Skip to content

Commit

Permalink
Add Redis support (#1612)
Browse files Browse the repository at this point in the history
* adding support for Redis cache

Signed-off-by: Dmitry Shmulevich <[email protected]>

* fix lint errors

Signed-off-by: Dmitry Shmulevich <[email protected]>

* minor bugfix

Signed-off-by: Dmitry Shmulevich <[email protected]>

* proper use of pool connections

Signed-off-by: Dmitry Shmulevich <[email protected]>

* fix lint errors

Signed-off-by: Dmitry Shmulevich <[email protected]>

* addressed comments
added unit test

Signed-off-by: Dmitry Shmulevich <[email protected]>

* update unit tests

Signed-off-by: Dmitry Shmulevich <[email protected]>

* reuse context in request timeout

Signed-off-by: Dmitry Shmulevich <[email protected]>

* use correct function names in logs

Signed-off-by: Dmitry Shmulevich <[email protected]>

* use common config for cache storage

Signed-off-by: Dmitry Shmulevich <[email protected]>

* added missing module dependency

Signed-off-by: Dmitry Shmulevich <[email protected]>

* delete extra newline

Signed-off-by: Dmitry Shmulevich <[email protected]>

* optimize redis SET

Signed-off-by: Dmitry Shmulevich <[email protected]>

* adding support for Redis cache

Signed-off-by: Dmitry Shmulevich <[email protected]>

* fix lint errors

Signed-off-by: Dmitry Shmulevich <[email protected]>

* minor bugfix

Signed-off-by: Dmitry Shmulevich <[email protected]>

* proper use of pool connections

Signed-off-by: Dmitry Shmulevich <[email protected]>

* fix lint errors

Signed-off-by: Dmitry Shmulevich <[email protected]>

* addressed comments
added unit test

Signed-off-by: Dmitry Shmulevich <[email protected]>

* update unit tests

Signed-off-by: Dmitry Shmulevich <[email protected]>

* reuse context in request timeout

Signed-off-by: Dmitry Shmulevich <[email protected]>

* use correct function names in logs

Signed-off-by: Dmitry Shmulevich <[email protected]>

* use common config for cache storage

Signed-off-by: Dmitry Shmulevich <[email protected]>

* added missing module dependency

Signed-off-by: Dmitry Shmulevich <[email protected]>

* delete extra newline

Signed-off-by: Dmitry Shmulevich <[email protected]>

* optimize redis SET

Signed-off-by: Dmitry Shmulevich <[email protected]>

* undo common config for storage systems

Signed-off-by: Dmitry Shmulevich <[email protected]>

* restore changes in CHANGELOG.md

Signed-off-by: Dmitry Shmulevich <[email protected]>

* fixed lint error

Signed-off-by: Dmitry Shmulevich <[email protected]>

* fixed mod-check error

Signed-off-by: Dmitry Shmulevich <[email protected]>

* fix metric labels

Signed-off-by: Dmitry Shmulevich <[email protected]>

* addressed comments

Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
Dmitry Shmulevich authored and csmarchbanks committed Oct 3, 2019
1 parent 0be2dd5 commit 857bb84
Show file tree
Hide file tree
Showing 34 changed files with 4,858 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

* [CHANGE] In table-manager, default DynamoDB capacity was reduced from 3,000 units to 1,000 units. We recommend you do not run with the defaults: find out what figures are needed for your environment and set that via `-dynamodb.periodic-table.write-throughput` and `-dynamodb.chunk-table.write-throughput`.
* [CHANGE] `--alertmanager.configs.auto-slack-root` flag was dropped as auto Slack root is not supported anymore. #1597
* [FEATURE] Add Redis support for caching #1612
* [ENHANCEMENT] Upgraded Prometheus to 2.12.0 and Alertmanager to 0.19.0. #1597


## 0.2.0 / 2019-09-05

This release has several exciting features, the most notable of them being setting `-ingester.spread-flushes` to potentially reduce your storage space by upto 50%.
Expand Down
4 changes: 4 additions & 0 deletions docs/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ The ingester query API was improved over time, but defaults to the old behaviour

Use these flags to specify the location and timeout of the memcached cluster used to cache query results.

- `-redis.{endpoint, timeout}`

Use these flags to specify the location and timeout of the Redis service used to cache query results.

## Distributor

- `-distributor.shard-by-all-labels`
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/gogo/status v1.0.3
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
github.com/gorilla/websocket v1.4.0 // indirect
Expand All @@ -56,6 +57,7 @@ require (
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.7.0
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1
github.com/satori/go.uuid v1.2.0 // indirect
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sercand/kuberesolver v2.1.0+incompatible // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down Expand Up @@ -495,6 +497,8 @@ github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f h1:7C9G4yUog
github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0=
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0 h1:W4dTblzSVIBNfDimJhh70OpZQQMwLVpwK50scXdH94w=
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0/go.mod h1:elNqjVbwD3sCZJqKzyN7uEuwGcCpeJvv67D6BrHsDbw=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rs/cors v1.6.0 h1:G9tHG9lebljV9mfp9SNPDL36nCDxmo3zTlAf1YgvzmI=
github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
Expand Down
16 changes: 16 additions & 0 deletions pkg/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"errors"
"flag"
"time"
)
Expand All @@ -28,6 +29,7 @@ type Config struct {
Background BackgroundConfig `yaml:"background,omitempty"`
Memcache MemcachedConfig `yaml:"memcached,omitempty"`
MemcacheClient MemcachedClientConfig `yaml:"memcached_client,omitempty"`
Redis RedisConfig `yaml:"redis,omitempty"`
Fifocache FifoCacheConfig `yaml:"fifocache,omitempty"`

// This is to name the cache metrics properly.
Expand All @@ -42,6 +44,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f
cfg.Background.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Memcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f)

f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.")
Expand All @@ -67,6 +70,10 @@ func New(cfg Config) (Cache, error) {
caches = append(caches, Instrument(cfg.Prefix+"fifocache", cache))
}

if cfg.MemcacheClient.Host != "" && cfg.Redis.Endpoint != "" {
return nil, errors.New("use of multiple cache storage systems is not supported")
}

if cfg.MemcacheClient.Host != "" {
if cfg.Memcache.Expiration == 0 && cfg.DefaultValidity != 0 {
cfg.Memcache.Expiration = cfg.DefaultValidity
Expand All @@ -79,6 +86,15 @@ func New(cfg Config) (Cache, error) {
caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache)))
}

