Skip to content

Commit

Permalink
Feat: correct timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
zijiren233 committed Oct 5, 2023
1 parent 7893feb commit eecb81a
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 45 deletions.
12 changes: 11 additions & 1 deletion av/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ type Packet struct {
Data []byte
}

func (p *Packet) NewPacketData() *Packet {
func (p *Packet) Type() int {
if p.IsVideo {
return TAG_VIDEO
} else if p.IsMetadata {
return TAG_SCRIPTDATAAMF0
} else {
return TAG_AUDIO
}
}

func (p *Packet) Clone() *Packet {
var tp = *p
tp.Data = make([]byte, len(p.Data))
copy(tp.Data, p.Data)
Expand Down
10 changes: 4 additions & 6 deletions container/flv/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
)

type Writer struct {
t *utils.Timestamp
t utils.Timestamp
headerBuf []byte
w *stream.Writer
inited bool
Expand All @@ -43,7 +43,6 @@ func WithWriterBuffer(size int) WriterConf {

func NewWriter(w io.Writer, conf ...WriterConf) *Writer {
writer := &Writer{
t: utils.NewTimestamp(),
headerBuf: make([]byte, headerLen),
bufSize: 1024,
}
Expand Down Expand Up @@ -75,7 +74,7 @@ func (w *Writer) Write(p *av.Packet) error {
} else if p.IsMetadata {
var err error
typeID = av.TAG_SCRIPTDATAAMF0
p = p.NewPacketData()
p = p.Clone()
p.Data, err = amf.MetaDataReform(p.Data, amf.DEL)
if err != nil {
return err
Expand All @@ -86,16 +85,15 @@ func (w *Writer) Write(p *av.Packet) error {
return nil
}
dataLen := len(p.Data)
timestamp := p.TimeStamp + w.t.BaseTimeStamp()
w.t.RecTimeStamp(timestamp, uint32(typeID))
timestamp := w.t.RecTimeStamp(p.TimeStamp, uint32(typeID))

preDataLen := dataLen + headerLen
timestampExt := timestamp >> 24

return w.w.
U8(typeID).
U24(uint32(dataLen)).
U24(uint32(timestamp)).
U24(timestamp).
U8(uint8(timestampExt)).
U24(0).
Bytes(p.Data).
Expand Down
5 changes: 4 additions & 1 deletion protocol/hls/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/zijiren233/livelib/container/flv"
"github.com/zijiren233/livelib/container/ts"
"github.com/zijiren233/livelib/protocol/hls/parser"
"github.com/zijiren233/livelib/utils"
)

const (
Expand All @@ -23,6 +24,7 @@ const (

type Source struct {
seq int
t utils.Timestamp
bwriter *bytes.Buffer
btswriter *bytes.Buffer
demuxer *flv.Demuxer
Expand Down Expand Up @@ -79,7 +81,7 @@ func (source *Source) SendPacket() error {
if p.IsMetadata {
continue
}
p = p.NewPacketData()
p = p.Clone()
err := source.demuxer.Demux(p)
if err != nil {
if err == flv.ErrAvcEndSEQ {
Expand All @@ -92,6 +94,7 @@ func (source *Source) SendPacket() error {
if err != nil || isSeq {
continue
}
p.TimeStamp = source.t.RecTimeStamp(p.TimeStamp, uint32(p.Type()))
if source.btswriter != nil {
source.stat.update(p.IsVideo, p.TimeStamp)
source.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime))
Expand Down
13 changes: 5 additions & 8 deletions protocol/httpflv/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
)

type HttpFlvWriter struct {
t *utils.Timestamp
t utils.Timestamp
headerBuf []byte
w *stream.Writer
inited bool
Expand All @@ -41,7 +41,6 @@ func WithWriterBuffer(size int) HttpFlvWriterConf {

func NewHttpFLVWriter(w io.Writer, conf ...HttpFlvWriterConf) *HttpFlvWriter {
writer := &HttpFlvWriter{
t: utils.NewTimestamp(),
headerBuf: make([]byte, headerLen),
bufSize: 1024,
packetQueue: make(chan *av.Packet, maxQueueNum),
Expand Down Expand Up @@ -73,6 +72,7 @@ func (w *HttpFlvWriter) Write(p *av.Packet) (err error) {
}

func (w *HttpFlvWriter) SendPacket() error {
var typeID uint8
for p := range w.packetQueue {
if !w.inited {
if err := w.w.Bytes(flv.FlvFirstHeader).Error(); err != nil {
Expand All @@ -81,14 +81,12 @@ func (w *HttpFlvWriter) SendPacket() error {
w.inited = true
}

var typeID uint8

if p.IsVideo {
typeID = av.TAG_VIDEO
} else if p.IsMetadata {
var err error
typeID = av.TAG_SCRIPTDATAAMF0
p = p.NewPacketData()
p = p.Clone()
p.Data, err = amf.MetaDataReform(p.Data, amf.DEL)
if err != nil {
return err
Expand All @@ -99,16 +97,15 @@ func (w *HttpFlvWriter) SendPacket() error {
return errors.New("not allowed packet type")
}
dataLen := len(p.Data)
timestamp := p.TimeStamp + w.t.BaseTimeStamp()
w.t.RecTimeStamp(timestamp, uint32(typeID))
timestamp := w.t.RecTimeStamp(p.TimeStamp, uint32(typeID))

preDataLen := dataLen + headerLen
timestampExt := timestamp >> 24

if err := w.w.
U8(typeID).
U24(uint32(dataLen)).
U24(uint32(timestamp)).
U24(timestamp).
U8(uint8(timestampExt)).
U24(0).
Bytes(p.Data).
Expand Down
19 changes: 4 additions & 15 deletions protocol/rtmp/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type Writer struct {
t *utils.Timestamp
t utils.Timestamp
conn ChunkWriter
packetQueue chan *av.Packet
WriteBWInfo StaticsBW
Expand All @@ -24,7 +24,6 @@ type Writer struct {
func NewWriter(conn ChunkWriter) *Writer {
w := &Writer{
conn: conn,
t: utils.NewTimestamp(),
packetQueue: make(chan *av.Packet, maxQueueNum),
WriteBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0},
}
Expand Down Expand Up @@ -79,21 +78,11 @@ func (w *Writer) SendPacket() error {
cs.Data = p.Data
cs.Length = uint32(len(p.Data))
cs.StreamID = p.StreamID
cs.Timestamp = p.TimeStamp
cs.Timestamp += w.t.BaseTimeStamp()

if p.IsVideo {
cs.TypeID = av.TAG_VIDEO
} else {
if p.IsMetadata {
cs.TypeID = av.TAG_SCRIPTDATAAMF0
} else {
cs.TypeID = av.TAG_AUDIO
}
}

cs.TypeID = uint32(p.Type())

w.SaveStatics(p.StreamID, uint64(cs.Length), p.IsVideo)
w.t.RecTimeStamp(cs.Timestamp, cs.TypeID)
cs.Timestamp = w.t.RecTimeStamp(p.TimeStamp, cs.TypeID)
if err := w.conn.Write(cs); err != nil {
return err
}
Expand Down
40 changes: 26 additions & 14 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,43 @@ package utils
import "github.com/zijiren233/livelib/av"

type Timestamp struct {
t uint32
videoTimestamp uint32
audioTimestamp uint32
lastVideoTimestamp uint32
lastAudioTimestamp uint32
}

func NewTimestamp() *Timestamp {
return new(Timestamp)
}

func (rw *Timestamp) BaseTimeStamp() uint32 {
return rw.t
}

func (rw *Timestamp) CalcBaseTimestamp() {
if rw.lastAudioTimestamp > rw.lastVideoTimestamp {
rw.t = rw.lastAudioTimestamp
func (rw *Timestamp) timeStamp() uint32 {
if rw.audioTimestamp > rw.videoTimestamp {
return rw.audioTimestamp
} else {
rw.t = rw.lastVideoTimestamp
return rw.videoTimestamp
}
}

func (rw *Timestamp) RecTimeStamp(timestamp, typeID uint32) {
func (rw *Timestamp) RecTimeStamp(timestamp, typeID uint32) uint32 {
if typeID == av.TAG_VIDEO {
if timestamp < rw.videoTimestamp {
if rw.lastVideoTimestamp > timestamp {
rw.videoTimestamp += timestamp
} else {
rw.videoTimestamp += timestamp - rw.lastVideoTimestamp
}
} else {
rw.videoTimestamp = timestamp
}
rw.lastVideoTimestamp = timestamp
} else if typeID == av.TAG_AUDIO {
if timestamp < rw.audioTimestamp {
if rw.lastAudioTimestamp > timestamp {
rw.audioTimestamp += timestamp
} else {
rw.audioTimestamp += timestamp - rw.lastAudioTimestamp
}
} else {
rw.audioTimestamp = timestamp
}
rw.lastAudioTimestamp = timestamp
}
return rw.timeStamp()
}

0 comments on commit eecb81a

Please sign in to comment.