diff --git a/application.go b/application.go index c3c4b3b..4bc1bb4 100644 --- a/application.go +++ b/application.go @@ -103,8 +103,8 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) { outputTypes = append(outputTypes, typ) } - tmp.Streams.Streams[validUUID] = NewStreamConfiguration(rtspStream.URL, outputTypes) - tmp.Streams.Streams[validUUID].verboseLevel = NewVerboseLevelFrom(rtspStream.Verbose) + tmp.Streams.store[validUUID] = NewStreamConfiguration(rtspStream.URL, outputTypes) + tmp.Streams.store[validUUID].verboseLevel = NewVerboseLevelFrom(rtspStream.Verbose) if rtspStream.Archive.Enabled && cfg.ArchiveCfg.Enabled { if rtspStream.Archive.MsPerSegment == 0 { return nil, fmt.Errorf("bad ms per segment archive stream") @@ -150,7 +150,7 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) { default: return nil, fmt.Errorf("unsupported archive type") } - err = tmp.Streams.setArchiveStream(validUUID, &archiveStorage) + err = tmp.Streams.UpdateArchiveStorageForStream(validUUID, &archiveStorage) if err != nil { return nil, errors.Wrap(err, "can't set archive for given stream") } @@ -171,44 +171,21 @@ func (app *Application) setCors(cfg configuration.CORSConfiguration) { } app.CorsConfig.ExposeHeaders = cfg.ExposeHeaders app.CorsConfig.AllowCredentials = cfg.AllowCredentials -} - -func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error { - return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled) -} - -func (app *Application) streamExists(streamID uuid.UUID) bool { - return app.Streams.streamExists(streamID) -} - -func (app *Application) existsWithType(streamID uuid.UUID, streamType StreamType) bool { - return app.Streams.existsWithType(streamID, streamType) -} - -func (app *Application) addCodec(streamID uuid.UUID, codecs []av.CodecData) { - app.Streams.addCodec(streamID, codecs) -} - -func (app *Application) getCodec(streamID uuid.UUID) ([]av.CodecData, error) { - return app.Streams.getCodec(streamID) -} - -func (app *Application) updateStreamStatus(streamID uuid.UUID, status bool) error { - return app.Streams.updateStreamStatus(streamID, status) -} - -func (app *Application) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { - return app.Streams.addClient(streamID) -} - -func (app *Application) clientDelete(streamID, clientID uuid.UUID) { - app.Streams.deleteClient(streamID, clientID) + // See https://github.com/gofiber/fiber/security/advisories/GHSA-fmg4-x8pw-hjhg + if app.CorsConfig.AllowCredentials { + for _, v := range app.CorsConfig.AllowOrigins { + if v == "*" { + log.Warn().Str("scope", SCOPE_APP).Str("event", EVENT_APP_CORS_CONFIG).Msg("[CORS] Insecure setup, 'AllowCredentials' is set to true, and 'AllowOrigins' is set to a wildcard. Settings 'AllowCredentials' to be 'false'. See https://github.com/gofiber/fiber/security/advisories/GHSA-fmg4-x8pw-hjhg") + app.CorsConfig.AllowCredentials = false + } + } + } } func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error { app.Streams.Lock() defer app.Streams.Unlock() - stream, ok := app.Streams.Streams[streamID] + stream, ok := app.Streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -221,18 +198,21 @@ func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) err return nil } -func (app *Application) startMP4Cast(streamID uuid.UUID, stopCast chan bool) error { +func (app *Application) startMP4Cast(archive *streamArhive, streamID uuid.UUID, stopCast chan bool) error { + if archive == nil { + return ErrNullArchive + } app.Streams.Lock() defer app.Streams.Unlock() - stream, ok := app.Streams.Streams[streamID] + stream, ok := app.Streams.store[streamID] if !ok { return ErrStreamNotFound } - go func(id uuid.UUID, mp4Chanel chan av.Packet, stop chan bool) { - err := app.startMP4(id, mp4Chanel, stop) + go func(arch *streamArhive, id uuid.UUID, mp4Chanel chan av.Packet, stop chan bool) { + err := app.startMP4(arch, id, mp4Chanel, stop) if err != nil { log.Error().Err(err).Str("scope", "archive").Str("event", "archive_start_cast").Str("stream_id", id.String()).Msg("Error on MP4 cast start") } - }(streamID, stream.mp4Chanel, stopCast) + }(archive, streamID, stream.mp4Chanel, stopCast) return nil } diff --git a/cmd/video_server/conf.json b/cmd/video_server/conf.json index c120b5f..79fbafa 100644 --- a/cmd/video_server/conf.json +++ b/cmd/video_server/conf.json @@ -36,8 +36,7 @@ "allow_origins": ["*"], "allow_methods": ["GET", "PUT", "POST", "DELETE"], "allow_headers": ["Origin", "Authorization", "Content-Type", "Content-Length", "Accept", "Accept-Encoding", "X-HttpRequest"], - "expose_headers": ["Content-Length"], - "allow_credentials":true + "expose_headers": ["Content-Length"] }, "rtsp_streams": [ { diff --git a/errors.go b/errors.go index 0086ec1..939c798 100644 --- a/errors.go +++ b/errors.go @@ -11,4 +11,5 @@ var ( ErrStreamTypeNotExists = fmt.Errorf("stream type does not exists") ErrStreamTypeNotSupported = fmt.Errorf("stream type is not supported") ErrNotSupportedStorage = fmt.Errorf("not supported storage") + ErrNullArchive = fmt.Errorf("archive == nil") ) diff --git a/hls.go b/hls.go index a1c1822..790a4b1 100644 --- a/hls.go +++ b/hls.go @@ -47,7 +47,7 @@ func (app *Application) startHls(streamID uuid.UUID, ch chan av.Packet, stopCast tsMuxer := ts.NewMuxer(outFile) // Write header - codecData, err := app.getCodec(streamID) + codecData, err := app.Streams.GetCodecsDataForStream(streamID) if err != nil { return errors.Wrap(err, streamID.String()) } @@ -202,12 +202,3 @@ func (app *Application) removeOutdatedSegments(streamID uuid.UUID, playlist *m3u } return nil } - -// ensureDir alias to 'mkdir -p' -func ensureDir(dirName string) error { - err := os.MkdirAll(dirName, 0777) - if err == nil || os.IsExist(err) { - return nil - } - return err -} diff --git a/http_server.go b/http_server.go index d52d5b8..2472c5f 100644 --- a/http_server.go +++ b/http_server.go @@ -78,7 +78,7 @@ func ListWrapper(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Cont if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_API_SERVER).Str("event", EVENT_API_REQUEST).Str("method", ctx.Request.Method).Str("route", ctx.Request.URL.Path).Str("remote", ctx.Request.RemoteAddr).Msg("Call streams list") } - allStreamsIDs := app.Streams.getKeys() + allStreamsIDs := app.Streams.GetAllStreamsIDS() ans := StreamsInfoShortenList{ Data: make([]StreamInfoShorten, len(allStreamsIDs)), } @@ -123,7 +123,7 @@ func EnableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Con ctx.JSON(http.StatusBadRequest, gin.H{"Error": errReason}) return } - if exist := app.streamExists(postData.GUID); !exist { + if exist := app.Streams.StreamExists(postData.GUID); !exist { outputTypes := make([]StreamType, 0, len(postData.OutputTypes)) for _, v := range postData.OutputTypes { typ, ok := streamTypeExists(v) @@ -146,7 +146,7 @@ func EnableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Con outputTypes = append(outputTypes, typ) } app.Streams.Lock() - app.Streams.Streams[postData.GUID] = NewStreamConfiguration(postData.URL, outputTypes) + app.Streams.store[postData.GUID] = NewStreamConfiguration(postData.URL, outputTypes) app.Streams.Unlock() app.StartStream(postData.GUID) } @@ -169,9 +169,9 @@ func DisableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Co ctx.JSON(http.StatusBadRequest, gin.H{"Error": errReason}) return } - if exist := app.streamExists(postData.GUID); exist { + if exist := app.Streams.StreamExists(postData.GUID); exist { app.Streams.Lock() - delete(app.Streams.Streams, postData.GUID) + delete(app.Streams.store, postData.GUID) app.Streams.Unlock() } ctx.JSON(200, app) diff --git a/logger.go b/logger.go index 8b60892..cb0a2ef 100644 --- a/logger.go +++ b/logger.go @@ -1,11 +1,14 @@ package videoserver const ( + SCOPE_APP = "app" SCOPE_STREAM = "stream" SCOPE_WS_HANDLER = "ws_handler" SCOPE_API_SERVER = "api_server" SCOPE_WS_SERVER = "ws_server" + EVENT_APP_CORS_CONFIG = "app_cors_config" + EVENT_API_PREPARE = "api_server_prepare" EVENT_API_START = "api_server_start" EVENT_API_CORS_ENABLE = "api_server_cors_enable" diff --git a/mp4.go b/mp4.go index 8a047d5..74252dd 100644 --- a/mp4.go +++ b/mp4.go @@ -16,12 +16,11 @@ import ( "github.com/rs/zerolog/log" ) -func (app *Application) startMP4(streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error { - var err error - archive := app.Streams.getStreamArchive(streamID) +func (app *Application) startMP4(archive *streamArhive, streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error { if archive == nil { - return errors.Wrap(err, "Bad archive stream") + return ErrNullArchive } + var err error err = archive.store.MakeBucket(archive.bucket) if err != nil { return errors.Wrap(err, "Can't prepare bucket") @@ -48,7 +47,7 @@ func (app *Application) startMP4(streamID uuid.UUID, ch chan av.Packet, stopCast } tsMuxer := mp4.NewMuxer(outFile) log.Info().Str("scope", "archive").Str("event", "archive_create_file").Str("stream_id", streamID.String()).Str("segment_path", segmentPath).Msg("Create segment") - codecData, err := app.getCodec(streamID) + codecData, err := app.Streams.GetCodecsDataForStream(streamID) if err != nil { return errors.Wrap(err, streamID.String()) } diff --git a/stream.go b/stream.go index 2eb8104..8de57a6 100644 --- a/stream.go +++ b/stream.go @@ -36,11 +36,11 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_codec_met").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Any("codec_data", session.CodecData).Msg("Found codec. Adding this one") } - app.addCodec(streamID, session.CodecData) + app.Streams.AddCodecForStream(streamID, session.CodecData) if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_status_update").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Update stream status") } - err = app.updateStreamStatus(streamID, true) + err = app.Streams.UpdateStreamStatus(streamID, true) if err != nil { return errors.Wrapf(err, "Can't update status for stream %s", streamID) } @@ -75,12 +75,12 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Need to start casting to MP4 archive") } - archive := app.Streams.getStreamArchive(streamID) + archive := app.Streams.GetStreamArchiveStorage(streamID) if archive == nil { log.Warn().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Empty archive configuration for the given stream") } else { stopMP4Cast = make(chan bool, 1) - err = app.startMP4Cast(streamID, stopMP4Cast) + err = app.startMP4Cast(archive, streamID, stopMP4Cast) if err != nil { if streamVerboseLevel > VERBOSE_NONE { log.Warn().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Can't start MP4 archive process") @@ -101,8 +101,8 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_codec_update_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Any("codec_data", session.CodecData).Msg("Recieved update codec signal") } - app.addCodec(streamID, session.CodecData) - err = app.updateStreamStatus(streamID, true) + app.Streams.AddCodecForStream(streamID, session.CodecData) + err = app.Streams.UpdateStreamStatus(streamID, true) if err != nil { return errors.Wrapf(err, "Can't update status for stream %s", streamID) } @@ -110,7 +110,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_NONE { log.Info().Str("scope", "streaming").Str("event", "stream_stop_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Recieved stop signal") } - err = app.updateStreamStatus(streamID, false) + err = app.Streams.UpdateStreamStatus(streamID, false) if err != nil { errors.Wrapf(err, "Can't switch status to False for stream '%s'", url) } @@ -129,7 +129,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar if streamVerboseLevel > VERBOSE_ADD { log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Casting packet") } - err = app.cast(streamID, *packetAV, hlsEnabled, archiveEnabled) + err = app.Streams.CastPacket(streamID, *packetAV, hlsEnabled, archiveEnabled) if err != nil { if hlsEnabled { if streamVerboseLevel > VERBOSE_NONE { @@ -143,7 +143,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar } stopMP4Cast <- true } - errStatus := app.updateStreamStatus(streamID, false) + errStatus := app.Streams.UpdateStreamStatus(streamID, false) if errStatus != nil { errors.Wrapf(errors.Wrapf(err, "Can't cast packet %s (%s)", streamID, url), "Can't switch status to False for stream '%s'", url) } diff --git a/stream_types.go b/stream_types.go index aba29a8..f2a76ca 100644 --- a/stream_types.go +++ b/stream_types.go @@ -16,9 +16,6 @@ func (iotaIdx StreamType) String() string { } var ( - supportedInputStreamTypes = map[StreamType]struct{}{ - STREAM_TYPE_RTSP: {}, - } supportedOutputStreamTypes = map[StreamType]struct{}{ STREAM_TYPE_HLS: {}, STREAM_TYPE_MSE: {}, diff --git a/streams.go b/streams.go index ac834f4..4aeec38 100644 --- a/streams.go +++ b/streams.go @@ -15,7 +15,7 @@ const ( // StartStreams starts all video streams func (app *Application) StartStreams() { - streamsIDs := app.Streams.getKeys() + streamsIDs := app.Streams.GetAllStreamsIDS() for i := range streamsIDs { app.StartStream(streamsIDs[i]) } @@ -32,16 +32,16 @@ func (app *Application) StartStream(streamID uuid.UUID) { } func (app *Application) RunStream(ctx context.Context, streamID uuid.UUID) error { - url, supportedTypes := app.Streams.GetStream(streamID) + url, supportedTypes := app.Streams.GetStreamInfo(streamID) if url == "" { return ErrStreamNotFound } hlsEnabled := typeExists(STREAM_TYPE_HLS, supportedTypes) - archiveEnabled, err := app.Streams.archiveEnabled(streamID) + archiveEnabled, err := app.Streams.IsArchiveEnabledForStream(streamID) if err != nil { return errors.Wrap(err, "Can't enable archive") } - streamVerboseLevel := app.Streams.getVerboseLevel(streamID) + streamVerboseLevel := app.Streams.GetVerboseLevelForStream(streamID) app.startLoop(ctx, streamID, url, hlsEnabled, archiveEnabled, streamVerboseLevel) return nil } diff --git a/streams_storage.go b/streams_storage.go index 9f1963a..5151ac9 100644 --- a/streams_storage.go +++ b/streams_storage.go @@ -14,66 +14,49 @@ import ( // StreamsStorage Map wrapper for map[uuid.UUID]*StreamConfiguration with mutex for concurrent usage type StreamsStorage struct { sync.RWMutex - Streams map[uuid.UUID]*StreamConfiguration `json:"rtsp_streams"` + store map[uuid.UUID]*StreamConfiguration } // NewStreamsStorageDefault prepares new allocated storage func NewStreamsStorageDefault() StreamsStorage { - return StreamsStorage{Streams: make(map[uuid.UUID]*StreamConfiguration)} + return StreamsStorage{store: make(map[uuid.UUID]*StreamConfiguration)} } -func (sm *StreamsStorage) GetStream(id uuid.UUID) (string, []StreamType) { - sm.Lock() - defer sm.Unlock() - stream, ok := sm.Streams[id] +// GetStreamInfo returns stream URL and its supported output types +func (streams *StreamsStorage) GetStreamInfo(streamID uuid.UUID) (string, []StreamType) { + streams.Lock() + defer streams.Unlock() + stream, ok := streams.store[streamID] if !ok { return "", []StreamType{} } return stream.URL, stream.SupportedOutputTypes } -// getKeys returns all storage streams' keys as slice -func (sm *StreamsStorage) getKeys() []uuid.UUID { - sm.Lock() - defer sm.Unlock() - keys := make([]uuid.UUID, 0, len(sm.Streams)) - for k := range sm.Streams { +// GetAllStreamsIDS returns all storage streams' keys as slice +func (streams *StreamsStorage) GetAllStreamsIDS() []uuid.UUID { + streams.Lock() + defer streams.Unlock() + keys := make([]uuid.UUID, 0, len(streams.store)) + for k := range streams.store { keys = append(keys, k) } return keys } -func (streams *StreamsStorage) archiveEnabled(streamID uuid.UUID) (bool, error) { - streams.RLock() - defer streams.RUnlock() - stream, ok := streams.Streams[streamID] - if !ok { - return false, ErrStreamNotFound - } - return stream.archive != nil, nil -} - -func (streams *StreamsStorage) getVerboseLevel(streamID uuid.UUID) VerboseLevel { - streams.RLock() - defer streams.RUnlock() - stream, ok := streams.Streams[streamID] - if !ok { - return VERBOSE_NONE - } - return stream.verboseLevel -} - -func (streams *StreamsStorage) streamExists(streamID uuid.UUID) bool { +// StreamExists checks whenever given stream ID exists in storage +func (streams *StreamsStorage) StreamExists(streamID uuid.UUID) bool { streams.RLock() defer streams.RUnlock() - _, ok := streams.Streams[streamID] + _, ok := streams.store[streamID] return ok } -func (streams *StreamsStorage) existsWithType(streamID uuid.UUID, streamType StreamType) bool { +// TypeExistsForStream checks whenever specific stream ID supports then given output stream type +func (streams *StreamsStorage) TypeExistsForStream(streamID uuid.UUID, streamType StreamType) bool { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return false } @@ -82,10 +65,11 @@ func (streams *StreamsStorage) existsWithType(streamID uuid.UUID, streamType Str return ok && typeEnabled } -func (streams *StreamsStorage) addCodec(streamID uuid.UUID, codecs []av.CodecData) { +// AddCodecForStream appends new codecs data for the given stream +func (streams *StreamsStorage) AddCodecForStream(streamID uuid.UUID, codecs []av.CodecData) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return } @@ -95,10 +79,11 @@ func (streams *StreamsStorage) addCodec(streamID uuid.UUID, codecs []av.CodecDat } } -func (streams *StreamsStorage) getCodec(streamID uuid.UUID) ([]av.CodecData, error) { +// GetCodecsDataForStream returns COPY of codecs data for the given stream +func (streams *StreamsStorage) GetCodecsDataForStream(streamID uuid.UUID) ([]av.CodecData, error) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return nil, ErrStreamNotFound } @@ -114,10 +99,11 @@ func (streams *StreamsStorage) getCodec(streamID uuid.UUID) ([]av.CodecData, err return codecs, nil } -func (streams *StreamsStorage) updateStreamStatus(streamID uuid.UUID, status bool) error { +// UpdateStreamStatus sets new status value for the given stream +func (streams *StreamsStorage) UpdateStreamStatus(streamID uuid.UUID, status bool) error { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -128,10 +114,11 @@ func (streams *StreamsStorage) updateStreamStatus(streamID uuid.UUID, status boo return nil } -func (streams *StreamsStorage) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { +// AddViewer adds client to the given stream. Return newly client ID, buffered channel for stream on success +func (streams *StreamsStorage) AddViewer(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return uuid.UUID{}, nil, ErrStreamNotFound } @@ -147,10 +134,11 @@ func (streams *StreamsStorage) addClient(streamID uuid.UUID) (uuid.UUID, chan av return clientID, ch, nil } -func (streams *StreamsStorage) deleteClient(streamID, clientID uuid.UUID) { +// DeleteViewer removes given client from the stream +func (streams *StreamsStorage) DeleteViewer(streamID, clientID uuid.UUID) { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return } @@ -160,10 +148,11 @@ func (streams *StreamsStorage) deleteClient(streamID, clientID uuid.UUID) { delete(stream.Clients, clientID) } -func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error { +// CastPacket cast AV Packet to viewers and possible to HLS/MP4 channels +func (streams *StreamsStorage) CastPacket(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -184,10 +173,33 @@ func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnable return nil } -func (streams *StreamsStorage) setArchiveStream(streamID uuid.UUID, archiveStorage *streamArhive) error { +// GetVerboseLevelForStream returst verbose level for the given stream +func (streams *StreamsStorage) GetVerboseLevelForStream(streamID uuid.UUID) VerboseLevel { + streams.RLock() + defer streams.RUnlock() + stream, ok := streams.store[streamID] + if !ok { + return VERBOSE_NONE + } + return stream.verboseLevel +} + +// IsArchiveEnabledForStream returns whenever archive has been enabled for stream +func (streams *StreamsStorage) IsArchiveEnabledForStream(streamID uuid.UUID) (bool, error) { + streams.RLock() + defer streams.RUnlock() + stream, ok := streams.store[streamID] + if !ok { + return false, ErrStreamNotFound + } + return stream.archive != nil, nil +} + +// UpdateArchiveStorageForStream updates archive storage configuration (it override existing one!) +func (streams *StreamsStorage) UpdateArchiveStorageForStream(streamID uuid.UUID, archiveStorage *streamArhive) error { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return ErrStreamNotFound } @@ -195,10 +207,11 @@ func (streams *StreamsStorage) setArchiveStream(streamID uuid.UUID, archiveStora return nil } -func (streams *StreamsStorage) getStreamArchive(streamID uuid.UUID) *streamArhive { +// GetStreamArchiveStorage returns pointer to the archive storage for the given stream +func (streams *StreamsStorage) GetStreamArchiveStorage(streamID uuid.UUID) *streamArhive { streams.Lock() defer streams.Unlock() - stream, ok := streams.Streams[streamID] + stream, ok := streams.store[streamID] if !ok { return nil } diff --git a/utils.go b/utils.go index 22f9153..ab376e8 100644 --- a/utils.go +++ b/utils.go @@ -1,6 +1,7 @@ package videoserver import ( + "os" "reflect" "sync" "sync/atomic" @@ -37,3 +38,12 @@ func readerCount(rw *sync.RWMutex) int64 { // func readerCount(rw *sync.RWMutex) int64 { // return reflect.ValueOf(rw).Elem().FieldByName("readerCount").Int() // } + +// ensureDir alias to 'mkdir -p' +func ensureDir(dirName string) error { + err := os.MkdirAll(dirName, 0777) + if err == nil || os.IsExist(err) { + return nil + } + return err +} diff --git a/ws_handler.go b/ws_handler.go index 4cf2d09..937166d 100644 --- a/ws_handler.go +++ b/ws_handler.go @@ -19,6 +19,9 @@ var ( // wshandler is a websocket handler for user connection func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Request, app *Application, verboseLevel VerboseLevel) { + var streamID, clientID uuid.UUID + var mseExists, clientAdded bool + streamIDSTR := r.FormValue("stream_id") if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Msg("MSE Connected") @@ -36,10 +39,16 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Msg("Connection has been closed") } + if mseExists && clientAdded { + app.Streams.DeleteViewer(streamID, clientID) + if verboseLevel > VERBOSE_SIMPLE { + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Client has been removed") + } + } conn.Close() }() - streamID, err := uuid.Parse(streamIDSTR) + streamID, err = uuid.Parse(streamIDSTR) if err != nil { errReason := fmt.Sprintf("Not valid UUID: '%s'", streamIDSTR) if verboseLevel > VERBOSE_NONE { @@ -48,7 +57,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - mseExists := app.existsWithType(streamID, STREAM_TYPE_MSE) + mseExists = app.Streams.TypeExistsForStream(streamID, STREAM_TYPE_MSE) if verboseLevel > VERBOSE_SIMPLE { log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Bool("mse_exists", mseExists).Msg("Validate stream type") } @@ -62,7 +71,7 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - cuuid, ch, err := app.addClient(streamID) + clientID, ch, err := app.Streams.AddViewer(streamID) if err != nil { errReason := "Can't add client to the queue" if verboseLevel > VERBOSE_NONE { @@ -71,28 +80,28 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re closeWSwithError(conn, 1011, errReason) return } - defer app.clientDelete(streamID, cuuid) + clientAdded = true if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Client has been added") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Client has been added") } - codecData, err := app.getCodec(streamID) + codecData, err := app.Streams.GetCodecsDataForStream(streamID) if err != nil { errReason := "Can't extract codec for stream" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Msg("Validate codecs") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Msg("Validate codecs") } if len(codecData) == 0 { errReason := "No codec information" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return @@ -102,75 +111,75 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if err != nil { errReason := "Can't write codec information to the header" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Msg("Write header to muxer") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Msg("Write header to muxer") } meta, init := muxer.GetInit(codecData) if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Get meta information") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Get meta information") } err = conn.WriteMessage(websocket.BinaryMessage, append([]byte{9}, meta...)) if err != nil { errReason := "Can't write meta information" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send meta information") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send meta information") } err = conn.WriteMessage(websocket.BinaryMessage, init) if err != nil { errReason := "Can't write initialization information" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("remote_addr", r.RemoteAddr).Str("event", EVENT_WS_UPGRADER).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send initialization message") + log.Info().Str("remote_addr", r.RemoteAddr).Str("event", EVENT_WS_UPGRADER).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg("Send initialization message") } var start bool quitCh := make(chan bool) rxPingCh := make(chan bool) - go func(q, p chan bool) { + go func(quit, ping chan bool) { if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Start loop in goroutine") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Start loop in goroutine") } for { msgType, data, err := conn.ReadMessage() if err != nil { - q <- true + quit <- true errReason := "Can't read message" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Read message in a loop") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Read message in a loop") } if msgType == websocket.TextMessage && len(data) > 0 && string(data) == "ping" { select { - case p <- true: - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been sent") + case ping <- true: + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been sent") // message sent default: // message dropped - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been dropped") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Int("message_type", msgType).Int("data_len", len(data)).Msg("Message has been dropped") } } } @@ -179,47 +188,47 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re noKeyFrames := time.NewTimer(keyFramesTimeout) if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Start loop") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Start loop") } for { select { case <-noKeyFrames.C: if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("No keyframes has been met") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("No keyframes has been met") } return case <-quitCh: if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Quit") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Quit") } return case <-rxPingCh: if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("'Ping' has been recieved") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("'Ping' has been recieved") } err := conn.WriteMessage(websocket.TextMessage, []byte("pong")) if err != nil { errReason := "Can't write PONG message" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("codecs", codecData).Str("meta", meta).Any("init", init).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } case pck := <-ch: if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Packet has been recieved from stream source") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Packet has been recieved from stream source") } if pck.IsKeyFrame { if verboseLevel > VERBOSE_SIMPLE { - log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Packet is a keyframe") + log.Info().Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Packet is a keyframe") } noKeyFrames.Reset(keyFramesTimeout) start = true } if !start { if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Msg("Stream has not been started") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Msg("Stream has not been started") } continue } @@ -227,23 +236,23 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if err != nil { errReason := "Can't write packet to the muxer" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("packet_len", len(pck.Data)).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("packet_len", len(pck.Data)).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write packet to the muxer") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write packet to the muxer") } if ready { if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Muxer is ready to write another packet") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Muxer is ready to write another packet") } err = conn.SetWriteDeadline(time.Now().Add(deadlineTimeout)) if err != nil { errReason := "Can't set new deadline" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return @@ -252,13 +261,13 @@ func wshandler(wsUpgrader *websocket.Upgrader, w http.ResponseWriter, r *http.Re if err != nil { errReason := "Can't write buffered message" if verboseLevel > VERBOSE_NONE { - log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) + log.Error().Err(err).Str("scope", SCOPE_WS_HANDLER).Str("event", EVENT_WS_UPGRADER).Str("event", "ping").Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Any("packet_len", len(pck.Data)).Bool("ready", ready).Int("buf_len", len(buf)).Msg(errReason) } closeWSwithError(conn, 1011, errReason) return } if verboseLevel > VERBOSE_ADD { - log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", cuuid.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write buffer to the client") + log.Info().Str("remote_addr", r.RemoteAddr).Str("stream_id", streamIDSTR).Str("client_id", clientID.String()).Bool("ready", ready).Int("buf_len", len(buf)).Msg("Write buffer to the client") } } }