Skip to content

Commit

Permalink
feat:New redis method TxPipeline (#4417)
Browse files Browse the repository at this point in the history
Co-authored-by: fish <[email protected]>
  • Loading branch information
fishJack01 and fish authored Oct 13, 2024
1 parent 24450f1 commit f52af1e
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
44 changes: 44 additions & 0 deletions core/stores/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,18 @@ func (s *Redis) PipelinedCtx(ctx context.Context, fn func(Pipeliner) error) erro
return err
}

func (s *Redis) Publish(channel string, message interface{}) (int64, error) {
return s.PublishCtx(context.Background(), channel, message)
}

func (s *Redis) PublishCtx(ctx context.Context, channel string, message interface{}) (int64, error) {
conn, err := getRedis(s)
if err != nil {
return 0, err
}
return conn.Publish(ctx, channel, message).Result()
}

// Rpop is the implementation of redis rpop command.
func (s *Redis) Rpop(key string) (string, error) {
return s.RpopCtx(context.Background(), key)
Expand Down Expand Up @@ -1247,6 +1259,18 @@ func (s *Redis) RpushCtx(ctx context.Context, key string, values ...any) (int, e
return int(v), nil
}

func (s *Redis) RPopLPush(source string, destination string) (string, error) {
return s.RPopLPushCtx(context.Background(), source, destination)
}

func (s *Redis) RPopLPushCtx(ctx context.Context, source string, destination string) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.RPopLPush(ctx, source, destination).Result()
}

// Sadd is the implementation of redis sadd command.
func (s *Redis) Sadd(key string, values ...any) (int, error) {
return s.SaddCtx(context.Background(), key, values...)
Expand Down Expand Up @@ -1645,6 +1669,26 @@ func (s *Redis) TtlCtx(ctx context.Context, key string) (int, error) {
return int(duration), nil
}

func (s *Redis) TxPipeline() (pipe Pipeliner, err error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.TxPipeline(), nil
}

func (s *Redis) Unlink(keys ...string) (int64, error) {
return s.UnlinkCtx(context.Background(), keys...)
}

func (s *Redis) UnlinkCtx(ctx context.Context, keys ...string) (int64, error) {
conn, err := getRedis(s)
if err != nil {
return 0, err
}
return conn.Unlink(ctx, keys...).Result()
}

// Zadd is the implementation of redis zadd command.
func (s *Redis) Zadd(key string, score int64, value string) (bool, error) {
return s.ZaddCtx(context.Background(), key, score, value)
Expand Down
67 changes: 67 additions & 0 deletions core/stores/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2080,3 +2080,70 @@ func (n mockedNode) BLPop(_ context.Context, _ time.Duration, _ ...string) *red.

return cmd
}

func TestRedisPublish(t *testing.T) {
runOnRedis(t, func(client *Redis) {
_, err := newRedis(client.Addr, badType()).Publish("Test", "message")
assert.NotNil(t, err)
_, err = client.Publish("Test", "message")
assert.Nil(t, err)
})
}

func TestRedisRPopLPush(t *testing.T) {
runOnRedis(t, func(client *Redis) {
_, err := newRedis(client.Addr, badType()).RPopLPush("Source", "Destination")
assert.NotNil(t, err)
_, err = client.Rpush("Source", "Destination")
assert.Nil(t, err)
_, err = client.RPopLPush("Source", "Destination")
assert.Nil(t, err)
})
}

func TestRedisUnlink(t *testing.T) {
runOnRedis(t, func(client *Redis) {
_, err := newRedis(client.Addr, badType()).Unlink("Key1", "Key2")
assert.NotNil(t, err)
err = client.Set("Key1", "Key2")
assert.Nil(t, err)
get, err := client.Get("Key1")
assert.Nil(t, err)
assert.Equal(t, "Key2", get)
res, err := client.Unlink("Key1")
assert.Nil(t, err)
assert.Equal(t, int64(1), res)
})
}

func TestRedisTxPipeline(t *testing.T) {
runOnRedis(t, func(client *Redis) {
ctx := context.Background()
pipe, err := newRedis(client.Addr, badType()).TxPipeline()
assert.NotNil(t, err)
pipe, err = client.TxPipeline()
assert.Nil(t, err)
key := "key"
hashKey := "field"
hashValue := "value"

// setting value
pipe.HSet(ctx, key, hashKey, hashValue)

existsCmd := pipe.Exists(ctx, key)
getCmd := pipe.HGet(ctx, key, hashKey)

// execution
_, err = pipe.Exec(ctx)
assert.Nil(t, err)

// verification results
exists, err := existsCmd.Result()
assert.Nil(t, err)
assert.Equal(t, int64(1), exists)

value, err := getCmd.Result()
assert.Nil(t, err)
assert.Equal(t, hashValue, value)
})
}

0 comments on commit f52af1e

Please sign in to comment.