Skip to content

Commit

Permalink
fix(sfu): will move layers as they arrive (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
OrlandoCo authored Mar 15, 2021
1 parent ba98f1c commit 003fae8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/sfu/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func NewPublisher(session *Session, id string, cfg WebRTCTransportConfig) (*Publ
p := &Publisher{
id: id,
pc: pc,
session: session,
router: newRouter(id, pc, session, cfg.router),
session: session,
}

pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
Expand Down
55 changes: 47 additions & 8 deletions pkg/sfu/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Receiver interface {
Codec() webrtc.RTPCodecParameters
Kind() webrtc.RTPCodecType
SSRC(layer int) uint32
AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer)
AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer, bestQualityFirst bool)
AddDownTrack(track *DownTrack, bestQualityFirst bool)
SubDownTrack(track *DownTrack, layer int) error
GetBitrate() [3]uint64
Expand Down Expand Up @@ -94,7 +94,7 @@ func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType {
return w.kind
}

func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer) {
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool) {
var layer int

switch track.RID() {
Expand All @@ -106,9 +106,31 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
layer = 0
}

if w.isSimulcast {
if bestQualityFirst {
for l := 0; l < layer; l++ {
w.locks[l].Lock()
for _, dt := range w.downTracks[l] {
dt.SwitchSpatialLayer(int64(layer), false)
}
w.locks[l].Unlock()
}
} else {
for l := 2; l != layer; l-- {
w.locks[l].Lock()
for _, dt := range w.downTracks[l] {
dt.SwitchSpatialLayer(int64(layer), false)
}
w.locks[l].Unlock()
}
}
}

w.locks[layer].Lock()
w.upTracks[layer] = track
w.buffers[layer] = buff
w.downTracks[layer] = make([]*DownTrack, 0, 10)
w.locks[layer].Unlock()
go w.writeRTP(layer)
}

Expand All @@ -123,31 +145,39 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) {
}
}
}
w.locks[layer].Lock()
if downTrackSubscribed(w.downTracks[layer], track) {
w.locks[layer].Unlock()
return
}
track.SetInitialLayers(int64(layer), 2)
track.maxSpatialLayer = 2
track.maxTemporalLayer = 2
track.trackType = SimulcastDownTrack
track.payload = packetFactory.Get().([]byte)
} else {
w.locks[layer].Lock()
if downTrackSubscribed(w.downTracks[layer], track) {
w.locks[layer].Unlock()
return
}
track.SetInitialLayers(0, 0)
track.trackType = SimpleDownTrack
}

w.locks[layer].Lock()
w.downTracks[layer] = append(w.downTracks[layer], track)
w.locks[layer].Unlock()
}

func (w *WebRTCReceiver) SubDownTrack(track *DownTrack, layer int) error {
w.locks[layer].Lock()
if dts := w.downTracks[layer]; dts != nil {
w.downTracks[layer] = append(dts, track)
} else {
if buf := w.buffers[layer]; buf != nil {
w.downTracks[layer] = append(w.downTracks[layer], track)
w.locks[layer].Unlock()
return errNoReceiverFound
return nil
}
w.locks[layer].Unlock()
return nil
return errNoReceiverFound
}

func (w *WebRTCReceiver) GetBitrate() [3]uint64 {
Expand Down Expand Up @@ -310,3 +340,12 @@ func (w *WebRTCReceiver) closeTracks() {
w.onCloseHandler()
}
}

func downTrackSubscribed(dts []*DownTrack, dt *DownTrack) bool {
for _, cdt := range dts {
if cdt == dt {
return true
}
}
return false
}
10 changes: 1 addition & 9 deletions pkg/sfu/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe

publish := false
trackID := track.ID()
rid := track.RID()

buff, rtcpReader := r.bufferFactory.GetBufferPair(uint32(track.SSRC()))

Expand Down Expand Up @@ -164,17 +163,10 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe
}
r.deleteReceiver(trackID, uint32(track.SSRC()))
})
if len(rid) == 0 || r.config.Simulcast.BestQualityFirst && rid == fullResolution ||
!r.config.Simulcast.BestQualityFirst && rid == quarterResolution {
publish = true
}
} else if r.config.Simulcast.BestQualityFirst && rid == fullResolution ||
!r.config.Simulcast.BestQualityFirst && rid == quarterResolution ||
!r.config.Simulcast.BestQualityFirst && rid == halfResolution {
publish = true
}

recv.AddUpTrack(track, buff)
recv.AddUpTrack(track, buff, r.config.Simulcast.BestQualityFirst)

buff.Bind(receiver.GetParameters(), buffer.Options{
MaxBitRate: r.config.MaxBandwidth,
Expand Down

0 comments on commit 003fae8

Please sign in to comment.