Skip to content

Commit

Permalink
Implement save to disk for h264 video stream (#27)
Browse files Browse the repository at this point in the history
* Add 'save_video' global flag

* Handle 'save_video' flag

* Move keyframer to its own pkg

* Rename 'thumbnailer.go' -> 'thumbnail.go'

* Implement h264 file writer

* Move to ingest.go
  • Loading branch information
nassah221 authored Apr 17, 2023
1 parent 0b15778 commit 1cab06f
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 83 deletions.
1 change: 1 addition & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ service = "dummy"
orchestrator = "dummy"
http_server_type = "http"
http_address = "localhost:8091"
save_video = false
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ type Config struct {
HTTPSHostname string `fig:"https_hostname"`
HTTPSCert string `fig:"https_cert"`
HTTPSKey string `fig:"https_key"`

SaveVideo bool `fig:"save_video"`
}
}

func Load() (Config, error) {
var cfg Config

if err := fig.Load(&cfg, fig.File("config.toml"), fig.Dirs(".")); err != nil {
return cfg, nil
return cfg, err
}

return cfg, nil
Expand Down
17 changes: 16 additions & 1 deletion pkg/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/Glimesh/waveguide/config"
"github.com/Glimesh/waveguide/pkg/h264"
"github.com/Glimesh/waveguide/pkg/keyframer"
"github.com/Glimesh/waveguide/pkg/orchestrator"
"github.com/Glimesh/waveguide/pkg/service"
"github.com/Glimesh/waveguide/pkg/types"
Expand Down Expand Up @@ -43,6 +44,10 @@ type Control struct {
HTTPSHostname string `mapstructure:"https_hostname"`
HTTPSCert string `mapstructure:"https_cert"`
HTTPSKey string `mapstructure:"https_key"`

// Flag to enable saving video stream to file
// Currently it's global flag toggled from the config file
SaveVideo bool `mapstructure:"save_video"`
}

func New(
Expand Down Expand Up @@ -81,6 +86,9 @@ func New(
HTTPSHostname: httpCfg.HTTPSHostname,
HTTPSCert: httpCfg.HTTPSCert,
HTTPSKey: httpCfg.HTTPSKey,

// this should be controlled at a stream level
SaveVideo: cfg.Control.SaveVideo,
}, nil
}

Expand Down Expand Up @@ -358,7 +366,7 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can
authenticated: true,

cancelFunc: cancelFunc,
keyframer: NewKeyframer(),
kf: keyframer.New(),
rtpIngest: make(chan *rtp.Packet),
stopHeartbeat: make(chan struct{}, 1),
stopThumbnailer: make(chan struct{}, 1),
Expand All @@ -370,6 +378,13 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can
startTime: time.Now().Unix(),
}

// TODO: this shouldn't be a global flag
// but rather configured on per stream basis
if ctrl.SaveVideo {
stream.saveVideo = true
stream.videoWriterChan = make(chan *rtp.Packet, 100) // not sure what the buffer size here should be
}