if cfg.Redis.Endpoint != "" {
if cfg.Redis.Expiration == 0 && cfg.DefaultValidity != 0 {
cfg.Redis.Expiration = cfg.DefaultValidity
}
cacheName := cfg.Prefix + "redis"
cache := NewRedisCache(cfg.Redis, cacheName, nil)
caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache)))
}

cache := NewTiered(caches)
if len(caches) > 1 {
cache = Instrument(cfg.Prefix+"tiered", cache)
Expand Down
154 changes: 154 additions & 0 deletions pkg/chunk/cache/redis_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package cache

import (
"context"
"flag"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gomodule/redigo/redis"
)

// RedisCache type caches chunks in redis
type RedisCache struct {
name string
expiration int
timeout time.Duration
pool *redis.Pool
}

// RedisConfig defines how a RedisCache should be constructed.
type RedisConfig struct {
Endpoint string `yaml:"endpoint,omitempty"`
Timeout time.Duration `yaml:"timeout,omitempty"`
Expiration time.Duration `yaml:"expiration,omitempty"`
MaxIdleConns int `yaml:"max_idle_conns,omitempty"`
MaxActiveConns int `yaml:"max_active_conns,omitempty"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis service endpoint to use when caching chunks. If empty, no redis will be used.")
f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.")
f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.")
f.IntVar(&cfg.MaxIdleConns, prefix+"redis.max-idle-conns", 80, description+"Maximum number of idle connections in pool.")
f.IntVar(&cfg.MaxActiveConns, prefix+"redis.max-active-conns", 0, description+"Maximum number of active connections in pool.")
}

// NewRedisCache creates a new RedisCache
func NewRedisCache(cfg RedisConfig, name string, pool *redis.Pool) *RedisCache {
// pool != nil only in unit tests
if pool == nil {
pool = &redis.Pool{
MaxIdle: cfg.MaxIdleConns,
MaxActive: cfg.MaxActiveConns,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", cfg.Endpoint)
if err != nil {
return nil, err
}
return c, err
},
}
}

cache := &RedisCache{
expiration: int(cfg.Expiration.Seconds()),
timeout: cfg.Timeout,
name: name,
pool: pool,
}

if err := cache.ping(context.Background()); err != nil {
level.Error(util.Logger).Log("msg", "error connecting to redis", "endpoint", cfg.Endpoint, "err", err)
}

return cache
}

// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.
func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
data, err := c.mget(ctx, keys)

if err != nil {
level.Error(util.Logger).Log("msg", "failed to get from redis", "name", c.name, "err", err)
missed = make([]string, len(keys))
copy(missed, keys)
return
}
for i, key := range keys {
if data[i] != nil {
found = append(found, key)
bufs = append(bufs, data[i])
} else {
missed = append(missed, key)
}
}
return
}

// Store stores the key in the cache.
func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
err := c.mset(ctx, keys, bufs, c.expiration)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to put to redis", "name", c.name, "err", err)
}
}

// Stop stops the redis client.
func (c *RedisCache) Stop() error {
return c.pool.Close()
}

// mset adds key-value pairs to the cache.
func (c *RedisCache) mset(ctx context.Context, keys []string, bufs [][]byte, ttl int) error {
conn := c.pool.Get()
defer conn.Close()

if err := conn.Send("MULTI"); err != nil {
return err
}
for i := range keys {
if err := conn.Send("SETEX", keys[i], ttl, bufs[i]); err != nil {
return err
}
}
_, err := redis.DoWithTimeout(conn, c.timeout, "EXEC")
return err
}

// mget retrieves values from the cache.
func (c *RedisCache) mget(ctx context.Context, keys []string) ([][]byte, error) {
intf := make([]interface{}, len(keys))
for i, key := range keys {
intf[i] = key
}

conn := c.pool.Get()
defer conn.Close()

return redis.ByteSlices(redis.DoWithTimeout(conn, c.timeout, "MGET", intf...))
}

func (c *RedisCache) ping(ctx context.Context) error {
conn := c.pool.Get()
defer conn.Close()

pong, err := redis.DoWithTimeout(conn, c.timeout, "PING")
if err == nil {
_, err = redis.String(pong, err)
}
return err
}

func redisStatusCode(err error) string {
switch err {
case nil:
return "200"
case redis.ErrNil:
return "404"
default:
return "500"
}
}
88 changes: 88 additions & 0 deletions pkg/chunk/cache/redis_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cache_test

import (
"context"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/gomodule/redigo/redis"
"github.com/rafaeljusto/redigomock"
"github.com/stretchr/testify/require"
)

func TestRedisCache(t *testing.T) {
cfg := cache.RedisConfig{
Timeout: 10 * time.Millisecond,
}

conn := redigomock.NewConn()
conn.Clear()
pool := redis.NewPool(func() (redis.Conn, error) {
return conn, nil
}, 10)

keys := []string{"key1", "key2", "key3"}
bufs := [][]byte{[]byte("data1"), []byte("data2"), []byte("data3")}
miss := []string{"miss1", "miss2"}

// ensure input correctness
nHit := len(keys)
require.Len(t, bufs, nHit)

// mock Redis Store
mockRedisStore(conn, keys, bufs)

//mock cache hit
keyIntf := make([]interface{}, nHit)
bufIntf := make([]interface{}, nHit)

for i := 0; i < nHit; i++ {
keyIntf[i] = keys[i]
bufIntf[i] = bufs[i]
}
conn.Command("MGET", keyIntf...).Expect(bufIntf)

// mock cache miss
nMiss := len(miss)
missIntf := make([]interface{}, nMiss)
for i, s := range miss {
missIntf[i] = s
}
conn.Command("MGET", missIntf...).ExpectError(nil)

// mock the cache
c := cache.NewRedisCache(cfg, "mock", pool)
ctx := context.Background()

c.Store(ctx, keys, bufs)

// test hits
found, data, missed := c.Fetch(ctx, keys)

require.Len(t, found, nHit)
require.Len(t, missed, 0)
for i := 0; i < nHit; i++ {
require.Equal(t, keys[i], found[i])
require.Equal(t, bufs[i], data[i])
}

// test misses
found, _, missed = c.Fetch(ctx, miss)

require.Len(t, found, 0)
require.Len(t, missed, nMiss)
for i := 0; i < nMiss; i++ {
require.Equal(t, miss[i], missed[i])
}
}

func mockRedisStore(conn *redigomock.Conn, keys []string, bufs [][]byte) {
conn.Command("MULTI")
ret := []interface{}{}
for i := range keys {
conn.Command("SETEX", keys[i], 0, bufs[i])
ret = append(ret, "OK")
}
conn.Command("EXEC").Expect(ret)
}
Loading

0 comments on commit 857bb84

Please sign in to comment.