Skip to content

Commit

Permalink
apply maxAdjustment to initial track sync
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Feb 19, 2025
1 parent 3ccd0ff commit 526cd04
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 50 deletions.
62 changes: 29 additions & 33 deletions pkg/synchronizer/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,65 +28,61 @@ type participantSynchronizer struct {
sync.Mutex

ntpStart time.Time
firstReport time.Time
tracks map[uint32]*TrackSynchronizer
senderReports map[uint32]*rtcp.SenderReport
}

func newParticipantSynchronizer() *participantSynchronizer {
return &participantSynchronizer{
tracks: make(map[uint32]*TrackSynchronizer),
senderReports: make(map[uint32]*rtcp.SenderReport),
}
}

func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) {
p.Lock()
defer p.Unlock()

if p.ntpStart.IsZero() {
if p.firstReport.IsZero() {
p.firstReport = time.Now()
}

p.senderReports[pkt.SSRC] = pkt
if len(p.senderReports) == len(p.tracks) {
p.synchronizeTracks()
if len(p.senderReports) < len(p.tracks) && time.Since(p.firstReport) < 5*time.Second {
return
}

// update ntp start time
for ssrc, report := range p.senderReports {
if t := p.tracks[ssrc]; t != nil {
pts := t.getSenderReportPTS(report)
ntpStart := mediatransportutil.NtpTime(report.NTPTime).Time().Add(-pts)
if p.ntpStart.IsZero() || ntpStart.Before(p.ntpStart) {
p.ntpStart = ntpStart
}
}
}
return
}

if t := p.tracks[pkt.SSRC]; t != nil {
t.onSenderReport(pkt, p.ntpStart)
}
}

func (p *participantSynchronizer) synchronizeTracks() {
// get estimated ntp start times for all tracks
estimatedStartTimes := make(map[uint32]time.Time)

// we will sync all tracks to the earliest
var earliestStart time.Time
for ssrc, pkt := range p.senderReports {
t := p.tracks[ssrc]
pts := t.getSenderReportPTS(pkt)
ntpStart := mediatransportutil.NtpTime(pkt.NTPTime).Time().Add(-pts)
if earliestStart.IsZero() || ntpStart.Before(earliestStart) {
earliestStart = ntpStart
}
estimatedStartTimes[ssrc] = ntpStart
}
p.ntpStart = earliestStart

// update pts delay so all ntp start times will match the earliest
for ssrc, startedAt := range estimatedStartTimes {
t := p.tracks[ssrc]
if diff := startedAt.Sub(earliestStart); diff != 0 {
t.Lock()
t.ptsOffset += diff
t.Unlock()
}
}
}

func (p *participantSynchronizer) getMaxOffset() time.Duration {
var maxOffset time.Duration

p.Lock()
for _, t := range p.tracks {
t.Lock()
if o := t.ptsOffset; o > maxOffset {
o := t.ptsOffset
t.Unlock()

if o > maxOffset {
maxOffset = o
}
t.Unlock()
}
p.Unlock()

Expand Down
6 changes: 2 additions & 4 deletions pkg/synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ func (s *Synchronizer) AddTrack(track TrackRemote, identity string) *TrackSynchr
s.Lock()
p := s.psByIdentity[identity]
if p == nil {
p = &participantSynchronizer{
tracks: make(map[uint32]*TrackSynchronizer),
senderReports: make(map[uint32]*rtcp.SenderReport),
}
p = newParticipantSynchronizer()
s.psByIdentity[identity] = p
}
ssrc := uint32(track.SSRC())
Expand Down Expand Up @@ -100,6 +97,7 @@ func (s *Synchronizer) getOrSetStartedAt(now int64) int64 {

if s.startedAt == 0 {
s.startedAt = now

if s.onStarted != nil {
s.onStarted()
}
Expand Down
34 changes: 21 additions & 13 deletions pkg/synchronizer/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

const (
ewmaWeight = 0.9
maxDrift = time.Millisecond * 15
maxAdjustment = time.Millisecond * 15
maxTSDiff = time.Minute
maxSNDropout = 3000 // max sequence number skip
uint32Half int64 = 2147483648
Expand Down Expand Up @@ -167,7 +167,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool)
for ts < t.firstTS-uint32Half {
ts += uint32Overflow
}
pts := t.getElapsed(ts) + t.ptsOffset
pts := t.getPTS(ts)
return ts, pts, true
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool)
}

// sanity check
pts := t.getElapsed(ts) + t.ptsOffset
pts := t.getPTS(ts)
if expected := time.Since(t.startedAt.Add(t.ptsOffset)); pts > expected+maxTSDiff {
// reset RTP timestamps
ts, pts = t.resetRTP(pkt, []any{
Expand All @@ -212,8 +212,8 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool)
return ts, pts, true
}

func (t *TrackSynchronizer) getElapsed(ts int64) time.Duration {
return t.rtpConverter.toDuration(ts - t.firstTS)
func (t *TrackSynchronizer) getPTS(ts int64) time.Duration {
return t.rtpConverter.toDuration(ts-t.firstTS) + t.ptsOffset
}

func (t *TrackSynchronizer) resetRTP(pkt *rtp.Packet, fields []any) (int64, time.Duration) {
Expand Down Expand Up @@ -334,31 +334,38 @@ func (t *TrackSynchronizer) getSenderReportPTSLocked(pkt *rtcp.SenderReport) tim
ts += uint32Overflow
}

return t.getElapsed(ts) + t.ptsOffset
return t.getPTS(ts)
}

// onSenderReport handles pts adjustments for a track
func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport, ntpStart time.Time) {
t.Lock()
defer t.Unlock()

// we receive every sender report twice
if pkt.RTPTime == t.lastSR {
if pkt.RTPTime == t.lastSR || t.startedAt.IsZero() {
return
}

pts := t.getSenderReportPTSLocked(pkt)
calculatedNTPStart := mediatransportutil.NtpTime(pkt.NTPTime).Time().Add(-pts)
drift := calculatedNTPStart.Sub(ntpStart)

t.adjustOffsetLocked(drift)
t.lastSR = pkt.RTPTime
}

func (t *TrackSynchronizer) adjustOffsetLocked(drift time.Duration) {
if drift == 0 {
return
}

t.stats.updateDrift(drift)
if drift > maxDrift {
drift = maxDrift
} else if drift < -maxDrift {
drift = -maxDrift
if drift > maxAdjustment {
drift = maxAdjustment
} else if drift < -maxAdjustment {
drift = -maxAdjustment
}
t.ptsOffset += drift
t.lastSR = pkt.RTPTime
}

type TrackStats struct {
Expand All @@ -371,6 +378,7 @@ func (t *TrackStats) updateDrift(drift time.Duration) {
if drift < 0 {
drift = -drift
}

t.AvgDrift = ewmaWeight*t.AvgDrift + (1-ewmaWeight)*float64(drift)
if drift > t.MaxDrift {
t.MaxDrift = drift
Expand Down

0 comments on commit 526cd04

Please sign in to comment.