if _, exists := ctrl.streams[channelID]; exists {
return stream, errors.New("stream already exists in stream manager state")
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/control/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package control

import "github.com/Glimesh/waveguide/pkg/disk"

func (s *Stream) configureVideoWriter(codec string) {
videoWriter := disk.NewNoopVideoWriter()
if s.saveVideo {
if vw, err := disk.NewVideoWriter(codec, "out.h264"); err == nil {
videoWriter = vw
} else {
s.log.Debug("video save enabled but failed to create video writer")
s.log.Warnf("video writer: %v", err)
s.log.Debug("falling back to noop video writer")
}
}
s.videoWriter = videoWriter
}

func (s *Stream) writer(done chan struct{}) {
s.log.Debug("starting file writer")
LOOP:
for {
select {
case <-done:
break LOOP
case p := <-s.videoWriterChan:
if err := s.videoWriter.WriteVideo(p); err != nil {
s.log.Debugf("writer: %v", err)
break LOOP
}
}
}
s.log.Debug("ending writer")
s.videoWriter.Close()
}
110 changes: 34 additions & 76 deletions pkg/control/thumbnailer.go → pkg/control/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@ import (
"io"
"net/http"
"strings"
"time"

"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
)

type header struct {
key string
value string
}

func (s *Stream) Ingest(ctx context.Context) error {
logger := s.log.WithField("app", "ingest")
done := make(chan struct{}, 1)

go s.startIngestor()

pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) //nolint exhaustive struct
Expand Down Expand Up @@ -47,6 +41,7 @@ func (s *Stream) Ingest(ctx context.Context) error {
codec := track.Codec()

if codec.MimeType == "video/H264" {
s.configureVideoWriter(webrtc.MimeTypeH264)
for {
select {
case <-cancelRead:
Expand Down Expand Up @@ -124,89 +119,52 @@ func (s *Stream) Ingest(ctx context.Context) error {
return nil
}

func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) {
req, err := http.NewRequest(method, uri, body)
if err != nil {
return nil, err
}

for _, header := range headers {
req.Header.Set(header.key, header.value)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}

func (s *Stream) startIngestor() {
done := make(chan struct{}, 1)
doneThumb := make(chan struct{}, 1)
doneWriter := make(chan struct{}, 1)

go s.thumbnailer(done)
go s.thumbnailer(doneThumb)
if s.saveVideo {
go s.writer(doneWriter)
}

for p := range s.rtpIngest {
select {
case s.thumbnailReceiver <- p:
case s.thumbnailReceiver <- p.Clone():
default:
}

select {
case s.videoWriterChan <- p.Clone():
default:
}
}
s.log.Debug("closed ingestor listener")

done <- struct{}{}
doneThumb <- struct{}{}
doneWriter <- struct{}{}
s.log.Debug("ending rtp ingestor")
}

func (s *Stream) thumbnailer(done chan struct{}) {
OUTER:
for {
s.log.Debug("waiting for thumbnail request signal")
select {
case <-s.requestThumbnail:
case <-done:
break OUTER
}
s.log.Debug("thumbnail request received")
type header struct {
key string
value string
}

for len(s.thumbnailReceiver) > 0 {
<-s.thumbnailReceiver
}
s.log.Debug("thumbnail buffer drained")

var pkt *rtp.Packet

t := time.Now()
INNER:
for {
select {
case pkt = <-s.thumbnailReceiver:
case <-done:
s.log.Debug("stopping thumbnail receiver")
break OUTER
}
func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) {
req, err := http.NewRequest(method, uri, body)
if err != nil {
return nil, err
}

select {
case <-done:
break OUTER
default:
// use a deadline of 10 seconds to retrieve a keyframe
if time.Since(t) > time.Second*10 {
s.log.Warn("keyframe not available")
break INNER
}
keyframe := s.keyframer.NewKeyframe(pkt)
if keyframe != nil {
s.log.Debug("got keyframe")
s.lastThumbnail <- keyframe
s.log.Debug("sent keyframe")
// reset and sleep after sending one keyframe
s.keyframer.Reset()
break INNER
}
}
}
for _, header := range headers {
req.Header.Set(header.key, header.value)
}
s.log.Debug("ending thumbnailer")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}
9 changes: 8 additions & 1 deletion pkg/control/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"

"github.com/Glimesh/waveguide/pkg/disk"
"github.com/Glimesh/waveguide/pkg/keyframer"
"github.com/Glimesh/waveguide/pkg/types"

"github.com/pion/rtp"
Expand All @@ -28,12 +30,15 @@ type Stream struct {

whepURI string

saveVideo bool
videoWriter disk.VideoWriter

// mediaStarted is set after media bytes have come in from the client
mediaStarted bool
hasSomeAudio bool
hasSomeVideo bool

keyframer *Keyframer
kf *keyframer.Keyframer
rtpIngest chan *rtp.Packet
lastThumbnail chan []byte
// channel used to signal thumbnailer to stop
Expand All @@ -42,6 +47,8 @@ type Stream struct {
requestThumbnail chan struct{}
thumbnailReceiver chan *rtp.Packet

videoWriterChan chan *rtp.Packet

ChannelID types.ChannelID
StreamID types.StreamID
StreamKey types.StreamKey
Expand Down
59 changes: 59 additions & 0 deletions pkg/control/thumbnail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package control

import (
"time"

"github.com/pion/rtp"
)

func (s *Stream) thumbnailer(done chan struct{}) {
OUTER:
for {
s.log.Debug("waiting for thumbnail request signal")
select {
case <-s.requestThumbnail:
case <-done:
break OUTER
}
s.log.Debug("thumbnail request received")

for len(s.thumbnailReceiver) > 0 {
<-s.thumbnailReceiver
}
s.log.Debug("thumbnail buffer drained")

var pkt *rtp.Packet

t := time.Now()
INNER:
for {
select {
case pkt = <-s.thumbnailReceiver:
case <-done:
s.log.Debug("stopping thumbnail receiver")
break OUTER
}

select {
case <-done:
break OUTER
default:
// use a deadline of 10 seconds to retrieve a keyframe
if time.Since(t) > time.Second*10 {
s.log.Warn("keyframe not available")
break INNER
}
keyframe := s.kf.GetKeyframe(pkt)
if keyframe != nil {
s.log.Debug("got keyframe")
s.lastThumbnail <- keyframe
s.log.Debug("sent keyframe")
// reset and sleep after sending one keyframe
s.kf.Reset()
break INNER
}
}
}
}
s.log.Debug("ending thumbnailer")
}
Loading

0 comments on commit 1cab06f

Please sign in to comment.