From 80eefb8d7a23730c188428dbf993c1af473b660c Mon Sep 17 00:00:00 2001 From: "aiden.ma" Date: Sun, 22 Sep 2024 21:09:30 +0800 Subject: [PATCH] feat: add redis bigkey hook --- core/stores/redis/bigkeyhook.go | 252 ++++++++++++++++ core/stores/redis/bigkeyhook_test.go | 355 +++++++++++++++++++++++ core/stores/redis/conf.go | 12 + core/stores/redis/redis.go | 21 +- core/stores/redis/redisclustermanager.go | 1 + 5 files changed, 635 insertions(+), 6 deletions(-) create mode 100644 core/stores/redis/bigkeyhook.go create mode 100644 core/stores/redis/bigkeyhook_test.go diff --git a/core/stores/redis/bigkeyhook.go b/core/stores/redis/bigkeyhook.go new file mode 100644 index 000000000000..50acb2be2f3d --- /dev/null +++ b/core/stores/redis/bigkeyhook.go @@ -0,0 +1,252 @@ +package redis + +import ( + "context" + "errors" + "strings" + "time" + + red "github.com/redis/go-redis/v9" + + "github.com/zeromicro/go-zero/core/logc" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/mapping" + "github.com/zeromicro/go-zero/core/threading" +) + +type ( + bigKeyHook struct { + config BigKeyHookConfig + buffer chan bigKeyData + } + + BigKeyHookConfig struct { + Enable bool `json:",default=true"` + LimitSize int `json:",default=10240"` + LimitCount int `json:",default=5"` + BufferLen int `json:",default=100"` + StatInterval time.Duration `json:",default=5m"` + } + + bigKeyData struct { + key string + size int + count int + } +) + +func NewBigKeyHook(config BigKeyHookConfig) (*bigKeyHook, error) { + if config.LimitSize <= 0 { + return nil, errors.New("limit size must be greater than 0") + } + if config.LimitCount <= 0 { + config.LimitCount = 5 + } + + if config.BufferLen > 0 && config.StatInterval <= 0 { + return nil, errors.New("stat interval must be greater than 0") + } + + hook := &bigKeyHook{ + config: config, + buffer: make(chan bigKeyData, config.BufferLen), + } + + threading.GoSafe(hook.stat) + + return hook, nil +} + +func (h *bigKeyHook) DialHook(next red.DialHook) red.DialHook { + return next +} + +func (h *bigKeyHook) ProcessHook(next red.ProcessHook) red.ProcessHook { + return func(ctx context.Context, cmd red.Cmder) error { + if !h.config.Enable { + return next(ctx, cmd) + } + + defer h.cmdCheck(ctx, cmd) + + return next(ctx, cmd) + } +} + +func (h *bigKeyHook) ProcessPipelineHook(next red.ProcessPipelineHook) red.ProcessPipelineHook { + return func(ctx context.Context, cmds []red.Cmder) error { + if !h.config.Enable { + return next(ctx, cmds) + } + + defer func() { + for _, cmd := range cmds { + h.cmdCheck(ctx, cmd) + } + }() + + return next(ctx, cmds) + } +} + +func (h *bigKeyHook) cmdCheck(ctx context.Context, cmd red.Cmder) { + if h.config.LimitSize <= 0 || len(cmd.Args()) < 2 || cmd.Err() != nil { + return + } + + var ( + size int + key = mapping.Repr(cmd.Args()[1]) + ) + + switch strings.ToLower(cmd.Name()) { + case "get": + c, ok := cmd.(*red.StringCmd) + if !ok { + return + } + size = len(c.Val()) + case "set", "setnx": + if len(cmd.Args()) >= 3 { + size = len(mapping.Repr(cmd.Args()[2])) + } + case "getset": + c, ok := cmd.(*red.StringCmd) + if !ok { + return + } + + if c.Err() == nil && len(c.Val()) > 0 { + size = len(c.Val()) + } else if len(c.Args()) >= 3 { + size = len(mapping.Repr(c.Args()[2])) + } + case "hgetall": + c, ok := cmd.(*red.MapStringStringCmd) + if !ok { + return + } + println(c.Val()) + for _, v := range c.Val() { + size += len(v) + } + case "hget": + if cmd.Err() != nil { + return + } + c, ok := cmd.(*red.StringCmd) + if !ok { + return + } + if len(cmd.Args()) >= 3 { + key += ":" + mapping.Repr(cmd.Args()[2]) + } + size = len(c.Val()) + case "hmget": + c, ok := cmd.(*red.SliceCmd) + if !ok { + return + } + for _, v := range c.Val() { + size += len(mapping.Repr(v)) + } + case "hset", "hsetnx": + if len(cmd.Args()) >= 4 { + key += ":" + mapping.Repr(cmd.Args()[2]) + size = len(mapping.Repr(cmd.Args()[3])) + } + case "hmset": + for i := 3; i < len(cmd.Args()); i += 2 { + size += len(mapping.Repr(cmd.Args()[i])) + } + case "sadd": + for i := 2; i < len(cmd.Args()); i++ { + size += len(mapping.Repr(cmd.Args()[i])) + } + case "smembers": + c, ok := cmd.(*red.StringSliceCmd) + if !ok { + return + } + for _, v := range c.Val() { + size += len(v) + } + case "zrange": + switch cmd.(type) { + case *red.StringSliceCmd: + for _, v := range cmd.(*red.StringSliceCmd).Val() { + size += len(v) + } + case *red.ZSliceCmd: + for _, v := range cmd.(*red.ZSliceCmd).Val() { + size += len(mapping.Repr(v.Member)) + } + } + case "zadd": + for i := 3; i < len(cmd.Args()); i += 2 { + size += len(mapping.Repr(cmd.Args()[i])) + } + case "zrangebyscore": + c, ok := cmd.(*red.ZSliceCmd) + if !ok { + return + } + + for _, v := range c.Val() { + size += len(mapping.Repr(v.Member)) + } + default: + return + } + + if size > h.config.LimitSize { + if h.config.BufferLen <= 0 { + logc.Infof(ctx, "[REDIS] BigKey limit, key: %s, size: %d", key, size) + } else { + select { + case h.buffer <- bigKeyData{key: key, size: size}: + default: + logc.Infof(ctx, "[REDIS] BigKey limit, key: %s, size: %d", key, size) + } + } + } + + return +} + +func (h *bigKeyHook) stat() { + if !h.config.Enable || h.config.BufferLen <= 0 { + return + } + + // getIntervalData returns the data of the interval. + getIntervalData := func() map[string]bigKeyData { + timeout := time.NewTimer(h.config.StatInterval) + var m = make(map[string]bigKeyData) + for { + select { + case data := <-h.buffer: + if _, ok := m[data.key]; !ok { + m[data.key] = data + } + + m[data.key] = bigKeyData{ + key: data.key, + size: data.size, + count: m[data.key].count + 1, + } + case <-timeout.C: + return m + } + } + } + + // log the big key. + for { + for key, data := range getIntervalData() { + if data.count >= h.config.LimitCount { + logx.Infof("[REDIS] BigKey limit, key: %s, size: %d, count: %d", key, data.size, data.count) + } + } + } +} diff --git a/core/stores/redis/bigkeyhook_test.go b/core/stores/redis/bigkeyhook_test.go new file mode 100644 index 000000000000..f1ddf79a25b7 --- /dev/null +++ b/core/stores/redis/bigkeyhook_test.go @@ -0,0 +1,355 @@ +package redis + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/stretchr/testify/assert" + + "github.com/zeromicro/go-zero/core/logx" +) + +func TestBigKeyHook_AfterProcess_Get(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + ctx := context.Background() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + err := r.Set("foo", "123456") + assert.NoError(t, err) + + _, _ = r.Get("foo") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + err = r.Set("foo2", "1234") + assert.NoError(t, err) + + _, _ = r.Get("foo2") + assert.NotContains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.GetCtx(ctx, "foo") + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Set(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _ = r.Set("foo", "123456") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _ = r.Set("foo2", "1234") + assert.NotContains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.Setnx("foo3", "123456") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _ = r.Setex("foo4", "123456", 10) + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.SetnxExCtx(context.Background(), "foo5", "123456", 10) + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.SetBit("foo6", 1, 1) + assert.NotContains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_GetSet(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _, _ = r.GetSet("foo", "123456") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.GetSet("foo2", "1234") + + _, _ = r.GetSet("foo2", "123456") + assert.NotContains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Hgetall(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _, _ = r.Hgetall("foo") + assert.NotContains(t, buf.String(), "BigKey limit") + + buf.Reset() + _ = r.Hset("foo", "bar", "123456") + _ = r.Hset("foo", "bar2", "123456") + _, _ = r.Hgetall("foo") + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Hget(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _, _ = r.Hget("foo", "bar") + assert.NotContains(t, buf.String(), "BigKey limit") + + buf.Reset() + _ = r.Hset("foo", "bar", "123456") + _, _ = r.Hget("foo", "bar") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _ = r.Hset("foo", "bar2", "123456") + _, _ = r.Hmget("foo", "bar1", "bar2") + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Hset(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _ = r.Hset("foo", "bar", "123456") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.Hsetnx("foo2", "bar", "123456") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _ = r.Hmset("foo3", map[string]string{"bar": "123456", "bar2": "123456"}) + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Sadd(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _, _ = r.Sadd("foo", "123456", "123456") + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Smembers(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _, _ = r.Sadd("foo", "123456", "123456") + _, _ = r.Smembers("foo") + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Zadd(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _, _ = r.Zadd("foo", 1, "123456") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.ZaddFloat("foo", 1, "123456") + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + r.Zadds("foo2", Pair{"111111", 1}, Pair{"2222222", 2}) + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_AfterProcess_Zrange(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 0, + }, + }) + + _, _ = r.Zadd("foo", 1, "123456") + + buf.Reset() + _, _ = r.Zrange("foo", 0, -1) + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.ZrangebyscoreWithScoresCtx(context.Background(), "foo", 0, 100) + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.ZrangeWithScores("foo", 0, 100) + assert.Contains(t, buf.String(), "BigKey limit") + + buf.Reset() + _, _ = r.ZrangebyscoreWithScoresAndLimit("foo", 0, 100, 0, 10) + assert.Contains(t, buf.String(), "BigKey limit") +} + +func TestBigKeyHook_stat(t *testing.T) { + var buf bytes.Buffer + logx.Reset() + logx.SetLevel(logx.InfoLevel) + logx.SetWriter(logx.NewWriter(&buf)) + defer logx.Reset() + + r := MustNewRedis(RedisConf{ + Host: miniredis.RunT(t).Addr(), + Type: "node", + VerifyBigKey: BigKeyHookConfig{ + Enable: true, + LimitSize: 5, + LimitCount: 1, + BufferLen: 100, + StatInterval: time.Millisecond * 100, + }, + }) + + err := r.Set("foo", "123456") + assert.NoError(t, err) + + for i := 0; i < 99; i++ { + _, _ = r.Get("foo") + } + + time.Sleep(time.Second) + + assert.Contains(t, buf.String(), "[REDIS] BigKey limit, key: foo, size: 6, count: 100") + +} diff --git a/core/stores/redis/conf.go b/core/stores/redis/conf.go index 5328757fddc4..6254dcea4417 100644 --- a/core/stores/redis/conf.go +++ b/core/stores/redis/conf.go @@ -3,6 +3,8 @@ package redis import ( "errors" "time" + + "github.com/zeromicro/go-zero/core/logx" ) var ( @@ -24,6 +26,8 @@ type ( NonBlock bool `json:",default=true"` // PingTimeout is the timeout for ping redis. PingTimeout time.Duration `json:",default=1s"` + // VerifyBigKey is the config for big key hook. + VerifyBigKey BigKeyHookConfig } // A RedisKeyConf is a redis config with key. @@ -46,6 +50,14 @@ func (rc RedisConf) NewRedis() *Redis { if rc.Tls { opts = append(opts, WithTLS()) } + if rc.VerifyBigKey.Enable { + bigKeyHook, err := NewBigKeyHook(rc.VerifyBigKey) + if err != nil { + logx.Errorf("NewBigKeyHook error: %v", err) + } else { + opts = append(opts, WithHook(bigKeyHook)) + } + } return newRedis(rc.Host, opts...) } diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 3758877fb789..0de86e6cec8f 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -8,6 +8,7 @@ import ( "time" red "github.com/redis/go-redis/v9" + "github.com/zeromicro/go-zero/core/breaker" "github.com/zeromicro/go-zero/core/errorx" "github.com/zeromicro/go-zero/core/logx" @@ -53,12 +54,13 @@ type ( // Redis defines a redis node/cluster. It is thread-safe. Redis struct { - Addr string - Type string - Pass string - tls bool - brk breaker.Breaker - hooks []red.Hook + Addr string + Type string + Pass string + tls bool + brk breaker.Breaker + hooks []red.Hook + verifyBigKeyConfig BigKeyHookConfig } // RedisNode interface represents a redis node. @@ -132,6 +134,13 @@ func NewRedis(conf RedisConf, opts ...Option) (*Redis, error) { if conf.Tls { opts = append([]Option{WithTLS()}, opts...) } + if conf.VerifyBigKey.Enable { + bigKeyHook, err := NewBigKeyHook(conf.VerifyBigKey) + if err != nil { + return nil, err + } + opts = append(opts, WithHook(bigKeyHook)) + } rds := newRedis(conf.Host, opts...) if !conf.NonBlock { diff --git a/core/stores/redis/redisclustermanager.go b/core/stores/redis/redisclustermanager.go index 5013de83c612..3c41cd5a018d 100644 --- a/core/stores/redis/redisclustermanager.go +++ b/core/stores/redis/redisclustermanager.go @@ -7,6 +7,7 @@ import ( "strings" red "github.com/redis/go-redis/v9" + "github.com/zeromicro/go-zero/core/syncx" )