Skip to content

Commit

Permalink
feat: implement live count method (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcbinz authored May 2, 2024
1 parent b9f209d commit d5ebeca
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 3 deletions.
51 changes: 50 additions & 1 deletion core/embed/query/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,60 @@ func (b Builder[M, C]) Live(ctx context.Context) (<-chan LiveResult[*M], error)
req := b.query.BuildAsLive()
resChan, err := b.db.Live(ctx, req.Statement, req.Variables)
if err != nil {
return nil, fmt.Errorf("could not query live records: %w", err)
return nil, fmt.Errorf("failed to query live records: %w", err)
}
return live(ctx, resChan, b.unmarshal, b.convTo), nil
}

// LiveCount is the live version of Count.
// Whenever a record is created or deleted that matches the
// conditions of the query, the count will be updated.
func (b Builder[M, C]) LiveCount(ctx context.Context) (<-chan int, error) {
count, err := b.Count(ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute initial count: %w", err)
}

resChan, err := b.Live(ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute live query: %w", err)
}

countChan := make(chan int, 1)

go func() {
defer close(countChan)

for {
select {

case <-ctx.Done():
return

case res, open := <-resChan:
if !open {
return
}

switch res.(type) {

case LiveCreate[*M]:
count++

case LiveDelete[*M]:
count--
}

countChan <- count
}
}
}()

countChan <- count

return countChan, nil
}

// LiveDiff behaves like Live, but instead of receiving the full result
// set on every change, it only receives the actual changes.
//func (b builder[M, C]) LiveDiff(ctx context.Context) (<-chan LiveResult[*M], error) {
Expand Down
51 changes: 50 additions & 1 deletion tests/basic/gen/som/query/builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 72 additions & 1 deletion tests/basic/som_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-surreal/som/tests/basic/model"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
"math/rand"
"testing"
"time"
)
Expand Down Expand Up @@ -146,7 +147,7 @@ func TestLiveQueries(t *testing.T) {

select {

case _, more = <-liveChan:
case _, more := <-liveChan:
if more {
t.Fatal("liveChan did not close after context was canceled")
}
Expand Down Expand Up @@ -231,3 +232,73 @@ func TestLiveQueriesFilter(t *testing.T) {
}
}
}

func TestLiveQueryCount(t *testing.T) {
ctx := context.Background()

client, cleanup := prepareDatabase(ctx, t)
defer cleanup()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := client.ApplySchema(ctx); err != nil {
t.Fatal(err)
}

liveCount, err := client.AllFieldTypesRepo().Query().LiveCount(ctx)
if err != nil {
t.Fatal(err)
}

count := rand.Intn(randMax-randMin) + randMin

var models []*model.AllFieldTypes

for i := 0; i < count; i++ {
newModel := &model.AllFieldTypes{}

if err := client.AllFieldTypesRepo().Create(ctx, newModel); err != nil {
t.Fatal(err)
}

models = append(models, newModel)
}

for i := 0; i <= count; i++ {
assert.Equal(t, i, <-liveCount)
}

for _, delModel := range models {
if err := client.AllFieldTypesRepo().Delete(ctx, delModel); err != nil {
t.Fatal(err)
}
}

for i := count; i > 0; i-- {
assert.Equal(t, i-1, <-liveCount)
}

select {

case <-liveCount:
t.Fatal("liveCount should not receive any more messages")

case <-time.After(1 * time.Second):
}

// Test the automatic closing of the live channel when the context is canceled:

cancel()

select {

case _, more := <-liveCount:
if more {
t.Fatal("liveCount did not close after context was canceled")
}

case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for live channel to close after context was canceled")
}
}

0 comments on commit d5ebeca

Please sign in to comment.