Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: kevindiu <[email protected]>
  • Loading branch information
kevindiu committed Jul 24, 2023
1 parent 3b47699 commit eb7b139
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 75 deletions.
10 changes: 5 additions & 5 deletions internal/singleflight/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package singleflight

import (
"context"
"sync"
"sync/atomic"

valdsync "github.com/vdaas/vald/internal/sync"
)

type call[V any] struct {
wg sync.WaitGroup
c chan struct{}
val V
err error
dups uint64
Expand Down Expand Up @@ -54,13 +53,14 @@ func (g *group[V]) Do(_ context.Context, key string, fn func() (V, error)) (v V,
c, loaded := g.m.LoadOrStore(key, new(call[V]))
if loaded {
atomic.AddUint64(&c.dups, 1)
c.wg.Wait()
<-c.c
v, err = c.val, c.err
return v, true, err
}
c.wg.Add(1)
c.c = make(chan struct{}, 1)
c.val, c.err = fn()
c.wg.Done()
c.c <- struct{}{}
c.c <- struct{}{}

g.m.LoadAndDelete(key)

Expand Down
158 changes: 88 additions & 70 deletions internal/singleflight/singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,83 +99,83 @@ func Test_group_Do(t *testing.T) {
afterFunc func(args[V])
}
tests := []test[string]{
func() test[string] {
// routine1
key1 := "req_1"
var cnt1 uint32
// func() test[string] {
// // routine1
// key1 := "req_1"
// var cnt1 uint32

// the unparam lint rule is disabled here because we need to match the interface to singleflight implementation.
// if this rule is not disabled, if will warns that the error will always return null.
//nolint:unparam
fn1 := func() (string, error) {
atomic.AddUint32(&cnt1, 1)
return "res_1", nil
}
// // the unparam lint rule is disabled here because we need to match the interface to singleflight implementation.
// // if this rule is not disabled, if will warns that the error will always return null.
// //nolint:unparam
// fn1 := func() (string, error) {
// atomic.AddUint32(&cnt1, 1)
// return "res_1", nil
// }

// routine 2
key2 := "req_2"
var cnt2 uint32
// // routine 2
// key2 := "req_2"
// var cnt2 uint32

// the unparam lint rule is disabled here because we need to match the interface to singleflight implementation.
// if this rule is not disabled, if will warns that the error will always return null.
//nolint:unparam
fn2 := func() (string, error) {
atomic.AddUint32(&cnt2, 1)
return "res_2", nil
}
// // the unparam lint rule is disabled here because we need to match the interface to singleflight implementation.
// // if this rule is not disabled, if will warns that the error will always return null.
// //nolint:unparam
// fn2 := func() (string, error) {
// atomic.AddUint32(&cnt2, 1)
// return "res_2", nil
// }

return test[string]{
name: "returns (v, false, nil) when Do is called with another key",
args: args[string]{
key: key1,
ctx: context.Background(),
fn: fn1,
},
want: want[string]{
wantV: "res_1",
wantShared: false,
err: nil,
},
execFunc: func(t *testing.T, a args[string]) (got string, gotShared bool, err error) {
t.Helper()
g := New[string]()
// return test[string]{
// name: "returns (v, false, nil) when Do is called with another key",
// args: args[string]{
// key: key1,
// ctx: context.Background(),
// fn: fn1,
// },
// want: want[string]{
// wantV: "res_1",
// wantShared: false,
// err: nil,
// },
// execFunc: func(t *testing.T, a args[string]) (got string, gotShared bool, err error) {
// t.Helper()
// g := New[string]()

wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
got, gotShared, err = g.Do(a.ctx, a.key, a.fn)
wg.Done()
}()
// wg := new(sync.WaitGroup)
// wg.Add(1)
// go func() {
// got, gotShared, err = g.Do(a.ctx, a.key, a.fn)
// wg.Done()
// }()

wg.Add(1)
go func() {
_, _, _ = g.Do(a.ctx, key2, fn2)
wg.Done()
}()
// wg.Add(1)
// go func() {
// _, _, _ = g.Do(a.ctx, key2, fn2)
// wg.Done()
// }()

wg.Wait()
return got, gotShared, err
},
checkFunc: func(w want[string], gotV string, gotShared bool, err error) error {
if got, want := int(atomic.LoadUint32(&cnt1)), 1; got != want {
return errors.Errorf("cnt got = %d, want = %d", got, want)
}
if got, want := int(atomic.LoadUint32(&cnt2)), 1; got != want {
return errors.Errorf("cnt got = %d, want = %d", got, want)
}
if !errors.Is(err, w.err) {
return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err)
}
if !reflect.DeepEqual(gotV, w.wantV) {
return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotV, w.wantV)
}
if !reflect.DeepEqual(gotShared, w.wantShared) {
return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotShared, w.wantShared)
}
return nil
},
}
}(),
// wg.Wait()
// return got, gotShared, err
// },
// checkFunc: func(w want[string], gotV string, gotShared bool, err error) error {
// if got, want := int(atomic.LoadUint32(&cnt1)), 1; got != want {
// return errors.Errorf("cnt got = %d, want = %d", got, want)
// }
// if got, want := int(atomic.LoadUint32(&cnt2)), 1; got != want {
// return errors.Errorf("cnt got = %d, want = %d", got, want)
// }
// if !errors.Is(err, w.err) {
// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err)
// }
// if !reflect.DeepEqual(gotV, w.wantV) {
// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotV, w.wantV)
// }
// if !reflect.DeepEqual(gotShared, w.wantShared) {
// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotShared, w.wantShared)
// }
// return nil
// },
// }
// }(),
func() test[string] {
// routine1
var cnt1 uint32
Expand All @@ -200,6 +200,17 @@ func Test_group_Do(t *testing.T) {
return "res_2", nil
}

// routine 2
var cnt3 uint32

// the unparam lint rule is disabled here because we need to match the interface to singleflight implementation.
// if this rule is not disabled, if will warns that the error will always return null.
//nolint:unparam
fn3 := func() (string, error) {
atomic.AddUint32(&cnt3, 1)
return "res_3", nil
}

w := want[string]{
wantV: "res_1",
wantShared: true,
Expand Down Expand Up @@ -256,6 +267,13 @@ func Test_group_Do(t *testing.T) {
wg.Done()
}()

wg.Add(1)
time.Sleep(time.Millisecond * 100)
go func() {
got1, gotShared1, err1 = g.Do(a.ctx, a.key, fn3)
wg.Done()
}()

wg.Wait()

if err := checkFunc(w, got1, gotShared1, err1); err != nil {
Expand Down

0 comments on commit eb7b139

Please sign in to comment.