Skip to content

Commit

Permalink
Merge pull request #48 from LdDl/structure
Browse files Browse the repository at this point in the history
Lock problem
  • Loading branch information
LdDl authored Sep 30, 2024
2 parents b3bb327 + 4995a23 commit c0e0239
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 170 deletions.
37 changes: 0 additions & 37 deletions application.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"

"github.com/deepch/vdk/av"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -181,39 +180,3 @@ func (app *Application) setCors(cfg configuration.CORSConfiguration) {
}
}
}

func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan bool) error {
app.Streams.Lock()
defer app.Streams.Unlock()
stream, ok := app.Streams.store[streamID]
if !ok {
return ErrStreamNotFound
}
go func(id uuid.UUID, hlsChanel chan av.Packet, stop chan bool) {
err := app.startHls(id, hlsChanel, stop)
if err != nil {
log.Error().Err(err).Str("scope", SCOPE_HLS).Str("event", EVENT_HLS_START_CAST).Str("stream_id", id.String()).Msg("Error on HLS cast start")
}
}(streamID, stream.hlsChanel, stopCast)
return nil
}

func (app *Application) startMP4Cast(archive *StreamArchiveWrapper, streamID uuid.UUID, stopCast chan bool, streamVerboseLevel VerboseLevel) error {
if archive == nil {
return ErrNullArchive
}
app.Streams.Lock()
defer app.Streams.Unlock()
stream, ok := app.Streams.store[streamID]
if !ok {
return ErrStreamNotFound
}
channel := stream.mp4Chanel
go func(arch *StreamArchiveWrapper, id uuid.UUID, mp4Chanel chan av.Packet, stop chan bool, verbose VerboseLevel) {
err := app.startMP4(arch, id, mp4Chanel, stop, verbose)
if err != nil {
log.Error().Err(err).Str("scope", SCOPE_ARCHIVE).Str("event", EVENT_ARCHIVE_START_CAST).Str("stream_id", id.String()).Msg("Error on MP4 cast start")
}
}(archive, streamID, channel, stopCast, streamVerboseLevel)
return nil
}
3 changes: 2 additions & 1 deletion cmd/video_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ var (
)

func init() {
zerolog.TimeFieldFormat = time.RFC3339
zerolog.TimeFieldFormat = time.RFC3339Nano
zerolog.DurationFieldUnit = time.Second
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// startHls starts routine to create m3u8 playlists
func (app *Application) startHls(streamID uuid.UUID, ch chan av.Packet, stopCast chan bool) error {
func (app *Application) startHls(streamID uuid.UUID, ch chan av.Packet, stopCast chan StopSignal) error {
err := ensureDir(app.HLS.Directory)
if err != nil {
return errors.Wrap(err, "Can't create directory for HLS temporary files")
Expand Down
1 change: 1 addition & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
EVENT_STREAMING_STATUS_UPDATE = "streaming_status_update"
EVENT_STREAMING_PACKET_SIGNAL = "streaming_packet_signal"
EVENT_STREAMING_STOP_SIGNAL = "streaming_stop_signal"
EVENT_STREAMING_UNKNOWN_SIGNAL = "streaming_unknown_signal"
EVENT_STREAMING_CODEC_UPDATE_SIGNAL = "streaming_codec_update_signal"
EVENT_STREAMING_EXIT_SIGNAL = "streaming_codec_exit_signal"
EVENT_STREAMING_CODEC_MET = "streaming_codec_met"
Expand Down
243 changes: 159 additions & 84 deletions mp4.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions storage/archive_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package storage

import (
"context"
"io"
)

type ArchiveUnit struct {
Payload io.Reader
Bucket string
SegmentName string
FileName string
}

type ArchiveStorage interface {
Expand Down
15 changes: 3 additions & 12 deletions storage/minio.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package storage

import (
"bytes"
"context"
"fmt"
"io"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/lifecycle"
Expand Down Expand Up @@ -50,25 +48,18 @@ func (m *MinioProvider) MakeBucket(bucket string) error {
return nil
}

// UploadFile loads file to MinIO. Do not provide FileName field in ArchiveUnit object if you want to use Payload bytes; otherwise file will be loaded from filesystem by FileName field
func (m *MinioProvider) UploadFile(ctx context.Context, object ArchiveUnit) (string, error) {
fname := fmt.Sprintf("%s/%s", m.Path, object.SegmentName)

buf := &bytes.Buffer{}

size, err := io.Copy(buf, object.Payload)
if err != nil {
return "", err
}
bucket := m.DefaultBucket
if object.Bucket != "" {
bucket = object.Bucket
}
_, err = m.client.PutObject(
_, err := m.client.FPutObject(
ctx,
bucket,
fname,
buf,
size,
object.FileName,
minio.PutObjectOptions{
ContentType: "application/octet-stream",
},
Expand Down
78 changes: 64 additions & 14 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,28 @@ const (
readTimeoutDuration = 33 * time.Second
)

type StopSignal uint8

const (
STOP_SIGNAL_ERR = StopSignal(iota)
STOP_SIGNAL_NO_VIDEO
STOP_SIGNAL_DISCONNECT
STOP_SIGNAL_STOP_DIAL
)

// runStream runs RTSP grabbing process
func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, archiveEnabled bool, streamVerboseLevel VerboseLevel) error {
var stopHlsCast, stopMP4Cast chan StopSignal

if hlsEnabled {
stopHlsCast = make(chan StopSignal, 1)
}
if archiveEnabled {
stopMP4Cast = make(chan StopSignal, 1)
}

errorSignal := make(chan error, 1)

if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_DIAL).Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Trying to dial")
}
Expand All @@ -31,18 +51,33 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
if err != nil {
return errors.Wrapf(err, "Can't connect to stream '%s'", url)
}
defer session.Close()
defer func() {
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_DIAL).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Closing connection")
}
if hlsEnabled {
stopHlsCast <- STOP_SIGNAL_STOP_DIAL
}
if archiveEnabled {
stopMP4Cast <- STOP_SIGNAL_STOP_DIAL
}
session.Close()
}()

if len(session.CodecData) != 0 {
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_CODEC_MET).Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Any("codec_data", session.CodecData).Msg("Found codec. Adding this one")
}
app.Streams.AddCodecForStream(streamID, session.CodecData)
err = app.Streams.AddCodecForStream(streamID, session.CodecData)
if err != nil {
return errors.Wrapf(err, "Can't update codec data for stream %s on empty codecs", streamID)
}
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_STATUS_UPDATE).Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Update stream status")
}
err = app.Streams.UpdateStreamStatus(streamID, true)
if err != nil {
return errors.Wrapf(err, "Can't update status for stream %s", streamID)
return errors.Wrapf(err, "Can't update status for stream %s on empty codecs", streamID)
}
}

Expand All @@ -56,12 +91,10 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
}
}

