diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 3758877fb789..e0c3e686e2f9 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -1197,6 +1197,18 @@ func (s *Redis) PipelinedCtx(ctx context.Context, fn func(Pipeliner) error) erro return err } +func (s *Redis) Publish(channel string, message interface{}) (int64, error) { + return s.PublishCtx(context.Background(), channel, message) +} + +func (s *Redis) PublishCtx(ctx context.Context, channel string, message interface{}) (int64, error) { + conn, err := getRedis(s) + if err != nil { + return 0, err + } + return conn.Publish(ctx, channel, message).Result() +} + // Rpop is the implementation of redis rpop command. func (s *Redis) Rpop(key string) (string, error) { return s.RpopCtx(context.Background(), key) @@ -1247,6 +1259,18 @@ func (s *Redis) RpushCtx(ctx context.Context, key string, values ...any) (int, e return int(v), nil } +func (s *Redis) RPopLPush(source string, destination string) (string, error) { + return s.RPopLPushCtx(context.Background(), source, destination) +} + +func (s *Redis) RPopLPushCtx(ctx context.Context, source string, destination string) (string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + return conn.RPopLPush(ctx, source, destination).Result() +} + // Sadd is the implementation of redis sadd command. func (s *Redis) Sadd(key string, values ...any) (int, error) { return s.SaddCtx(context.Background(), key, values...) @@ -1645,6 +1669,26 @@ func (s *Redis) TtlCtx(ctx context.Context, key string) (int, error) { return int(duration), nil } +func (s *Redis) TxPipeline() (pipe Pipeliner, err error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + return conn.TxPipeline(), nil +} + +func (s *Redis) Unlink(keys ...string) (int64, error) { + return s.UnlinkCtx(context.Background(), keys...) +} + +func (s *Redis) UnlinkCtx(ctx context.Context, keys ...string) (int64, error) { + conn, err := getRedis(s) + if err != nil { + return 0, err + } + return conn.Unlink(ctx, keys...).Result() +} + // Zadd is the implementation of redis zadd command. func (s *Redis) Zadd(key string, score int64, value string) (bool, error) { return s.ZaddCtx(context.Background(), key, score, value) diff --git a/core/stores/redis/redis_test.go b/core/stores/redis/redis_test.go index 032ac6b8e6f8..5712d7d064d9 100644 --- a/core/stores/redis/redis_test.go +++ b/core/stores/redis/redis_test.go @@ -2080,3 +2080,70 @@ func (n mockedNode) BLPop(_ context.Context, _ time.Duration, _ ...string) *red. return cmd } + +func TestRedisPublish(t *testing.T) { + runOnRedis(t, func(client *Redis) { + _, err := newRedis(client.Addr, badType()).Publish("Test", "message") + assert.NotNil(t, err) + _, err = client.Publish("Test", "message") + assert.Nil(t, err) + }) +} + +func TestRedisRPopLPush(t *testing.T) { + runOnRedis(t, func(client *Redis) { + _, err := newRedis(client.Addr, badType()).RPopLPush("Source", "Destination") + assert.NotNil(t, err) + _, err = client.Rpush("Source", "Destination") + assert.Nil(t, err) + _, err = client.RPopLPush("Source", "Destination") + assert.Nil(t, err) + }) +} + +func TestRedisUnlink(t *testing.T) { + runOnRedis(t, func(client *Redis) { + _, err := newRedis(client.Addr, badType()).Unlink("Key1", "Key2") + assert.NotNil(t, err) + err = client.Set("Key1", "Key2") + assert.Nil(t, err) + get, err := client.Get("Key1") + assert.Nil(t, err) + assert.Equal(t, "Key2", get) + res, err := client.Unlink("Key1") + assert.Nil(t, err) + assert.Equal(t, int64(1), res) + }) +} + +func TestRedisTxPipeline(t *testing.T) { + runOnRedis(t, func(client *Redis) { + ctx := context.Background() + pipe, err := newRedis(client.Addr, badType()).TxPipeline() + assert.NotNil(t, err) + pipe, err = client.TxPipeline() + assert.Nil(t, err) + key := "key" + hashKey := "field" + hashValue := "value" + + // setting value + pipe.HSet(ctx, key, hashKey, hashValue) + + existsCmd := pipe.Exists(ctx, key) + getCmd := pipe.HGet(ctx, key, hashKey) + + // execution + _, err = pipe.Exec(ctx) + assert.Nil(t, err) + + // verification results + exists, err := existsCmd.Result() + assert.Nil(t, err) + assert.Equal(t, int64(1), exists) + + value, err := getCmd.Result() + assert.Nil(t, err) + assert.Equal(t, hashValue, value) + }) +}