Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic queue implementation #7

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# template
# goqite

goqite (pronounced Go-queue-ite) is a Go queue library built on SQLite and inspired by AWS SQS.

See [www.goqite.com](https://www.goqite.com) for an introduction.

Made in 🇩🇰 by [maragu](https://www.maragu.dk/), maker of [online Go courses](https://www.golang.dk/).
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
module template
module github.com/maragudk/goqite

go 1.21

require github.com/mattn/go-sqlite3 v1.14.19

require github.com/maragudk/is v0.1.0
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/maragudk/is v0.1.0 h1:obq9anZNmOYcaNbeT0LMyjIexdNeYTw/TLAPD/BnZHA=
github.com/maragudk/is v0.1.0/go.mod h1:W/r6+TpnISu+a88OLXQy5JQGCOhXQXXLD2e5b4xMn5c=
github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI=
github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
124 changes: 124 additions & 0 deletions goqite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package goqite

import (
"context"
"database/sql"
"errors"
"time"
)

// rfc3339Milli is like time.RFC3339Nano, but with millisecond precision, and fractional seconds do not have trailing
// zeros removed.
const rfc3339Milli = "2006-01-02T15:04:05.000Z07:00"

type logger interface {
Println(v ...any)
}

type NewOpts struct {
DB *sql.DB
Log logger
MaxReceive int
Timeout time.Duration
}

func New(opts NewOpts) *Queue {
if opts.DB == nil {
panic("DB cannot be nil")
}

if opts.Log == nil {
opts.Log = &discardLogger{}
}

if opts.MaxReceive < 0 {
panic("max receive cannot be negative")
}

if opts.MaxReceive == 0 {
opts.MaxReceive = 3
}

if opts.Timeout < 0 {
panic("timeout cannot be negative")
}

if opts.Timeout == 0 {
opts.Timeout = time.Second
}

return &Queue{
db: opts.DB,
log: opts.Log,
maxReceive: opts.MaxReceive,
timeout: opts.Timeout,
}
}

type Queue struct {
db *sql.DB
log logger
maxReceive int
timeout time.Duration
}

type ID string

type Message struct {
ID ID
Delay time.Duration
Body []byte
}

func (q *Queue) Send(ctx context.Context, m Message) error {
if m.Delay < 0 {
return errors.New("delay cannot be negative")
}

timeout := time.Now().Add(m.Delay).Format(rfc3339Milli)

_, err := q.db.ExecContext(ctx, `insert into queue (body, timeout) values (?, ?)`, m.Body, timeout)
if err != nil {
return err
}
return nil
}

func (q *Queue) Receive(ctx context.Context) (*Message, error) {
now := time.Now()
nowFormatted := now.Format(rfc3339Milli)
timeoutFormatted := now.Add(q.timeout).Format(rfc3339Milli)

query := `
update queue
set
timeout = ?,
received = received + 1
where id = (
select id from queue
where
? >= timeout and
received < ?
order by created
limit 1
)
returning id, body`

var m Message
if err := q.db.QueryRowContext(ctx, query, timeoutFormatted, nowFormatted, q.maxReceive).Scan(&m.ID, &m.Body); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return &m, nil
}

func (q *Queue) Delete(ctx context.Context, id ID) error {
_, err := q.db.ExecContext(ctx, `delete from queue where id = ?`, id)
return err
}

type discardLogger struct{}

func (l *discardLogger) Println(v ...any) {}
139 changes: 139 additions & 0 deletions goqite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package goqite_test

import (
"context"
"database/sql"
_ "embed"
"testing"
"time"

"github.com/maragudk/is"
_ "github.com/mattn/go-sqlite3"

"github.com/maragudk/goqite"
)

//go:embed schema.sql
var schema string

func TestQueue(t *testing.T) {
t.Run("can send and receive and delete a message", func(t *testing.T) {
q := newQ(t, goqite.NewOpts{Timeout: time.Millisecond})

m, err := q.Receive(context.Background())
is.NotError(t, err)
is.Nil(t, m)

m = &goqite.Message{
Body: []byte("yo"),
}

err = q.Send(context.Background(), *m)
is.NotError(t, err)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.NotNil(t, m)
is.Equal(t, "yo", string(m.Body))

err = q.Delete(context.Background(), m.ID)
is.NotError(t, err)

time.Sleep(time.Millisecond)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.Nil(t, m)
})

t.Run("does not receive a delayed message immediately", func(t *testing.T) {
q := newQ(t, goqite.NewOpts{})

m := &goqite.Message{
Body: []byte("yo"),
Delay: time.Millisecond,
}

err := q.Send(context.Background(), *m)
is.NotError(t, err)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.Nil(t, m)

time.Sleep(time.Millisecond)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.NotNil(t, m)
is.Equal(t, "yo", string(m.Body))
})

t.Run("does not receive a message twice in a row", func(t *testing.T) {
q := newQ(t, goqite.NewOpts{Timeout: time.Second})

m := &goqite.Message{
Body: []byte("yo"),
}

err := q.Send(context.Background(), *m)
is.NotError(t, err)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.NotNil(t, m)
is.Equal(t, "yo", string(m.Body))

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.Nil(t, m)
})

t.Run("does receive a message up to two times if set and timeout has passed", func(t *testing.T) {
q := newQ(t, goqite.NewOpts{Timeout: time.Millisecond, MaxReceive: 2})

m := &goqite.Message{
Body: []byte("yo"),
}

err := q.Send(context.Background(), *m)
is.NotError(t, err)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.NotNil(t, m)
is.Equal(t, "yo", string(m.Body))

time.Sleep(time.Millisecond)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.NotNil(t, m)
is.Equal(t, "yo", string(m.Body))

time.Sleep(time.Millisecond)

m, err = q.Receive(context.Background())
is.NotError(t, err)
is.Nil(t, m)
})
}

func newQ(t *testing.T, opts goqite.NewOpts) *goqite.Queue {
t.Helper()

db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
if err != nil {
t.Fatal(err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
_, err = db.Exec(schema)
if err != nil {
t.Fatal(err)
}

opts.DB = db

return goqite.New(opts)
}
12 changes: 12 additions & 0 deletions schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
create table queue (
id text primary key default ('m_' || lower(hex(randomblob(16)))),
created text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
updated text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
body blob not null,
timeout text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
received integer not null default 0
) strict;

create trigger queue_updated_timestamp after update on queue begin
update queue set updated = strftime('%Y-%m-%dT%H:%M:%fZ') where id = old.id;
end;
1 change: 0 additions & 1 deletion template.go

This file was deleted.

Loading