diff --git a/config.toml.example b/config.toml.example index 3599250..f6effad 100644 --- a/config.toml.example +++ b/config.toml.example @@ -27,3 +27,4 @@ service = "dummy" orchestrator = "dummy" http_server_type = "http" http_address = "localhost:8091" +save_video = false diff --git a/config/config.go b/config/config.go index 83ab6ee..8b7f8ec 100644 --- a/config/config.go +++ b/config/config.go @@ -73,6 +73,8 @@ type Config struct { HTTPSHostname string `fig:"https_hostname"` HTTPSCert string `fig:"https_cert"` HTTPSKey string `fig:"https_key"` + + SaveVideo bool `fig:"save_video"` } } @@ -80,7 +82,7 @@ func Load() (Config, error) { var cfg Config if err := fig.Load(&cfg, fig.File("config.toml"), fig.Dirs(".")); err != nil { - return cfg, nil + return cfg, err } return cfg, nil diff --git a/pkg/control/control.go b/pkg/control/control.go index 12088f3..70eb4f9 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -11,6 +11,7 @@ import ( "github.com/Glimesh/waveguide/config" "github.com/Glimesh/waveguide/pkg/h264" + "github.com/Glimesh/waveguide/pkg/keyframer" "github.com/Glimesh/waveguide/pkg/orchestrator" "github.com/Glimesh/waveguide/pkg/service" "github.com/Glimesh/waveguide/pkg/types" @@ -43,6 +44,10 @@ type Control struct { HTTPSHostname string `mapstructure:"https_hostname"` HTTPSCert string `mapstructure:"https_cert"` HTTPSKey string `mapstructure:"https_key"` + + // Flag to enable saving video stream to file + // Currently it's global flag toggled from the config file + SaveVideo bool `mapstructure:"save_video"` } func New( @@ -81,6 +86,9 @@ func New( HTTPSHostname: httpCfg.HTTPSHostname, HTTPSCert: httpCfg.HTTPSCert, HTTPSKey: httpCfg.HTTPSKey, + + // this should be controlled at a stream level + SaveVideo: cfg.Control.SaveVideo, }, nil } @@ -358,7 +366,7 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can authenticated: true, cancelFunc: cancelFunc, - keyframer: NewKeyframer(), + kf: keyframer.New(), rtpIngest: make(chan *rtp.Packet), stopHeartbeat: make(chan struct{}, 1), stopThumbnailer: make(chan struct{}, 1), @@ -370,6 +378,13 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can startTime: time.Now().Unix(), } + // TODO: this shouldn't be a global flag + // but rather configured on per stream basis + if ctrl.SaveVideo { + stream.saveVideo = true + stream.videoWriterChan = make(chan *rtp.Packet, 100) // not sure what the buffer size here should be + } + if _, exists := ctrl.streams[channelID]; exists { return stream, errors.New("stream already exists in stream manager state") } diff --git a/pkg/control/disk.go b/pkg/control/disk.go new file mode 100644 index 0000000..d4702dc --- /dev/null +++ b/pkg/control/disk.go @@ -0,0 +1,35 @@ +package control + +import "github.com/Glimesh/waveguide/pkg/disk" + +func (s *Stream) configureVideoWriter(codec string) { + videoWriter := disk.NewNoopVideoWriter() + if s.saveVideo { + if vw, err := disk.NewVideoWriter(codec, "out.h264"); err == nil { + videoWriter = vw + } else { + s.log.Debug("video save enabled but failed to create video writer") + s.log.Warnf("video writer: %v", err) + s.log.Debug("falling back to noop video writer") + } + } + s.videoWriter = videoWriter +} + +func (s *Stream) writer(done chan struct{}) { + s.log.Debug("starting file writer") +LOOP: + for { + select { + case <-done: + break LOOP + case p := <-s.videoWriterChan: + if err := s.videoWriter.WriteVideo(p); err != nil { + s.log.Debugf("writer: %v", err) + break LOOP + } + } + } + s.log.Debug("ending writer") + s.videoWriter.Close() +} diff --git a/pkg/control/thumbnailer.go b/pkg/control/ingest.go similarity index 70% rename from pkg/control/thumbnailer.go rename to pkg/control/ingest.go index a72bb68..75d06ab 100644 --- a/pkg/control/thumbnailer.go +++ b/pkg/control/ingest.go @@ -6,20 +6,14 @@ import ( "io" "net/http" "strings" - "time" - "github.com/pion/rtp" "github.com/pion/webrtc/v3" ) -type header struct { - key string - value string -} - func (s *Stream) Ingest(ctx context.Context) error { logger := s.log.WithField("app", "ingest") done := make(chan struct{}, 1) + go s.startIngestor() pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) //nolint exhaustive struct @@ -47,6 +41,7 @@ func (s *Stream) Ingest(ctx context.Context) error { codec := track.Codec() if codec.MimeType == "video/H264" { + s.configureVideoWriter(webrtc.MimeTypeH264) for { select { case <-cancelRead: @@ -124,89 +119,52 @@ func (s *Stream) Ingest(ctx context.Context) error { return nil } -func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) { - req, err := http.NewRequest(method, uri, body) - if err != nil { - return nil, err - } - - for _, header := range headers { - req.Header.Set(header.key, header.value) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - - return resp, nil -} - func (s *Stream) startIngestor() { - done := make(chan struct{}, 1) + doneThumb := make(chan struct{}, 1) + doneWriter := make(chan struct{}, 1) - go s.thumbnailer(done) + go s.thumbnailer(doneThumb) + if s.saveVideo { + go s.writer(doneWriter) + } for p := range s.rtpIngest { select { - case s.thumbnailReceiver <- p: + case s.thumbnailReceiver <- p.Clone(): + default: + } + + select { + case s.videoWriterChan <- p.Clone(): default: } } s.log.Debug("closed ingestor listener") - done <- struct{}{} + doneThumb <- struct{}{} + doneWriter <- struct{}{} s.log.Debug("ending rtp ingestor") } -func (s *Stream) thumbnailer(done chan struct{}) { -OUTER: - for { - s.log.Debug("waiting for thumbnail request signal") - select { - case <-s.requestThumbnail: - case <-done: - break OUTER - } - s.log.Debug("thumbnail request received") +type header struct { + key string + value string +} - for len(s.thumbnailReceiver) > 0 { - <-s.thumbnailReceiver - } - s.log.Debug("thumbnail buffer drained") - - var pkt *rtp.Packet - - t := time.Now() - INNER: - for { - select { - case pkt = <-s.thumbnailReceiver: - case <-done: - s.log.Debug("stopping thumbnail receiver") - break OUTER - } +func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) { + req, err := http.NewRequest(method, uri, body) + if err != nil { + return nil, err + } - select { - case <-done: - break OUTER - default: - // use a deadline of 10 seconds to retrieve a keyframe - if time.Since(t) > time.Second*10 { - s.log.Warn("keyframe not available") - break INNER - } - keyframe := s.keyframer.NewKeyframe(pkt) - if keyframe != nil { - s.log.Debug("got keyframe") - s.lastThumbnail <- keyframe - s.log.Debug("sent keyframe") - // reset and sleep after sending one keyframe - s.keyframer.Reset() - break INNER - } - } - } + for _, header := range headers { + req.Header.Set(header.key, header.value) } - s.log.Debug("ending thumbnailer") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + + return resp, nil } diff --git a/pkg/control/stream.go b/pkg/control/stream.go index ff1c945..1c19e55 100644 --- a/pkg/control/stream.go +++ b/pkg/control/stream.go @@ -4,6 +4,8 @@ import ( "context" "errors" + "github.com/Glimesh/waveguide/pkg/disk" + "github.com/Glimesh/waveguide/pkg/keyframer" "github.com/Glimesh/waveguide/pkg/types" "github.com/pion/rtp" @@ -28,12 +30,15 @@ type Stream struct { whepURI string + saveVideo bool + videoWriter disk.VideoWriter + // mediaStarted is set after media bytes have come in from the client mediaStarted bool hasSomeAudio bool hasSomeVideo bool - keyframer *Keyframer + kf *keyframer.Keyframer rtpIngest chan *rtp.Packet lastThumbnail chan []byte // channel used to signal thumbnailer to stop @@ -42,6 +47,8 @@ type Stream struct { requestThumbnail chan struct{} thumbnailReceiver chan *rtp.Packet + videoWriterChan chan *rtp.Packet + ChannelID types.ChannelID StreamID types.StreamID StreamKey types.StreamKey diff --git a/pkg/control/thumbnail.go b/pkg/control/thumbnail.go new file mode 100644 index 0000000..8752ff5 --- /dev/null +++ b/pkg/control/thumbnail.go @@ -0,0 +1,59 @@ +package control + +import ( + "time" + + "github.com/pion/rtp" +) + +func (s *Stream) thumbnailer(done chan struct{}) { +OUTER: + for { + s.log.Debug("waiting for thumbnail request signal") + select { + case <-s.requestThumbnail: + case <-done: + break OUTER + } + s.log.Debug("thumbnail request received") + + for len(s.thumbnailReceiver) > 0 { + <-s.thumbnailReceiver + } + s.log.Debug("thumbnail buffer drained") + + var pkt *rtp.Packet + + t := time.Now() + INNER: + for { + select { + case pkt = <-s.thumbnailReceiver: + case <-done: + s.log.Debug("stopping thumbnail receiver") + break OUTER + } + + select { + case <-done: + break OUTER + default: + // use a deadline of 10 seconds to retrieve a keyframe + if time.Since(t) > time.Second*10 { + s.log.Warn("keyframe not available") + break INNER + } + keyframe := s.kf.GetKeyframe(pkt) + if keyframe != nil { + s.log.Debug("got keyframe") + s.lastThumbnail <- keyframe + s.log.Debug("sent keyframe") + // reset and sleep after sending one keyframe + s.kf.Reset() + break INNER + } + } + } + } + s.log.Debug("ending thumbnailer") +} diff --git a/pkg/disk/media.go b/pkg/disk/media.go new file mode 100644 index 0000000..59f1c10 --- /dev/null +++ b/pkg/disk/media.go @@ -0,0 +1,63 @@ +package disk + +import ( + "errors" + + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/h264writer" +) + +var ( + _ VideoWriter = (*noopWriter)(nil) + _ AudioWriter = (*noopWriter)(nil) + + _ VideoWriter = (*videoWriter)(nil) +) + +type noopWriter struct{} + +func NewNoopVideoWriter() VideoWriter { + return &noopWriter{} +} + +func NewNoopAudioWriter() AudioWriter { + return &noopWriter{} +} + +func (nw *noopWriter) WriteVideo(_ *rtp.Packet) error { return nil } +func (nw *noopWriter) Close() error { return nil } + +func (nw *noopWriter) WriteAudio() {} + +// TODO: figure out if wrapping this is needed +// Wrapped media.Writer into a custom type +// that may be useful later on +type videoWriter struct { + media.Writer +} + +func (w *videoWriter) WriteVideo(p *rtp.Packet) error { + return w.WriteRTP(p) +} + +func (w *videoWriter) Close() error { + return w.Writer.Close() +} + +func NewVideoWriter(codec, filename string) (VideoWriter, error) { + switch codec { + case webrtc.MimeTypeH264: + w, err := h264writer.New(filename) + if err != nil { + return nil, err + } + return &videoWriter{w}, nil + case webrtc.MimeTypeVP8: + // TODO: implement vp8 + panic("vp8 file output unimplemented") + default: + return nil, errors.New("unsupported codec") + } +} diff --git a/pkg/disk/writer.go b/pkg/disk/writer.go new file mode 100644 index 0000000..9743a86 --- /dev/null +++ b/pkg/disk/writer.go @@ -0,0 +1,12 @@ +package disk + +import "github.com/pion/rtp" + +type VideoWriter interface { + WriteVideo(p *rtp.Packet) error + Close() error +} + +type AudioWriter interface { + WriteAudio() +} diff --git a/pkg/control/keyframer.go b/pkg/keyframer/keyframer.go similarity index 94% rename from pkg/control/keyframer.go rename to pkg/keyframer/keyframer.go index 6cd7258..4ddd7b9 100644 --- a/pkg/control/keyframer.go +++ b/pkg/keyframer/keyframer.go @@ -1,4 +1,4 @@ -package control +package keyframer import ( "crypto/sha256" @@ -11,7 +11,7 @@ import ( "github.com/pion/rtp/codecs" ) -func NewKeyframer() *Keyframer { +func New() *Keyframer { return &Keyframer{ timestamp: 0, frameStarted: false, @@ -32,7 +32,7 @@ func (kf *Keyframer) Reset() { kf.packets = make(map[uint16][]byte) } -func (kf *Keyframer) NewKeyframe(p *rtp.Packet) []byte { +func (kf *Keyframer) GetKeyframe(p *rtp.Packet) []byte { // fmt.Printf("frameStarted=%t\n", kf.frameStarted) // Frame has started, but timestamps don't match, continue if kf.frameStarted && kf.timestamp != p.Timestamp { @@ -87,7 +87,7 @@ func (kf *Keyframer) NewKeyframe(p *rtp.Packet) []byte { return nil } -func (kf *Keyframer) Keyframe() []byte { +func (kf *Keyframer) LastFullKeyframe() []byte { return kf.lastFullKeyframe }