From 526cd047871c1d4ec3a0218b7df0f86237ad6df9 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 19 Feb 2025 18:21:36 +0800 Subject: [PATCH] apply maxAdjustment to initial track sync --- pkg/synchronizer/participant.go | 62 +++++++++++++++----------------- pkg/synchronizer/synchronizer.go | 6 ++-- pkg/synchronizer/track.go | 34 +++++++++++------- 3 files changed, 52 insertions(+), 50 deletions(-) diff --git a/pkg/synchronizer/participant.go b/pkg/synchronizer/participant.go index dee5ab1a..71e4bfa0 100644 --- a/pkg/synchronizer/participant.go +++ b/pkg/synchronizer/participant.go @@ -28,20 +28,42 @@ type participantSynchronizer struct { sync.Mutex ntpStart time.Time + firstReport time.Time tracks map[uint32]*TrackSynchronizer senderReports map[uint32]*rtcp.SenderReport } +func newParticipantSynchronizer() *participantSynchronizer { + return &participantSynchronizer{ + tracks: make(map[uint32]*TrackSynchronizer), + senderReports: make(map[uint32]*rtcp.SenderReport), + } +} + func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) { p.Lock() defer p.Unlock() if p.ntpStart.IsZero() { + if p.firstReport.IsZero() { + p.firstReport = time.Now() + } + p.senderReports[pkt.SSRC] = pkt - if len(p.senderReports) == len(p.tracks) { - p.synchronizeTracks() + if len(p.senderReports) < len(p.tracks) && time.Since(p.firstReport) < 5*time.Second { + return + } + + // update ntp start time + for ssrc, report := range p.senderReports { + if t := p.tracks[ssrc]; t != nil { + pts := t.getSenderReportPTS(report) + ntpStart := mediatransportutil.NtpTime(report.NTPTime).Time().Add(-pts) + if p.ntpStart.IsZero() || ntpStart.Before(p.ntpStart) { + p.ntpStart = ntpStart + } + } } - return } if t := p.tracks[pkt.SSRC]; t != nil { @@ -49,44 +71,18 @@ func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) { } } -func (p *participantSynchronizer) synchronizeTracks() { - // get estimated ntp start times for all tracks - estimatedStartTimes := make(map[uint32]time.Time) - - // we will sync all tracks to the earliest - var earliestStart time.Time - for ssrc, pkt := range p.senderReports { - t := p.tracks[ssrc] - pts := t.getSenderReportPTS(pkt) - ntpStart := mediatransportutil.NtpTime(pkt.NTPTime).Time().Add(-pts) - if earliestStart.IsZero() || ntpStart.Before(earliestStart) { - earliestStart = ntpStart - } - estimatedStartTimes[ssrc] = ntpStart - } - p.ntpStart = earliestStart - - // update pts delay so all ntp start times will match the earliest - for ssrc, startedAt := range estimatedStartTimes { - t := p.tracks[ssrc] - if diff := startedAt.Sub(earliestStart); diff != 0 { - t.Lock() - t.ptsOffset += diff - t.Unlock() - } - } -} - func (p *participantSynchronizer) getMaxOffset() time.Duration { var maxOffset time.Duration p.Lock() for _, t := range p.tracks { t.Lock() - if o := t.ptsOffset; o > maxOffset { + o := t.ptsOffset + t.Unlock() + + if o > maxOffset { maxOffset = o } - t.Unlock() } p.Unlock() diff --git a/pkg/synchronizer/synchronizer.go b/pkg/synchronizer/synchronizer.go index d626d9ca..8f060fe6 100644 --- a/pkg/synchronizer/synchronizer.go +++ b/pkg/synchronizer/synchronizer.go @@ -49,10 +49,7 @@ func (s *Synchronizer) AddTrack(track TrackRemote, identity string) *TrackSynchr s.Lock() p := s.psByIdentity[identity] if p == nil { - p = &participantSynchronizer{ - tracks: make(map[uint32]*TrackSynchronizer), - senderReports: make(map[uint32]*rtcp.SenderReport), - } + p = newParticipantSynchronizer() s.psByIdentity[identity] = p } ssrc := uint32(track.SSRC()) @@ -100,6 +97,7 @@ func (s *Synchronizer) getOrSetStartedAt(now int64) int64 { if s.startedAt == 0 { s.startedAt = now + if s.onStarted != nil { s.onStarted() } diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index bd6d6335..c2105e12 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -31,7 +31,7 @@ import ( const ( ewmaWeight = 0.9 - maxDrift = time.Millisecond * 15 + maxAdjustment = time.Millisecond * 15 maxTSDiff = time.Minute maxSNDropout = 3000 // max sequence number skip uint32Half int64 = 2147483648 @@ -167,7 +167,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool) for ts < t.firstTS-uint32Half { ts += uint32Overflow } - pts := t.getElapsed(ts) + t.ptsOffset + pts := t.getPTS(ts) return ts, pts, true } @@ -198,7 +198,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool) } // sanity check - pts := t.getElapsed(ts) + t.ptsOffset + pts := t.getPTS(ts) if expected := time.Since(t.startedAt.Add(t.ptsOffset)); pts > expected+maxTSDiff { // reset RTP timestamps ts, pts = t.resetRTP(pkt, []any{ @@ -212,8 +212,8 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool) return ts, pts, true } -func (t *TrackSynchronizer) getElapsed(ts int64) time.Duration { - return t.rtpConverter.toDuration(ts - t.firstTS) +func (t *TrackSynchronizer) getPTS(ts int64) time.Duration { + return t.rtpConverter.toDuration(ts-t.firstTS) + t.ptsOffset } func (t *TrackSynchronizer) resetRTP(pkt *rtp.Packet, fields []any) (int64, time.Duration) { @@ -334,7 +334,7 @@ func (t *TrackSynchronizer) getSenderReportPTSLocked(pkt *rtcp.SenderReport) tim ts += uint32Overflow } - return t.getElapsed(ts) + t.ptsOffset + return t.getPTS(ts) } // onSenderReport handles pts adjustments for a track @@ -342,8 +342,7 @@ func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport, ntpStart time t.Lock() defer t.Unlock() - // we receive every sender report twice - if pkt.RTPTime == t.lastSR { + if pkt.RTPTime == t.lastSR || t.startedAt.IsZero() { return } @@ -351,14 +350,22 @@ func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport, ntpStart time calculatedNTPStart := mediatransportutil.NtpTime(pkt.NTPTime).Time().Add(-pts) drift := calculatedNTPStart.Sub(ntpStart) + t.adjustOffsetLocked(drift) + t.lastSR = pkt.RTPTime +} + +func (t *TrackSynchronizer) adjustOffsetLocked(drift time.Duration) { + if drift == 0 { + return + } + t.stats.updateDrift(drift) - if drift > maxDrift { - drift = maxDrift - } else if drift < -maxDrift { - drift = -maxDrift + if drift > maxAdjustment { + drift = maxAdjustment + } else if drift < -maxAdjustment { + drift = -maxAdjustment } t.ptsOffset += drift - t.lastSR = pkt.RTPTime } type TrackStats struct { @@ -371,6 +378,7 @@ func (t *TrackStats) updateDrift(drift time.Duration) { if drift < 0 { drift = -drift } + t.AvgDrift = ewmaWeight*t.AvgDrift + (1-ewmaWeight)*float64(drift) if drift > t.MaxDrift { t.MaxDrift = drift