Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement save to disk for h264 video stream #27

Merged
merged 6 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ service = "dummy"
orchestrator = "dummy"
http_server_type = "http"
http_address = "localhost:8091"
save_video = false
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ type Config struct {
HTTPSHostname string `fig:"https_hostname"`
HTTPSCert string `fig:"https_cert"`
HTTPSKey string `fig:"https_key"`

SaveVideo bool `fig:"save_video"`
}
}

func Load() (Config, error) {
var cfg Config

if err := fig.Load(&cfg, fig.File("config.toml"), fig.Dirs(".")); err != nil {
return cfg, nil
return cfg, err
}

return cfg, nil
Expand Down
17 changes: 16 additions & 1 deletion pkg/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/Glimesh/waveguide/config"
"github.com/Glimesh/waveguide/pkg/h264"
"github.com/Glimesh/waveguide/pkg/keyframer"
"github.com/Glimesh/waveguide/pkg/orchestrator"
"github.com/Glimesh/waveguide/pkg/service"
"github.com/Glimesh/waveguide/pkg/types"
Expand Down Expand Up @@ -43,6 +44,10 @@ type Control struct {
HTTPSHostname string `mapstructure:"https_hostname"`
HTTPSCert string `mapstructure:"https_cert"`
HTTPSKey string `mapstructure:"https_key"`

// Flag to enable saving video stream to file
// Currently it's global flag toggled from the config file
SaveVideo bool `mapstructure:"save_video"`
}

func New(
Expand Down Expand Up @@ -81,6 +86,9 @@ func New(
HTTPSHostname: httpCfg.HTTPSHostname,
HTTPSCert: httpCfg.HTTPSCert,
HTTPSKey: httpCfg.HTTPSKey,

// this should be controlled at a stream level
SaveVideo: cfg.Control.SaveVideo,
}, nil
}

Expand Down Expand Up @@ -358,7 +366,7 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can
authenticated: true,

cancelFunc: cancelFunc,
keyframer: NewKeyframer(),
kf: keyframer.New(),
rtpIngest: make(chan *rtp.Packet),
stopHeartbeat: make(chan struct{}, 1),
stopThumbnailer: make(chan struct{}, 1),
Expand All @@ -370,6 +378,13 @@ func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.Can
startTime: time.Now().Unix(),
}

// TODO: this shouldn't be a global flag
// but rather configured on per stream basis
if ctrl.SaveVideo {
stream.saveVideo = true
stream.videoWriterChan = make(chan *rtp.Packet, 100) // not sure what the buffer size here should be
}