var stopHlsCast chan bool
if hlsEnabled {
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_HLS_CAST).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Need to start casting for HLS")
}
stopHlsCast = make(chan bool, 1)
err = app.startHlsCast(streamID, stopHlsCast)
if err != nil {
if streamVerboseLevel > VERBOSE_NONE {
Expand All @@ -70,7 +103,6 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
}
}

var stopMP4Cast chan bool
if archiveEnabled {
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_MP4_CAST).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Need to start casting to MP4 archive")
Expand All @@ -79,8 +111,7 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
if archive == nil {
log.Warn().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_MP4_CAST).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Empty archive configuration for the given stream")
} else {
stopMP4Cast = make(chan bool, 1)
err = app.startMP4Cast(archive, streamID, stopMP4Cast, streamVerboseLevel)
err = app.startMP4Cast(archive, streamID, stopMP4Cast, errorSignal, streamVerboseLevel)
if err != nil {
if streamVerboseLevel > VERBOSE_NONE {
log.Warn().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_MP4_CAST).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Can't start MP4 archive process")
Expand All @@ -94,27 +125,44 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
select {
case <-pingStream.C:
log.Error().Err(ErrStreamHasNoVideo).Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_EXIT_SIGNAL).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream has no video")
if hlsEnabled {
stopHlsCast <- STOP_SIGNAL_NO_VIDEO
}
if archiveEnabled {
stopMP4Cast <- STOP_SIGNAL_NO_VIDEO
}
return errors.Wrapf(ErrStreamHasNoVideo, "URL is '%s'", url)
case signals := <-session.Signals:
switch signals {
case rtspv2.SignalCodecUpdate:
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_CODEC_UPDATE_SIGNAL).Str("stream_id", streamID.String()).Str("stream_url", url).Any("codec_data", session.CodecData).Msg("Recieved update codec signal")
}
app.Streams.AddCodecForStream(streamID, session.CodecData)
err = app.Streams.AddCodecForStream(streamID, session.CodecData)
if err != nil {
return errors.Wrapf(err, "Can't update codec data for stream %s on codecs update signal", streamID)
}
err = app.Streams.UpdateStreamStatus(streamID, true)
if err != nil {
return errors.Wrapf(err, "Can't update status for stream %s", streamID)
return errors.Wrapf(err, "Can't update status for stream %s after codecs update", streamID)
}
case rtspv2.SignalStreamRTPStop:
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_STOP_SIGNAL).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Recieved stop signal")
}
if hlsEnabled {
stopHlsCast <- STOP_SIGNAL_DISCONNECT
}
if archiveEnabled {
stopMP4Cast <- STOP_SIGNAL_DISCONNECT
}
err = app.Streams.UpdateStreamStatus(streamID, false)
if err != nil {
errors.Wrapf(err, "Can't switch status to False for stream '%s'", url)
return errors.Wrapf(err, "Can't update status for stream %s after RTP stops", streamID)
}
return errors.Wrapf(ErrStreamDisconnected, "URL is '%s'", url)
default:
log.Info().Str("warn", SCOPE_STREAMING).Str("event", EVENT_STREAMING_UNKNOWN_SIGNAL).Str("stream_id", streamID.String()).Str("stream_url", url).Int("signal", signals).Msg("Other signal")
}
case packetAV := <-session.OutgoingPacketQueue:
if streamVerboseLevel > VERBOSE_ADD {
Expand All @@ -135,20 +183,22 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled, ar
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_PACKET_SIGNAL).Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Need to stop HLS cast")
}
stopHlsCast <- true
stopHlsCast <- STOP_SIGNAL_ERR
}
if archiveEnabled {
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_PACKET_SIGNAL).Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Need to stop MP4 cast")
}
stopMP4Cast <- true
stopMP4Cast <- STOP_SIGNAL_ERR
}
errStatus := app.Streams.UpdateStreamStatus(streamID, false)
if errStatus != nil {
errors.Wrapf(errors.Wrapf(err, "Can't cast packet %s (%s)", streamID, url), "Can't switch status to False for stream '%s'", url)
return errors.Wrapf(err, "Can't update status for stream %s after casting", streamID)
}
return errors.Wrapf(err, "Can't cast packet %s (%s)", streamID, url)
}
case errS := <-errorSignal:
return errors.Wrapf(errS, "Recieved error signal from MP4 casting")
}
}
}
23 changes: 23 additions & 0 deletions stream_hls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package videoserver

