diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..12fc1ca --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,29 @@ +name: Build Test + +on: + push: + branches: + - "master" + tags: + - "v*" + pull_request: + branches: + - "master" + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Go 1.22 + uses: actions/setup-go@v5 + with: + go-version: "1.22" + + - name: Test + run: | + sudo apt update -y + sudo apt install -y golang git + go test -v diff --git a/README.md b/README.md index dcd293e..be94ba8 100644 --- a/README.md +++ b/README.md @@ -7,11 +7,12 @@
## Notice @@ -71,7 +72,7 @@ import "github.com/JustinTimperio/gpq" ``` ### Prerequisites -For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github.com/cornelk/hashmap) and [BadgerDB](https://github.com/dgraph-io/badger). +For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github.com/cornelk/hashmap), [btree](https://github.com/tidwall/btree) and [BadgerDB](https://github.com/dgraph-io/badger). ### API Reference - `NewGPQ[d any](options schema.GPQOptions) (uint, *GPQ[d], error)` - Creates a new GPQ with the specified options and returns the number of restored items, the GPQ, and an error if one occurred. @@ -102,7 +103,7 @@ func main() { ShouldEscalate: true, EscalationRate: time.Duration(time.Second), CanTimeout: true, - Timeout: time.Duration(time.Second * 1), + Timeout: time.Duration(time.Second * 5), } opts := schema.GPQOptions{ diff --git a/gpq.go b/gpq.go index 7425d5f..24823cd 100644 --- a/gpq.go +++ b/gpq.go @@ -35,9 +35,9 @@ type GPQ[d any] struct { // lazyDiskDeleteChan is a channel used to send messages to the lazy disk cache lazyDiskDeleteChan chan schema.DeleteMessage // batchHandler allows for synchronization of disk cache batches - batchHandler *BatchHandler[d] + batchHandler *batchHandler[d] // batchCounter is used to keep track the current batch number - batchCounter *BatchCounter + batchCounter *batchCounter } // NewGPQ creates a new GPQ with the given number of buckets @@ -72,8 +72,8 @@ func NewGPQ[d any](Options schema.GPQOptions) (uint, *GPQ[d], error) { lazyDiskSendChan: sender, lazyDiskDeleteChan: receiver, - batchHandler: NewBatchHandler(diskCache), - batchCounter: NewBatchCounter(Options.LazyDiskBatchSize), + batchHandler: newBatchHandler(diskCache), + batchCounter: newBatchCounter(Options.LazyDiskBatchSize), } var restored uint @@ -130,7 +130,7 @@ func (g *GPQ[d]) Enqueue(item schema.Item[d]) error { item.DiskUUID = key if g.options.LazyDiskCacheEnabled { - item.BatchNumber = g.batchCounter.Increment() + item.BatchNumber = g.batchCounter.increment() g.lazyDiskSendChan <- item } else { err = g.diskCache.WriteSingle(key, item) @@ -169,7 +169,7 @@ func (g *GPQ[d]) EnqueueBatch(items []schema.Item[d]) []error { items[i].DiskUUID = key if g.options.LazyDiskCacheEnabled { - items[i].BatchNumber = g.batchCounter.Increment() + items[i].BatchNumber = g.batchCounter.increment() g.lazyDiskSendChan <- items[i] } else { err = g.diskCache.WriteSingle(items[i].DiskUUID, items[i]) diff --git a/gpq_base_test.go b/gpq_base_test.go index 860eefc..f081401 100644 --- a/gpq_base_test.go +++ b/gpq_base_test.go @@ -3,6 +3,7 @@ package gpq_test import ( "log" "sync" + "sync/atomic" "testing" "time" @@ -113,9 +114,9 @@ func TestPrioritize(t *testing.T) { } var ( - escalated uint - removed uint - received uint + escalated uint64 + removed uint64 + received uint64 ) var wg sync.WaitGroup @@ -135,9 +136,10 @@ func TestPrioritize(t *testing.T) { if err != nil { log.Fatalln(err) } - removed += r - escalated += e - t.Log("Received:", received, "Removed:", removed, "Escalated:", escalated) + + atomic.AddUint64(&removed, uint64(r)) + atomic.AddUint64(&escalated, uint64(e)) + t.Log("Received:", atomic.LoadUint64(&received), "Removed:", atomic.LoadUint64(&removed), "Escalated:", atomic.LoadUint64(&escalated)) case <-shutdown: break forloop @@ -164,7 +166,7 @@ func TestPrioritize(t *testing.T) { go func() { defer wg.Done() for { - if received+removed >= tm { + if atomic.LoadUint64(&received)+atomic.LoadUint64(&removed) >= uint64(tm) { break } time.Sleep(time.Millisecond * 10) @@ -172,7 +174,7 @@ func TestPrioritize(t *testing.T) { if err != nil { continue } - received++ + atomic.AddUint64(&received, 1) } t.Log("Dequeued all items") shutdown <- struct{}{} diff --git a/gpq_e2e_test.go b/gpq_e2e_test.go index 25a1032..9380941 100644 --- a/gpq_e2e_test.go +++ b/gpq_e2e_test.go @@ -52,8 +52,8 @@ func TestE2E(t *testing.T) { var ( received uint64 - removed uint - escalated uint + removed uint64 + escalated uint64 ) var wg sync.WaitGroup @@ -72,9 +72,10 @@ func TestE2E(t *testing.T) { if err != nil { log.Fatalln(err) } - removed += r - escalated += e - t.Log("Received:", received, "Removed:", removed, "Escalated:", escalated) + + atomic.AddUint64(&received, uint64(r)) + atomic.AddUint64(&escalated, uint64(e)) + t.Log("Received:", atomic.LoadUint64(&received), "Removed:", atomic.LoadUint64(&removed), "Escalated:", atomic.LoadUint64(&escalated)) case <-shutdown: break breaker @@ -111,7 +112,7 @@ func TestE2E(t *testing.T) { go func() { defer wg.Done() for { - if atomic.LoadUint64(&received)+uint64(removed) >= total { + if atomic.LoadUint64(&received)+atomic.LoadUint64(&removed) >= total { break } items, err := queue.DequeueBatch(batchSize) diff --git a/gpq_parallel_test.go b/gpq_parallel_test.go index 9812474..8807145 100644 --- a/gpq_parallel_test.go +++ b/gpq_parallel_test.go @@ -13,7 +13,7 @@ import ( // Tests pushing and pulling single messages in parallel func TestSingleParallel(t *testing.T) { var ( - total uint = 10_000_000 + total uint = 1_000_000 syncToDisk bool = false lazySync bool = false maxBuckets uint = 10 @@ -95,7 +95,7 @@ func TestSingleParallel(t *testing.T) { // Tests pushing and pulling batches of messages in parallel func TestBatchParallel(t *testing.T) { var ( - total uint = 10_000_000 + total uint = 1_000_000 syncToDisk bool = false lazySync bool = false maxBuckets uint = 10 diff --git a/helpers.go b/helpers.go index c092335..81ec1e3 100644 --- a/helpers.go +++ b/helpers.go @@ -7,15 +7,15 @@ import ( "github.com/JustinTimperio/gpq/schema" ) -type BatchHandler[T any] struct { +type batchHandler[T any] struct { mux *sync.Mutex syncedBatches map[uint]bool deletedBatches map[uint]bool diskCache *disk.Disk[T] } -func NewBatchHandler[T any](diskCache *disk.Disk[T]) *BatchHandler[T] { - return &BatchHandler[T]{ +func newBatchHandler[T any](diskCache *disk.Disk[T]) *batchHandler[T] { + return &batchHandler[T]{ mux: &sync.Mutex{}, syncedBatches: make(map[uint]bool), deletedBatches: make(map[uint]bool), @@ -23,7 +23,7 @@ func NewBatchHandler[T any](diskCache *disk.Disk[T]) *BatchHandler[T] { } } -func (bh *BatchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uint) { +func (bh *batchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uint) { bh.mux.Lock() defer bh.mux.Unlock() @@ -36,7 +36,7 @@ func (bh *BatchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uin bh.deletedBatches[batchNumber] = false } -func (bh *BatchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumber uint, wasRestored bool) { +func (bh *batchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumber uint, wasRestored bool) { bh.mux.Lock() defer bh.mux.Unlock() @@ -55,15 +55,15 @@ func (bh *BatchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumbe } -type BatchCounter struct { +type batchCounter struct { mux *sync.Mutex batchNumber uint batchCounter uint batchSize uint } -func NewBatchCounter(batchSize uint) *BatchCounter { - return &BatchCounter{ +func newBatchCounter(batchSize uint) *batchCounter { + return &batchCounter{ mux: &sync.Mutex{}, batchNumber: 0, batchCounter: 0, @@ -71,7 +71,7 @@ func NewBatchCounter(batchSize uint) *BatchCounter { } } -func (bc *BatchCounter) Increment() (batchNumber uint) { +func (bc *batchCounter) increment() (batchNumber uint) { bc.mux.Lock() defer bc.mux.Unlock()