diff --git a/README.md b/README.md
index 6d7980a..f560733 100644
--- a/README.md
+++ b/README.md
@@ -66,12 +66,14 @@ func main() {
// Send a message to the queue.
// Note that the body is an arbitrary byte slice, so you can decide
// what kind of payload you have. You can also set a message delay.
- err = q.Send(context.Background(), goqite.Message{
+ // You can use the returned ID to interact with the message.
+ id, err := q.Send(context.Background(), goqite.Message{
Body: []byte("yo"),
})
if err != nil {
log.Fatalln(err)
}
+ log.Println(id)
// Receive a message from the queue, during which time it's not available to
// other consumers (until the message timeout has passed).
diff --git a/docs/examples/queue/main.go b/docs/examples/queue/main.go
index 85137d8..3e781d0 100644
--- a/docs/examples/queue/main.go
+++ b/docs/examples/queue/main.go
@@ -39,12 +39,14 @@ func main() {
// Send a message to the queue.
// Note that the body is an arbitrary byte slice, so you can decide
// what kind of payload you have. You can also set a message delay.
- err = q.Send(context.Background(), goqite.Message{
+ // You can use the returned ID to interact with the message.
+ id, err := q.Send(context.Background(), goqite.Message{
Body: []byte("yo"),
})
if err != nil {
log.Fatalln(err)
}
+ log.Println(id)
// Receive a message from the queue, during which time it's not available to
// other consumers (until the message timeout has passed).
diff --git a/docs/index.html b/docs/index.html
index e377ecf..327a0e3 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -67,12 +67,14 @@
Queue
// Send a message to the queue.
// Note that the body is an arbitrary byte slice, so you can decide
// what kind of payload you have. You can also set a message delay.
- err = q.Send(context.Background(), goqite.Message{
+ // You can use the returned ID to interact with the message.
+ id, err := q.Send(context.Background(), goqite.Message{
Body: []byte("yo"),
})
if err != nil {
log.Fatalln(err)
}
+ log.Println(id)
// Receive a message from the queue, during which time it's not available to
// other consumers (until the message timeout has passed).
diff --git a/goqite.go b/goqite.go
index 27fb730..099f694 100644
--- a/goqite.go
+++ b/goqite.go
@@ -80,25 +80,31 @@ type Message struct {
}
// Send a Message to the queue with an optional delay.
-func (q *Queue) Send(ctx context.Context, m Message) error {
- return internalsql.InTx(q.db, func(tx *sql.Tx) error {
- return q.SendTx(ctx, tx, m)
+// Returns the ID of the message.
+func (q *Queue) Send(ctx context.Context, m Message) (ID, error) {
+ var id ID
+ err := internalsql.InTx(q.db, func(tx *sql.Tx) error {
+ var err error
+ id, err = q.SendTx(ctx, tx, m)
+ return err
})
+ return id, err
}
// SendTx is like Send, but within an existing transaction.
-func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) error {
+func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) (ID, error) {
if m.Delay < 0 {
panic("delay cannot be negative")
}
timeout := time.Now().Add(m.Delay).Format(rfc3339Milli)
- _, err := tx.ExecContext(ctx, `insert into goqite (queue, body, timeout) values (?, ?, ?)`, q.name, m.Body, timeout)
- if err != nil {
- return err
+ var id ID
+ query := `insert into goqite (queue, body, timeout) values (?, ?, ?) returning id`
+ if err := tx.QueryRowContext(ctx, query, q.name, m.Body, timeout).Scan(&id); err != nil {
+ return id, err
}
- return nil
+ return id, nil
}
// Receive a Message from the queue, or nil if there is none.
diff --git a/goqite_test.go b/goqite_test.go
index fd9e2cf..5ed791b 100644
--- a/goqite_test.go
+++ b/goqite_test.go
@@ -31,8 +31,9 @@ func TestQueue(t *testing.T) {
Body: []byte("yo"),
}
- err = q.Send(context.Background(), *m)
+ id, err := q.Send(context.Background(), *m)
is.NotError(t, err)
+ is.True(t, len(id) > 0)
m, err = q.Receive(context.Background())
is.NotError(t, err)
@@ -99,7 +100,7 @@ func TestQueue_Send(t *testing.T) {
is.Equal(t, "delay cannot be negative", r)
}()
- err = q.Send(context.Background(), goqite.Message{Delay: -1})
+ _, _ = q.Send(context.Background(), goqite.Message{Delay: -1})
})
}
@@ -112,7 +113,7 @@ func TestQueue_Receive(t *testing.T) {
Delay: 2 * time.Millisecond,
}
- err := q.Send(context.Background(), *m)
+ _, err := q.Send(context.Background(), *m)
is.NotError(t, err)
m, err = q.Receive(context.Background())
@@ -134,7 +135,7 @@ func TestQueue_Receive(t *testing.T) {
Body: []byte("yo"),
}
- err := q.Send(context.Background(), *m)
+ _, err := q.Send(context.Background(), *m)
is.NotError(t, err)
m, err = q.Receive(context.Background())
@@ -154,7 +155,7 @@ func TestQueue_Receive(t *testing.T) {
Body: []byte("yo"),
}
- err := q.Send(context.Background(), *m)
+ _, err := q.Send(context.Background(), *m)
is.NotError(t, err)
m, err = q.Receive(context.Background())
@@ -180,7 +181,7 @@ func TestQueue_Receive(t *testing.T) {
q1 := newQ(t, goqite.NewOpts{}, "test.db")
q2 := newQ(t, goqite.NewOpts{Name: "q2"}, "test.db")
- err := q1.Send(context.Background(), goqite.Message{Body: []byte("yo")})
+ _, err := q1.Send(context.Background(), goqite.Message{Body: []byte("yo")})
is.NotError(t, err)
m, err := q2.Receive(context.Background())
@@ -197,7 +198,7 @@ func TestQueue_Extend(t *testing.T) {
Body: []byte("yo"),
}
- err := q.Send(context.Background(), *m)
+ _, err := q.Send(context.Background(), *m)
is.NotError(t, err)
m, err = q.Receive(context.Background())
@@ -228,14 +229,14 @@ func TestQueue_Extend(t *testing.T) {
Body: []byte("yo"),
}
- err = q.Send(context.Background(), *m)
+ _, err = q.Send(context.Background(), *m)
is.NotError(t, err)
m, err = q.Receive(context.Background())
is.NotError(t, err)
is.NotNil(t, m)
- err = q.Extend(context.Background(), m.ID, -1)
+ _ = q.Extend(context.Background(), m.ID, -1)
})
}
@@ -254,7 +255,7 @@ func TestQueue_ReceiveAndWait(t *testing.T) {
t.Run("gets a message immediately if there is one", func(t *testing.T) {
q := newQ(t, goqite.NewOpts{Timeout: time.Millisecond}, ":memory:")
- err := q.Send(context.Background(), goqite.Message{Body: []byte("yo")})
+ _, err := q.Send(context.Background(), goqite.Message{Body: []byte("yo")})
is.NotError(t, err)
m, err := q.ReceiveAndWait(context.Background(), time.Millisecond)
@@ -290,7 +291,7 @@ func BenchmarkQueue(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
- err := q.Send(context.Background(), goqite.Message{
+ _, err := q.Send(context.Background(), goqite.Message{
Body: []byte("yo"),
})
is.NotError(b, err)
@@ -336,7 +337,7 @@ func BenchmarkQueue(b *testing.B) {
for i := 0; i < 100_000; i++ {
q := queues[rand.Intn(len(queues))]
- err := q.Send(context.Background(), goqite.Message{
+ _, err := q.Send(context.Background(), goqite.Message{
Body: []byte("yo"),
})
is.NotError(b, err)
diff --git a/http/handler.go b/http/handler.go
index cccdd20..41b0c85 100644
--- a/http/handler.go
+++ b/http/handler.go
@@ -16,7 +16,7 @@ import (
)
type queue interface {
- Send(ctx context.Context, m goqite.Message) error
+ Send(ctx context.Context, m goqite.Message) (goqite.ID, error)
Receive(ctx context.Context) (*goqite.Message, error)
ReceiveAndWait(ctx context.Context, interval time.Duration) (*goqite.Message, error)
Extend(ctx context.Context, id goqite.ID, delay time.Duration) error
@@ -103,7 +103,7 @@ func NewHandler(q queue) http.HandlerFunc {
return
}
- if err := q.Send(r.Context(), req.Message); err != nil {
+ if _, err := q.Send(r.Context(), req.Message); err != nil {
http.Error(w, "error sending message: "+err.Error(), http.StatusInternalServerError)
return
}
diff --git a/http/handler_test.go b/http/handler_test.go
index a6cba30..6890477 100644
--- a/http/handler_test.go
+++ b/http/handler_test.go
@@ -29,8 +29,8 @@ type queueMock struct {
err error
}
-func (q *queueMock) Send(ctx context.Context, m goqite.Message) error {
- return q.err
+func (q *queueMock) Send(ctx context.Context, m goqite.Message) (goqite.ID, error) {
+ return "", q.err
}
func (q *queueMock) Receive(ctx context.Context) (*goqite.Message, error) {
diff --git a/jobs/runner.go b/jobs/runner.go
index 76d9acb..8afce58 100644
--- a/jobs/runner.go
+++ b/jobs/runner.go
@@ -214,7 +214,8 @@ func Create(ctx context.Context, q *goqite.Queue, name string, m []byte) error {
if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil {
return err
}
- return q.Send(ctx, goqite.Message{Body: buf.Bytes()})
+ _, err := q.Send(ctx, goqite.Message{Body: buf.Bytes()})
+ return err
}
// CreateTx is like Create, but within an existing transaction.
@@ -223,7 +224,8 @@ func CreateTx(ctx context.Context, tx *sql.Tx, q *goqite.Queue, name string, m [
if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil {
return err
}
- return q.SendTx(ctx, tx, goqite.Message{Body: buf.Bytes()})
+ _, err := q.SendTx(ctx, tx, goqite.Message{Body: buf.Bytes()})
+ return err
}
// logger matches the info level method from the slog.Logger.