import (
"github.com/deepch/vdk/av"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

func (app *Application) startHlsCast(streamID uuid.UUID, stopCast chan StopSignal) error {
app.Streams.Lock()
defer app.Streams.Unlock()
stream, ok := app.Streams.store[streamID]
if !ok {
return ErrStreamNotFound
}
go func(id uuid.UUID, hlsChanel chan av.Packet, stop chan StopSignal) {
err := app.startHls(id, hlsChanel, stop)
if err != nil {
log.Error().Err(err).Str("scope", SCOPE_HLS).Str("event", EVENT_HLS_START_CAST).Str("stream_id", id.String()).Msg("Error on HLS cast start")
}
}(streamID, stream.hlsChanel, stopCast)
return nil
}
28 changes: 28 additions & 0 deletions stream_mp4.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package videoserver

import (
"github.com/deepch/vdk/av"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

func (app *Application) startMP4Cast(archive *StreamArchiveWrapper, streamID uuid.UUID, stopCast chan StopSignal, errorSignal chan error, streamVerboseLevel VerboseLevel) error {
if archive == nil {
return ErrNullArchive
}
app.Streams.Lock()
defer app.Streams.Unlock()
stream, ok := app.Streams.store[streamID]
if !ok {
return ErrStreamNotFound
}
channel := stream.mp4Chanel
go func(arch *StreamArchiveWrapper, id uuid.UUID, mp4Chanel chan av.Packet, stop chan StopSignal, verbose VerboseLevel) {
err := app.startMP4(arch, id, mp4Chanel, stop, verbose)
if err != nil {
log.Error().Err(err).Str("scope", SCOPE_ARCHIVE).Str("event", EVENT_ARCHIVE_START_CAST).Str("stream_id", id.String()).Msg("Error on MP4 cast start")
}
errorSignal <- err
}(archive, streamID, channel, stopCast, streamVerboseLevel)
return nil
}
34 changes: 18 additions & 16 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,24 @@ func (app *Application) RunStream(ctx context.Context, streamID uuid.UUID) error

// startLoop starts stream loop with dialing to certain RTSP
func (app *Application) startLoop(ctx context.Context, streamID uuid.UUID, url string, hlsEnabled, archiveEnabled bool, streamVerboseLevel VerboseLevel) {
select {
case <-ctx.Done():
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_DONE).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream is done")
}
return
default:
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_START).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream must be establishment")
}
err := app.runStream(streamID, url, hlsEnabled, archiveEnabled, streamVerboseLevel)
if err != nil {
log.Error().Err(err).Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_RESTART).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Can't start stream")
}
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_RESTART).Str("stream_id", streamID.String()).Str("stream_url", url).Any("restart_duration", restartStreamDuration).Msg("Stream must be re-establishment")
for {
select {
case <-ctx.Done():
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_DONE).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream is done")
}
return
default:
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_START).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream must be establishment")
}
err := app.runStream(streamID, url, hlsEnabled, archiveEnabled, streamVerboseLevel)
if err != nil {
log.Error().Err(err).Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_RESTART).Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Can't start stream")
}
if streamVerboseLevel > VERBOSE_NONE {
log.Info().Str("scope", SCOPE_STREAMING).Str("event", EVENT_STREAMING_RESTART).Str("stream_id", streamID.String()).Str("stream_url", url).Dur("restart_duration", restartStreamDuration).Msg("Stream must be re-establishment")
}
}
time.Sleep(restartStreamDuration)
}
Expand Down
Loading

0 comments on commit c0e0239

Please sign in to comment.