From 003fae81d7a01b7898f947dd8546f87b9ddb8f3c Mon Sep 17 00:00:00 2001 From: OrlandoCo Date: Sun, 14 Mar 2021 18:54:20 -0600 Subject: [PATCH] fix(sfu): will move layers as they arrive (#469) --- pkg/sfu/publisher.go | 2 +- pkg/sfu/receiver.go | 55 +++++++++++++++++++++++++++++++++++++------- pkg/sfu/router.go | 10 +------- 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/pkg/sfu/publisher.go b/pkg/sfu/publisher.go index bcf6a246f..80b9c21b1 100644 --- a/pkg/sfu/publisher.go +++ b/pkg/sfu/publisher.go @@ -39,8 +39,8 @@ func NewPublisher(session *Session, id string, cfg WebRTCTransportConfig) (*Publ p := &Publisher{ id: id, pc: pc, - session: session, router: newRouter(id, pc, session, cfg.router), + session: session, } pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 0dd4ae367..293f86649 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -20,7 +20,7 @@ type Receiver interface { Codec() webrtc.RTPCodecParameters Kind() webrtc.RTPCodecType SSRC(layer int) uint32 - AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer) + AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer, bestQualityFirst bool) AddDownTrack(track *DownTrack, bestQualityFirst bool) SubDownTrack(track *DownTrack, layer int) error GetBitrate() [3]uint64 @@ -94,7 +94,7 @@ func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType { return w.kind } -func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer) { +func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool) { var layer int switch track.RID() { @@ -106,9 +106,31 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff layer = 0 } + if w.isSimulcast { + if bestQualityFirst { + for l := 0; l < layer; l++ { + w.locks[l].Lock() + for _, dt := range w.downTracks[l] { + dt.SwitchSpatialLayer(int64(layer), false) + } + w.locks[l].Unlock() + } + } else { + for l := 2; l != layer; l-- { + w.locks[l].Lock() + for _, dt := range w.downTracks[l] { + dt.SwitchSpatialLayer(int64(layer), false) + } + w.locks[l].Unlock() + } + } + } + + w.locks[layer].Lock() w.upTracks[layer] = track w.buffers[layer] = buff w.downTracks[layer] = make([]*DownTrack, 0, 10) + w.locks[layer].Unlock() go w.writeRTP(layer) } @@ -123,31 +145,39 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { } } } + w.locks[layer].Lock() + if downTrackSubscribed(w.downTracks[layer], track) { + w.locks[layer].Unlock() + return + } track.SetInitialLayers(int64(layer), 2) track.maxSpatialLayer = 2 track.maxTemporalLayer = 2 track.trackType = SimulcastDownTrack track.payload = packetFactory.Get().([]byte) } else { + w.locks[layer].Lock() + if downTrackSubscribed(w.downTracks[layer], track) { + w.locks[layer].Unlock() + return + } track.SetInitialLayers(0, 0) track.trackType = SimpleDownTrack } - w.locks[layer].Lock() w.downTracks[layer] = append(w.downTracks[layer], track) w.locks[layer].Unlock() } func (w *WebRTCReceiver) SubDownTrack(track *DownTrack, layer int) error { w.locks[layer].Lock() - if dts := w.downTracks[layer]; dts != nil { - w.downTracks[layer] = append(dts, track) - } else { + if buf := w.buffers[layer]; buf != nil { + w.downTracks[layer] = append(w.downTracks[layer], track) w.locks[layer].Unlock() - return errNoReceiverFound + return nil } w.locks[layer].Unlock() - return nil + return errNoReceiverFound } func (w *WebRTCReceiver) GetBitrate() [3]uint64 { @@ -310,3 +340,12 @@ func (w *WebRTCReceiver) closeTracks() { w.onCloseHandler() } } + +func downTrackSubscribed(dts []*DownTrack, dt *DownTrack) bool { + for _, cdt := range dts { + if cdt == dt { + return true + } + } + return false +} diff --git a/pkg/sfu/router.go b/pkg/sfu/router.go index fa5a64e91..48c5d62cf 100644 --- a/pkg/sfu/router.go +++ b/pkg/sfu/router.go @@ -84,7 +84,6 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe publish := false trackID := track.ID() - rid := track.RID() buff, rtcpReader := r.bufferFactory.GetBufferPair(uint32(track.SSRC())) @@ -164,17 +163,10 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe } r.deleteReceiver(trackID, uint32(track.SSRC())) }) - if len(rid) == 0 || r.config.Simulcast.BestQualityFirst && rid == fullResolution || - !r.config.Simulcast.BestQualityFirst && rid == quarterResolution { - publish = true - } - } else if r.config.Simulcast.BestQualityFirst && rid == fullResolution || - !r.config.Simulcast.BestQualityFirst && rid == quarterResolution || - !r.config.Simulcast.BestQualityFirst && rid == halfResolution { publish = true } - recv.AddUpTrack(track, buff) + recv.AddUpTrack(track, buff, r.config.Simulcast.BestQualityFirst) buff.Bind(receiver.GetParameters(), buffer.Options{ MaxBitRate: r.config.MaxBandwidth,