From de12b1e7b8744ed2c15337a2219b2d81be95fa31 Mon Sep 17 00:00:00 2001 From: Umputun Date: Sun, 10 Nov 2019 03:15:07 -0600 Subject: [PATCH 1/7] wip: start with pubsub --- cache.go | 2 ++ cache_test.go | 18 ++++++++++++++ eventbus/pubsub.go | 17 +++++++++++++ expirable_cache.go | 16 +++++++++++++ expirable_cache_test.go | 41 +++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 ++ lru_cache.go | 19 +++++++++++++++ lru_cache_test.go | 53 +++++++++++++++++++++++++++++++++++++++++ options.go | 11 +++++++++ 10 files changed, 180 insertions(+) create mode 100644 eventbus/pubsub.go diff --git a/cache.go b/cache.go index b917bd5..acc6a3d 100644 --- a/cache.go +++ b/cache.go @@ -6,6 +6,8 @@ // 3 flavors of cache provided - NoP (do-nothing cache), ExpirableCache (TTL based), and LruCache package lcw +//go:generate sh -c "mockery -inpkg -name LoadingCache -print > /tmp/cache-mock.tmp && mv /tmp/cache-mock.tmp cache_mock.go" + import ( "fmt" ) diff --git a/cache_test.go b/cache_test.go index 640f04e..346be4b 100644 --- a/cache_test.go +++ b/cache_test.go @@ -656,3 +656,21 @@ func (s sizedString) Size() int { return len(s) } func (s sizedString) MarshalBinary() (data []byte, err error) { return []byte(s), nil } + +type mockPubSub struct { + calledKeys []string + fns []func(fromID string, key string) +} + +func (m *mockPubSub) Subscribe(fn func(fromID string, key string)) error { + m.fns = append(m.fns, fn) + return nil +} + +func (m *mockPubSub) Publish(fromID string, key string) error { + m.calledKeys = append(m.calledKeys, key) + for _, fn := range m.fns { + fn(fromID, key) + } + return nil +} diff --git a/eventbus/pubsub.go b/eventbus/pubsub.go new file mode 100644 index 0000000..479fdf4 --- /dev/null +++ b/eventbus/pubsub.go @@ -0,0 +1,17 @@ +package eventbus + +type PubSub interface { + Publish(fromID string, key string) error + Subscribe(fn func(fromID string, key string)) error +} + +// NopPubSub implements default do-nothing pub-sub (event bus) +type NopPubSub struct{} + +func (n *NopPubSub) Subscribe(fn func(fromID string, key string)) error { + return nil +} + +func (n *NopPubSub) Publish(fromID string, key string) error { + return nil +} diff --git a/expirable_cache.go b/expirable_cache.go index 4f862ef..886e6ce 100644 --- a/expirable_cache.go +++ b/expirable_cache.go @@ -4,8 +4,10 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" "github.com/pkg/errors" + "github.com/go-pkgz/lcw/eventbus" "github.com/go-pkgz/lcw/internal/cache" ) @@ -14,6 +16,7 @@ type ExpirableCache struct { options CacheStat currentSize int64 + id string backend *cache.LoadingCache } @@ -24,7 +27,9 @@ func NewExpirableCache(opts ...Option) (*ExpirableCache, error) { maxKeys: 1000, maxValueSize: 0, ttl: 5 * time.Minute, + eventBus: &eventbus.NopPubSub{}, }, + id: uuid.New().String(), } for _, opt := range opts { @@ -33,6 +38,10 @@ func NewExpirableCache(opts ...Option) (*ExpirableCache, error) { } } + if err := res.eventBus.Subscribe(res.onBusEvent); err != nil { + return nil, errors.Wrapf(err, "can't subscribe to event bus") + } + backend, err := cache.NewLoadingCache( cache.MaxKeys(res.maxKeys), cache.TTL(res.ttl), @@ -45,6 +54,7 @@ func NewExpirableCache(opts ...Option) (*ExpirableCache, error) { size := s.Size() atomic.AddInt64(&res.currentSize, -1*int64(size)) } + _ = res.eventBus.Publish(res.id, key) }), ) if err != nil { @@ -128,6 +138,12 @@ func (c *ExpirableCache) Close() error { return nil } +func (c *ExpirableCache) onBusEvent(id, key string) { + if id != c.id { + c.backend.Invalidate(key) + } +} + func (c *ExpirableCache) size() int64 { return atomic.LoadInt64(&c.currentSize) } diff --git a/expirable_cache_test.go b/expirable_cache_test.go index 8326bb6..5fa30fe 100644 --- a/expirable_cache_test.go +++ b/expirable_cache_test.go @@ -112,3 +112,44 @@ func TestExpirableCache_BadOptions(t *testing.T) { _, err = NewExpirableCache(TTL(-1)) assert.EqualError(t, err, "failed to set cache option: negative ttl") } + +func TestExpirableCacheWithBus(t *testing.T) { + ps := &mockPubSub{} + lc1, err := NewExpirableCache(MaxKeys(5), TTL(time.Millisecond*100), EventBus(ps)) + require.NoError(t, err) + + lc2, err := NewExpirableCache(MaxKeys(50), TTL(time.Millisecond*5000), EventBus(ps)) + require.NoError(t, err) + + // add 5 keys to the first node cache + for i := 0; i < 5; i++ { + _, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (Value, error) { + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + time.Sleep(10 * time.Millisecond) + } + + assert.Equal(t, 0, len(ps.calledKeys), "no events") + assert.Equal(t, 5, lc1.Stat().Keys) + assert.Equal(t, int64(5), lc1.Stat().Misses) + + // add key-1 key to the second node + _, e := lc2.Get(fmt.Sprintf("key-1"), func() (Value, error) { + return "result-111", nil + }) + assert.NoError(t, e) + assert.Equal(t, 1, lc2.Stat().Keys) + assert.Equal(t, int64(1), lc2.Stat().Misses, lc2.Stat()) + + time.Sleep(55 * time.Millisecond) // let key-1 expire + assert.Equal(t, 1, len(ps.calledKeys), "1 event, key-0 expired") + assert.Equal(t, 4, lc1.Stat().Keys) + assert.Equal(t, 1, lc2.Stat().Keys, "key-1 still in cache2") + + time.Sleep(210 * time.Millisecond) // let all keys expire + assert.Equal(t, 6, len(ps.calledKeys), "6 events, key-1 expired %+v", ps.calledKeys) + assert.Equal(t, 0, lc1.Stat().Keys) + assert.Equal(t, 0, lc2.Stat().Keys, "key-1 removed from cache2") + +} diff --git a/go.mod b/go.mod index 5ac87b0..bd7894d 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/go-pkgz/lcw require ( github.com/alicebob/miniredis/v2 v2.11.4 github.com/go-redis/redis/v7 v7.2.0 + github.com/google/uuid v1.1.1 github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/golang-lru v0.5.4 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 38edae2..8f189aa 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3 h1:6amM4HsNPOvMLVc2ZnyqrjeQ92YAVWn7T4WBKK87inY= github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= diff --git a/lru_cache.go b/lru_cache.go index fee675d..0fae37b 100644 --- a/lru_cache.go +++ b/lru_cache.go @@ -1,10 +1,14 @@ package lcw import ( + "log" "sync/atomic" + "github.com/google/uuid" lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" + + "github.com/go-pkgz/lcw/eventbus" ) // LruCache wraps lru.LruCache with loading cache Get and size limits @@ -13,6 +17,7 @@ type LruCache struct { CacheStat backend *lru.Cache currentSize int64 + id string } // NewLruCache makes LRU LoadingCache implementation, 1000 max keys by default @@ -21,7 +26,9 @@ func NewLruCache(opts ...Option) (*LruCache, error) { options: options{ maxKeys: 1000, maxValueSize: 0, + eventBus: &eventbus.NopPubSub{}, }, + id: uuid.New().String(), } for _, opt := range opts { if err := opt(&res.options); err != nil { @@ -29,6 +36,10 @@ func NewLruCache(opts ...Option) (*LruCache, error) { } } + if err := res.eventBus.Subscribe(res.onBusEvent); err != nil { + return nil, errors.Wrapf(err, "can't subscribe to event bus") + } + onEvicted := func(key interface{}, value interface{}) { if res.onEvicted != nil { res.onEvicted(key.(string), value) @@ -37,6 +48,7 @@ func NewLruCache(opts ...Option) (*LruCache, error) { size := s.Size() atomic.AddInt64(&res.currentSize, -1*int64(size)) } + _ = res.eventBus.Publish(res.id, key.(string)) } var err error @@ -131,6 +143,13 @@ func (c *LruCache) Close() error { return nil } +func (c *LruCache) onBusEvent(id, key string) { + log.Println("!! ", id, key) + if id != c.id { // prevent reaction on event from this cache + c.backend.Remove(key) + } +} + func (c *LruCache) size() int64 { return atomic.LoadInt64(&c.currentSize) } diff --git a/lru_cache_test.go b/lru_cache_test.go index ffc30af..663d460 100644 --- a/lru_cache_test.go +++ b/lru_cache_test.go @@ -82,6 +82,59 @@ func TestLruCache_BadOptions(t *testing.T) { assert.EqualError(t, err, "failed to set cache option: negative ttl") } +func TestLruCache_MaxKeysWithBus(t *testing.T) { + + ps := &mockPubSub{} + + var coldCalls int32 + lc1, err := NewLruCache(MaxKeys(5), MaxValSize(10), EventBus(ps)) + require.Nil(t, err) + + lc2, err := NewLruCache(MaxKeys(50), MaxValSize(100), EventBus(ps)) + require.Nil(t, err) + + // put 5 keys to cache1 + for i := 0; i < 5; i++ { + res, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (Value, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + assert.Nil(t, e) + assert.Equal(t, fmt.Sprintf("result-%d", i), res.(string)) + assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) + } + // check if really cached + res, err := lc1.Get("key-3", func() (Value, error) { + return "result-blah", nil + }) + assert.Nil(t, err) + assert.Equal(t, "result-3", res.(string), "should be cached") + + // put 1 key to cache2 + res, e := lc2.Get(fmt.Sprintf("key-1"), func() (Value, error) { + return fmt.Sprintf("result-111"), nil + }) + assert.Nil(t, e) + assert.Equal(t, fmt.Sprintf("result-111"), res.(string)) + + // try to cache1 after maxKeys reached, will remove key-0 + res, err = lc1.Get("key-X", func() (Value, error) { + return "result-X", nil + }) + assert.Nil(t, err) + assert.Equal(t, "result-X", res.(string)) + assert.Equal(t, 5, lc1.backend.Len()) + + assert.Equal(t, 1, lc2.backend.Len(), "cache2 still has key-1") + + // try to cache1 after maxKeys reached, will remove key-1 + res, err = lc1.Get("key-X2", func() (Value, error) { + return "result-X", nil + }) + assert.Nil(t, err) + assert.Equal(t, 1, lc2.backend.Len(), "cache2 removed key-1") +} + // LruCache illustrates the use of LRU loading cache func ExampleLruCache() { // set up test server for single response diff --git a/options.go b/options.go index 4ee964a..00ac140 100644 --- a/options.go +++ b/options.go @@ -3,6 +3,8 @@ package lcw import ( "errors" "time" + + "github.com/go-pkgz/lcw/eventbus" ) type options struct { @@ -12,6 +14,7 @@ type options struct { maxCacheSize int64 ttl time.Duration onEvicted func(key string, value Value) + eventBus eventbus.PubSub } // Option func type @@ -84,3 +87,11 @@ func OnEvicted(fn func(key string, value Value)) Option { return nil } } + +// OnEvicted sets callback on invalidation event +func EventBus(pubsub eventbus.PubSub) Option { + return func(o *options) error { + o.eventBus = pubsub + return nil + } +} From 8270e7d4b95640b8ada144563b91a3395fb91604 Mon Sep 17 00:00:00 2001 From: Umputun Date: Sun, 10 Nov 2019 20:16:02 -0600 Subject: [PATCH 2/7] wip: fails on lru --- cache_test.go | 2 ++ lru_cache.go | 30 ++++++++++++++++++------------ lru_cache_test.go | 1 - 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cache_test.go b/cache_test.go index 346be4b..93572b5 100644 --- a/cache_test.go +++ b/cache_test.go @@ -3,6 +3,7 @@ package lcw import ( "errors" "fmt" + "log" "math/rand" "strings" "sync" @@ -671,6 +672,7 @@ func (m *mockPubSub) Publish(fromID string, key string) error { m.calledKeys = append(m.calledKeys, key) for _, fn := range m.fns { fn(fromID, key) + log.Println("!!!", fromID, key) } return nil } diff --git a/lru_cache.go b/lru_cache.go index 0fae37b..3de0cb9 100644 --- a/lru_cache.go +++ b/lru_cache.go @@ -17,7 +17,7 @@ type LruCache struct { CacheStat backend *lru.Cache currentSize int64 - id string + id string // uuid identifying cache instance } // NewLruCache makes LRU LoadingCache implementation, 1000 max keys by default @@ -36,28 +36,33 @@ func NewLruCache(opts ...Option) (*LruCache, error) { } } - if err := res.eventBus.Subscribe(res.onBusEvent); err != nil { - return nil, errors.Wrapf(err, "can't subscribe to event bus") + err := res.init() + return &res, err +} + +func (c *LruCache) init() error { + if err := c.eventBus.Subscribe(c.onBusEvent); err != nil { + return errors.Wrapf(err, "can't subscribe to event bus") } onEvicted := func(key interface{}, value interface{}) { - if res.onEvicted != nil { - res.onEvicted(key.(string), value) + if c.onEvicted != nil { + c.onEvicted(key.(string), value) } if s, ok := value.(Sizer); ok { size := s.Size() - atomic.AddInt64(&res.currentSize, -1*int64(size)) + atomic.AddInt64(&c.currentSize, -1*int64(size)) } - _ = res.eventBus.Publish(res.id, key.(string)) + _ = c.eventBus.Publish(c.id, key.(string)) // signal invalidation to other nodes } var err error // OnEvicted called automatically for expired and manually deleted - if res.backend, err = lru.NewWithEvict(res.maxKeys, onEvicted); err != nil { - return nil, errors.Wrap(err, "failed to make lru cache backend") + if c.backend, err = lru.NewWithEvict(c.maxKeys, onEvicted); err != nil { + return errors.Wrap(err, "failed to make lru cache backend") } - return &res, nil + return nil } // Get gets value by key or load with fn if not found in cache @@ -143,9 +148,10 @@ func (c *LruCache) Close() error { return nil } +// onBusEvent reacts on invalidation message triggered by event bus from another cache instance func (c *LruCache) onBusEvent(id, key string) { - log.Println("!! ", id, key) - if id != c.id { // prevent reaction on event from this cache + if id != c.id && c.backend.Contains(key) { // prevent reaction on event from this cache + log.Println("!! ", id, key) c.backend.Remove(key) } } diff --git a/lru_cache_test.go b/lru_cache_test.go index 663d460..960962e 100644 --- a/lru_cache_test.go +++ b/lru_cache_test.go @@ -83,7 +83,6 @@ func TestLruCache_BadOptions(t *testing.T) { } func TestLruCache_MaxKeysWithBus(t *testing.T) { - ps := &mockPubSub{} var coldCalls int32 From c85df6e0c95c9b10bcf53e4434dc11468d9dc094 Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Mon, 11 May 2020 22:46:59 +0200 Subject: [PATCH 3/7] golangci-lint fixes --- cache_test.go | 2 +- eventbus/pubsub.go | 9 ++++++++- expirable_cache.go | 1 + expirable_cache_test.go | 1 + lru_cache_test.go | 2 ++ options.go | 2 +- 6 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cache_test.go b/cache_test.go index 93572b5..c39469a 100644 --- a/cache_test.go +++ b/cache_test.go @@ -668,7 +668,7 @@ func (m *mockPubSub) Subscribe(fn func(fromID string, key string)) error { return nil } -func (m *mockPubSub) Publish(fromID string, key string) error { +func (m *mockPubSub) Publish(fromID, key string) error { m.calledKeys = append(m.calledKeys, key) for _, fn := range m.fns { fn(fromID, key) diff --git a/eventbus/pubsub.go b/eventbus/pubsub.go index 479fdf4..5f41cf6 100644 --- a/eventbus/pubsub.go +++ b/eventbus/pubsub.go @@ -1,5 +1,10 @@ +// Package eventbus provides PubSub interface used for distributed cache invalidation, +// as well as NopPubSub implementation. package eventbus +// PubSub interface is used for distributed cache invalidation. +// Publish is called on each entry invalidation, +// Subscribe is used for subscription for these events. type PubSub interface { Publish(fromID string, key string) error Subscribe(fn func(fromID string, key string)) error @@ -8,10 +13,12 @@ type PubSub interface { // NopPubSub implements default do-nothing pub-sub (event bus) type NopPubSub struct{} +// Subscribe does nothing for NopPubSub func (n *NopPubSub) Subscribe(fn func(fromID string, key string)) error { return nil } -func (n *NopPubSub) Publish(fromID string, key string) error { +// Publish does nothing for NopPubSub +func (n *NopPubSub) Publish(fromID, key string) error { return nil } diff --git a/expirable_cache.go b/expirable_cache.go index 886e6ce..c657d28 100644 --- a/expirable_cache.go +++ b/expirable_cache.go @@ -138,6 +138,7 @@ func (c *ExpirableCache) Close() error { return nil } +// onBusEvent reacts on invalidation message triggered by event bus from another cache instance func (c *ExpirableCache) onBusEvent(id, key string) { if id != c.id { c.backend.Invalidate(key) diff --git a/expirable_cache_test.go b/expirable_cache_test.go index 5fa30fe..fab293f 100644 --- a/expirable_cache_test.go +++ b/expirable_cache_test.go @@ -123,6 +123,7 @@ func TestExpirableCacheWithBus(t *testing.T) { // add 5 keys to the first node cache for i := 0; i < 5; i++ { + i := i _, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (Value, error) { return fmt.Sprintf("result-%d", i), nil }) diff --git a/lru_cache_test.go b/lru_cache_test.go index 960962e..ff8375e 100644 --- a/lru_cache_test.go +++ b/lru_cache_test.go @@ -94,6 +94,7 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) { // put 5 keys to cache1 for i := 0; i < 5; i++ { + i := i res, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (Value, error) { atomic.AddInt32(&coldCalls, 1) return fmt.Sprintf("result-%d", i), nil @@ -131,6 +132,7 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) { return "result-X", nil }) assert.Nil(t, err) + assert.Equal(t, "result-2", res.(string)) assert.Equal(t, 1, lc2.backend.Len(), "cache2 removed key-1") } diff --git a/options.go b/options.go index 00ac140..36bca12 100644 --- a/options.go +++ b/options.go @@ -88,7 +88,7 @@ func OnEvicted(fn func(key string, value Value)) Option { } } -// OnEvicted sets callback on invalidation event +// EventBus sets PubSub for distributed cache invalidation func EventBus(pubsub eventbus.PubSub) Option { return func(o *options) error { o.eventBus = pubsub From 1de0f866b4df472de67a3baa7bc65c245412488f Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Thu, 14 May 2020 00:04:02 +0200 Subject: [PATCH 4/7] fix deadlock in mockPubSub --- cache_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cache_test.go b/cache_test.go index c39469a..0bea918 100644 --- a/cache_test.go +++ b/cache_test.go @@ -671,7 +671,8 @@ func (m *mockPubSub) Subscribe(fn func(fromID string, key string)) error { func (m *mockPubSub) Publish(fromID, key string) error { m.calledKeys = append(m.calledKeys, key) for _, fn := range m.fns { - fn(fromID, key) + // run in goroutine to prevent deadlock + go fn(fromID, key) log.Println("!!!", fromID, key) } return nil From 9df5066b341b9ac6f51ff6c4cff7d6b971136961 Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Thu, 28 May 2020 21:36:14 +0200 Subject: [PATCH 5/7] make PubSub tests more robust --- README.md | 9 ++++++--- cache_test.go | 21 ++++++++++++++++++--- expirable_cache_test.go | 25 ++++++++++++++++++------- lru_cache.go | 2 -- lru_cache_test.go | 32 ++++++++++++++++++++++---------- 5 files changed, 64 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 0bdeaf6..1ca9ffa 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,11 @@ Main features: ## Usage -``` -cache := lcw.NewLruCache(lcw.MaxKeys(500), lcw.MaxCacheSize(65536), lcw.MaxValSize(200), lcw.MaxKeySize(32)) +```go +cache, err := lcw.NewLruCache(lcw.MaxKeys(500), lcw.MaxCacheSize(65536), lcw.MaxValSize(200), lcw.MaxKeySize(32)) +if err != nil { + panic("failed to create cache") +} defer cache.Close() val, err := cache.Get("key123", func() (lcw.Value, error) { @@ -59,7 +62,7 @@ Cache can be created with URIs: 1. Key is not a string, but a composed type made from partition, key-id and list of scopes (tags). 1. Value type limited to `[]byte` 1. Added `Flush` method for scoped/tagged invalidation of multiple records in a given partition -1. A simplified interface with Get, Stat and Flush only. +1. A simplified interface with Get, Stat, Flush and Close only. ## Details diff --git a/cache_test.go b/cache_test.go index 0bea918..ce50cb5 100644 --- a/cache_test.go +++ b/cache_test.go @@ -3,7 +3,6 @@ package lcw import ( "errors" "fmt" - "log" "math/rand" "strings" "sync" @@ -661,19 +660,35 @@ func (s sizedString) MarshalBinary() (data []byte, err error) { type mockPubSub struct { calledKeys []string fns []func(fromID string, key string) + sync.Mutex + sync.WaitGroup +} + +func (m *mockPubSub) CalledKeys() []string { + m.Lock() + defer m.Unlock() + return m.calledKeys } func (m *mockPubSub) Subscribe(fn func(fromID string, key string)) error { + m.Lock() + defer m.Unlock() m.fns = append(m.fns, fn) return nil } func (m *mockPubSub) Publish(fromID, key string) error { + m.Lock() + defer m.Unlock() m.calledKeys = append(m.calledKeys, key) for _, fn := range m.fns { + fn := fn + m.Add(1) // run in goroutine to prevent deadlock - go fn(fromID, key) - log.Println("!!!", fromID, key) + go func() { + fn(fromID, key) + m.Done() + }() } return nil } diff --git a/expirable_cache_test.go b/expirable_cache_test.go index fab293f..7eb2275 100644 --- a/expirable_cache_test.go +++ b/expirable_cache_test.go @@ -37,7 +37,11 @@ func TestExpirableCache(t *testing.T) { assert.Equal(t, 5, lc.Stat().Keys) assert.Equal(t, int64(6), lc.Stat().Misses) - time.Sleep(55 * time.Millisecond) + // let key-0 expire, GitHub Actions friendly way + for lc.Stat().Keys > 4 { + lc.backend.DeleteExpired() // enforce DeleteExpired for GitHub earlier than TTL/2 + time.Sleep(time.Millisecond * 10) + } assert.Equal(t, 4, lc.Stat().Keys) time.Sleep(210 * time.Millisecond) @@ -117,9 +121,11 @@ func TestExpirableCacheWithBus(t *testing.T) { ps := &mockPubSub{} lc1, err := NewExpirableCache(MaxKeys(5), TTL(time.Millisecond*100), EventBus(ps)) require.NoError(t, err) + defer lc1.Close() lc2, err := NewExpirableCache(MaxKeys(50), TTL(time.Millisecond*5000), EventBus(ps)) require.NoError(t, err) + defer lc2.Close() // add 5 keys to the first node cache for i := 0; i < 5; i++ { @@ -131,26 +137,31 @@ func TestExpirableCacheWithBus(t *testing.T) { time.Sleep(10 * time.Millisecond) } - assert.Equal(t, 0, len(ps.calledKeys), "no events") + assert.Equal(t, 0, len(ps.CalledKeys()), "no events") assert.Equal(t, 5, lc1.Stat().Keys) assert.Equal(t, int64(5), lc1.Stat().Misses) // add key-1 key to the second node - _, e := lc2.Get(fmt.Sprintf("key-1"), func() (Value, error) { + _, e := lc2.Get("key-1", func() (Value, error) { return "result-111", nil }) assert.NoError(t, e) assert.Equal(t, 1, lc2.Stat().Keys) assert.Equal(t, int64(1), lc2.Stat().Misses, lc2.Stat()) - time.Sleep(55 * time.Millisecond) // let key-1 expire - assert.Equal(t, 1, len(ps.calledKeys), "1 event, key-0 expired") + // let key-0 expire, GitHub Actions friendly way + for lc1.Stat().Keys > 4 { + lc1.backend.DeleteExpired() // enforce DeleteExpired for GitHub earlier than TTL/2 + ps.Wait() // wait for onBusEvent goroutines to finish + time.Sleep(time.Millisecond * 10) + } assert.Equal(t, 4, lc1.Stat().Keys) assert.Equal(t, 1, lc2.Stat().Keys, "key-1 still in cache2") + assert.Equal(t, 1, len(ps.CalledKeys())) time.Sleep(210 * time.Millisecond) // let all keys expire - assert.Equal(t, 6, len(ps.calledKeys), "6 events, key-1 expired %+v", ps.calledKeys) + ps.Wait() // wait for onBusEvent goroutines to finish + assert.Equal(t, 6, len(ps.CalledKeys()), "6 events, key-1 expired %+v", ps.calledKeys) assert.Equal(t, 0, lc1.Stat().Keys) assert.Equal(t, 0, lc2.Stat().Keys, "key-1 removed from cache2") - } diff --git a/lru_cache.go b/lru_cache.go index 3de0cb9..914ab95 100644 --- a/lru_cache.go +++ b/lru_cache.go @@ -1,7 +1,6 @@ package lcw import ( - "log" "sync/atomic" "github.com/google/uuid" @@ -151,7 +150,6 @@ func (c *LruCache) Close() error { // onBusEvent reacts on invalidation message triggered by event bus from another cache instance func (c *LruCache) onBusEvent(id, key string) { if id != c.id && c.backend.Contains(key) { // prevent reaction on event from this cache - log.Println("!! ", id, key) c.backend.Remove(key) } } diff --git a/lru_cache_test.go b/lru_cache_test.go index ff8375e..7d78e77 100644 --- a/lru_cache_test.go +++ b/lru_cache_test.go @@ -88,9 +88,11 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) { var coldCalls int32 lc1, err := NewLruCache(MaxKeys(5), MaxValSize(10), EventBus(ps)) require.Nil(t, err) + defer lc1.Close() lc2, err := NewLruCache(MaxKeys(50), MaxValSize(100), EventBus(ps)) require.Nil(t, err) + defer lc2.Close() // put 5 keys to cache1 for i := 0; i < 5; i++ { @@ -99,7 +101,7 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) { atomic.AddInt32(&coldCalls, 1) return fmt.Sprintf("result-%d", i), nil }) - assert.Nil(t, e) + assert.NoError(t, e) assert.Equal(t, fmt.Sprintf("result-%d", i), res.(string)) assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) } @@ -107,33 +109,43 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) { res, err := lc1.Get("key-3", func() (Value, error) { return "result-blah", nil }) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, "result-3", res.(string), "should be cached") + assert.Equal(t, 0, len(ps.CalledKeys()), "no events") + // put 1 key to cache2 - res, e := lc2.Get(fmt.Sprintf("key-1"), func() (Value, error) { - return fmt.Sprintf("result-111"), nil + res, e := lc2.Get("key-1", func() (Value, error) { + return "result-111", nil }) - assert.Nil(t, e) - assert.Equal(t, fmt.Sprintf("result-111"), res.(string)) + assert.NoError(t, e) + assert.Equal(t, "result-111", res.(string)) // try to cache1 after maxKeys reached, will remove key-0 res, err = lc1.Get("key-X", func() (Value, error) { return "result-X", nil }) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, "result-X", res.(string)) assert.Equal(t, 5, lc1.backend.Len()) + assert.Equal(t, 1, len(ps.CalledKeys()), "1 event, key-0 expired") + assert.Equal(t, 1, lc2.backend.Len(), "cache2 still has key-1") // try to cache1 after maxKeys reached, will remove key-1 res, err = lc1.Get("key-X2", func() (Value, error) { return "result-X", nil }) - assert.Nil(t, err) - assert.Equal(t, "result-2", res.(string)) - assert.Equal(t, 1, lc2.backend.Len(), "cache2 removed key-1") + assert.NoError(t, err) + assert.Equal(t, "result-X", res.(string)) + + assert.Equal(t, 2, len(ps.CalledKeys()), "2 events, key-1 expired") + + // wait for onBusEvent goroutines to finish + ps.Wait() + + assert.Equal(t, 0, lc2.backend.Len(), "cache2 removed key-1") } // LruCache illustrates the use of LRU loading cache From fd40e5bc1513506dbf2724f33ff95806e9033ae9 Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Wed, 3 Jun 2020 00:03:23 +0200 Subject: [PATCH 6/7] add real Redis PubSub, tests with LRU --- .github/workflows/ci.yml | 4 ++ cache.go | 1 + eventbus/pubsub.go | 2 +- eventbus/pubsub_test.go | 13 +++++++ eventbus/redis.go | 71 ++++++++++++++++++++++++++++++++++++ eventbus/redis_test.go | 38 +++++++++++++++++++ lru_cache_test.go | 79 +++++++++++++++++++++++++++++++++++++++- 7 files changed, 205 insertions(+), 3 deletions(-) create mode 100644 eventbus/pubsub_test.go create mode 100644 eventbus/redis.go create mode 100644 eventbus/redis_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 46ddffc..cde9b38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,9 @@ jobs: - name: checkout uses: actions/checkout@v2 + - name: start Redis + uses: supercharge/redis-github-action@1.1.0 + - name: build and test run: | go get -v @@ -29,6 +32,7 @@ jobs: env: GO111MODULE: "on" TZ: "America/Chicago" + ENABLE_REDIS_TESTS: "true" - name: install golangci-lint and goveralls run: | diff --git a/cache.go b/cache.go index acc6a3d..e7452a4 100644 --- a/cache.go +++ b/cache.go @@ -83,3 +83,4 @@ func (n *Nop) Stat() CacheStat { func (n *Nop) Close() error { return nil } + diff --git a/eventbus/pubsub.go b/eventbus/pubsub.go index 5f41cf6..5a3da9f 100644 --- a/eventbus/pubsub.go +++ b/eventbus/pubsub.go @@ -1,5 +1,5 @@ // Package eventbus provides PubSub interface used for distributed cache invalidation, -// as well as NopPubSub implementation. +// as well as NopPubSub and RedisPubSub implementations. package eventbus // PubSub interface is used for distributed cache invalidation. diff --git a/eventbus/pubsub_test.go b/eventbus/pubsub_test.go new file mode 100644 index 0000000..a154733 --- /dev/null +++ b/eventbus/pubsub_test.go @@ -0,0 +1,13 @@ +package eventbus + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNopPubSub(t *testing.T) { + nopPubSub := NopPubSub{} + assert.NoError(t, nopPubSub.Subscribe(nil)) + assert.NoError(t, nopPubSub.Publish("", "")) +} diff --git a/eventbus/redis.go b/eventbus/redis.go new file mode 100644 index 0000000..b02dabf --- /dev/null +++ b/eventbus/redis.go @@ -0,0 +1,71 @@ +package eventbus + +import ( + "strings" + "time" + + "github.com/go-redis/redis/v7" + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" +) + +// NewRedisPubSub creates new RedisPubSub with given parameters. +// Returns an error in case of problems with creating PubSub client for specified channel. +func NewRedisPubSub(addr, channel string) (*RedisPubSub, error) { + client := redis.NewClient(&redis.Options{Addr: addr}) + pubSub := client.Subscribe(channel) + // wait for subscription to be created and ignore the message + if _, err := pubSub.Receive(); err != nil { + return nil, errors.Wrapf(err, "problem subscribing to channel %s on address %s", channel, addr) + } + return &RedisPubSub{client: client, pubSub: pubSub, channel: channel, done: make(chan struct{})}, nil +} + +// RedisPubSub provides Redis implementation for PubSub interface +type RedisPubSub struct { + client *redis.Client + pubSub *redis.PubSub + channel string + + done chan struct{} +} + +// Subscribe calls provided function on subscription channel provided on new RedisPubSub instance creation. +// Should not be called more than once. Spawns a gorouting and does not return an error. +func (m *RedisPubSub) Subscribe(fn func(fromID string, key string)) error { + go func(done <-chan struct{}, pubsub *redis.PubSub) { + for { + select { + case <-done: + return + default: + } + msg, err := pubsub.ReceiveTimeout(time.Second * 10) + if err != nil { + continue + } + + // Process the message + if msg, ok := msg.(*redis.Message); ok { + payload := strings.Split(msg.Payload, "$") + fn(payload[0], payload[1]) + } + } + }(m.done, m.pubSub) + + return nil +} + +// Publish publishes provided message to channel provided on new RedisPubSub instance creation +func (m *RedisPubSub) Publish(fromID, key string) error { + return m.client.Publish(m.channel, fromID+"$"+key).Err() +} + +// Close cleans up running goroutines and closes Redis clients +func (m *RedisPubSub) Close() error { + close(m.done) + errs := new(multierror.Error) + errs = multierror.Append(errs, errors.Wrap(m.pubSub.Close(), "problem closing pubSub client")) + errs = multierror.Append(errs, errors.Wrap(m.client.Close(), "problem closing redis client")) + return errs.ErrorOrNil() +} diff --git a/eventbus/redis_test.go b/eventbus/redis_test.go new file mode 100644 index 0000000..4b6c4a2 --- /dev/null +++ b/eventbus/redis_test.go @@ -0,0 +1,38 @@ +package eventbus + +import ( + "math/rand" + "os" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewRedisPubSub_Error(t *testing.T) { + redisPubSub, err := NewRedisPubSub("127.0.0.1:99999", "test") + require.Error(t, err) + require.Nil(t, redisPubSub) +} + +func TestRedisPubSub(t *testing.T) { + if _, ok := os.LookupEnv("ENABLE_REDIS_TESTS"); !ok { + t.Skip("ENABLE_REDIS_TESTS env variable is not set, not expecting Redis to be ready at 127.0.0.1:6379") + } + + channel := "lcw-test-" + strconv.Itoa(rand.Intn(1000000)) + redisPubSub, err := NewRedisPubSub("127.0.0.1:6379", channel) + require.NoError(t, err) + require.NotNil(t, redisPubSub) + var called []string + assert.Nil(t, redisPubSub.Subscribe(func(fromID string, key string) { + called = append(called, fromID, key) + })) + assert.NoError(t, redisPubSub.Publish("test_fromID", "test_key")) + // Sleep which waits for Subscribe goroutine to pick up published changes + time.Sleep(time.Second) + assert.NoError(t, redisPubSub.Close()) + assert.Equal(t, []string{"test_fromID", "test_key"}, called) +} diff --git a/lru_cache_test.go b/lru_cache_test.go index 7d78e77..f77fa15 100644 --- a/lru_cache_test.go +++ b/lru_cache_test.go @@ -4,14 +4,20 @@ import ( "fmt" "io/ioutil" "log" + "math/rand" "net/http" "net/http/httptest" + "os" "sort" + "strconv" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/go-pkgz/lcw/eventbus" ) func TestLruCache_MaxKeys(t *testing.T) { @@ -87,11 +93,11 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) { var coldCalls int32 lc1, err := NewLruCache(MaxKeys(5), MaxValSize(10), EventBus(ps)) - require.Nil(t, err) + require.NoError(t, err) defer lc1.Close() lc2, err := NewLruCache(MaxKeys(50), MaxValSize(100), EventBus(ps)) - require.Nil(t, err) + require.NoError(t, err) defer lc2.Close() // put 5 keys to cache1 @@ -148,6 +154,75 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) { assert.Equal(t, 0, lc2.backend.Len(), "cache2 removed key-1") } +func TestLruCache_MaxKeysWithRedis(t *testing.T) { + if _, ok := os.LookupEnv("ENABLE_REDIS_TESTS"); !ok { + t.Skip("ENABLE_REDIS_TESTS env variable is not set, not expecting Redis to be ready at 127.0.0.1:6379") + } + + var coldCalls int32 + + channel := "lcw-test-" + strconv.Itoa(rand.Intn(1000000)) + + redisPubSub1, err := eventbus.NewRedisPubSub("127.0.0.1:6379", channel) + require.NoError(t, err) + lc1, err := NewLruCache(MaxKeys(5), MaxValSize(10), EventBus(redisPubSub1)) + require.NoError(t, err) + defer lc1.Close() + + redisPubSub2, err := eventbus.NewRedisPubSub("127.0.0.1:6379", channel) + require.NoError(t, err) + lc2, err := NewLruCache(MaxKeys(50), MaxValSize(100), EventBus(redisPubSub2)) + require.NoError(t, err) + defer lc2.Close() + + // put 5 keys to cache1 + for i := 0; i < 5; i++ { + i := i + res, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (Value, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + assert.Equal(t, fmt.Sprintf("result-%d", i), res.(string)) + assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) + } + // check if really cached + res, err := lc1.Get("key-3", func() (Value, error) { + return "result-blah", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-3", res.(string), "should be cached") + + // put 1 key to cache2 + res, e := lc2.Get("key-1", func() (Value, error) { + return "result-111", nil + }) + assert.NoError(t, e) + assert.Equal(t, "result-111", res.(string)) + + // try to cache1 after maxKeys reached, will remove key-0 + res, err = lc1.Get("key-X", func() (Value, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res.(string)) + assert.Equal(t, 5, lc1.backend.Len()) + + assert.Equal(t, 1, lc2.backend.Len(), "cache2 still has key-1") + + // try to cache1 after maxKeys reached, will remove key-1 + res, err = lc1.Get("key-X2", func() (Value, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res.(string)) + + time.Sleep(time.Second) + assert.Equal(t, 0, lc2.backend.Len(), "cache2 removed key-1") + assert.NoError(t, redisPubSub1.Close()) + assert.NoError(t, redisPubSub2.Close()) +} + // LruCache illustrates the use of LRU loading cache func ExampleLruCache() { // set up test server for single response From 19be7768f976cd10a891440c4b83d7ed7adc9784 Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Wed, 10 Jun 2020 17:10:50 +0200 Subject: [PATCH 7/7] multiple review fixes --- cache_test.go | 4 ++-- eventbus/pubsub.go | 6 +++--- eventbus/redis.go | 6 +++--- eventbus/redis_test.go | 6 +++--- expirable_cache.go | 3 +++ options.go | 4 ++-- 6 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cache_test.go b/cache_test.go index ce50cb5..3f3720b 100644 --- a/cache_test.go +++ b/cache_test.go @@ -659,7 +659,7 @@ func (s sizedString) MarshalBinary() (data []byte, err error) { type mockPubSub struct { calledKeys []string - fns []func(fromID string, key string) + fns []func(fromID, key string) sync.Mutex sync.WaitGroup } @@ -670,7 +670,7 @@ func (m *mockPubSub) CalledKeys() []string { return m.calledKeys } -func (m *mockPubSub) Subscribe(fn func(fromID string, key string)) error { +func (m *mockPubSub) Subscribe(fn func(fromID, key string)) error { m.Lock() defer m.Unlock() m.fns = append(m.fns, fn) diff --git a/eventbus/pubsub.go b/eventbus/pubsub.go index 5a3da9f..3f2909f 100644 --- a/eventbus/pubsub.go +++ b/eventbus/pubsub.go @@ -6,15 +6,15 @@ package eventbus // Publish is called on each entry invalidation, // Subscribe is used for subscription for these events. type PubSub interface { - Publish(fromID string, key string) error - Subscribe(fn func(fromID string, key string)) error + Publish(fromID, key string) error + Subscribe(fn func(fromID, key string)) error } // NopPubSub implements default do-nothing pub-sub (event bus) type NopPubSub struct{} // Subscribe does nothing for NopPubSub -func (n *NopPubSub) Subscribe(fn func(fromID string, key string)) error { +func (n *NopPubSub) Subscribe(fn func(fromID, key string)) error { return nil } diff --git a/eventbus/redis.go b/eventbus/redis.go index b02dabf..4406b4f 100644 --- a/eventbus/redis.go +++ b/eventbus/redis.go @@ -31,8 +31,8 @@ type RedisPubSub struct { } // Subscribe calls provided function on subscription channel provided on new RedisPubSub instance creation. -// Should not be called more than once. Spawns a gorouting and does not return an error. -func (m *RedisPubSub) Subscribe(fn func(fromID string, key string)) error { +// Should not be called more than once. Spawns a goroutine and does not return an error. +func (m *RedisPubSub) Subscribe(fn func(fromID, key string)) error { go func(done <-chan struct{}, pubsub *redis.PubSub) { for { select { @@ -48,7 +48,7 @@ func (m *RedisPubSub) Subscribe(fn func(fromID string, key string)) error { // Process the message if msg, ok := msg.(*redis.Message); ok { payload := strings.Split(msg.Payload, "$") - fn(payload[0], payload[1]) + fn(payload[0], strings.Join(payload[1:], "$")) } } }(m.done, m.pubSub) diff --git a/eventbus/redis_test.go b/eventbus/redis_test.go index 4b6c4a2..0c77fc6 100644 --- a/eventbus/redis_test.go +++ b/eventbus/redis_test.go @@ -27,12 +27,12 @@ func TestRedisPubSub(t *testing.T) { require.NoError(t, err) require.NotNil(t, redisPubSub) var called []string - assert.Nil(t, redisPubSub.Subscribe(func(fromID string, key string) { + assert.Nil(t, redisPubSub.Subscribe(func(fromID, key string) { called = append(called, fromID, key) })) - assert.NoError(t, redisPubSub.Publish("test_fromID", "test_key")) + assert.NoError(t, redisPubSub.Publish("test_fromID", "$test$key$")) // Sleep which waits for Subscribe goroutine to pick up published changes time.Sleep(time.Second) assert.NoError(t, redisPubSub.Close()) - assert.Equal(t, []string{"test_fromID", "test_key"}, called) + assert.Equal(t, []string{"test_fromID", "$test$key$"}, called) } diff --git a/expirable_cache.go b/expirable_cache.go index c657d28..8b20f7e 100644 --- a/expirable_cache.go +++ b/expirable_cache.go @@ -54,6 +54,9 @@ func NewExpirableCache(opts ...Option) (*ExpirableCache, error) { size := s.Size() atomic.AddInt64(&res.currentSize, -1*int64(size)) } + // ignore the error on Publish as we don't have log inside the module and + // there is no other way to handle it: we publish the cache invalidation + // and hope for the best _ = res.eventBus.Publish(res.id, key) }), ) diff --git a/options.go b/options.go index 36bca12..5a93d2b 100644 --- a/options.go +++ b/options.go @@ -89,9 +89,9 @@ func OnEvicted(fn func(key string, value Value)) Option { } // EventBus sets PubSub for distributed cache invalidation -func EventBus(pubsub eventbus.PubSub) Option { +func EventBus(pubSub eventbus.PubSub) Option { return func(o *options) error { - o.eventBus = pubsub + o.eventBus = pubSub return nil } }