From 490ebef18555dff92d4fd0d4e97ec2d21fb96ff1 Mon Sep 17 00:00:00 2001 From: Dexter Miguel <5452298+divmgl@users.noreply.github.com> Date: Mon, 3 Jan 2022 00:34:51 -0500 Subject: [PATCH] jackd can now be used from concurrent goroutines --- README.md | 8 +- main.go | 260 ++++++++++++++++++++++++++++++--------------------- main_test.go | 13 +-- types.go | 41 ++++++++ 4 files changed, 206 insertions(+), 116 deletions(-) create mode 100644 types.go diff --git a/README.md b/README.md index c919372..65389e1 100644 --- a/README.md +++ b/README.md @@ -242,7 +242,13 @@ func worker(conn *jackd.Client, fn func(id uint32, body []byte) error) error { ## Concurrency -Please keep in mind that `beanstalkd` processes commands for a given connection serially. This means that you should never access the same `jackd` client from different goroutines. You've been warned! +`jackd` as of 1.1.0 supports issuing commands from multiple goroutines. In order to avoid concurrency issues, all `jackd` commands are synchronized with a mutex. This is because `beanstalkd` processes commands per connection serially. + +Please keep this in mind as your goroutines may block each other if they're utilizing the same `jackd` instance (especially with long-running commands, like the `reserve` commands). This is normally not a problem in most architectures, but if you do run into issues, you have several options: + +* If you need to publish and consume from the same process, use two separate `jackd` instances: one for publishing and one for consuming +* Ensure that you create individual `jackd` instances per goroutine. Keep in mind that this opens a new connection to `beanstalkd`. +* Keep all of your code synchronous when dealing with `jackd` (specifically, use mutexes, wait groups, or simply do not use multiple goroutines with `jackd`) # License diff --git a/main.go b/main.go index f86b700..a6687bc 100644 --- a/main.go +++ b/main.go @@ -6,32 +6,13 @@ import ( "fmt" "log" "net" + "sync" "time" ) var Delimiter = []byte("\r\n") var MaxTubeName = 200 -type Client struct { - conn net.Conn - buffer *bufio.ReadWriter - scanner *bufio.Scanner -} - -type PutOpts struct { - Priority uint32 - Delay time.Duration - TTR time.Duration -} - -func DefaultPutOpts() PutOpts { - return PutOpts{ - Priority: 0, - Delay: 0, - TTR: 60 * time.Second, - } -} - func Dial(addr string) (*Client, error) { conn, err := net.Dial("tcp", addr) if err != nil { @@ -49,18 +30,14 @@ func Dial(addr string) (*Client, error) { bufio.NewWriter(conn), ), scanner: scanner, + mutex: new(sync.Mutex), }, nil } -func Must(client *Client, err error) *Client { - if err != nil { - log.Fatalf("unable to connect to beanstalkd instance %v", err) - } - - return client -} - func (jackd *Client) Put(body []byte, opts PutOpts) (uint32, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + command := []byte(fmt.Sprintf( "put %d %d %d %d\r\n", opts.Priority, @@ -114,6 +91,9 @@ func (jackd *Client) Put(body []byte, opts PutOpts) (uint32, error) { } func (jackd *Client) Use(tube string) (usingTube string, err error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err = validateTubeName(tube); err != nil { return } @@ -139,6 +119,9 @@ func (jackd *Client) Use(tube string) (usingTube string, err error) { } func (jackd *Client) Kick(numJobs uint32) (kicked uint32, err error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err = jackd.write([]byte(fmt.Sprintf("kick %d\r\n", numJobs))); err != nil { return } @@ -160,6 +143,9 @@ func (jackd *Client) Kick(numJobs uint32) (kicked uint32, err error) { } func (jackd *Client) KickJob(id uint32) error { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("kick-job %d\r\n", id))); err != nil { return err } @@ -167,17 +153,10 @@ func (jackd *Client) KickJob(id uint32) error { return jackd.expectedResponse("KICKED", []string{NotFound}) } -func (jackd *Client) write(command []byte) error { - if _, err := jackd.buffer.Write(command); err != nil { - return err - } - if err := jackd.buffer.Flush(); err != nil { - return err - } - return nil -} - func (jackd *Client) Delete(job uint32) error { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("delete %d\r\n", job))); err != nil { return err } @@ -186,6 +165,9 @@ func (jackd *Client) Delete(job uint32) error { } func (jackd *Client) PauseTube(tube string, delay time.Duration) error { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf( "pause-tube %s %d\r\n", tube, @@ -197,19 +179,10 @@ func (jackd *Client) PauseTube(tube string, delay time.Duration) error { return jackd.expectedResponse("PAUSED", []string{NotFound}) } -type ReleaseOpts struct { - Priority uint32 - Delay time.Duration -} - -func DefaultReleaseOpts() ReleaseOpts { - return ReleaseOpts{ - Priority: 0, - Delay: 0, - } -} - func (jackd *Client) Release(job uint32, opts ReleaseOpts) error { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf( "release %d %d %d\r\n", job, @@ -223,6 +196,9 @@ func (jackd *Client) Release(job uint32, opts ReleaseOpts) error { } func (jackd *Client) Bury(job uint32, priority uint32) error { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf( "bury %d %d\r\n", job, @@ -235,6 +211,9 @@ func (jackd *Client) Bury(job uint32, priority uint32) error { } func (jackd *Client) Touch(job uint32) error { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("touch %d\r\n", job))); err != nil { return err } @@ -243,6 +222,9 @@ func (jackd *Client) Touch(job uint32) error { } func (jackd *Client) Watch(tube string) (watched uint32, err error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err = validateTubeName(tube); err != nil { return } @@ -268,6 +250,9 @@ func (jackd *Client) Watch(tube string) (watched uint32, err error) { } func (jackd *Client) Ignore(tube string) (watched uint32, err error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err = validateTubeName(tube); err != nil { return } @@ -292,22 +277,10 @@ func (jackd *Client) Ignore(tube string) (watched uint32, err error) { return } -func (jackd *Client) expectedResponse(expected string, errs []string) error { - if jackd.scanner.Scan() { - resp := jackd.scanner.Text() - if err := validate(resp, errs); err != nil { - return err - } - - if resp != expected { - return unexpectedResponseError(resp) - } - } - - return nil -} - func (jackd *Client) Reserve() (uint32, []byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte("reserve\r\n")); err != nil { return 0, nil, err } @@ -316,6 +289,9 @@ func (jackd *Client) Reserve() (uint32, []byte, error) { } func (jackd *Client) ReserveJob(job uint32) (uint32, []byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("reserve-job %d\r\n", job))); err != nil { return 0, nil, err } @@ -324,6 +300,9 @@ func (jackd *Client) ReserveJob(job uint32) (uint32, []byte, error) { } func (jackd *Client) Peek(job uint32) (uint32, []byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("peek %d\r\n", job))); err != nil { return 0, nil, err } @@ -332,6 +311,9 @@ func (jackd *Client) Peek(job uint32) (uint32, []byte, error) { } func (jackd *Client) PeekReady() (uint32, []byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte("peek-ready\r\n")); err != nil { return 0, nil, err } @@ -340,6 +322,9 @@ func (jackd *Client) PeekReady() (uint32, []byte, error) { } func (jackd *Client) PeekDelayed() (uint32, []byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte("peek-delayed\r\n")); err != nil { return 0, nil, err } @@ -348,6 +333,9 @@ func (jackd *Client) PeekDelayed() (uint32, []byte, error) { } func (jackd *Client) PeekBuried() (uint32, []byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte("peek-buried\r\n")); err != nil { return 0, nil, err } @@ -355,43 +343,10 @@ func (jackd *Client) PeekBuried() (uint32, []byte, error) { return jackd.responseJobChunk("FOUND", []string{NotFound}) } -func (jackd *Client) responseJobChunk(expected string, errs []string) (uint32, []byte, error) { - var id uint32 = 0 - var payloadLength = 0 - - if jackd.scanner.Scan() { - resp := jackd.scanner.Text() - if err := validate(resp, errs); err != nil { - return 0, nil, err - } - - parsed, err := fmt.Sscanf(resp, expected+" %d %d", &id, &payloadLength) - if err != nil { - return 0, nil, err - } - if parsed != 2 { - return 0, nil, unexpectedResponseError(resp) - } - } - - if err := jackd.scanner.Err(); err != nil { - return 0, nil, err - } - - body := []byte("") - - for jackd.scanner.Scan() { - body = append(body, jackd.scanner.Bytes()...) - if len(body) == payloadLength { - break - } - body = append(body, Delimiter...) - } - - return id, body, jackd.scanner.Err() -} - func (jackd *Client) StatsJob(id uint32) ([]byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("stats-job %d\r\n", id))); err != nil { return nil, err } @@ -400,6 +355,9 @@ func (jackd *Client) StatsJob(id uint32) ([]byte, error) { } func (jackd *Client) StatsTube(tubeName string) ([]byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("stats-tube %s\r\n", tubeName))); err != nil { return nil, err } @@ -408,6 +366,9 @@ func (jackd *Client) StatsTube(tubeName string) ([]byte, error) { } func (jackd *Client) Stats() ([]byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("stats\r\n"))); err != nil { return nil, err } @@ -416,6 +377,9 @@ func (jackd *Client) Stats() ([]byte, error) { } func (jackd *Client) ListTubes() ([]byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("list-tubes\r\n"))); err != nil { return nil, err } @@ -424,6 +388,9 @@ func (jackd *Client) ListTubes() ([]byte, error) { } func (jackd *Client) ListTubeUsed() (tube string, err error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err = jackd.write([]byte(fmt.Sprintf("list-tube-used\r\n"))); err != nil { return } @@ -445,6 +412,9 @@ func (jackd *Client) ListTubeUsed() (tube string, err error) { } func (jackd *Client) ListTubesWatched() ([]byte, error) { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + if err := jackd.write([]byte(fmt.Sprintf("list-tubes-watched\r\n"))); err != nil { return nil, err } @@ -452,6 +422,76 @@ func (jackd *Client) ListTubesWatched() ([]byte, error) { return jackd.responseDataChunk([]string{NotFound}) } +func (jackd *Client) Quit() error { + jackd.mutex.Lock() + defer jackd.mutex.Unlock() + + if err := jackd.write([]byte("quit\r\n")); err != nil { + return err + } + + return jackd.conn.Close() +} + +func (jackd *Client) expectedResponse(expected string, errs []string) error { + if jackd.scanner.Scan() { + resp := jackd.scanner.Text() + if err := validate(resp, errs); err != nil { + return err + } + + if resp != expected { + return unexpectedResponseError(resp) + } + } + + return nil +} + +func (jackd *Client) responseJobChunk(expected string, errs []string) (uint32, []byte, error) { + var id uint32 = 0 + var payloadLength = 0 + + if jackd.scanner.Scan() { + resp := jackd.scanner.Text() + if err := validate(resp, errs); err != nil { + return 0, nil, err + } + + parsed, err := fmt.Sscanf(resp, expected+" %d %d", &id, &payloadLength) + if err != nil { + return 0, nil, err + } + if parsed != 2 { + return 0, nil, unexpectedResponseError(resp) + } + } + + if err := jackd.scanner.Err(); err != nil { + return 0, nil, err + } + + body := []byte("") + + for jackd.scanner.Scan() { + body = append(body, jackd.scanner.Bytes()...) + if len(body) == payloadLength { + break + } + body = append(body, Delimiter...) + } + + return id, body, jackd.scanner.Err() +} + +func Must(client *Client, err error) *Client { + if err != nil { + log.Fatalf("unable to connect to beanstalkd instance %v", err) + } + + return client +} + func (jackd *Client) responseDataChunk(errs []string) ([]byte, error) { var payloadLength = 0 @@ -487,14 +527,6 @@ func (jackd *Client) responseDataChunk(errs []string) ([]byte, error) { return body, jackd.scanner.Err() } -func (jackd *Client) Quit() error { - if err := jackd.write([]byte("quit\r\n")); err != nil { - return err - } - - return jackd.conn.Close() -} - var unexpectedResponseError = func(resp string) error { return fmt.Errorf("unexpected response: %s", resp) } @@ -515,3 +547,13 @@ func splitCRLF(buf []byte, atEOF bool) (advance int, token []byte, err error) { // Request more data. return 0, nil, nil } + +func (jackd *Client) write(command []byte) error { + if _, err := jackd.buffer.Write(command); err != nil { + return err + } + if err := jackd.buffer.Flush(); err != nil { + return err + } + return nil +} diff --git a/main_test.go b/main_test.go index e01cf5e..e56e726 100644 --- a/main_test.go +++ b/main_test.go @@ -2,11 +2,12 @@ package jackd_test import ( "crypto/rand" - "github.com/goccy/go-yaml" - "github.com/stretchr/testify/assert" "testing" "time" + "github.com/goccy/go-yaml" + "github.com/stretchr/testify/assert" + "github.com/getjackd/go-jackd" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -18,6 +19,10 @@ type JackdSuite struct { beanstalkd2 *jackd.Client } +func TestJackdSuite(t *testing.T) { + suite.Run(t, new(JackdSuite)) +} + func (suite *JackdSuite) SetupTest() { beanstalkd, err := jackd.Dial("localhost:11300") require.NoError(suite.T(), err) @@ -361,7 +366,3 @@ func (suite *JackdSuite) TestPauseTube() { assert.Equal(suite.T(), job, reservedJob) assert.Equal(suite.T(), payload, reservedPayload) } - -func TestJackdSuite(t *testing.T) { - suite.Run(t, new(JackdSuite)) -} diff --git a/types.go b/types.go new file mode 100644 index 0000000..0a870e3 --- /dev/null +++ b/types.go @@ -0,0 +1,41 @@ +package jackd + +import ( + "bufio" + "net" + "sync" + "time" +) + +type Client struct { + conn net.Conn + buffer *bufio.ReadWriter + scanner *bufio.Scanner + mutex *sync.Mutex +} + +type PutOpts struct { + Priority uint32 + Delay time.Duration + TTR time.Duration +} + +func DefaultPutOpts() PutOpts { + return PutOpts{ + Priority: 0, + Delay: 0, + TTR: 60 * time.Second, + } +} + +type ReleaseOpts struct { + Priority uint32 + Delay time.Duration +} + +func DefaultReleaseOpts() ReleaseOpts { + return ReleaseOpts{ + Priority: 0, + Delay: 0, + } +}