From 2f5cec1c8b03bbdd4f2102fa122d5d039fc8ae2d Mon Sep 17 00:00:00 2001 From: Muhammad Hassan Date: Tue, 11 Apr 2023 01:50:48 +0500 Subject: [PATCH 1/6] Add 'save_video' global flag --- config.toml.example | 1 + config/config.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/config.toml.example b/config.toml.example index 3599250..f6effad 100644 --- a/config.toml.example +++ b/config.toml.example @@ -27,3 +27,4 @@ service = "dummy" orchestrator = "dummy" http_server_type = "http" http_address = "localhost:8091" +save_video = false diff --git a/config/config.go b/config/config.go index 83ab6ee..8b7f8ec 100644 --- a/config/config.go +++ b/config/config.go @@ -73,6 +73,8 @@ type Config struct { HTTPSHostname string `fig:"https_hostname"` HTTPSCert string `fig:"https_cert"` HTTPSKey string `fig:"https_key"` + + SaveVideo bool `fig:"save_video"` } } @@ -80,7 +82,7 @@ func Load() (Config, error) { var cfg Config if err := fig.Load(&cfg, fig.File("config.toml"), fig.Dirs(".")); err != nil { - return cfg, nil + return cfg, err } return cfg, nil From 52a5caf21e7a33194487296e611ed4311023dbad Mon Sep 17 00:00:00 2001 From: Muhammad Hassan Date: Tue, 11 Apr 2023 01:51:56 +0500 Subject: [PATCH 2/6] Handle 'save_video' flag --- pkg/control/control.go | 17 ++++++++++++++++- pkg/control/stream.go | 9 ++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pkg/control/control.go b/pkg/control/control.go index 12088f3..70eb4f9 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -11,6 +11,7 @@ import ( "github.com/Glimesh/waveguide/config" "github.com/Glimesh/waveguide/pkg/h264" + "github.com/Glimesh/waveguide/pkg/keyframer" "github.com/Glimesh/waveguide/pkg/orchestrator" "github.com/Glimesh/waveguide/pkg/service" "github.com/Glimesh/waveguide/pkg/types" @@ -43,6 +44,10 @@ type Control struct { HTTPSHostname string `mapstructure:"https_hostname"` HTTPSCert string `mapstructure:"https_cert"` HTTPSKey string `mapstructure:"https_key"` + + // Flag to enable saving video stream to file + // Currently it's global flag toggled from the config file + SaveVideo bool `mapstructure:"save_video"` } func New( @@ -81,6 +86,9 @@ func New( HTTPSHostname: httpCfg.HTTPSHostname, HTTPSCert: httpCfg.HTTPSCert, HTTPSKey: httpCfg.HTTPSKey, + + // this should be controlled at a stream level + SaveVideo: cfg.Control.SaveVideo, }, nil } @@ -358,7 +366,7 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can authenticated: true, cancelFunc: cancelFunc, - keyframer: NewKeyframer(), + kf: keyframer.New(), rtpIngest: make(chan *rtp.Packet), stopHeartbeat: make(chan struct{}, 1), stopThumbnailer: make(chan struct{}, 1), @@ -370,6 +378,13 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can startTime: time.Now().Unix(), } + // TODO: this shouldn't be a global flag + // but rather configured on per stream basis + if ctrl.SaveVideo { + stream.saveVideo = true + stream.videoWriterChan = make(chan *rtp.Packet, 100) // not sure what the buffer size here should be + } + if _, exists := ctrl.streams[channelID]; exists { return stream, errors.New("stream already exists in stream manager state") } diff --git a/pkg/control/stream.go b/pkg/control/stream.go index ff1c945..1c19e55 100644 --- a/pkg/control/stream.go +++ b/pkg/control/stream.go @@ -4,6 +4,8 @@ import ( "context" "errors" + "github.com/Glimesh/waveguide/pkg/disk" + "github.com/Glimesh/waveguide/pkg/keyframer" "github.com/Glimesh/waveguide/pkg/types" "github.com/pion/rtp" @@ -28,12 +30,15 @@ type Stream struct { whepURI string + saveVideo bool + videoWriter disk.VideoWriter + // mediaStarted is set after media bytes have come in from the client mediaStarted bool hasSomeAudio bool hasSomeVideo bool - keyframer *Keyframer + kf *keyframer.Keyframer rtpIngest chan *rtp.Packet lastThumbnail chan []byte // channel used to signal thumbnailer to stop @@ -42,6 +47,8 @@ type Stream struct { requestThumbnail chan struct{} thumbnailReceiver chan *rtp.Packet + videoWriterChan chan *rtp.Packet + ChannelID types.ChannelID StreamID types.StreamID StreamKey types.StreamKey From 4c8a40e13c32da59d2f2cc34c70190206d68a607 Mon Sep 17 00:00:00 2001 From: Muhammad Hassan Date: Tue, 11 Apr 2023 01:52:33 +0500 Subject: [PATCH 3/6] Move keyframer to its own pkg --- pkg/{control => keyframer}/keyframer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) rename pkg/{control => keyframer}/keyframer.go (94%) diff --git a/pkg/control/keyframer.go b/pkg/keyframer/keyframer.go similarity index 94% rename from pkg/control/keyframer.go rename to pkg/keyframer/keyframer.go index 6cd7258..4ddd7b9 100644 --- a/pkg/control/keyframer.go +++ b/pkg/keyframer/keyframer.go @@ -1,4 +1,4 @@ -package control +package keyframer import ( "crypto/sha256" @@ -11,7 +11,7 @@ import ( "github.com/pion/rtp/codecs" ) -func NewKeyframer() *Keyframer { +func New() *Keyframer { return &Keyframer{ timestamp: 0, frameStarted: false, @@ -32,7 +32,7 @@ func (kf *Keyframer) Reset() { kf.packets = make(map[uint16][]byte) } -func (kf *Keyframer) NewKeyframe(p *rtp.Packet) []byte { +func (kf *Keyframer) GetKeyframe(p *rtp.Packet) []byte { // fmt.Printf("frameStarted=%t\n", kf.frameStarted) // Frame has started, but timestamps don't match, continue if kf.frameStarted && kf.timestamp != p.Timestamp { @@ -87,7 +87,7 @@ func (kf *Keyframer) NewKeyframe(p *rtp.Packet) []byte { return nil } -func (kf *Keyframer) Keyframe() []byte { +func (kf *Keyframer) LastFullKeyframe() []byte { return kf.lastFullKeyframe } From aa19ccb78f00bfb65cd3ad5603be7f4eea00be7d Mon Sep 17 00:00:00 2001 From: Muhammad Hassan Date: Tue, 11 Apr 2023 01:53:14 +0500 Subject: [PATCH 4/6] Rename 'thumbnailer.go' -> 'thumbnail.go' --- pkg/control/thumbnail.go | 59 +++++++++++ pkg/control/thumbnailer.go | 212 ------------------------------------- 2 files changed, 59 insertions(+), 212 deletions(-) create mode 100644 pkg/control/thumbnail.go delete mode 100644 pkg/control/thumbnailer.go diff --git a/pkg/control/thumbnail.go b/pkg/control/thumbnail.go new file mode 100644 index 0000000..8752ff5 --- /dev/null +++ b/pkg/control/thumbnail.go @@ -0,0 +1,59 @@ +package control + +import ( + "time" + + "github.com/pion/rtp" +) + +func (s *Stream) thumbnailer(done chan struct{}) { +OUTER: + for { + s.log.Debug("waiting for thumbnail request signal") + select { + case <-s.requestThumbnail: + case <-done: + break OUTER + } + s.log.Debug("thumbnail request received") + + for len(s.thumbnailReceiver) > 0 { + <-s.thumbnailReceiver + } + s.log.Debug("thumbnail buffer drained") + + var pkt *rtp.Packet + + t := time.Now() + INNER: + for { + select { + case pkt = <-s.thumbnailReceiver: + case <-done: + s.log.Debug("stopping thumbnail receiver") + break OUTER + } + + select { + case <-done: + break OUTER + default: + // use a deadline of 10 seconds to retrieve a keyframe + if time.Since(t) > time.Second*10 { + s.log.Warn("keyframe not available") + break INNER + } + keyframe := s.kf.GetKeyframe(pkt) + if keyframe != nil { + s.log.Debug("got keyframe") + s.lastThumbnail <- keyframe + s.log.Debug("sent keyframe") + // reset and sleep after sending one keyframe + s.kf.Reset() + break INNER + } + } + } + } + s.log.Debug("ending thumbnailer") +} diff --git a/pkg/control/thumbnailer.go b/pkg/control/thumbnailer.go deleted file mode 100644 index a72bb68..0000000 --- a/pkg/control/thumbnailer.go +++ /dev/null @@ -1,212 +0,0 @@ -package control - -import ( - "context" - "errors" - "io" - "net/http" - "strings" - "time" - - "github.com/pion/rtp" - "github.com/pion/webrtc/v3" -) - -type header struct { - key string - value string -} - -func (s *Stream) Ingest(ctx context.Context) error { - logger := s.log.WithField("app", "ingest") - done := make(chan struct{}, 1) - go s.startIngestor() - - pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) //nolint exhaustive struct - if err != nil { - return err - } - - pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - cancelRead := make(chan struct{}, 1) - go func() { - <-done - s.log.Debug("exiting on track") - LOOP: - for { - select { - case <-s.lastThumbnail: - default: - s.log.Debug("thumbnail channel drained") - break LOOP - } - } - cancelRead <- struct{}{} - }() - - codec := track.Codec() - - if codec.MimeType == "video/H264" { - for { - select { - case <-cancelRead: - s.log.Debug("on track stop signal") - close(s.rtpIngest) - return - default: - pkt, _, readErr := track.ReadRTP() - if readErr != nil { - // terminate the ingestor is input stream is EOF - if errors.Is(readErr, io.EOF) { - s.log.Debugf("read: %v", readErr) - close(s.rtpIngest) - return - } - } - s.rtpIngest <- pkt - } - } - } - }) - - sdpHeader := header{"Accept", "application/sdp"} - resp, err := doHTTPRequest( - s.whepURI, - http.MethodPost, - strings.NewReader(""), - sdpHeader, - ) - if err != nil { - return err - } - defer resp.Body.Close() - - offer, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - - if err := pc.SetRemoteDescription( //nolint shadow - webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: string(offer), - }); err != nil { - return err - } - - answerSDP, err := pc.CreateAnswer(nil) - if err != nil { - return err - } - - gatherComplete := webrtc.GatheringCompletePromise(pc) - if err := pc.SetLocalDescription(answerSDP); err != nil { //nolint shadow - return err - } - <-gatherComplete - - answer := pc.LocalDescription().SDP - _, err = doHTTPRequest( //nolint response is ignored - resp.Header.Get("location"), - http.MethodPost, - strings.NewReader(answer), - sdpHeader, - ) - if err != nil { - return err - } - - <-ctx.Done() - pc.Close() - done <- struct{}{} - logger.Debug("received ctx done signal") - - return nil -} - -func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) { - req, err := http.NewRequest(method, uri, body) - if err != nil { - return nil, err - } - - for _, header := range headers { - req.Header.Set(header.key, header.value) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - - return resp, nil -} - -func (s *Stream) startIngestor() { - done := make(chan struct{}, 1) - - go s.thumbnailer(done) - - for p := range s.rtpIngest { - select { - case s.thumbnailReceiver <- p: - default: - } - } - s.log.Debug("closed ingestor listener") - - done <- struct{}{} - s.log.Debug("ending rtp ingestor") -} - -func (s *Stream) thumbnailer(done chan struct{}) { -OUTER: - for { - s.log.Debug("waiting for thumbnail request signal") - select { - case <-s.requestThumbnail: - case <-done: - break OUTER - } - s.log.Debug("thumbnail request received") - - for len(s.thumbnailReceiver) > 0 { - <-s.thumbnailReceiver - } - s.log.Debug("thumbnail buffer drained") - - var pkt *rtp.Packet - - t := time.Now() - INNER: - for { - select { - case pkt = <-s.thumbnailReceiver: - case <-done: - s.log.Debug("stopping thumbnail receiver") - break OUTER - } - - select { - case <-done: - break OUTER - default: - // use a deadline of 10 seconds to retrieve a keyframe - if time.Since(t) > time.Second*10 { - s.log.Warn("keyframe not available") - break INNER - } - keyframe := s.keyframer.NewKeyframe(pkt) - if keyframe != nil { - s.log.Debug("got keyframe") - s.lastThumbnail <- keyframe - s.log.Debug("sent keyframe") - // reset and sleep after sending one keyframe - s.keyframer.Reset() - break INNER - } - } - } - } - s.log.Debug("ending thumbnailer") -} From 5a8295825003585952fa77fbce522767c91c5074 Mon Sep 17 00:00:00 2001 From: Muhammad Hassan Date: Tue, 11 Apr 2023 01:54:14 +0500 Subject: [PATCH 5/6] Implement h264 file writer --- pkg/control/disk.go | 35 +++++++++++++++++++++++++ pkg/disk/media.go | 63 +++++++++++++++++++++++++++++++++++++++++++++ pkg/disk/writer.go | 12 +++++++++ 3 files changed, 110 insertions(+) create mode 100644 pkg/control/disk.go create mode 100644 pkg/disk/media.go create mode 100644 pkg/disk/writer.go diff --git a/pkg/control/disk.go b/pkg/control/disk.go new file mode 100644 index 0000000..d4702dc --- /dev/null +++ b/pkg/control/disk.go @@ -0,0 +1,35 @@ +package control + +import "github.com/Glimesh/waveguide/pkg/disk" + +func (s *Stream) configureVideoWriter(codec string) { + videoWriter := disk.NewNoopVideoWriter() + if s.saveVideo { + if vw, err := disk.NewVideoWriter(codec, "out.h264"); err == nil { + videoWriter = vw + } else { + s.log.Debug("video save enabled but failed to create video writer") + s.log.Warnf("video writer: %v", err) + s.log.Debug("falling back to noop video writer") + } + } + s.videoWriter = videoWriter +} + +func (s *Stream) writer(done chan struct{}) { + s.log.Debug("starting file writer") +LOOP: + for { + select { + case <-done: + break LOOP + case p := <-s.videoWriterChan: + if err := s.videoWriter.WriteVideo(p); err != nil { + s.log.Debugf("writer: %v", err) + break LOOP + } + } + } + s.log.Debug("ending writer") + s.videoWriter.Close() +} diff --git a/pkg/disk/media.go b/pkg/disk/media.go new file mode 100644 index 0000000..59f1c10 --- /dev/null +++ b/pkg/disk/media.go @@ -0,0 +1,63 @@ +package disk + +import ( + "errors" + + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/h264writer" +) + +var ( + _ VideoWriter = (*noopWriter)(nil) + _ AudioWriter = (*noopWriter)(nil) + + _ VideoWriter = (*videoWriter)(nil) +) + +type noopWriter struct{} + +func NewNoopVideoWriter() VideoWriter { + return &noopWriter{} +} + +func NewNoopAudioWriter() AudioWriter { + return &noopWriter{} +} + +func (nw *noopWriter) WriteVideo(_ *rtp.Packet) error { return nil } +func (nw *noopWriter) Close() error { return nil } + +func (nw *noopWriter) WriteAudio() {} + +// TODO: figure out if wrapping this is needed +// Wrapped media.Writer into a custom type +// that may be useful later on +type videoWriter struct { + media.Writer +} + +func (w *videoWriter) WriteVideo(p *rtp.Packet) error { + return w.WriteRTP(p) +} + +func (w *videoWriter) Close() error { + return w.Writer.Close() +} + +func NewVideoWriter(codec, filename string) (VideoWriter, error) { + switch codec { + case webrtc.MimeTypeH264: + w, err := h264writer.New(filename) + if err != nil { + return nil, err + } + return &videoWriter{w}, nil + case webrtc.MimeTypeVP8: + // TODO: implement vp8 + panic("vp8 file output unimplemented") + default: + return nil, errors.New("unsupported codec") + } +} diff --git a/pkg/disk/writer.go b/pkg/disk/writer.go new file mode 100644 index 0000000..9743a86 --- /dev/null +++ b/pkg/disk/writer.go @@ -0,0 +1,12 @@ +package disk + +import "github.com/pion/rtp" + +type VideoWriter interface { + WriteVideo(p *rtp.Packet) error + Close() error +} + +type AudioWriter interface { + WriteAudio() +} From 3e92af8ca2fa8ad43dd0ee985c71b77a89587666 Mon Sep 17 00:00:00 2001 From: Muhammad Hassan Date: Tue, 11 Apr 2023 01:54:39 +0500 Subject: [PATCH 6/6] Move to ingest.go --- pkg/control/ingest.go | 170 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 pkg/control/ingest.go diff --git a/pkg/control/ingest.go b/pkg/control/ingest.go new file mode 100644 index 0000000..75d06ab --- /dev/null +++ b/pkg/control/ingest.go @@ -0,0 +1,170 @@ +package control + +import ( + "context" + "errors" + "io" + "net/http" + "strings" + + "github.com/pion/webrtc/v3" +) + +func (s *Stream) Ingest(ctx context.Context) error { + logger := s.log.WithField("app", "ingest") + done := make(chan struct{}, 1) + + go s.startIngestor() + + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) //nolint exhaustive struct + if err != nil { + return err + } + + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + cancelRead := make(chan struct{}, 1) + go func() { + <-done + s.log.Debug("exiting on track") + LOOP: + for { + select { + case <-s.lastThumbnail: + default: + s.log.Debug("thumbnail channel drained") + break LOOP + } + } + cancelRead <- struct{}{} + }() + + codec := track.Codec() + + if codec.MimeType == "video/H264" { + s.configureVideoWriter(webrtc.MimeTypeH264) + for { + select { + case <-cancelRead: + s.log.Debug("on track stop signal") + close(s.rtpIngest) + return + default: + pkt, _, readErr := track.ReadRTP() + if readErr != nil { + // terminate the ingestor is input stream is EOF + if errors.Is(readErr, io.EOF) { + s.log.Debugf("read: %v", readErr) + close(s.rtpIngest) + return + } + } + s.rtpIngest <- pkt + } + } + } + }) + + sdpHeader := header{"Accept", "application/sdp"} + resp, err := doHTTPRequest( + s.whepURI, + http.MethodPost, + strings.NewReader(""), + sdpHeader, + ) + if err != nil { + return err + } + defer resp.Body.Close() + + offer, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + if err := pc.SetRemoteDescription( //nolint shadow + webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: string(offer), + }); err != nil { + return err + } + + answerSDP, err := pc.CreateAnswer(nil) + if err != nil { + return err + } + + gatherComplete := webrtc.GatheringCompletePromise(pc) + if err := pc.SetLocalDescription(answerSDP); err != nil { //nolint shadow + return err + } + <-gatherComplete + + answer := pc.LocalDescription().SDP + _, err = doHTTPRequest( //nolint response is ignored + resp.Header.Get("location"), + http.MethodPost, + strings.NewReader(answer), + sdpHeader, + ) + if err != nil { + return err + } + + <-ctx.Done() + pc.Close() + done <- struct{}{} + logger.Debug("received ctx done signal") + + return nil +} + +func (s *Stream) startIngestor() { + doneThumb := make(chan struct{}, 1) + doneWriter := make(chan struct{}, 1) + + go s.thumbnailer(doneThumb) + if s.saveVideo { + go s.writer(doneWriter) + } + + for p := range s.rtpIngest { + select { + case s.thumbnailReceiver <- p.Clone(): + default: + } + + select { + case s.videoWriterChan <- p.Clone(): + default: + } + } + s.log.Debug("closed ingestor listener") + + doneThumb <- struct{}{} + doneWriter <- struct{}{} + s.log.Debug("ending rtp ingestor") +} + +type header struct { + key string + value string +} + +func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) { + req, err := http.NewRequest(method, uri, body) + if err != nil { + return nil, err + } + + for _, header := range headers { + req.Header.Set(header.key, header.value) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + + return resp, nil +}