Skip to content

Commit

Permalink
Updated writer
Browse files Browse the repository at this point in the history
  • Loading branch information
djthorpe committed Jun 30, 2024
1 parent c932e17 commit 80bf31d
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 55 deletions.
18 changes: 10 additions & 8 deletions pkg/ffmpeg/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ffmpeg
import (
"encoding/json"
"errors"
"fmt"
"io"
"syscall"

Expand All @@ -21,16 +20,19 @@ type Encoder struct {
ctx *ff.AVCodecContext
stream *ff.AVStream
packet *ff.AVPacket

// We are flushing the encoder
eof bool
//next_pts int64
}

// EncoderFrameFn is a function which is called to receive a frame to encode. It should
// return nil to continue encoding or io.EOF to stop encoding.
type EncoderFrameFn func(int) (*ff.AVFrame, error)

// EncoderPacketFn is a function which is called for each packet encoded. It should
// return nil to continue encoding or io.EOF to stop encoding immediately.
type EncoderPacketFn func(*ff.AVPacket) error
// EncoderPacketFn is a function which is called for each packet encoded, with
// the stream timebase.
type EncoderPacketFn func(*ff.AVPacket, *ff.AVRational) error

////////////////////////////////////////////////////////////////////////////////
// LIFECYCLE
Expand Down Expand Up @@ -178,8 +180,9 @@ func (e *Encoder) Par() *Par {
// PRIVATE METHODS

func (e *Encoder) encode(frame *ff.AVFrame, fn EncoderPacketFn) error {
timebase := e.stream.TimeBase()

// Send the frame to the encoder
fmt.Println("Sending frame", frame)
if err := ff.AVCodec_send_frame(e.ctx, frame); err != nil {
if errors.Is(err, syscall.EAGAIN) || errors.Is(err, io.EOF) {
return nil
Expand All @@ -191,7 +194,6 @@ func (e *Encoder) encode(frame *ff.AVFrame, fn EncoderPacketFn) error {
var result error
for {
// Receive the packet
fmt.Println("Receiving packet")
if err := ff.AVCodec_receive_packet(e.ctx, e.packet); errors.Is(err, syscall.EAGAIN) || errors.Is(err, io.EOF) {
// Finished receiving packet or EOF
break
Expand All @@ -200,7 +202,7 @@ func (e *Encoder) encode(frame *ff.AVFrame, fn EncoderPacketFn) error {
}

// Pass back to the caller
if err := fn(e.packet); errors.Is(err, io.EOF) {
if err := fn(e.packet, &timebase); errors.Is(err, io.EOF) {
// End early, return EOF
result = io.EOF
break
Expand All @@ -214,7 +216,7 @@ func (e *Encoder) encode(frame *ff.AVFrame, fn EncoderPacketFn) error {

// Flush
if result == nil {
result = fn(nil)
result = fn(nil, &timebase)
}

// Return success or EOF
Expand Down
35 changes: 35 additions & 0 deletions pkg/ffmpeg/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ffmpeg

import (
"encoding/json"
)

////////////////////////////////////////////////////////////////////////////////
// TYPES

type Metadata struct {
Key string `json:"key" writer:",width:30"`
Value any `json:"value,omitempty" writer:",wrap,width:50"`
}

const (
MetaArtwork = "artwork" // Metadata key for artwork, set the value as []byte
)

////////////////////////////////////////////////////////////////////////////////
// LIFECYCLE

func NewMetadata(key string, value any) *Metadata {
return &Metadata{
Key: key,
Value: value,
}
}

////////////////////////////////////////////////////////////////////////////////
// STRINIGY

func (m *Metadata) String() string {
data, _ := json.MarshalIndent(m, "", " ")
return string(data)
}
22 changes: 15 additions & 7 deletions pkg/ffmpeg/opts.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package ffmpeg

import (
// Namespace imports
// Package imports
ffmpeg "github.com/mutablelogic/go-media/sys/ffmpeg61"

// Namespace imports
. "github.com/djthorpe/go-errors"
ffmpeg "github.com/mutablelogic/go-media/sys/ffmpeg61"
)

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -17,11 +18,10 @@ type opts struct {
force bool
par *Par

// Format options
oformat *ffmpeg.AVOutputFormat

// Stream options
streams map[int]*Par
// Writer options
oformat *ffmpeg.AVOutputFormat
streams map[int]*Par
metadata []*Metadata
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -103,6 +103,14 @@ func OptForce() Opt {
}
}

// Append metadata to the output file, including artwork
func OptMetadata(entry ...*Metadata) Opt {
return func(o *opts) error {
o.metadata = append(o.metadata, entry...)
return nil
}
}

// Pixel format of the output frame
func OptPixFormat(format string) Opt {
return func(o *opts) error {
Expand Down
105 changes: 89 additions & 16 deletions pkg/ffmpeg/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,40 @@ const (
//////////////////////////////////////////////////////////////////////////////
// LIFECYCLE

// Create a new writer with a URL and options
func Create(url string, opt ...Opt) (*Writer, error) {
options := newOpts()
writer := new(Writer)

// Apply options
for _, opt := range opt {
if err := opt(options); err != nil {
return nil, err
}
}

// Guess the output format
var ofmt *ff.AVOutputFormat
if options.oformat == nil && url != "" {
options.oformat = ff.AVFormat_guess_format("", url, "")
}
if options.oformat == nil {
return nil, ErrBadParameter.With("unable to guess the output format")
}

// Allocate the output media context
ctx, err := ff.AVFormat_create_file(url, ofmt)
if err != nil {
return nil, err
} else {
writer.output = ctx
}

// Continue with open
return writer.open(options)
}

// Create a new writer with an io.Writer and options
func NewWriter(w io.Writer, opt ...Opt) (*Writer, error) {
options := newOpts()
writer := new(Writer)
Expand Down Expand Up @@ -68,6 +102,11 @@ func NewWriter(w io.Writer, opt ...Opt) (*Writer, error) {
writer.output = ctx
}

// Continue with open
return writer.open(options)
}

func (writer *Writer) open(options *opts) (*Writer, error) {
// Create codec contexts for each stream
var result error
keys := sort.IntSlice(maps.Keys(options.streams))
Expand All @@ -86,7 +125,25 @@ func NewWriter(w io.Writer, opt ...Opt) (*Writer, error) {
return nil, errors.Join(result, writer.Close())
}

// Write the header
// Add metadata
metadata := ff.AVUtil_dict_alloc()
if metadata == nil {
return nil, errors.Join(errors.New("unable to allocate metadata dictionary"), writer.Close())
}
for _, entry := range options.metadata {
// Ignore artwork fields
if entry.Key == MetaArtwork || entry.Key == "" || entry.Value == nil {
continue
}
// Set dictionary entry
if err := ff.AVUtil_dict_set(metadata, entry.Key, fmt.Sprint(entry.Value), ff.AV_DICT_APPEND); err != nil {
return nil, errors.Join(err, writer.Close())
}
}

// Set metadata, write the header
// Metadata ownership is transferred to the output context
writer.output.SetMetadata(metadata)
if err := ff.AVFormat_write_header(writer.output, nil); err != nil {
return nil, errors.Join(err, writer.Close())
} else {
Expand All @@ -97,6 +154,7 @@ func NewWriter(w io.Writer, opt ...Opt) (*Writer, error) {
return writer, nil
}

// Close a writer and release resources
func (w *Writer) Close() error {
var result error

Expand Down Expand Up @@ -148,7 +206,9 @@ func (w *Writer) Encode(in EncoderFrameFn, out EncoderPacketFn) error {
}
if out == nil {
// By default, write packet to output
out = w.Write
out = func(pkt *ff.AVPacket, tb *ff.AVRational) error {
return w.Write(pkt)
}
}

// Initialise encoders
Expand All @@ -159,29 +219,41 @@ func (w *Writer) Encode(in EncoderFrameFn, out EncoderPacketFn) error {
return ErrBadParameter.Withf("duplicate stream %v", stream)
}
encoders[stream] = encoder

// Initialize the encoder
encoder.eof = false
}

// Continue until all encoders have returned io.EOF
// Continue until all encoders have returned io.EOF and have been flushed
for {
// No more encoding to do
if len(encoders) == 0 {
break
}

// TODO: We get the encoder with the lowest timestamp
for stream, encoder := range encoders {
// Receive a frame for the encoder
frame, err := in(stream)
if errors.Is(err, io.EOF) {
fmt.Println("EOF for frame on stream", stream)
delete(encoders, stream)
} else if err != nil {
var frame *ff.AVFrame
var err error

// Receive a frame if not EOF
if !encoder.eof {
frame, err = in(stream)
if errors.Is(err, io.EOF) {
encoder.eof = true
} else if err != nil {
return fmt.Errorf("stream %v: %w", stream, err)
}
}

// Send a frame for encoding
if err := encoder.Encode(frame, out); err != nil {
return fmt.Errorf("stream %v: %w", stream, err)
} else if frame == nil {
return fmt.Errorf("stream %v: nil frame received", stream)
} else if err := encoder.Encode(frame, out); errors.Is(err, io.EOF) {
fmt.Println("EOF for packet on stream", stream)
}

// If eof then delete the encoder
if encoder.eof {
delete(encoders, stream)
} else if err != nil {
return fmt.Errorf("stream %v: %w", stream, err)
}
}
}
Expand All @@ -190,7 +262,8 @@ func (w *Writer) Encode(in EncoderFrameFn, out EncoderPacketFn) error {
return nil
}

// Write a packet to the output
// Write a packet to the output. If you intercept the packets in the
// Encode method, then you can use this method to write packets to the output.
func (w *Writer) Write(packet *ff.AVPacket) error {
return ff.AVCodec_interleaved_write_frame(w.output, packet)
}
Expand Down
60 changes: 53 additions & 7 deletions pkg/ffmpeg/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func Test_writer_001(t *testing.T) {
// Create a writer with an audio stream
writer, err := ffmpeg.NewWriter(w,
ffmpeg.OptOutputFormat(w.Name()),
ffmpeg.OptMetadata(ffmpeg.NewMetadata("title", t.Name())),
ffmpeg.OptAudioStream(1, ffmpeg.AudioPar("fltp", "mono", 22050)),
)
if !assert.NoError(err) {
Expand All @@ -40,21 +41,66 @@ func Test_writer_001(t *testing.T) {
defer audio.Close()

// Write frames
n := 0
duration := 1000 * time.Minute
assert.NoError(writer.Encode(func(stream int) (*ff.AVFrame, error) {
frame := audio.Frame()
t.Log("Frame s", frame.Time().Truncate(time.Millisecond))
if frame.Time() > 10*time.Second {
if frame.Time() >= duration {
return nil, io.EOF
} else {
t.Log("Frame s", frame.Time().Truncate(time.Millisecond))
return frame.(*ffmpeg.Frame).AVFrame(), nil
}
}, func(packet *ff.AVPacket) error {
}, func(packet *ff.AVPacket, timebase *ff.AVRational) error {
if packet != nil {
t.Log("Packet ts", packet.Pts())
n += packet.Size()
t.Log("Packet", packet)
}
return writer.Write(packet)
}))
t.Log("Written", n, "bytes to", w.Name())
t.Log("Written to", w.Name())
}

func Test_writer_002(t *testing.T) {
assert := assert.New(t)

// Write to a file
w, err := os.CreateTemp("", t.Name()+"_*.mp3")
if !assert.NoError(err) {
t.FailNow()
}
defer w.Close()

// Create a writer with an audio stream
writer, err := ffmpeg.Create(w.Name(),
ffmpeg.OptMetadata(ffmpeg.NewMetadata("title", t.Name())),
ffmpeg.OptAudioStream(1, ffmpeg.AudioPar("fltp", "mono", 22050)),
)
if !assert.NoError(err) {
t.FailNow()
}
defer writer.Close()

// Make an audio generator
audio, err := generator.NewSine(440, -5, writer.Stream(1).Par())
if !assert.NoError(err) {
t.FailNow()
}
defer audio.Close()

// Write frames
duration := 1000 * time.Minute
assert.NoError(writer.Encode(func(stream int) (*ff.AVFrame, error) {
frame := audio.Frame()
if frame.Time() >= duration {
return nil, io.EOF
} else {
t.Log("Frame s", frame.Time().Truncate(time.Millisecond))
return frame.(*ffmpeg.Frame).AVFrame(), nil
}
}, func(packet *ff.AVPacket, timebase *ff.AVRational) error {
if packet != nil {
t.Log("Packet", packet)
}
return writer.Write(packet)
}))
t.Log("Written to", w.Name())
}
Loading

0 comments on commit 80bf31d

Please sign in to comment.