diff --git a/README.md b/README.md index 6d7980a..f560733 100644 --- a/README.md +++ b/README.md @@ -66,12 +66,14 @@ func main() { // Send a message to the queue. // Note that the body is an arbitrary byte slice, so you can decide // what kind of payload you have. You can also set a message delay. - err = q.Send(context.Background(), goqite.Message{ + // You can use the returned ID to interact with the message. + id, err := q.Send(context.Background(), goqite.Message{ Body: []byte("yo"), }) if err != nil { log.Fatalln(err) } + log.Println(id) // Receive a message from the queue, during which time it's not available to // other consumers (until the message timeout has passed). diff --git a/docs/examples/queue/main.go b/docs/examples/queue/main.go index 85137d8..3e781d0 100644 --- a/docs/examples/queue/main.go +++ b/docs/examples/queue/main.go @@ -39,12 +39,14 @@ func main() { // Send a message to the queue. // Note that the body is an arbitrary byte slice, so you can decide // what kind of payload you have. You can also set a message delay. - err = q.Send(context.Background(), goqite.Message{ + // You can use the returned ID to interact with the message. + id, err := q.Send(context.Background(), goqite.Message{ Body: []byte("yo"), }) if err != nil { log.Fatalln(err) } + log.Println(id) // Receive a message from the queue, during which time it's not available to // other consumers (until the message timeout has passed). diff --git a/docs/index.html b/docs/index.html index e377ecf..327a0e3 100644 --- a/docs/index.html +++ b/docs/index.html @@ -67,12 +67,14 @@

Queue

// Send a message to the queue. // Note that the body is an arbitrary byte slice, so you can decide // what kind of payload you have. You can also set a message delay. - err = q.Send(context.Background(), goqite.Message{ + // You can use the returned ID to interact with the message. + id, err := q.Send(context.Background(), goqite.Message{ Body: []byte("yo"), }) if err != nil { log.Fatalln(err) } + log.Println(id) // Receive a message from the queue, during which time it's not available to // other consumers (until the message timeout has passed). diff --git a/goqite.go b/goqite.go index 27fb730..099f694 100644 --- a/goqite.go +++ b/goqite.go @@ -80,25 +80,31 @@ type Message struct { } // Send a Message to the queue with an optional delay. -func (q *Queue) Send(ctx context.Context, m Message) error { - return internalsql.InTx(q.db, func(tx *sql.Tx) error { - return q.SendTx(ctx, tx, m) +// Returns the ID of the message. +func (q *Queue) Send(ctx context.Context, m Message) (ID, error) { + var id ID + err := internalsql.InTx(q.db, func(tx *sql.Tx) error { + var err error + id, err = q.SendTx(ctx, tx, m) + return err }) + return id, err } // SendTx is like Send, but within an existing transaction. -func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) error { +func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) (ID, error) { if m.Delay < 0 { panic("delay cannot be negative") } timeout := time.Now().Add(m.Delay).Format(rfc3339Milli) - _, err := tx.ExecContext(ctx, `insert into goqite (queue, body, timeout) values (?, ?, ?)`, q.name, m.Body, timeout) - if err != nil { - return err + var id ID + query := `insert into goqite (queue, body, timeout) values (?, ?, ?) returning id` + if err := tx.QueryRowContext(ctx, query, q.name, m.Body, timeout).Scan(&id); err != nil { + return id, err } - return nil + return id, nil } // Receive a Message from the queue, or nil if there is none. diff --git a/goqite_test.go b/goqite_test.go index fd9e2cf..5ed791b 100644 --- a/goqite_test.go +++ b/goqite_test.go @@ -31,8 +31,9 @@ func TestQueue(t *testing.T) { Body: []byte("yo"), } - err = q.Send(context.Background(), *m) + id, err := q.Send(context.Background(), *m) is.NotError(t, err) + is.True(t, len(id) > 0) m, err = q.Receive(context.Background()) is.NotError(t, err) @@ -99,7 +100,7 @@ func TestQueue_Send(t *testing.T) { is.Equal(t, "delay cannot be negative", r) }() - err = q.Send(context.Background(), goqite.Message{Delay: -1}) + _, _ = q.Send(context.Background(), goqite.Message{Delay: -1}) }) } @@ -112,7 +113,7 @@ func TestQueue_Receive(t *testing.T) { Delay: 2 * time.Millisecond, } - err := q.Send(context.Background(), *m) + _, err := q.Send(context.Background(), *m) is.NotError(t, err) m, err = q.Receive(context.Background()) @@ -134,7 +135,7 @@ func TestQueue_Receive(t *testing.T) { Body: []byte("yo"), } - err := q.Send(context.Background(), *m) + _, err := q.Send(context.Background(), *m) is.NotError(t, err) m, err = q.Receive(context.Background()) @@ -154,7 +155,7 @@ func TestQueue_Receive(t *testing.T) { Body: []byte("yo"), } - err := q.Send(context.Background(), *m) + _, err := q.Send(context.Background(), *m) is.NotError(t, err) m, err = q.Receive(context.Background()) @@ -180,7 +181,7 @@ func TestQueue_Receive(t *testing.T) { q1 := newQ(t, goqite.NewOpts{}, "test.db") q2 := newQ(t, goqite.NewOpts{Name: "q2"}, "test.db") - err := q1.Send(context.Background(), goqite.Message{Body: []byte("yo")}) + _, err := q1.Send(context.Background(), goqite.Message{Body: []byte("yo")}) is.NotError(t, err) m, err := q2.Receive(context.Background()) @@ -197,7 +198,7 @@ func TestQueue_Extend(t *testing.T) { Body: []byte("yo"), } - err := q.Send(context.Background(), *m) + _, err := q.Send(context.Background(), *m) is.NotError(t, err) m, err = q.Receive(context.Background()) @@ -228,14 +229,14 @@ func TestQueue_Extend(t *testing.T) { Body: []byte("yo"), } - err = q.Send(context.Background(), *m) + _, err = q.Send(context.Background(), *m) is.NotError(t, err) m, err = q.Receive(context.Background()) is.NotError(t, err) is.NotNil(t, m) - err = q.Extend(context.Background(), m.ID, -1) + _ = q.Extend(context.Background(), m.ID, -1) }) } @@ -254,7 +255,7 @@ func TestQueue_ReceiveAndWait(t *testing.T) { t.Run("gets a message immediately if there is one", func(t *testing.T) { q := newQ(t, goqite.NewOpts{Timeout: time.Millisecond}, ":memory:") - err := q.Send(context.Background(), goqite.Message{Body: []byte("yo")}) + _, err := q.Send(context.Background(), goqite.Message{Body: []byte("yo")}) is.NotError(t, err) m, err := q.ReceiveAndWait(context.Background(), time.Millisecond) @@ -290,7 +291,7 @@ func BenchmarkQueue(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - err := q.Send(context.Background(), goqite.Message{ + _, err := q.Send(context.Background(), goqite.Message{ Body: []byte("yo"), }) is.NotError(b, err) @@ -336,7 +337,7 @@ func BenchmarkQueue(b *testing.B) { for i := 0; i < 100_000; i++ { q := queues[rand.Intn(len(queues))] - err := q.Send(context.Background(), goqite.Message{ + _, err := q.Send(context.Background(), goqite.Message{ Body: []byte("yo"), }) is.NotError(b, err) diff --git a/http/handler.go b/http/handler.go index cccdd20..41b0c85 100644 --- a/http/handler.go +++ b/http/handler.go @@ -16,7 +16,7 @@ import ( ) type queue interface { - Send(ctx context.Context, m goqite.Message) error + Send(ctx context.Context, m goqite.Message) (goqite.ID, error) Receive(ctx context.Context) (*goqite.Message, error) ReceiveAndWait(ctx context.Context, interval time.Duration) (*goqite.Message, error) Extend(ctx context.Context, id goqite.ID, delay time.Duration) error @@ -103,7 +103,7 @@ func NewHandler(q queue) http.HandlerFunc { return } - if err := q.Send(r.Context(), req.Message); err != nil { + if _, err := q.Send(r.Context(), req.Message); err != nil { http.Error(w, "error sending message: "+err.Error(), http.StatusInternalServerError) return } diff --git a/http/handler_test.go b/http/handler_test.go index a6cba30..6890477 100644 --- a/http/handler_test.go +++ b/http/handler_test.go @@ -29,8 +29,8 @@ type queueMock struct { err error } -func (q *queueMock) Send(ctx context.Context, m goqite.Message) error { - return q.err +func (q *queueMock) Send(ctx context.Context, m goqite.Message) (goqite.ID, error) { + return "", q.err } func (q *queueMock) Receive(ctx context.Context) (*goqite.Message, error) { diff --git a/jobs/runner.go b/jobs/runner.go index 76d9acb..8afce58 100644 --- a/jobs/runner.go +++ b/jobs/runner.go @@ -214,7 +214,8 @@ func Create(ctx context.Context, q *goqite.Queue, name string, m []byte) error { if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil { return err } - return q.Send(ctx, goqite.Message{Body: buf.Bytes()}) + _, err := q.Send(ctx, goqite.Message{Body: buf.Bytes()}) + return err } // CreateTx is like Create, but within an existing transaction. @@ -223,7 +224,8 @@ func CreateTx(ctx context.Context, tx *sql.Tx, q *goqite.Queue, name string, m [ if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil { return err } - return q.SendTx(ctx, tx, goqite.Message{Body: buf.Bytes()}) + _, err := q.SendTx(ctx, tx, goqite.Message{Body: buf.Bytes()}) + return err } // logger matches the info level method from the slog.Logger.