Skip to content

Commit

Permalink
Merge pull request #44 from LdDl/structure
Browse files Browse the repository at this point in the history
Code structure changes + CORS Security issue
  • Loading branch information
LdDl authored Sep 16, 2024
2 parents 90a2ec5 + 36c14e0 commit 2c14f8e
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 169 deletions.
62 changes: 21 additions & 41 deletions application.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) {
outputTypes = append(outputTypes, typ)
}

tmp.Streams.Streams[validUUID] = NewStreamConfiguration(rtspStream.URL, outputTypes)
tmp.Streams.Streams[validUUID].verboseLevel = NewVerboseLevelFrom(rtspStream.Verbose)
tmp.Streams.store[validUUID] = NewStreamConfiguration(rtspStream.URL, outputTypes)
tmp.Streams.store[validUUID].verboseLevel = NewVerboseLevelFrom(rtspStream.Verbose)
if rtspStream.Archive.Enabled && cfg.ArchiveCfg.Enabled {
if rtspStream.Archive.MsPerSegment == 0 {
return nil, fmt.Errorf("bad ms per segment archive stream")
Expand Down Expand Up @@ -150,7 +150,7 @@ func NewApplication(cfg *configuration.Configuration) (*Application, error) {
default:
return nil, fmt.Errorf("unsupported archive type")
}
err = tmp.Streams.setArchiveStream(validUUID, &archiveStorage)
err = tmp.Streams.UpdateArchiveStorageForStream(validUUID, &archiveStorage)
if err != nil {
return nil, errors.Wrap(err, "can't set archive for given stream")
}
Expand All @@ -171,44 +171,21 @@ func (app *Application) setCors(cfg configuration.CORSConfiguration) {
}
app.CorsConfig.ExposeHeaders = cfg.ExposeHeaders
app.CorsConfig.AllowCredentials = cfg.AllowCredentials
}

func (app *Application) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled, archiveEnabled bool) error {
return app.Streams.cast(streamID, pck, hlsEnabled, archiveEnabled)
}

func (app *Application) streamExists(streamID uuid.UUID) bool {
return app.Streams.streamExists(streamID)
}

func (app *Application) existsWithType(streamID uuid.UUID, streamType StreamType) bool {
return app.Streams.existsWithType(streamID, streamType)
}

func (app *Application) addCodec(streamID uuid.UUID, codecs []av.CodecData) {
app.Streams.addCodec(streamID, codecs)
}

func (app *Application) getCodec(streamID uuid.UUID) ([]av.CodecData, error) {
return app.Streams.getCodec(streamID)
}

func (app *Application) updateStreamStatus(streamID uuid.UUID, status bool) error {
return app.Streams.updateStreamStatus(streamID, status)
}

func (app *Application) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) {
return app.Streams.addClient(streamID)
}

func (app *Application) clientDelete(streamID, clientID uuid.UUID) {
app.Streams.deleteClient(streamID, clientID)
// See https://github.com/gofiber/fiber/security/advisories/GHSA-fmg4-x8pw-hjhg
if app.CorsConfig.AllowCredentials {
for _, v := range app.CorsConfig.AllowOrigins {
if v == "*" {
log.Warn().Str("scope", SCOPE_APP).Str("event", EVENT_APP_CORS_CONFIG).Msg("[CORS] Insecure setup, 'AllowCredentials' is set to true, and 'AllowOrigins' is set to a wildcard. Settings 'AllowCredentials' to be 'false'. See https://github.com/gofiber/fiber/security/advisories/GHSA-fmg4-x8pw-hjhg")
app.CorsConfig.AllowCredentials = false
}
}
}
}

func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error {
app.Streams.Lock()
defer app.Streams.Unlock()
stream, ok := app.Streams.Streams[streamID]
stream, ok := app.Streams.store[streamID]
if !ok {
return ErrStreamNotFound
}
Expand All @@ -221,18 +198,21 @@ func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) err
return nil
}

