Skip to content

Commit

Permalink
Merge pull request #26 from go-pkgz/pubsub
Browse files Browse the repository at this point in the history
Basic support for distributed cache for LRU and Expirable caches
  • Loading branch information
umputun authored Jun 10, 2020
2 parents e5a17b1 + 19be776 commit 392920a
Show file tree
Hide file tree
Showing 15 changed files with 453 additions and 10 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ jobs:
- name: checkout
uses: actions/checkout@v2

- name: start Redis
uses: supercharge/[email protected]

- name: build and test
run: |
go get -v
Expand All @@ -29,6 +32,7 @@ jobs:
env:
GO111MODULE: "on"
TZ: "America/Chicago"
ENABLE_REDIS_TESTS: "true"

- name: install golangci-lint and goveralls
run: |
Expand Down
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ Main features:

## Usage

```
cache := lcw.NewLruCache(lcw.MaxKeys(500), lcw.MaxCacheSize(65536), lcw.MaxValSize(200), lcw.MaxKeySize(32))
```go
cache, err := lcw.NewLruCache(lcw.MaxKeys(500), lcw.MaxCacheSize(65536), lcw.MaxValSize(200), lcw.MaxKeySize(32))
if err != nil {
panic("failed to create cache")
}
defer cache.Close()

val, err := cache.Get("key123", func() (lcw.Value, error) {
Expand Down Expand Up @@ -59,7 +62,7 @@ Cache can be created with URIs:
1. Key is not a string, but a composed type made from partition, key-id and list of scopes (tags).
1. Value type limited to `[]byte`
1. Added `Flush` method for scoped/tagged invalidation of multiple records in a given partition
1. A simplified interface with Get, Stat and Flush only.
1. A simplified interface with Get, Stat, Flush and Close only.

## Details

Expand Down
3 changes: 3 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// 3 flavors of cache provided - NoP (do-nothing cache), ExpirableCache (TTL based), and LruCache
package lcw

//go:generate sh -c "mockery -inpkg -name LoadingCache -print > /tmp/cache-mock.tmp && mv /tmp/cache-mock.tmp cache_mock.go"

import (
"fmt"
)
Expand Down Expand Up @@ -81,3 +83,4 @@ func (n *Nop) Stat() CacheStat {
func (n *Nop) Close() error {
return nil
}

36 changes: 36 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,3 +656,39 @@ func (s sizedString) Size() int { return len(s) }
func (s sizedString) MarshalBinary() (data []byte, err error) {
return []byte(s), nil
}

type mockPubSub struct {
calledKeys []string
fns []func(fromID, key string)
sync.Mutex
sync.WaitGroup
}

func (m *mockPubSub) CalledKeys() []string {
m.Lock()
defer m.Unlock()
return m.calledKeys
}

func (m *mockPubSub) Subscribe(fn func(fromID, key string)) error {
m.Lock()
defer m.Unlock()
m.fns = append(m.fns, fn)
return nil
}

func (m *mockPubSub) Publish(fromID, key string) error {
m.Lock()
defer m.Unlock()
m.calledKeys = append(m.calledKeys, key)
for _, fn := range m.fns {
fn := fn
m.Add(1)
// run in goroutine to prevent deadlock
go func() {
fn(fromID, key)
m.Done()
}()
}
return nil
}
24 changes: 24 additions & 0 deletions eventbus/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Package eventbus provides PubSub interface used for distributed cache invalidation,
// as well as NopPubSub and RedisPubSub implementations.
package eventbus

// PubSub interface is used for distributed cache invalidation.
// Publish is called on each entry invalidation,
// Subscribe is used for subscription for these events.
type PubSub interface {
Publish(fromID, key string) error
Subscribe(fn func(fromID, key string)) error
}

// NopPubSub implements default do-nothing pub-sub (event bus)
type NopPubSub struct{}

// Subscribe does nothing for NopPubSub
func (n *NopPubSub) Subscribe(fn func(fromID, key string)) error {
return nil
}

// Publish does nothing for NopPubSub
func (n *NopPubSub) Publish(fromID, key string) error {
return nil
}
13 changes: 13 additions & 0 deletions eventbus/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package eventbus

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNopPubSub(t *testing.T) {
nopPubSub := NopPubSub{}
assert.NoError(t, nopPubSub.Subscribe(nil))
assert.NoError(t, nopPubSub.Publish("", ""))
}
71 changes: 71 additions & 0 deletions eventbus/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package eventbus

import (
"strings"
"time"

"github.com/go-redis/redis/v7"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
)

// NewRedisPubSub creates new RedisPubSub with given parameters.
// Returns an error in case of problems with creating PubSub client for specified channel.
func NewRedisPubSub(addr, channel string) (*RedisPubSub, error) {
client := redis.NewClient(&redis.Options{Addr: addr})
pubSub := client.Subscribe(channel)
// wait for subscription to be created and ignore the message
if _, err := pubSub.Receive(); err != nil {
return nil, errors.Wrapf(err, "problem subscribing to channel %s on address %s", channel, addr)
}
return &RedisPubSub{client: client, pubSub: pubSub, channel: channel, done: make(chan struct{})}, nil
}

// RedisPubSub provides Redis implementation for PubSub interface
type RedisPubSub struct {
client *redis.Client
pubSub *redis.PubSub
channel string

done chan struct{}
}

// Subscribe calls provided function on subscription channel provided on new RedisPubSub instance creation.
// Should not be called more than once. Spawns a goroutine and does not return an error.
func (m *RedisPubSub) Subscribe(fn func(fromID, key string)) error {
go func(done <-chan struct{}, pubsub *redis.PubSub) {
for {
select {
case <-done:
return
default:
}
msg, err := pubsub.ReceiveTimeout(time.Second * 10)
if err != nil {
continue
}

// Process the message
if msg, ok := msg.(*redis.Message); ok {
payload := strings.Split(msg.Payload, "$")
fn(payload[0], strings.Join(payload[1:], "$"))
}
}
}(m.done, m.pubSub)

return nil
}

// Publish publishes provided message to channel provided on new RedisPubSub instance creation
func (m *RedisPubSub) Publish(fromID, key string) error {
return m.client.Publish(m.channel, fromID+"$"+key).Err()
}

// Close cleans up running goroutines and closes Redis clients
func (m *RedisPubSub) Close() error {
close(m.done)
errs := new(multierror.Error)
errs = multierror.Append(errs, errors.Wrap(m.pubSub.Close(), "problem closing pubSub client"))
errs = multierror.Append(errs, errors.Wrap(m.client.Close(), "problem closing redis client"))
return errs.ErrorOrNil()
}
38 changes: 38 additions & 0 deletions eventbus/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package eventbus

import (
"math/rand"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewRedisPubSub_Error(t *testing.T) {
redisPubSub, err := NewRedisPubSub("127.0.0.1:99999", "test")
require.Error(t, err)
require.Nil(t, redisPubSub)
}

func TestRedisPubSub(t *testing.T) {
if _, ok := os.LookupEnv("ENABLE_REDIS_TESTS"); !ok {
t.Skip("ENABLE_REDIS_TESTS env variable is not set, not expecting Redis to be ready at 127.0.0.1:6379")
}

channel := "lcw-test-" + strconv.Itoa(rand.Intn(1000000))
redisPubSub, err := NewRedisPubSub("127.0.0.1:6379", channel)
require.NoError(t, err)
require.NotNil(t, redisPubSub)
var called []string
assert.Nil(t, redisPubSub.Subscribe(func(fromID, key string) {
called = append(called, fromID, key)
}))
assert.NoError(t, redisPubSub.Publish("test_fromID", "$test$key$"))
// Sleep which waits for Subscribe goroutine to pick up published changes
time.Sleep(time.Second)
assert.NoError(t, redisPubSub.Close())
assert.Equal(t, []string{"test_fromID", "$test$key$"}, called)
}
20 changes: 20 additions & 0 deletions expirable_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/go-pkgz/lcw/eventbus"
"github.com/go-pkgz/lcw/internal/cache"
)

Expand All @@ -14,6 +16,7 @@ type ExpirableCache struct {
options
CacheStat
currentSize int64
id string
backend *cache.LoadingCache
}

Expand All @@ -24,7 +27,9 @@ func NewExpirableCache(opts ...Option) (*ExpirableCache, error) {
maxKeys: 1000,
maxValueSize: 0,
ttl: 5 * time.Minute,
eventBus: &eventbus.NopPubSub{},
},
id: uuid.New().String(),
}

for _, opt := range opts {
Expand All @@ -33,6 +38,10 @@ func NewExpirableCache(opts ...Option) (*ExpirableCache, error) {
}
}

if err := res.eventBus.Subscribe(res.onBusEvent); err != nil {
return nil, errors.Wrapf(err, "can't subscribe to event bus")
}

backend, err := cache.NewLoadingCache(
cache.MaxKeys(res.maxKeys),
cache.TTL(res.ttl),
Expand All @@ -45,6 +54,10 @@ func NewExpirableCache(opts ...Option) (*ExpirableCache, error) {
size := s.Size()
atomic.AddInt64(&res.currentSize, -1*int64(size))
}
// ignore the error on Publish as we don't have log inside the module and
// there is no other way to handle it: we publish the cache invalidation
// and hope for the best
_ = res.eventBus.Publish(res.id, key)
}),
)
if err != nil {
Expand Down Expand Up @@ -128,6 +141,13 @@ func (c *ExpirableCache) Close() error {
return nil
}

// onBusEvent reacts on invalidation message triggered by event bus from another cache instance
func (c *ExpirableCache) onBusEvent(id, key string) {
if id != c.id {
c.backend.Invalidate(key)
}
}

func (c *ExpirableCache) size() int64 {
return atomic.LoadInt64(&c.currentSize)
}
Expand Down
55 changes: 54 additions & 1 deletion expirable_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ func TestExpirableCache(t *testing.T) {
assert.Equal(t, 5, lc.Stat().Keys)
assert.Equal(t, int64(6), lc.Stat().Misses)

time.Sleep(55 * time.Millisecond)
// let key-0 expire, GitHub Actions friendly way
for lc.Stat().Keys > 4 {
lc.backend.DeleteExpired() // enforce DeleteExpired for GitHub earlier than TTL/2
time.Sleep(time.Millisecond * 10)
}
assert.Equal(t, 4, lc.Stat().Keys)

time.Sleep(210 * time.Millisecond)
Expand Down Expand Up @@ -112,3 +116,52 @@ func TestExpirableCache_BadOptions(t *testing.T) {
_, err = NewExpirableCache(TTL(-1))
assert.EqualError(t, err, "failed to set cache option: negative ttl")
}

func TestExpirableCacheWithBus(t *testing.T) {
ps := &mockPubSub{}
lc1, err := NewExpirableCache(MaxKeys(5), TTL(time.Millisecond*100), EventBus(ps))
require.NoError(t, err)
defer lc1.Close()

lc2, err := NewExpirableCache(MaxKeys(50), TTL(time.Millisecond*5000), EventBus(ps))
require.NoError(t, err)
defer lc2.Close()

// add 5 keys to the first node cache
for i := 0; i < 5; i++ {
i := i
_, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (Value, error) {
return fmt.Sprintf("result-%d", i), nil
})
assert.NoError(t, e)
time.Sleep(10 * time.Millisecond)
}

assert.Equal(t, 0, len(ps.CalledKeys()), "no events")
assert.Equal(t, 5, lc1.Stat().Keys)
assert.Equal(t, int64(5), lc1.Stat().Misses)

// add key-1 key to the second node
_, e := lc2.Get("key-1", func() (Value, error) {
return "result-111", nil
})
assert.NoError(t, e)
assert.Equal(t, 1, lc2.Stat().Keys)
assert.Equal(t, int64(1), lc2.Stat().Misses, lc2.Stat())

// let key-0 expire, GitHub Actions friendly way
for lc1.Stat().Keys > 4 {
lc1.backend.DeleteExpired() // enforce DeleteExpired for GitHub earlier than TTL/2
ps.Wait() // wait for onBusEvent goroutines to finish
time.Sleep(time.Millisecond * 10)
}
assert.Equal(t, 4, lc1.Stat().Keys)
assert.Equal(t, 1, lc2.Stat().Keys, "key-1 still in cache2")
assert.Equal(t, 1, len(ps.CalledKeys()))

time.Sleep(210 * time.Millisecond) // let all keys expire
ps.Wait() // wait for onBusEvent goroutines to finish
assert.Equal(t, 6, len(ps.CalledKeys()), "6 events, key-1 expired %+v", ps.calledKeys)
assert.Equal(t, 0, lc1.Stat().Keys)
assert.Equal(t, 0, lc2.Stat().Keys, "key-1 removed from cache2")
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/go-pkgz/lcw
require (
github.com/alicebob/miniredis/v2 v2.11.4
github.com/go-redis/redis/v7 v7.2.0
github.com/google/uuid v1.1.1
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3 h1:6amM4HsNPOvMLVc2ZnyqrjeQ92YAVWn7T4WBKK87inY=
github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
Expand Down
Loading

0 comments on commit 392920a

Please sign in to comment.