Skip to content

Commit

Permalink
Add jobs.CreateTx to create a job inside a current transaction (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
markuswustenberg authored May 3, 2024
1 parent e6219ed commit 95eb0b8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
4 changes: 3 additions & 1 deletion internal/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func NewDB(t testing.TB, path string) *sql.DB {
func NewQ(t testing.TB, opts goqite.NewOpts, path string) *goqite.Queue {
t.Helper()

opts.DB = NewDB(t, path)
if opts.DB == nil {
opts.DB = NewDB(t, path)
}

if opts.Name == "" {
opts.Name = "test"
Expand Down
11 changes: 11 additions & 0 deletions jobs/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package jobs
import (
"bytes"
"context"
"database/sql"
"encoding/gob"
"errors"
"fmt"
Expand Down Expand Up @@ -197,6 +198,7 @@ func (r *Runner) Register(name string, job Func) {
r.jobs[name] = job
}

// Create a message for the named job in the given queue.
func Create(ctx context.Context, q *goqite.Queue, name string, m []byte) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil {
Expand All @@ -205,6 +207,15 @@ func Create(ctx context.Context, q *goqite.Queue, name string, m []byte) error {
return q.Send(ctx, goqite.Message{Body: buf.Bytes()})
}

// CreateTx is like Create, but within an existing transaction.
func CreateTx(ctx context.Context, tx *sql.Tx, q *goqite.Queue, name string, m []byte) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil {
return err
}
return q.SendTx(ctx, tx, goqite.Message{Body: buf.Bytes()})
}

// logger matches the info level method from the slog.Logger.
type logger interface {
Info(msg string, args ...any)
Expand Down
26 changes: 26 additions & 0 deletions jobs/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
_ "github.com/mattn/go-sqlite3"

"github.com/maragudk/goqite"
internalsql "github.com/maragudk/goqite/internal/sql"
internaltesting "github.com/maragudk/goqite/internal/testing"
"github.com/maragudk/goqite/jobs"
)
Expand Down Expand Up @@ -121,6 +122,31 @@ func TestRunner_Start(t *testing.T) {
})
}

func TestCreateTx(t *testing.T) {
t.Run("can create a job inside a transaction", func(t *testing.T) {
db := internaltesting.NewDB(t, ":memory:")
q := internaltesting.NewQ(t, goqite.NewOpts{DB: db}, ":memory:")
r := jobs.NewRunner(jobs.NewRunnerOpts{Log: internaltesting.NewLogger(t), Queue: q})

var ran bool
ctx, cancel := context.WithCancel(context.Background())
r.Register("test", func(ctx context.Context, m []byte) error {
ran = true
is.Equal(t, "yo", string(m))
cancel()
return nil
})

err := internalsql.InTx(db, func(tx *sql.Tx) error {
return jobs.CreateTx(ctx, tx, q, "test", []byte("yo"))
})
is.NotError(t, err)

r.Start(ctx)
is.True(t, ran)
})
}

func ExampleRunner_Start() {
log := slog.Default()

Expand Down

0 comments on commit 95eb0b8

Please sign in to comment.