if _, exists := ctrl.streams[channelID]; exists {
return stream, errors.New("stream already exists in stream manager state")
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/control/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package control

import "github.com/Glimesh/waveguide/pkg/disk"

func (s *Stream) configureVideoWriter(codec string) {
videoWriter := disk.NewNoopVideoWriter()
if s.saveVideo {
if vw, err := disk.NewVideoWriter(codec, "out.h264"); err == nil {
videoWriter = vw
} else {
s.log.Debug("video save enabled but failed to create video writer")
s.log.Warnf("video writer: %v", err)
s.log.Debug("falling back to noop video writer")
}
}
s.videoWriter = videoWriter
}

func (s *Stream) writer(done chan struct{}) {
s.log.Debug("starting file writer")
LOOP:
for {
select {
case <-done:
break LOOP
case p := <-s.videoWriterChan:
if err := s.videoWriter.WriteVideo(p); err != nil {
s.log.Debugf("writer: %v", err)
break LOOP
}
}
}
s.log.Debug("ending writer")
s.videoWriter.Close()
}
110 changes: 34 additions & 76 deletions pkg/control/thumbnailer.go → pkg/control/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@ import (
"io"
"net/http"
"strings"
"time"

"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
)

type header struct {
key string
value string
}

func (s *Stream) Ingest(ctx context.Context) error {
logger := s.log.WithField("app", "ingest")
done := make(chan struct{}, 1)

go s.startIngestor()

pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) //nolint exhaustive struct
Expand Down Expand Up @@ -47,6 +41,7 @@ func (s *Stream) Ingest(ctx context.Context) error {
codec := track.Codec()

if codec.MimeType == "video/H264" {
s.configureVideoWriter(webrtc.MimeTypeH264)
for {
select {
case <-cancelRead:
Expand Down Expand Up @@ -124,89 +119,52 @@ func (s *Stream) Ingest(ctx context.Context) error {
return nil
}

func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) {
req, err := http.NewRequest(method, uri, body)
if err != nil {
return nil, err
}

for _, header := range headers {
req.Header.Set(header.key, header.value)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}

func (s *Stream) startIngestor() {
done := make(chan struct{}, 1)
doneThumb := make(chan struct{}, 1)
doneWriter := make(chan struct{}, 1)

go s.thumbnailer(done)
go s.thumbnailer(doneThumb)
if s.saveVideo {
go s.writer(doneWriter)
}

for p := range s.rtpIngest {
select {
case s.thumbnailReceiver <- p:
case s.thumbnailReceiver <- p.Clone():
default:
}

select {
case s.videoWriterChan <- p.Clone():
default:
}
}
s.log.Debug("closed ingestor listener")

done <- struct{}{}
doneThumb <- struct{}{}
doneWriter <- struct{}{}
s.log.Debug("ending rtp ingestor")
}

func (s *Stream) thumbnailer(done chan struct{}) {
OUTER:
for {
s.log.Debug("waiting for thumbnail request signal")
select {
case <-s.requestThumbnail:
case <-done:
break OUTER
}
s.log.Debug("thumbnail request received")
type header struct {
key string
value string
}

for len(s.thumbnailReceiver) > 0 {
<-s.thumbnailReceiver
}
s.log.Debug("thumbnail buffer drained")

var pkt *rtp.Packet

t := time.Now()
INNER:
for {
select {
case pkt = <-s.thumbnailReceiver:
case <-done:
s.log.Debug("stopping thumbnail receiver")
break OUTER
}
func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) {
req, err := http.NewRequest(method, uri, body)
if err != nil {
return nil, err
}

select {
case <-done:
break OUTER
default:
// use a deadline of 10 seconds to retrieve a keyframe
if time.Since(t) > time.Second*10 {
s.log.Warn("keyframe not available")
break INNER
}
keyframe := s.keyframer.NewKeyframe(pkt)
if keyframe != nil {
s.log.Debug("got keyframe")
s.lastThumbnail <- keyframe
s.log.Debug("sent keyframe")
// reset and sleep after sending one keyframe
s.keyframer.Reset()
break INNER
}
}
}
for _, header := range headers {
req.Header.Set(header.key, header.value)
}
s.log.Debug("ending thumbnailer")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}
9 changes: 8 additions & 1 deletion pkg/control/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"

"github.com/Glimesh/waveguide/pkg/disk"
"github.com/Glimesh/waveguide/pkg/keyframer"
"github.com/Glimesh/waveguide/pkg/types"

"github.com/pion/rtp"
Expand All @@ -28,12 +30,15 @@ type Stream struct {

whepURI string

saveVideo bool
videoWriter disk.VideoWriter

// mediaStarted is set after media bytes have come in from the client
mediaStarted bool
hasSomeAudio bool
hasSomeVideo bool

keyframer *Keyframer
kf *keyframer.Keyframer
rtpIngest chan *rtp.Packet
lastThumbnail chan []byte
// channel used to signal thumbnailer to stop
Expand All @@ -42,6 +47,8 @@ type Stream struct {
requestThumbnail chan struct{}
thumbnailReceiver chan *rtp.Packet

videoWriterChan chan *rtp.Packet

ChannelID types.ChannelID
StreamID types.StreamID
StreamKey types.StreamKey
Expand Down
59 changes: 59 additions & 0 deletions pkg/control/thumbnail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package control

import (
"time"

"github.com/pion/rtp"
)

func (s *Stream) thumbnailer(done chan struct{}) {
OUTER:
for {
s.log.Debug("waiting for thumbnail request signal")
select {
case <-s.requestThumbnail:
case <-done:
break OUTER
}
s.log.Debug("thumbnail request received")

for len(s.thumbnailReceiver) > 0 {
<-s.thumbnailReceiver
}
s.log.Debug("thumbnail buffer drained")

var pkt *rtp.Packet

t := time.Now()
INNER:
for {
select {
case pkt = <-s.thumbnailReceiver:
case <-done:
s.log.Debug("stopping thumbnail receiver")
break OUTER
}

select {
case <-done:
break OUTER
default:
// use a deadline of 10 seconds to retrieve a keyframe
if time.Since(t) > time.Second*10 {
s.log.Warn("keyframe not available")
break INNER
}
keyframe := s.kf.GetKeyframe(pkt)
if keyframe != nil {
s.log.Debug("got keyframe")
s.lastThumbnail <- keyframe
s.log.Debug("sent keyframe")
// reset and sleep after sending one keyframe
s.kf.Reset()
break INNER
}
}
}
}
s.log.Debug("ending thumbnailer")
}
Loading