From ef32fbb0199b852bd0ee41fcbf63effd8e0dea57 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:04:50 +0300 Subject: [PATCH 01/22] rename field --- application.go | 8 ++++---- http_server.go | 4 ++-- streams.go | 2 +- streams_storage.go | 36 ++++++++++++++++++------------------ 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/application.go b/application.go index c3c4b3b..25df731 100644 --- a/application.go +++ b/application.go @@ -103,8 +103,8 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) { outputTypes = append(outputTypes, typ) } - tmp.Streams.Streams[validUUID] = NewStreamConfiguration(rtspStream.URL, outputTypes) - tmp.Streams.Streams[validUUID].verboseLevel = NewVerboseLevelFrom(rtspStream.Verbose) + tmp.Streams.store[validUUID] = NewStreamConfiguration(rtspStream.URL, outputTypes) + tmp.Streams.store[validUUID].verboseLevel = NewVerboseLevelFrom(rtspStream.Verbose) if rtspStream.Archive.Enabled && cfg.ArchiveCfg.Enabled { if rtspStream.Archive.MsPerSegment == 0 { return nil, fmt.Errorf("bad ms per segment archive stream") @@ -208,7 +208,7 @@ func (app *Application) clientDelete(streamID, clientID uuid.UUID) { func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error { app.Streams.Lock() defer app.Streams.Unlock() - stream, ok := app.Streams.Streams[streamID] + stream, ok := app.Streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -224,7 +224,7 @@ func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) err func (app *Application) startMP4Cast(streamID uuid.UUID, stopCast chan bool) error { app.Streams.Lock() defer app.Streams.Unlock() - stream, ok := app.Streams.Streams[streamID] + stream, ok := app.Streams.store[streamID] if !ok { return ErrStreamNotFound } diff --git a/http_server.go b/http_server.go index d52d5b8..e822996 100644 --- a/http_server.go +++ b/http_server.go @@ -146,7 +146,7 @@ func EnableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Con outputTypes = append(outputTypes, typ) } app.Streams.Lock() - app.Streams.Streams[postData.GUID] = NewStreamConfiguration(postData.URL, outputTypes) + app.Streams.store[postData.GUID] = NewStreamConfiguration(postData.URL, outputTypes) app.Streams.Unlock() app.StartStream(postData.GUID) } @@ -171,7 +171,7 @@ func DisableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Co } if exist := app.streamExists(postData.GUID); exist { app.Streams.Lock() - delete(app.Streams.Streams, postData.GUID) + delete(app.Streams.store, postData.GUID) app.Streams.Unlock() } ctx.JSON(200, app) diff --git a/streams.go b/streams.go index ac834f4..831fad8 100644 --- a/streams.go +++ b/streams.go @@ -32,7 +32,7 @@ func (app *Application) StartStream(streamID uuid.UUID) { } func (app *Application) RunStream(ctx context.Context, streamID uuid.UUID) error { - url, supportedTypes := app.Streams.GetStream(streamID) + url, supportedTypes := app.Streams.GetStreamInfo(streamID) if url == "" { return ErrStreamNotFound } diff --git a/streams_storage.go b/streams_storage.go index 9f1963a..08a04ba 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -14,18 +14,18 @@ import ( // StreamsStorage Map wrapper for map[uuid.UUID]*StreamConfiguration with mutex for concurrent usage type StreamsStorage struct { sync.RWMutex - Streams map[uuid.UUID]*StreamConfiguration `json:"rtsp_streams"` + store map[uuid.UUID]*StreamConfiguration `json:"rtsp_streams"` } // NewStreamsStorageDefault prepares new allocated storage func NewStreamsStorageDefault() StreamsStorage { - return StreamsStorage{Streams: make(map[uuid.UUID]*StreamConfiguration)} + return StreamsStorage{store: make(map[uuid.UUID]*StreamConfiguration)} } -func (sm *StreamsStorage) GetStream(id uuid.UUID) (string, []StreamType) { +func (sm *StreamsStorage) GetStreamInfo(id uuid.UUID) (string, []StreamType) { sm.Lock() defer sm.Unlock() - stream, ok := sm.Streams[id] + stream, ok := sm.store[id] if !ok { return "", []StreamType{} } @@ -36,8 +36,8 @@ func (sm *StreamsStorage) GetStream(id uuid.UUID) (string, []StreamType) { func (sm *StreamsStorage) getKeys() []uuid.UUID { sm.Lock() defer sm.Unlock() - keys := make([]uuid.UUID, 0, len(sm.Streams)) - for k := range sm.Streams { + keys := make([]uuid.UUID, 0, len(sm.store)) + for k := range sm.store { keys = append(keys, k) } return keys @@ -46,7 +46,7 @@ func (sm *StreamsStorage) getKeys() []uuid.UUID { func (streams *StreamsStorage) archiveEnabled(streamID uuid.UUID) (bool, error) { streams.RLock() defer streams.RUnlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return false, ErrStreamNotFound } @@ -56,7 +56,7 @@ func (streams *StreamsStorage) archiveEnabled(streamID uuid.UUID) (bool, error) func (streams *StreamsStorage) getVerboseLevel(streamID uuid.UUID) VerboseLevel { streams.RLock() defer streams.RUnlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return VERBOSE_NONE } @@ -66,14 +66,14 @@ func (streams *StreamsStorage) getVerboseLevel(streamID uuid.UUID) VerboseLevel func (streams *StreamsStorage) streamExists(streamID uuid.UUID) bool { streams.RLock() defer streams.RUnlock() - _, ok := streams.Streams[streamID] + _, ok := streams.store[streamID] return ok } func (streams *StreamsStorage) existsWithType(streamID uuid.UUID, streamType StreamType) bool { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return false } @@ -85,7 +85,7 @@ func (streams *StreamsStorage) existsWithType(streamID uuid.UUID, streamType Str func (streams *StreamsStorage) addCodec(streamID uuid.UUID, codecs []av.CodecData) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return } @@ -98,7 +98,7 @@ func (streams *StreamsStorage) addCodec(streamID uuid.UUID, codecs []av.CodecDat func (streams *StreamsStorage) getCodec(streamID uuid.UUID) ([]av.CodecData, error) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return nil, ErrStreamNotFound } @@ -117,7 +117,7 @@ func (streams *StreamsStorage) getCodec(streamID uuid.UUID) ([]av.CodecData, err func (streams *StreamsStorage) updateStreamStatus(streamID uuid.UUID, status bool) error { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -131,7 +131,7 @@ func (streams *StreamsStorage) updateStreamStatus(streamID uuid.UUID, status boo func (streams *StreamsStorage) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return uuid.UUID{}, nil, ErrStreamNotFound } @@ -150,7 +150,7 @@ func (streams *StreamsStorage) addClient(streamID uuid.UUID) (uuid.UUID, chan av func (streams *StreamsStorage) deleteClient(streamID, clientID uuid.UUID) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return } @@ -163,7 +163,7 @@ func (streams *StreamsStorage) deleteClient(streamID, clientID uuid.UUID) { func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -187,7 +187,7 @@ func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnable func (streams *StreamsStorage) setArchiveStream(streamID uuid.UUID, archiveStorage *streamArhive) error { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -198,7 +198,7 @@ func (streams *StreamsStorage) setArchiveStream(streamID uuid.UUID, archiveStora func (streams *StreamsStorage) getStreamArchive(streamID uuid.UUID) *streamArhive { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return nil } From 31038ae1bef54c93e27e52009891219342862404 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:05:33 +0300 Subject: [PATCH 02/22] minor rename --- streams_storage.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/streams_storage.go b/streams_storage.go index 08a04ba..d59461e 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -22,10 +22,10 @@ func NewStreamsStorageDefault() StreamsStorage { return StreamsStorage{store: make(map[uuid.UUID]*StreamConfiguration)} } -func (sm *StreamsStorage) GetStreamInfo(id uuid.UUID) (string, []StreamType) { - sm.Lock() - defer sm.Unlock() - stream, ok := sm.store[id] +func (streams *StreamsStorage) GetStreamInfo(streamID uuid.UUID) (string, []StreamType) { + streams.Lock() + defer streams.Unlock() + stream, ok := streams.store[streamID] if !ok { return "", []StreamType{} } @@ -33,11 +33,11 @@ func (sm *StreamsStorage) GetStreamInfo(id uuid.UUID) (string, []StreamType) { } // getKeys returns all storage streams' keys as slice -func (sm *StreamsStorage) getKeys() []uuid.UUID { - sm.Lock() - defer sm.Unlock() - keys := make([]uuid.UUID, 0, len(sm.store)) - for k := range sm.store { +func (streams *StreamsStorage) getKeys() []uuid.UUID { + streams.Lock() + defer streams.Unlock() + keys := make([]uuid.UUID, 0, len(streams.store)) + for k := range streams.store { keys = append(keys, k) } return keys From de4dcbe674792cedd2f491cfa05791a047906440 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:10:47 +0300 Subject: [PATCH 03/22] continue to remove non usable fns --- application.go | 8 -------- http_server.go | 6 +++--- streams.go | 6 +++--- streams_storage.go | 17 ++++++++++------- ws_handler.go | 2 +- 5 files changed, 17 insertions(+), 22 deletions(-) diff --git a/application.go b/application.go index 25df731..07e3233 100644 --- a/application.go +++ b/application.go @@ -177,14 +177,6 @@ func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, arch return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) } -func (app *Application) streamExists(streamID uuid.UUID) bool { - return app.Streams.streamExists(streamID) -} - -func (app *Application) existsWithType(streamID uuid.UUID, streamType StreamType) bool { - return app.Streams.existsWithType(streamID, streamType) -} - func (app *Application) addCodec(streamID uuid.UUID, codecs []av.CodecData) { app.Streams.addCodec(streamID, codecs) } diff --git a/http_server.go b/http_server.go index e822996..2472c5f 100644 --- a/http_server.go +++ b/http_server.go @@ -78,7 +78,7 @@ func ListWrapper(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Cont if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_API_SERVER).Str("event", EVENT_API_REQUEST).Str("method", ctx.Request.Method).Str("route", ctx.Request.URL.Path).Str("remote", ctx.Request.RemoteAddr).Msg("Call streams list") } - allStreamsIDs := app.Streams.getKeys() + allStreamsIDs := app.Streams.GetAllStreamsIDS() ans := StreamsInfoShortenList{ Data: make([]StreamInfoShorten, len(allStreamsIDs)), } @@ -123,7 +123,7 @@ func EnableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Con ctx.JSON(http.StatusBadRequest, gin.H{"Error": errReason}) return } - if exist := app.streamExists(postData.GUID); !exist { + if exist := app.Streams.StreamExists(postData.GUID); !exist { outputTypes := make([]StreamType, 0, len(postData.OutputTypes)) for _, v := range postData.OutputTypes { typ, ok := streamTypeExists(v) @@ -169,7 +169,7 @@ func DisableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Co ctx.JSON(http.StatusBadRequest, gin.H{"Error": errReason}) return } - if exist := app.streamExists(postData.GUID); exist { + if exist := app.Streams.StreamExists(postData.GUID); exist { app.Streams.Lock() delete(app.Streams.store, postData.GUID) app.Streams.Unlock() diff --git a/streams.go b/streams.go index 831fad8..c66c4e7 100644 --- a/streams.go +++ b/streams.go @@ -15,7 +15,7 @@ const ( // StartStreams starts all video streams func (app *Application) StartStreams() { - streamsIDs := app.Streams.getKeys() + streamsIDs := app.Streams.GetAllStreamsIDS() for i := range streamsIDs { app.StartStream(streamsIDs[i]) } @@ -37,11 +37,11 @@ func (app *Application) RunStream(ctx context.Context, streamID uuid.UUID) error return ErrStreamNotFound } hlsEnabled := typeExists(STREAM_TYPE_HLS, supportedTypes) - archiveEnabled, err := app.Streams.archiveEnabled(streamID) + archiveEnabled, err := app.Streams.archiveEnabledForStream(streamID) if err != nil { return errors.Wrap(err, "Can't enable archive") } - streamVerboseLevel := app.Streams.getVerboseLevel(streamID) + streamVerboseLevel := app.Streams.getVerboseLevelForStream(streamID) app.startLoop(ctx, streamID, url, hlsEnabled, archiveEnabled, streamVerboseLevel) return nil } diff --git a/streams_storage.go b/streams_storage.go index d59461e..035a455 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -14,7 +14,7 @@ import ( // StreamsStorage Map wrapper for map[uuid.UUID]*StreamConfiguration with mutex for concurrent usage type StreamsStorage struct { sync.RWMutex - store map[uuid.UUID]*StreamConfiguration `json:"rtsp_streams"` + store map[uuid.UUID]*StreamConfiguration } // NewStreamsStorageDefault prepares new allocated storage @@ -22,6 +22,7 @@ func NewStreamsStorageDefault() StreamsStorage { return StreamsStorage{store: make(map[uuid.UUID]*StreamConfiguration)} } +// GetStreamInfo returns stream URL and its supported output types func (streams *StreamsStorage) GetStreamInfo(streamID uuid.UUID) (string, []StreamType) { streams.Lock() defer streams.Unlock() @@ -32,8 +33,8 @@ func (streams *StreamsStorage) GetStreamInfo(streamID uuid.UUID) (string, []Stre return stream.URL, stream.SupportedOutputTypes } -// getKeys returns all storage streams' keys as slice -func (streams *StreamsStorage) getKeys() []uuid.UUID { +// GetAllStreamsIDS returns all storage streams' keys as slice +func (streams *StreamsStorage) GetAllStreamsIDS() []uuid.UUID { streams.Lock() defer streams.Unlock() keys := make([]uuid.UUID, 0, len(streams.store)) @@ -43,7 +44,7 @@ func (streams *StreamsStorage) getKeys() []uuid.UUID { return keys } -func (streams *StreamsStorage) archiveEnabled(streamID uuid.UUID) (bool, error) { +func (streams *StreamsStorage) archiveEnabledForStream(streamID uuid.UUID) (bool, error) { streams.RLock() defer streams.RUnlock() stream, ok := streams.store[streamID] @@ -53,7 +54,7 @@ func (streams *StreamsStorage) archiveEnabled(streamID uuid.UUID) (bool, error) return stream.archive != nil, nil } -func (streams *StreamsStorage) getVerboseLevel(streamID uuid.UUID) VerboseLevel { +func (streams *StreamsStorage) getVerboseLevelForStream(streamID uuid.UUID) VerboseLevel { streams.RLock() defer streams.RUnlock() stream, ok := streams.store[streamID] @@ -63,14 +64,16 @@ func (streams *StreamsStorage) getVerboseLevel(streamID uuid.UUID) VerboseLevel return stream.verboseLevel } -func (streams *StreamsStorage) streamExists(streamID uuid.UUID) bool { +// StreamExists checks whenever given stream ID exists in storage +func (streams *StreamsStorage) StreamExists(streamID uuid.UUID) bool { streams.RLock() defer streams.RUnlock() _, ok := streams.store[streamID] return ok } -func (streams *StreamsStorage) existsWithType(streamID uuid.UUID, streamType StreamType) bool { +// TypeExistsForStream checks whenever specific stream ID supports then given output stream type +func (streams *StreamsStorage) TypeExistsForStream(streamID uuid.UUID, streamType StreamType) bool { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] diff --git a/ws_handler.go b/ws_handler.go index 4cf2d09..7552df6 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -48,7 +48,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - mseExists := app.existsWithType(streamID, STREAM_TYPE_MSE) + mseExists := app.Streams.TypeExistsForStream(streamID, STREAM_TYPE_MSE) if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Bool("mse_exists", mseExists).Msg("Validate stream type") } From 916b18d0fe181c8052a3fa69afa2d2a9944184bf Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:11:43 +0300 Subject: [PATCH 04/22] remove addCodec shortcut --- application.go | 4 ---- stream.go | 4 ++-- streams_storage.go | 3 ++- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/application.go b/application.go index 07e3233..9b1ad67 100644 --- a/application.go +++ b/application.go @@ -177,10 +177,6 @@ func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, arch return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) } -func (app *Application) addCodec(streamID uuid.UUID, codecs []av.CodecData) { - app.Streams.addCodec(streamID, codecs) -} - func (app *Application) getCodec(streamID uuid.UUID) ([]av.CodecData, error) { return app.Streams.getCodec(streamID) } diff --git a/stream.go b/stream.go index 2eb8104..429dc69 100644 --- a/stream.go +++ b/stream.go @@ -36,7 +36,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_codec_met").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Any("codec_data", session.CodecData).Msg("Found codec. Adding this one") } - app.addCodec(streamID, session.CodecData) + app.Streams.AddCodecForStream(streamID, session.CodecData) if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_status_update").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Update stream status") } @@ -101,7 +101,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_codec_update_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Any("codec_data", session.CodecData).Msg("Recieved update codec signal") } - app.addCodec(streamID, session.CodecData) + app.Streams.AddCodecForStream(streamID, session.CodecData) err = app.updateStreamStatus(streamID, true) if err != nil { return errors.Wrapf(err, "Can't update status for stream %s", streamID) diff --git a/streams_storage.go b/streams_storage.go index 035a455..061f6e2 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -85,7 +85,8 @@ func (streams *StreamsStorage) TypeExistsForStream(streamID uuid.UUID, streamTyp return ok && typeEnabled } -func (streams *StreamsStorage) addCodec(streamID uuid.UUID, codecs []av.CodecData) { +// AddCodecForStream appends new codecs data for the given stream +func (streams *StreamsStorage) AddCodecForStream(streamID uuid.UUID, codecs []av.CodecData) { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] From 221cce87fe74c3237594947d5856b579e0da88d4 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:12:43 +0300 Subject: [PATCH 05/22] remove getCodec shortcut --- application.go | 4 ---- hls.go | 2 +- mp4.go | 2 +- streams_storage.go | 3 ++- ws_handler.go | 2 +- 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/application.go b/application.go index 9b1ad67..1a2d42b 100644 --- a/application.go +++ b/application.go @@ -177,10 +177,6 @@ func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, arch return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) } -func (app *Application) getCodec(streamID uuid.UUID) ([]av.CodecData, error) { - return app.Streams.getCodec(streamID) -} - func (app *Application) updateStreamStatus(streamID uuid.UUID, status bool) error { return app.Streams.updateStreamStatus(streamID, status) } diff --git a/hls.go b/hls.go index a1c1822..71bbde5 100644 --- a/hls.go +++ b/hls.go @@ -47,7 +47,7 @@ func (app *Application) startHls(streamID uuid.UUID, ch chan av.Packet, stopCast tsMuxer := ts.NewMuxer(outFile) // Write header - codecData, err := app.getCodec(streamID) + codecData, err := app.Streams.GetCodecsDataForStream(streamID) if err != nil { return errors.Wrap(err, streamID.String()) } diff --git a/mp4.go b/mp4.go index 8a047d5..264d375 100644 --- a/mp4.go +++ b/mp4.go @@ -48,7 +48,7 @@ func (app *Application) startMP4(streamID uuid.UUID, ch chan av.Packet, stopCast } tsMuxer := mp4.NewMuxer(outFile) log.Info().Str("scope", "archive").Str("event", "archive_create_file").Str("stream_id", streamID.String()).Str("segment_path", segmentPath).Msg("Create segment") - codecData, err := app.getCodec(streamID) + codecData, err := app.Streams.GetCodecsDataForStream(streamID) if err != nil { return errors.Wrap(err, streamID.String()) } diff --git a/streams_storage.go b/streams_storage.go index 061f6e2..4b97bee 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -99,7 +99,8 @@ func (streams *StreamsStorage) AddCodecForStream(streamID uuid.UUID, codecs []av } } -func (streams *StreamsStorage) getCodec(streamID uuid.UUID) ([]av.CodecData, error) { +// GetCodecsDataForStream returns COPY of codecs data for the given stream +func (streams *StreamsStorage) GetCodecsDataForStream(streamID uuid.UUID) ([]av.CodecData, error) { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] diff --git a/ws_handler.go b/ws_handler.go index 7552df6..ce605b1 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -76,7 +76,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Client has been added") } - codecData, err := app.getCodec(streamID) + codecData, err := app.Streams.GetCodecsDataForStream(streamID) if err != nil { errReason := "Can't extract codec for stream" if verboseLevel > VERBOSE_NONE { From 1276c918ad1920dcb87d3fc36792b844497e7eab Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:14:07 +0300 Subject: [PATCH 06/22] remove updateStreamStatus shortcut --- application.go | 4 ---- stream.go | 8 ++++---- streams_storage.go | 3 ++- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/application.go b/application.go index 1a2d42b..14a17ee 100644 --- a/application.go +++ b/application.go @@ -177,10 +177,6 @@ func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, arch return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) } -func (app *Application) updateStreamStatus(streamID uuid.UUID, status bool) error { - return app.Streams.updateStreamStatus(streamID, status) -} - func (app *Application) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { return app.Streams.addClient(streamID) } diff --git a/stream.go b/stream.go index 429dc69..0cbaab4 100644 --- a/stream.go +++ b/stream.go @@ -40,7 +40,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_status_update").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Update stream status") } - err = app.updateStreamStatus(streamID, true) + err = app.Streams.UpdateStreamStatus(streamID, true) if err != nil { return errors.Wrapf(err, "Can't update status for stream %s", streamID) } @@ -102,7 +102,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar log.Info().Str("scope", "streaming").Str("event", "stream_codec_update_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Any("codec_data", session.CodecData).Msg("Recieved update codec signal") } app.Streams.AddCodecForStream(streamID, session.CodecData) - err = app.updateStreamStatus(streamID, true) + err = app.Streams.UpdateStreamStatus(streamID, true) if err != nil { return errors.Wrapf(err, "Can't update status for stream %s", streamID) } @@ -110,7 +110,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_stop_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Recieved stop signal") } - err = app.updateStreamStatus(streamID, false) + err = app.Streams.UpdateStreamStatus(streamID, false) if err != nil { errors.Wrapf(err, "Can't switch status to False for stream '%s'", url) } @@ -143,7 +143,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar } stopMP4Cast <- true } - errStatus := app.updateStreamStatus(streamID, false) + errStatus := app.Streams.UpdateStreamStatus(streamID, false) if errStatus != nil { errors.Wrapf(errors.Wrapf(err, "Can't cast packet %s (%s)", streamID, url), "Can't switch status to False for stream '%s'", url) } diff --git a/streams_storage.go b/streams_storage.go index 4b97bee..a56d4a9 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -119,7 +119,8 @@ func (streams *StreamsStorage) GetCodecsDataForStream(streamID uuid.UUID) ([]av. return codecs, nil } -func (streams *StreamsStorage) updateStreamStatus(streamID uuid.UUID, status bool) error { +// UpdateStreamStatus sets new status value for the given stream +func (streams *StreamsStorage) UpdateStreamStatus(streamID uuid.UUID, status bool) error { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] From 852ffd582bcfa85d30cc2c23448702e3f7e7472e Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:16:15 +0300 Subject: [PATCH 07/22] remove addClient shortcut --- application.go | 4 ---- streams_storage.go | 3 ++- ws_handler.go | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/application.go b/application.go index 14a17ee..31c7f0a 100644 --- a/application.go +++ b/application.go @@ -177,10 +177,6 @@ func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, arch return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) } -func (app *Application) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { - return app.Streams.addClient(streamID) -} - func (app *Application) clientDelete(streamID, clientID uuid.UUID) { app.Streams.deleteClient(streamID, clientID) } diff --git a/streams_storage.go b/streams_storage.go index a56d4a9..87d6f77 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -134,7 +134,8 @@ func (streams *StreamsStorage) UpdateStreamStatus(streamID uuid.UUID, status boo return nil } -func (streams *StreamsStorage) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { +// AddViewer adds client to the given stream. Return newly client ID, buffered channel for stream on success +func (streams *StreamsStorage) AddViewer(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] diff --git a/ws_handler.go b/ws_handler.go index ce605b1..232fd59 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -62,7 +62,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - cuuid, ch, err := app.addClient(streamID) + cuuid, ch, err := app.Streams.AddViewer(streamID) if err != nil { errReason := "Can't add client to the queue" if verboseLevel > VERBOSE_NONE { From d889da836d54c6652b30c42fc23320922aec469f Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:17:16 +0300 Subject: [PATCH 08/22] remove deleteClient shortcut --- application.go | 4 --- streams_storage.go | 3 ++- ws_handler.go | 64 +++++++++++++++++++++++----------------------- 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/application.go b/application.go index 31c7f0a..382b044 100644 --- a/application.go +++ b/application.go @@ -177,10 +177,6 @@ func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, arch return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) } -func (app *Application) clientDelete(streamID, clientID uuid.UUID) { - app.Streams.deleteClient(streamID, clientID) -} - func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error { app.Streams.Lock() defer app.Streams.Unlock() diff --git a/streams_storage.go b/streams_storage.go index 87d6f77..31bbfff 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -154,7 +154,8 @@ func (streams *StreamsStorage) AddViewer(streamID uuid.UUID) (uuid.UUID, chan av return clientID, ch, nil } -func (streams *StreamsStorage) deleteClient(streamID, clientID uuid.UUID) { +// DeleteViewer removes given client from the stream +func (streams *StreamsStorage) DeleteViewer(streamID, clientID uuid.UUID) { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] diff --git a/ws_handler.go b/ws_handler.go index 232fd59..e229dee 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -62,7 +62,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - cuuid, ch, err := app.Streams.AddViewer(streamID) + clientID, ch, err := app.Streams.AddViewer(streamID) if err != nil { errReason := "Can't add client to the queue" if verboseLevel > VERBOSE_NONE { @@ -71,28 +71,28 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - defer app.clientDelete(streamID, cuuid) + defer app.Streams.DeleteViewer(streamID, clientID) if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Client has been added") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Client has been added") } codecData, err := app.Streams.GetCodecsDataForStream(streamID) if err != nil { errReason := "Can't extract codec for stream" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Msg("Validate codecs") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Msg("Validate codecs") } if len(codecData) == 0 { errReason := "No codec information" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return @@ -102,43 +102,43 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if err != nil { errReason := "Can't write codec information to the header" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Msg("Write header to muxer") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Msg("Write header to muxer") } meta, init := muxer.GetInit(codecData) if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Get meta information") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Get meta information") } err = conn.WriteMessage(websocket.BinaryMessage, append([]byte{9}, meta...)) if err != nil { errReason := "Can't write meta information" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send meta information") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send meta information") } err = conn.WriteMessage(websocket.BinaryMessage, init) if err != nil { errReason := "Can't write initialization information" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("remote_addr", r.RemoteAddr).Str("event", EVENT_WS_UPGRADER).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send initialization message") + log.Info().Str("remote_addr", r.RemoteAddr).Str("event", EVENT_WS_UPGRADER).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send initialization message") } var start bool @@ -147,7 +147,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re go func(q, p chan bool) { if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Start loop in goroutine") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Start loop in goroutine") } for { msgType, data, err := conn.ReadMessage() @@ -155,22 +155,22 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re q <- true errReason := "Can't read message" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Read message in a loop") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Read message in a loop") } if msgType == websocket.TextMessage && len(data) > 0 && string(data) == "ping" { select { case p <- true: - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been sent") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been sent") // message sent default: // message dropped - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been dropped") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been dropped") } } } @@ -179,47 +179,47 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re noKeyFrames := time.NewTimer(keyFramesTimeout) if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Start loop") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Start loop") } for { select { case <-noKeyFrames.C: if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("No keyframes has been met") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("No keyframes has been met") } return case <-quitCh: if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Quit") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Quit") } return case <-rxPingCh: if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("'Ping' has been recieved") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("'Ping' has been recieved") } err := conn.WriteMessage(websocket.TextMessage, []byte("pong")) if err != nil { errReason := "Can't write PONG message" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } case pck := <-ch: if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Packet has been recieved from stream source") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Packet has been recieved from stream source") } if pck.IsKeyFrame { if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Packet is a keyframe") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Packet is a keyframe") } noKeyFrames.Reset(keyFramesTimeout) start = true } if !start { if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Stream has not been started") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Stream has not been started") } continue } @@ -227,23 +227,23 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if err != nil { errReason := "Can't write packet to the muxer" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("packet_len", len(pck.Data)).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("packet_len", len(pck.Data)).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write packet to the muxer") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write packet to the muxer") } if ready { if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Muxer is ready to write another packet") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Muxer is ready to write another packet") } err = conn.SetWriteDeadline(time.Now().Add(deadlineTimeout)) if err != nil { errReason := "Can't set new deadline" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return @@ -252,13 +252,13 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if err != nil { errReason := "Can't write buffered message" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write buffer to the client") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write buffer to the client") } } } From 26cba6a6f18f034397e2bfbd9f18dac17b849031 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:19:16 +0300 Subject: [PATCH 09/22] remove cast shortcut + rename client->viewer --- application.go | 4 ---- stream.go | 2 +- streams_storage.go | 43 ++++++++++++++++++++++--------------------- 3 files changed, 23 insertions(+), 26 deletions(-) diff --git a/application.go b/application.go index 382b044..3cbc43d 100644 --- a/application.go +++ b/application.go @@ -173,10 +173,6 @@ func (app *Application) setCors(cfg configuration.CORSConfiguration) { app.CorsConfig.AllowCredentials = cfg.AllowCredentials } -func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error { - return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) -} - func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error { app.Streams.Lock() defer app.Streams.Unlock() diff --git a/stream.go b/stream.go index 0cbaab4..4c40560 100644 --- a/stream.go +++ b/stream.go @@ -129,7 +129,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_ADD { log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Casting packet") } - err = app.cast(streamID, *packetAV, hlsEnabled, archiveEnabled) + err = app.Streams.CastPacket(streamID, *packetAV, hlsEnabled, archiveEnabled) if err != nil { if hlsEnabled { if streamVerboseLevel > VERBOSE_NONE { diff --git a/streams_storage.go b/streams_storage.go index 31bbfff..e02166a 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -44,26 +44,6 @@ func (streams *StreamsStorage) GetAllStreamsIDS() []uuid.UUID { return keys } -func (streams *StreamsStorage) archiveEnabledForStream(streamID uuid.UUID) (bool, error) { - streams.RLock() - defer streams.RUnlock() - stream, ok := streams.store[streamID] - if !ok { - return false, ErrStreamNotFound - } - return stream.archive != nil, nil -} - -func (streams *StreamsStorage) getVerboseLevelForStream(streamID uuid.UUID) VerboseLevel { - streams.RLock() - defer streams.RUnlock() - stream, ok := streams.store[streamID] - if !ok { - return VERBOSE_NONE - } - return stream.verboseLevel -} - // StreamExists checks whenever given stream ID exists in storage func (streams *StreamsStorage) StreamExists(streamID uuid.UUID) bool { streams.RLock() @@ -168,7 +148,8 @@ func (streams *StreamsStorage) DeleteViewer(streamID, clientID uuid.UUID) { delete(stream.Clients, clientID) } -func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error { +// CastPacket cast AV Packet to viewers and possible to HLS/MP4 channels +func (streams *StreamsStorage) CastPacket(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] @@ -192,6 +173,26 @@ func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnable return nil } +func (streams *StreamsStorage) getVerboseLevelForStream(streamID uuid.UUID) VerboseLevel { + streams.RLock() + defer streams.RUnlock() + stream, ok := streams.store[streamID] + if !ok { + return VERBOSE_NONE + } + return stream.verboseLevel +} + +func (streams *StreamsStorage) archiveEnabledForStream(streamID uuid.UUID) (bool, error) { + streams.RLock() + defer streams.RUnlock() + stream, ok := streams.store[streamID] + if !ok { + return false, ErrStreamNotFound + } + return stream.archive != nil, nil +} + func (streams *StreamsStorage) setArchiveStream(streamID uuid.UUID, archiveStorage *streamArhive) error { streams.Lock() defer streams.Unlock() From 5967619667c50cffaee155d87d16ec28d2e91eb9 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:20:02 +0300 Subject: [PATCH 10/22] export verbose level for stream --- streams.go | 2 +- streams_storage.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/streams.go b/streams.go index c66c4e7..af44936 100644 --- a/streams.go +++ b/streams.go @@ -41,7 +41,7 @@ func (app *Application) RunStream(ctx context.Context, streamID uuid.UUID) error if err != nil { return errors.Wrap(err, "Can't enable archive") } - streamVerboseLevel := app.Streams.getVerboseLevelForStream(streamID) + streamVerboseLevel := app.Streams.GetVerboseLevelForStream(streamID) app.startLoop(ctx, streamID, url, hlsEnabled, archiveEnabled, streamVerboseLevel) return nil } diff --git a/streams_storage.go b/streams_storage.go index e02166a..45f75c0 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -173,7 +173,8 @@ func (streams *StreamsStorage) CastPacket(streamID uuid.UUID, pck av.Packet, hls return nil } -func (streams *StreamsStorage) getVerboseLevelForStream(streamID uuid.UUID) VerboseLevel { +// GetVerboseLevelForStream returst verbose level for the given stream +func (streams *StreamsStorage) GetVerboseLevelForStream(streamID uuid.UUID) VerboseLevel { streams.RLock() defer streams.RUnlock() stream, ok := streams.store[streamID] From c102ca5915f8237a3112664dc5849068e6420be4 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:21:19 +0300 Subject: [PATCH 11/22] minor doc --- streams.go | 2 +- streams_storage.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/streams.go b/streams.go index af44936..4aeec38 100644 --- a/streams.go +++ b/streams.go @@ -37,7 +37,7 @@ func (app *Application) RunStream(ctx context.Context, streamID uuid.UUID) error return ErrStreamNotFound } hlsEnabled := typeExists(STREAM_TYPE_HLS, supportedTypes) - archiveEnabled, err := app.Streams.archiveEnabledForStream(streamID) + archiveEnabled, err := app.Streams.IsArchiveEnabledForStream(streamID) if err != nil { return errors.Wrap(err, "Can't enable archive") } diff --git a/streams_storage.go b/streams_storage.go index 45f75c0..f3a51a9 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -184,7 +184,8 @@ func (streams *StreamsStorage) GetVerboseLevelForStream(streamID uuid.UUID) Verb return stream.verboseLevel } -func (streams *StreamsStorage) archiveEnabledForStream(streamID uuid.UUID) (bool, error) { +// IsArchiveEnabledForStream returns whenever archive has been enabled for stream +func (streams *StreamsStorage) IsArchiveEnabledForStream(streamID uuid.UUID) (bool, error) { streams.RLock() defer streams.RUnlock() stream, ok := streams.store[streamID] From f756fd334877efe81d4d9e33b8effceb7bef92d0 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:23:00 +0300 Subject: [PATCH 12/22] minor doc on archive storage config --- application.go | 2 +- streams_storage.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/application.go b/application.go index 3cbc43d..6529294 100644 --- a/application.go +++ b/application.go @@ -150,7 +150,7 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) { default: return nil, fmt.Errorf("unsupported archive type") } - err = tmp.Streams.setArchiveStream(validUUID, &archiveStorage) + err = tmp.Streams.UpdateArchiveStorageForStream(validUUID, &archiveStorage) if err != nil { return nil, errors.Wrap(err, "can't set archive for given stream") } diff --git a/streams_storage.go b/streams_storage.go index f3a51a9..186bce6 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -195,7 +195,8 @@ func (streams *StreamsStorage) IsArchiveEnabledForStream(streamID uuid.UUID) (bo return stream.archive != nil, nil } -func (streams *StreamsStorage) setArchiveStream(streamID uuid.UUID, archiveStorage *streamArhive) error { +// UpdateArchiveStorageForStream updates archive storage configuration (it override existing one!) +func (streams *StreamsStorage) UpdateArchiveStorageForStream(streamID uuid.UUID, archiveStorage *streamArhive) error { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] From 8228fff07711f012d8126a8295f006e941b83557 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:26:28 +0300 Subject: [PATCH 13/22] extrect archive storage --- application.go | 8 ++++---- mp4.go | 6 +----- stream.go | 4 ++-- streams_storage.go | 3 ++- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/application.go b/application.go index 6529294..4dcb857 100644 --- a/application.go +++ b/application.go @@ -189,18 +189,18 @@ func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) err return nil } -func (app *Application) startMP4Cast(streamID uuid.UUID, stopCast chan bool) error { +func (app *Application) startMP4Cast(archive *streamArhive, streamID uuid.UUID, stopCast chan bool) error { app.Streams.Lock() defer app.Streams.Unlock() stream, ok := app.Streams.store[streamID] if !ok { return ErrStreamNotFound } - go func(id uuid.UUID, mp4Chanel chan av.Packet, stop chan bool) { - err := app.startMP4(id, mp4Chanel, stop) + go func(arch *streamArhive, id uuid.UUID, mp4Chanel chan av.Packet, stop chan bool) { + err := app.startMP4(arch, id, mp4Chanel, stop) if err != nil { log.Error().Err(err).Str("scope", "archive").Str("event", "archive_start_cast").Str("stream_id", id.String()).Msg("Error on MP4 cast start") } - }(streamID, stream.mp4Chanel, stopCast) + }(archive, streamID, stream.mp4Chanel, stopCast) return nil } diff --git a/mp4.go b/mp4.go index 264d375..b30c204 100644 --- a/mp4.go +++ b/mp4.go @@ -16,12 +16,8 @@ import ( "github.com/rs/zerolog/log" ) -func (app *Application) startMP4(streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error { +func (app *Application) startMP4(archive *streamArhive, streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error { var err error - archive := app.Streams.getStreamArchive(streamID) - if archive == nil { - return errors.Wrap(err, "Bad archive stream") - } err = archive.store.MakeBucket(archive.bucket) if err != nil { return errors.Wrap(err, "Can't prepare bucket") diff --git a/stream.go b/stream.go index 4c40560..8de57a6 100644 --- a/stream.go +++ b/stream.go @@ -75,12 +75,12 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Need to start casting to MP4 archive") } - archive := app.Streams.getStreamArchive(streamID) + archive := app.Streams.GetStreamArchiveStorage(streamID) if archive == nil { log.Warn().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Empty archive configuration for the given stream") } else { stopMP4Cast = make(chan bool, 1) - err = app.startMP4Cast(streamID, stopMP4Cast) + err = app.startMP4Cast(archive, streamID, stopMP4Cast) if err != nil { if streamVerboseLevel > VERBOSE_NONE { log.Warn().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Can't start MP4 archive process") diff --git a/streams_storage.go b/streams_storage.go index 186bce6..5151ac9 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -207,7 +207,8 @@ func (streams *StreamsStorage) UpdateArchiveStorageForStream(streamID uuid.UUID, return nil } -func (streams *StreamsStorage) getStreamArchive(streamID uuid.UUID) *streamArhive { +// GetStreamArchiveStorage returns pointer to the archive storage for the given stream +func (streams *StreamsStorage) GetStreamArchiveStorage(streamID uuid.UUID) *streamArhive { streams.Lock() defer streams.Unlock() stream, ok := streams.store[streamID] From e6d5c836b5f3f384a843eb8f9d95355f69a2aaf9 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:27:07 +0300 Subject: [PATCH 14/22] possible err on empty archive --- mp4.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mp4.go b/mp4.go index b30c204..4162bca 100644 --- a/mp4.go +++ b/mp4.go @@ -18,6 +18,9 @@ import ( func (app *Application) startMP4(archive *streamArhive, streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error { var err error + if archive == nil { + return errors.Wrap(err, "Bad archive stream") + } err = archive.store.MakeBucket(archive.bucket) if err != nil { return errors.Wrap(err, "Can't prepare bucket") From 9095a75ec58ee89a383bb7e8b722763c4734b2c0 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:28:13 +0300 Subject: [PATCH 15/22] null archive error --- application.go | 3 +++ errors.go | 1 + mp4.go | 4 ++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/application.go b/application.go index 4dcb857..1ad4128 100644 --- a/application.go +++ b/application.go @@ -190,6 +190,9 @@ func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) err } func (app *Application) startMP4Cast(archive *streamArhive, streamID uuid.UUID, stopCast chan bool) error { + if archive == nil { + return ErrNullArchive + } app.Streams.Lock() defer app.Streams.Unlock() stream, ok := app.Streams.store[streamID] diff --git a/errors.go b/errors.go index 0086ec1..939c798 100644 --- a/errors.go +++ b/errors.go @@ -11,4 +11,5 @@ var ( ErrStreamTypeNotExists = fmt.Errorf("stream type does not exists") ErrStreamTypeNotSupported = fmt.Errorf("stream type is not supported") ErrNotSupportedStorage = fmt.Errorf("not supported storage") + ErrNullArchive = fmt.Errorf("archive == nil") ) diff --git a/mp4.go b/mp4.go index 4162bca..74252dd 100644 --- a/mp4.go +++ b/mp4.go @@ -17,10 +17,10 @@ import ( ) func (app *Application) startMP4(archive *streamArhive, streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error { - var err error if archive == nil { - return errors.Wrap(err, "Bad archive stream") + return ErrNullArchive } + var err error err = archive.store.MakeBucket(archive.bucket) if err != nil { return errors.Wrap(err, "Can't prepare bucket") From 598064bc7498b3a103ab2e8ff2f53f5f060f0cf3 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:34:22 +0300 Subject: [PATCH 16/22] move to util --- hls.go | 9 --------- utils.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/hls.go b/hls.go index 71bbde5..790a4b1 100644 --- a/hls.go +++ b/hls.go @@ -202,12 +202,3 @@ func (app *Application) removeOutdatedSegments(streamID uuid.UUID, playlist *m3u } return nil } - -// ensureDir alias to 'mkdir -p' -func ensureDir(dirName string) error { - err := os.MkdirAll(dirName, 0777) - if err == nil || os.IsExist(err) { - return nil - } - return err -} diff --git a/utils.go b/utils.go index 22f9153..ab376e8 100644 --- a/utils.go +++ b/utils.go @@ -1,6 +1,7 @@ package videoserver import ( + "os" "reflect" "sync" "sync/atomic" @@ -37,3 +38,12 @@ func readerCount(rw *sync.RWMutex) int64 { // func readerCount(rw *sync.RWMutex) int64 { // return reflect.ValueOf(rw).Elem().FieldByName("readerCount").Int() // } + +// ensureDir alias to 'mkdir -p' +func ensureDir(dirName string) error { + err := os.MkdirAll(dirName, 0777) + if err == nil || os.IsExist(err) { + return nil + } + return err +} From cda485da8c8dac92586ac92f53a3adf70de5e6c6 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:34:53 +0300 Subject: [PATCH 17/22] redundant input types --- stream_types.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/stream_types.go b/stream_types.go index aba29a8..f2a76ca 100644 --- a/stream_types.go +++ b/stream_types.go @@ -16,9 +16,6 @@ func (iotaIdx StreamType) String() string { } var ( - supportedInputStreamTypes = map[StreamType]struct{}{ - STREAM_TYPE_RTSP: {}, - } supportedOutputStreamTypes = map[StreamType]struct{}{ STREAM_TYPE_HLS: {}, STREAM_TYPE_MSE: {}, From e085cb33bda8b5f7a193b0ab1cb6ba737bfd34a5 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:41:01 +0300 Subject: [PATCH 18/22] cors --- application.go | 9 +++++++++ logger.go | 3 +++ 2 files changed, 12 insertions(+) diff --git a/application.go b/application.go index 1ad4128..4bc1bb4 100644 --- a/application.go +++ b/application.go @@ -171,6 +171,15 @@ func (app *Application) setCors(cfg configuration.CORSConfiguration) { } app.CorsConfig.ExposeHeaders = cfg.ExposeHeaders app.CorsConfig.AllowCredentials = cfg.AllowCredentials + // See https://github.com/gofiber/fiber/security/advisories/GHSA-fmg4-x8pw-hjhg + if app.CorsConfig.AllowCredentials { + for _, v := range app.CorsConfig.AllowOrigins { + if v == "*" { + log.Warn().Str("scope", SCOPE_APP).Str("event", EVENT_APP_CORS_CONFIG).Msg("[CORS] Insecure setup, 'AllowCredentials' is set to true, and 'AllowOrigins' is set to a wildcard. Settings 'AllowCredentials' to be 'false'. See https://github.com/gofiber/fiber/security/advisories/GHSA-fmg4-x8pw-hjhg") + app.CorsConfig.AllowCredentials = false + } + } + } } func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error { diff --git a/logger.go b/logger.go index 8b60892..cb0a2ef 100644 --- a/logger.go +++ b/logger.go @@ -1,11 +1,14 @@ package videoserver const ( + SCOPE_APP = "app" SCOPE_STREAM = "stream" SCOPE_WS_HANDLER = "ws_handler" SCOPE_API_SERVER = "api_server" SCOPE_WS_SERVER = "ws_server" + EVENT_APP_CORS_CONFIG = "app_cors_config" + EVENT_API_PREPARE = "api_server_prepare" EVENT_API_START = "api_server_start" EVENT_API_CORS_ENABLE = "api_server_cors_enable" From 9d30c0d8e28f6069feaf50894eaba28526d87e37 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:42:03 +0300 Subject: [PATCH 19/22] rm allow creds --- cmd/video_server/conf.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/video_server/conf.json b/cmd/video_server/conf.json index c120b5f..79fbafa 100644 --- a/cmd/video_server/conf.json +++ b/cmd/video_server/conf.json @@ -36,8 +36,7 @@ "allow_origins": ["*"], "allow_methods": ["GET", "PUT", "POST", "DELETE"], "allow_headers": ["Origin", "Authorization", "Content-Type", "Content-Length", "Accept", "Accept-Encoding", "X-HttpRequest"], - "expose_headers": ["Content-Length"], - "allow_credentials":true + "expose_headers": ["Content-Length"] }, "rtsp_streams": [ { From f114243dbc7ffe6c5dec258c516b02d069199c48 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:46:34 +0300 Subject: [PATCH 20/22] single literal name --- ws_handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ws_handler.go b/ws_handler.go index e229dee..b9acf07 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -145,14 +145,14 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re quitCh := make(chan bool) rxPingCh := make(chan bool) - go func(q, p chan bool) { + go func(quit, ping chan bool) { if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Start loop in goroutine") } for { msgType, data, err := conn.ReadMessage() if err != nil { - q <- true + quit <- true errReason := "Can't read message" if verboseLevel > VERBOSE_NONE { log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) @@ -165,7 +165,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re } if msgType == websocket.TextMessage && len(data) > 0 && string(data) == "ping" { select { - case p <- true: + case ping <- true: log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been sent") // message sent default: From 66f265b926ddc90986d5b537d45c1748b8892a8b Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:49:13 +0300 Subject: [PATCH 21/22] single defer in ws handler --- ws_handler.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/ws_handler.go b/ws_handler.go index b9acf07..bc289bd 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -19,6 +19,9 @@ var ( // wshandler is a websocket handler for user connection func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Request, app *Application, verboseLevel VerboseLevel) { + var streamID, clientID uuid.UUID + var mseExists bool + streamIDSTR := r.FormValue("stream_id") if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Msg("MSE Connected") @@ -36,10 +39,16 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Msg("Connection has been closed") } + if mseExists { + app.Streams.DeleteViewer(streamID, clientID) + if verboseLevel > VERBOSE_SIMPLE { + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Client has been removed") + } + } conn.Close() }() - streamID, err := uuid.Parse(streamIDSTR) + streamID, err = uuid.Parse(streamIDSTR) if err != nil { errReason := fmt.Sprintf("Not valid UUID: '%s'", streamIDSTR) if verboseLevel > VERBOSE_NONE { @@ -48,7 +57,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - mseExists := app.Streams.TypeExistsForStream(streamID, STREAM_TYPE_MSE) + mseExists = app.Streams.TypeExistsForStream(streamID, STREAM_TYPE_MSE) if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Bool("mse_exists", mseExists).Msg("Validate stream type") } @@ -71,7 +80,6 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - defer app.Streams.DeleteViewer(streamID, clientID) if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Client has been added") } From 36c14e0a48b3390667340277c5dde6c3166c6ac4 Mon Sep 17 00:00:00 2001 From: LdDl Date: Mon, 16 Sep 2024 18:51:59 +0300 Subject: [PATCH 22/22] make it explicit --- ws_handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ws_handler.go b/ws_handler.go index bc289bd..937166d 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -20,7 +20,7 @@ var ( // wshandler is a websocket handler for user connection func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Request, app *Application, verboseLevel VerboseLevel) { var streamID, clientID uuid.UUID - var mseExists bool + var mseExists, clientAdded bool streamIDSTR := r.FormValue("stream_id") if verboseLevel > VERBOSE_SIMPLE { @@ -39,7 +39,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Msg("Connection has been closed") } - if mseExists { + if mseExists && clientAdded { app.Streams.DeleteViewer(streamID, clientID) if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Client has been removed") @@ -80,6 +80,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } + clientAdded = true if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Client has been added") }