From 8b149ef906fddc064cc0b0206e5170443fcb6e53 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 17 Feb 2025 17:46:19 +0800 Subject: [PATCH 1/3] add PacketsDropped to jitter buffer, avoid divide by 0 --- pkg/jitter/buffer.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/jitter/buffer.go b/pkg/jitter/buffer.go index 9e090c9..ae96790 100644 --- a/pkg/jitter/buffer.go +++ b/pkg/jitter/buffer.go @@ -236,10 +236,21 @@ func (b *Buffer) PopSamples(force bool) [][]*rtp.Packet { } } +func (b *Buffer) PacketsDropped() int { + b.mu.Lock() + defer b.mu.Unlock() + + return b.packetsDropped +} + func (b *Buffer) PacketLoss() float64 { b.mu.Lock() defer b.mu.Unlock() + if b.packetsTotal == 0 { + return 0 + } + return float64(b.packetsDropped) / float64(b.packetsTotal) } From 52045e3713063e14da73eece95304b3f551791b0 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 17 Feb 2025 18:14:35 +0800 Subject: [PATCH 2/3] more stats --- egressclient.go | 3 +- pkg/jitter/buffer.go | 79 +++++++++++++++++++++++++++++++++----------- 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/egressclient.go b/egressclient.go index 8a72c72..0dd08fc 100644 --- a/egressclient.go +++ b/egressclient.go @@ -18,9 +18,10 @@ import ( "context" "net/http" + "github.com/twitchtv/twirp" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils/xtwirp" - "github.com/twitchtv/twirp" ) type EgressClient struct { diff --git a/pkg/jitter/buffer.go b/pkg/jitter/buffer.go index ae96790..b4377d0 100644 --- a/pkg/jitter/buffer.go +++ b/pkg/jitter/buffer.go @@ -29,8 +29,7 @@ type Buffer struct { maxLate uint32 clockRate uint32 onPacketDropped func() - packetsDropped int - packetsTotal int + stats *BufferStats logger logger.Logger mu sync.Mutex @@ -45,6 +44,14 @@ type Buffer struct { minTS uint32 } +type BufferStats struct { + PacketsPushed uint64 + PacketsDropped uint64 + PaddingPackets uint64 + PacketsPopped uint64 + SamplesPopped uint64 +} + type packet struct { prev, next *packet start, end bool @@ -57,6 +64,7 @@ func NewBuffer(depacketizer rtp.Depacketizer, clockRate uint32, maxLatency time. depacketizer: depacketizer, maxLate: uint32(float64(maxLatency) / float64(time.Second) * float64(clockRate)), clockRate: clockRate, + stats: &BufferStats{}, logger: logger.LogRLogger(logr.Discard()), } for _, opt := range opts { @@ -78,7 +86,7 @@ func (b *Buffer) Push(pkt *rtp.Packet) { b.mu.Lock() defer b.mu.Unlock() - b.packetsTotal++ + b.stats.PacketsPushed++ var start, end, padding bool if len(pkt.Payload) == 0 { // drop padding packets from the beginning of the stream @@ -121,7 +129,7 @@ func (b *Buffer) Push(pkt *rtp.Packet) { } else if beforePrev && !outsidePrevRange { // drop if packet comes before previously pushed packet if !p.padding { - b.packetsDropped++ + b.stats.PacketsDropped++ if b.onPacketDropped != nil { b.onPacketDropped() } @@ -236,22 +244,28 @@ func (b *Buffer) PopSamples(force bool) [][]*rtp.Packet { } } -func (b *Buffer) PacketsDropped() int { +func (b *Buffer) Stats() *BufferStats { b.mu.Lock() defer b.mu.Unlock() - return b.packetsDropped + return &BufferStats{ + PacketsPushed: b.stats.PacketsPushed, + PacketsDropped: b.stats.PacketsDropped, + PaddingPackets: b.stats.PaddingPackets, + PacketsPopped: b.stats.PacketsPopped, + SamplesPopped: b.stats.SamplesPopped, + } } func (b *Buffer) PacketLoss() float64 { b.mu.Lock() defer b.mu.Unlock() - if b.packetsTotal == 0 { + if b.stats.PacketsPushed == 0 { return 0 } - return float64(b.packetsDropped) / float64(b.packetsTotal) + return float64(b.stats.PacketsDropped) / float64(b.stats.PacketsPushed) } func (b *Buffer) forcePop() []*rtp.Packet { @@ -260,7 +274,13 @@ func (b *Buffer) forcePop() []*rtp.Packet { var next *packet for c := b.head; c != nil; c = next { next = c.next - packets = append(packets, c.packet) + if !c.padding { + packets = append(packets, c.packet) + b.stats.PacketsPopped++ + } + if c.end { + b.stats.SamplesPopped++ + } b.free(c) } @@ -276,15 +296,24 @@ func (b *Buffer) forcePopSamples() [][]*rtp.Packet { var next *packet for c := b.head; c != nil; c = next { next = c.next + if c.start && len(sample) > 0 { + b.stats.SamplesPopped++ packets = append(packets, sample) sample = make([]*rtp.Packet, 0) } - sample = append(sample, c.packet) + + if !c.padding { + sample = append(sample, c.packet) + b.stats.PacketsPopped++ + } + if c.end { + b.stats.SamplesPopped++ packets = append(packets, sample) sample = make([]*rtp.Packet, 0) } + b.free(c) } @@ -312,9 +341,6 @@ func (b *Buffer) pop() []*rtp.Packet { var next *packet for c := b.head; ; c = next { next = c.next - if !c.padding { - packets = append(packets, c.packet) - } if next != nil { if outsideRange(next.packet.SequenceNumber, c.packet.SequenceNumber) { // adjust minTS to account for sequence number reset @@ -322,6 +348,15 @@ func (b *Buffer) pop() []*rtp.Packet { } next.prev = nil } + + if !c.padding { + packets = append(packets, c.packet) + b.stats.PacketsPopped++ + } + if c.end { + b.stats.SamplesPopped++ + } + if c == end { b.prevSN = c.packet.SequenceNumber b.head = next @@ -331,6 +366,7 @@ func (b *Buffer) pop() []*rtp.Packet { b.free(c) return packets } + b.free(c) } } @@ -355,9 +391,6 @@ func (b *Buffer) popSamples() [][]*rtp.Packet { var next *packet for c := b.head; ; c = next { next = c.next - if !c.padding { - sample = append(sample, c.packet) - } if next != nil { if outsideRange(next.packet.SequenceNumber, c.packet.SequenceNumber) { // adjust minTS to account for sequence number reset @@ -365,10 +398,17 @@ func (b *Buffer) popSamples() [][]*rtp.Packet { } next.prev = nil } + + if !c.padding { + sample = append(sample, c.packet) + b.stats.PacketsPopped++ + } if c.end { + b.stats.SamplesPopped++ packets = append(packets, sample) sample = make([]*rtp.Packet, 0) } + if c == end { b.prevSN = c.packet.SequenceNumber b.head = next @@ -378,6 +418,7 @@ func (b *Buffer) popSamples() [][]*rtp.Packet { b.free(c) return packets } + b.free(c) } } @@ -419,13 +460,13 @@ func (b *Buffer) drop() { // lost packets will now be too old even if we receive them // on sequence number reset, skip callback because we don't know whether we lost any if !b.head.reset { - b.packetsDropped++ dropped = true + b.stats.PacketsDropped++ } for b.head != nil && !b.head.start && before32(b.head.packet.Timestamp-b.maxSampleSize, b.minTS) { dropped = true - b.packetsDropped++ + b.stats.PacketsDropped++ b.prevSN = b.head.packet.SequenceNumber - 1 b.dropHead() } @@ -446,7 +487,7 @@ func (b *Buffer) drop() { count := 0 ts := c.packet.Timestamp for { - b.packetsDropped++ + b.stats.PacketsDropped++ count++ b.dropHead() c = b.head From cfb1b7bf6ce6423253273c9a624396021a9e6b3c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 17 Feb 2025 18:17:51 +0800 Subject: [PATCH 3/3] add lock around PopSamples --- pkg/jitter/buffer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/jitter/buffer.go b/pkg/jitter/buffer.go index b4377d0..e8bcee9 100644 --- a/pkg/jitter/buffer.go +++ b/pkg/jitter/buffer.go @@ -46,8 +46,8 @@ type Buffer struct { type BufferStats struct { PacketsPushed uint64 + PaddingPushed uint64 PacketsDropped uint64 - PaddingPackets uint64 PacketsPopped uint64 SamplesPopped uint64 } @@ -87,6 +87,10 @@ func (b *Buffer) Push(pkt *rtp.Packet) { defer b.mu.Unlock() b.stats.PacketsPushed++ + if pkt.Padding { + b.stats.PaddingPushed++ + } + var start, end, padding bool if len(pkt.Payload) == 0 { // drop padding packets from the beginning of the stream @@ -237,6 +241,9 @@ func (b *Buffer) Pop(force bool) []*rtp.Packet { } func (b *Buffer) PopSamples(force bool) [][]*rtp.Packet { + b.mu.Lock() + defer b.mu.Unlock() + if force { return b.forcePopSamples() } else { @@ -251,7 +258,7 @@ func (b *Buffer) Stats() *BufferStats { return &BufferStats{ PacketsPushed: b.stats.PacketsPushed, PacketsDropped: b.stats.PacketsDropped, - PaddingPackets: b.stats.PaddingPackets, + PaddingPushed: b.stats.PaddingPushed, PacketsPopped: b.stats.PacketsPopped, SamplesPopped: b.stats.SamplesPopped, }