func (app *Application) startMP4Cast(streamID uuid.UUID, stopCast chan bool) error {
func (app *Application) startMP4Cast(archive *streamArhive, streamID uuid.UUID, stopCast chan bool) error {
if archive == nil {
return ErrNullArchive
}
app.Streams.Lock()
defer app.Streams.Unlock()
stream, ok := app.Streams.Streams[streamID]
stream, ok := app.Streams.store[streamID]
if !ok {
return ErrStreamNotFound
}
go func(id uuid.UUID, mp4Chanel chan av.Packet, stop chan bool) {
err := app.startMP4(id, mp4Chanel, stop)
go func(arch *streamArhive, id uuid.UUID, mp4Chanel chan av.Packet, stop chan bool) {
err := app.startMP4(arch, id, mp4Chanel, stop)
if err != nil {
log.Error().Err(err).Str("scope", "archive").Str("event", "archive_start_cast").Str("stream_id", id.String()).Msg("Error on MP4 cast start")
}
}(streamID, stream.mp4Chanel, stopCast)
}(archive, streamID, stream.mp4Chanel, stopCast)
return nil
}
3 changes: 1 addition & 2 deletions cmd/video_server/conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
"allow_origins": ["*"],
"allow_methods": ["GET", "PUT", "POST", "DELETE"],
"allow_headers": ["Origin", "Authorization", "Content-Type", "Content-Length", "Accept", "Accept-Encoding", "X-HttpRequest"],
"expose_headers": ["Content-Length"],
"allow_credentials":true
"expose_headers": ["Content-Length"]
},
"rtsp_streams": [
{
Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ var (
ErrStreamTypeNotExists = fmt.Errorf("stream type does not exists")
ErrStreamTypeNotSupported = fmt.Errorf("stream type is not supported")
ErrNotSupportedStorage = fmt.Errorf("not supported storage")
ErrNullArchive = fmt.Errorf("archive == nil")
)
11 changes: 1 addition & 10 deletions hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (app *Application) startHls(streamID uuid.UUID, ch chan av.Packet, stopCast
tsMuxer := ts.NewMuxer(outFile)

// Write header
codecData, err := app.getCodec(streamID)
codecData, err := app.Streams.GetCodecsDataForStream(streamID)
if err != nil {
return errors.Wrap(err, streamID.String())
}
Expand Down Expand Up @@ -202,12 +202,3 @@ func (app *Application) removeOutdatedSegments(streamID uuid.UUID, playlist *m3u
}
return nil
}

// ensureDir alias to 'mkdir -p'
func ensureDir(dirName string) error {
err := os.MkdirAll(dirName, 0777)
if err == nil || os.IsExist(err) {
return nil
}
return err
}
10 changes: 5 additions & 5 deletions http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func ListWrapper(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Cont
if verboseLevel > VERBOSE_SIMPLE {
log.Info().Str("scope", SCOPE_API_SERVER).Str("event", EVENT_API_REQUEST).Str("method", ctx.Request.Method).Str("route", ctx.Request.URL.Path).Str("remote", ctx.Request.RemoteAddr).Msg("Call streams list")
}
allStreamsIDs := app.Streams.getKeys()
allStreamsIDs := app.Streams.GetAllStreamsIDS()
ans := StreamsInfoShortenList{
Data: make([]StreamInfoShorten, len(allStreamsIDs)),
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func EnableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Con
ctx.JSON(http.StatusBadRequest, gin.H{"Error": errReason})
return
}
if exist := app.streamExists(postData.GUID); !exist {
if exist := app.Streams.StreamExists(postData.GUID); !exist {
outputTypes := make([]StreamType, 0, len(postData.OutputTypes))
for _, v := range postData.OutputTypes {
typ, ok := streamTypeExists(v)
Expand All @@ -146,7 +146,7 @@ func EnableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Con
outputTypes = append(outputTypes, typ)
}
app.Streams.Lock()
app.Streams.Streams[postData.GUID] = NewStreamConfiguration(postData.URL, outputTypes)
app.Streams.store[postData.GUID] = NewStreamConfiguration(postData.URL, outputTypes)
app.Streams.Unlock()
app.StartStream(postData.GUID)
}
Expand All @@ -169,9 +169,9 @@ func DisableCamera(app *Application, verboseLevel VerboseLevel) func(ctx *gin.Co
ctx.JSON(http.StatusBadRequest, gin.H{"Error": errReason})
return
}
if exist := app.streamExists(postData.GUID); exist {
if exist := app.Streams.StreamExists(postData.GUID); exist {
app.Streams.Lock()
delete(app.Streams.Streams, postData.GUID)
delete(app.Streams.store, postData.GUID)
app.Streams.Unlock()
}
ctx.JSON(200, app)
Expand Down
3 changes: 3 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package videoserver

const (
SCOPE_APP = "app"
SCOPE_STREAM = "stream"
SCOPE_WS_HANDLER = "ws_handler"
SCOPE_API_SERVER = "api_server"
SCOPE_WS_SERVER = "ws_server"

EVENT_APP_CORS_CONFIG = "app_cors_config"

EVENT_API_PREPARE = "api_server_prepare"
EVENT_API_START = "api_server_start"
EVENT_API_CORS_ENABLE = "api_server_cors_enable"
Expand Down
9 changes: 4 additions & 5 deletions mp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ import (
"github.com/rs/zerolog/log"
)

func (app *Application) startMP4(streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error {
var err error
archive := app.Streams.getStreamArchive(streamID)
func (app *Application) startMP4(archive *streamArhive, streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error {
if archive == nil {
return errors.Wrap(err, "Bad archive stream")
return ErrNullArchive
}
var err error
err = archive.store.MakeBucket(archive.bucket)
if err != nil {
return errors.Wrap(err, "Can't prepare bucket")
Expand All @@ -48,7 +47,7 @@ func (app *Application) startMP4(streamID uuid.UUID, ch chan av.Packet, stopCast
}
tsMuxer := mp4.NewMuxer(outFile)
log.Info().Str("scope", "archive").Str("event", "archive_create_file").Str("stream_id", streamID.String()).Str("segment_path", segmentPath).Msg("Create segment")
codecData, err := app.getCodec(streamID)
codecData, err := app.Streams.GetCodecsDataForStream(streamID)
if err != nil {
return errors.Wrap(err, streamID.String())
}
Expand Down
18 changes: 9 additions & 9 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", "streaming").Str("event", "stream_codec_met").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Any("codec_data", session.CodecData).Msg("Found codec. Adding this one")
}
app.addCodec(streamID, session.CodecData)
app.Streams.AddCodecForStream(streamID, session.CodecData)
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", "streaming").Str("event", "stream_status_update").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Update stream status")
}
err = app.updateStreamStatus(streamID, true)
err = app.Streams.UpdateStreamStatus(streamID, true)
if err != nil {
return errors.Wrapf(err, "Can't update status for stream %s", streamID)
}
Expand Down Expand Up @@ -75,12 +75,12 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Need to start casting to MP4 archive")
}
archive := app.Streams.getStreamArchive(streamID)
archive := app.Streams.GetStreamArchiveStorage(streamID)
if archive == nil {
log.Warn().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Empty archive configuration for the given stream")
} else {
stopMP4Cast = make(chan bool, 1)
err = app.startMP4Cast(streamID, stopMP4Cast)
err = app.startMP4Cast(archive, streamID, stopMP4Cast)
if err != nil {
if streamVerboseLevel > VERBOSE_NONE {
log.Warn().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Can't start MP4 archive process")
Expand All @@ -101,16 +101,16 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", "streaming").Str("event", "stream_codec_update_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Any("codec_data", session.CodecData).Msg("Recieved update codec signal")
}
app.addCodec(streamID, session.CodecData)
err = app.updateStreamStatus(streamID, true)
app.Streams.AddCodecForStream(streamID, session.CodecData)
err = app.Streams.UpdateStreamStatus(streamID, true)
if err != nil {
return errors.Wrapf(err, "Can't update status for stream %s", streamID)
}
case rtspv2.SignalStreamRTPStop:
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", "streaming").Str("event", "stream_stop_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Recieved stop signal")
}
err = app.updateStreamStatus(streamID, false)
err = app.Streams.UpdateStreamStatus(streamID, false)
if err != nil {
errors.Wrapf(err, "Can't switch status to False for stream '%s'", url)
}
Expand All @@ -129,7 +129,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
if streamVerboseLevel > VERBOSE_ADD {
log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Casting packet")
}
err = app.cast(streamID, *packetAV, hlsEnabled, archiveEnabled)
err = app.Streams.CastPacket(streamID, *packetAV, hlsEnabled, archiveEnabled)
if err != nil {
if hlsEnabled {
if streamVerboseLevel > VERBOSE_NONE {
Expand All @@ -143,7 +143,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
}
stopMP4Cast <- true
}
errStatus := app.updateStreamStatus(streamID, false)
errStatus := app.Streams.UpdateStreamStatus(streamID, false)
if errStatus != nil {
errors.Wrapf(errors.Wrapf(err, "Can't cast packet %s (%s)", streamID, url), "Can't switch status to False for stream '%s'", url)
}
Expand Down
3 changes: 0 additions & 3 deletions stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ func (iotaIdx StreamType) String() string {
}

var (
supportedInputStreamTypes = map[StreamType]struct{}{
STREAM_TYPE_RTSP: {},
}
supportedOutputStreamTypes = map[StreamType]struct{}{
STREAM_TYPE_HLS: {},
STREAM_TYPE_MSE: {},
Expand Down
8 changes: 4 additions & 4 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (

// StartStreams starts all video streams
func (app *Application) StartStreams() {
streamsIDs := app.Streams.getKeys()
streamsIDs := app.Streams.GetAllStreamsIDS()
for i := range streamsIDs {
app.StartStream(streamsIDs[i])
}
Expand All @@ -32,16 +32,16 @@ func (app *Application) StartStream(streamID uuid.UUID) {
}

func (app *Application) RunStream(ctx context.Context, streamID uuid.UUID) error {
url, supportedTypes := app.Streams.GetStream(streamID)
url, supportedTypes := app.Streams.GetStreamInfo(streamID)
if url == "" {
return ErrStreamNotFound
}
hlsEnabled := typeExists(STREAM_TYPE_HLS, supportedTypes)
archiveEnabled, err := app.Streams.archiveEnabled(streamID)
archiveEnabled, err := app.Streams.IsArchiveEnabledForStream(streamID)
if err != nil {
return errors.Wrap(err, "Can't enable archive")
}
streamVerboseLevel := app.Streams.getVerboseLevel(streamID)
streamVerboseLevel := app.Streams.GetVerboseLevelForStream(streamID)
app.startLoop(ctx, streamID, url, hlsEnabled, archiveEnabled, streamVerboseLevel)
return nil
}
Expand Down
Loading

0 comments on commit 2c14f8e

Please sign in to comment.