From 94a872b0ec99308c980222554ed9402f3c7023db Mon Sep 17 00:00:00 2001 From: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com> Date: Tue, 21 Jan 2025 15:38:07 -0800 Subject: [PATCH] Go: Add command XClaim (#2984) * Go: Add command XClaim Signed-off-by: TJ Zhang --- go/api/base_client.go | 182 +++++++++++++++ go/api/options/stream_options.go | 72 ++++++ go/api/response_handlers.go | 37 ++++ go/api/stream_commands.go | 28 +++ go/integTest/shared_commands_test.go | 320 ++++++++++++++++++++++++++- 5 files changed, 638 insertions(+), 1 deletion(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 3defba9f9b..e28c28a4c7 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -3310,3 +3310,185 @@ func (client *baseClient) BitCountWithOptions(key string, opts *options.BitCount } return handleIntResponse(result) } + +// Changes the ownership of a pending message. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The name of the consumer group. +// consumer - The name of the consumer. +// minIdleTime - The minimum idle time in milliseconds. +// ids - The ids of the entries to claim. +// +// Return value: +// +// A `map of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by +// the consumer. +// +// Example: +// +// result, err := client.XClaim("key", "group", "consumer", 1000, []string{"streamId1", "streamId2"}) +// fmt.Println(result) // Output: map[streamId1:[["entry1", "data1"], ["entry2", "data2"]] streamId2:[["entry3", "data3"]]] +// +// [valkey.io]: https://valkey.io/commands/xclaim/ +func (client *baseClient) XClaim( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, +) (map[string][][]string, error) { + return client.XClaimWithOptions(key, group, consumer, minIdleTime, ids, nil) +} + +// Changes the ownership of a pending message. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The name of the consumer group. +// consumer - The name of the consumer. +// minIdleTime - The minimum idle time in milliseconds. +// ids - The ids of the entries to claim. +// options - Stream claim options. +// +// Return value: +// +// A `map` of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by +// the consumer. +// +// Example: +// +// result, err := client.XClaimWithOptions( +// "key", +// "group", +// "consumer", +// 1000, +// []string{"streamId1", "streamId2"}, +// options.NewStreamClaimOptions().SetIdleTime(1), +// ) +// fmt.Println(result) // Output: map[streamId1:[["entry1", "data1"], ["entry2", "data2"]] streamId2:[["entry3", "data3"]]] +// +// [valkey.io]: https://valkey.io/commands/xclaim/ +func (client *baseClient) XClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + opts *options.StreamClaimOptions, +) (map[string][][]string, error) { + args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...) + if opts != nil { + optionArgs, err := opts.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optionArgs...) + } + result, err := client.executeCommand(C.XClaim, args) + if err != nil { + return nil, err + } + return handleMapOfArrayOfStringArrayResponse(result) +} + +// Changes the ownership of a pending message. This function returns an `array` with +// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The name of the consumer group. +// consumer - The name of the consumer. +// minIdleTime - The minimum idle time in milliseconds. +// ids - The ids of the entries to claim. +// options - Stream claim options. +// +// Return value: +// +// An array of the ids of the entries that were claimed by the consumer. +// +// Example: +// +// result, err := client.XClaimJustId( +// "key", +// "group", +// "consumer", +// 1000, +// []string{"streamId1", "streamId2"}, +// ) +// fmt.Println(result) // Output: ["streamId1", "streamId2"] +// +// [valkey.io]: https://valkey.io/commands/xclaim/ +func (client *baseClient) XClaimJustId( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, +) ([]string, error) { + return client.XClaimJustIdWithOptions(key, group, consumer, minIdleTime, ids, nil) +} + +// Changes the ownership of a pending message. This function returns an `array` with +// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The name of the consumer group. +// consumer - The name of the consumer. +// minIdleTime - The minimum idle time in milliseconds. +// ids - The ids of the entries to claim. +// options - Stream claim options. +// +// Return value: +// +// An array of the ids of the entries that were claimed by the consumer. +// +// Example: +// +// result, err := client.XClaimJustIdWithOptions( +// "key", +// "group", +// "consumer", +// 1000, +// []string{"streamId1", "streamId2"}, +// options.NewStreamClaimOptions().SetIdleTime(1), +// ) +// fmt.Println(result) // Output: ["streamId1", "streamId2"] +// +// [valkey.io]: https://valkey.io/commands/xclaim/ +func (client *baseClient) XClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + opts *options.StreamClaimOptions, +) ([]string, error) { + args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...) + if opts != nil { + optionArgs, err := opts.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optionArgs...) + } + args = append(args, options.JUST_ID_VALKEY_API) + result, err := client.executeCommand(C.XClaim, args) + if err != nil { + return nil, err + } + return handleStringArrayResponse(result) +} diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index 71a76dc284..ef2876f6a6 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -326,3 +326,75 @@ func (xgsio *XGroupSetIdOptions) ToArgs() ([]string, error) { return args, nil } + +// Optional arguments for `XClaim` in [StreamCommands] +type StreamClaimOptions struct { + idleTime int64 + idleUnixTime int64 + retryCount int64 + isForce bool +} + +func NewStreamClaimOptions() *StreamClaimOptions { + return &StreamClaimOptions{} +} + +// Set the idle time in milliseconds. +func (sco *StreamClaimOptions) SetIdleTime(idleTime int64) *StreamClaimOptions { + sco.idleTime = idleTime + return sco +} + +// Set the idle time in unix-milliseconds. +func (sco *StreamClaimOptions) SetIdleUnixTime(idleUnixTime int64) *StreamClaimOptions { + sco.idleUnixTime = idleUnixTime + return sco +} + +// Set the retry count. +func (sco *StreamClaimOptions) SetRetryCount(retryCount int64) *StreamClaimOptions { + sco.retryCount = retryCount + return sco +} + +// Set the force flag. +func (sco *StreamClaimOptions) SetForce() *StreamClaimOptions { + sco.isForce = true + return sco +} + +// Valkey API keywords for stream claim options +const ( + // ValKey API string to designate IDLE time in milliseconds + IDLE_VALKEY_API string = "IDLE" + // ValKey API string to designate TIME time in unix-milliseconds + TIME_VALKEY_API string = "TIME" + // ValKey API string to designate RETRYCOUNT + RETRY_COUNT_VALKEY_API string = "RETRYCOUNT" + // ValKey API string to designate FORCE + FORCE_VALKEY_API string = "FORCE" + // ValKey API string to designate JUSTID + JUST_ID_VALKEY_API string = "JUSTID" +) + +func (sco *StreamClaimOptions) ToArgs() ([]string, error) { + optionArgs := []string{} + + if sco.idleTime > 0 { + optionArgs = append(optionArgs, IDLE_VALKEY_API, utils.IntToString(sco.idleTime)) + } + + if sco.idleUnixTime > 0 { + optionArgs = append(optionArgs, TIME_VALKEY_API, utils.IntToString(sco.idleUnixTime)) + } + + if sco.retryCount > 0 { + optionArgs = append(optionArgs, RETRY_COUNT_VALKEY_API, utils.IntToString(sco.retryCount)) + } + + if sco.isForce { + optionArgs = append(optionArgs, FORCE_VALKEY_API) + } + + return optionArgs, nil +} diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index 8f74a15132..e49c2a5db5 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -588,6 +588,7 @@ type mapConverter[T any] struct { canBeNil bool } +// Converts an untyped map into a map[string]T func (node mapConverter[T]) convert(data interface{}) (interface{}, error) { if data == nil { if node.canBeNil { @@ -598,14 +599,17 @@ func (node mapConverter[T]) convert(data interface{}) (interface{}, error) { } result := make(map[string]T) + // Iterate over the map and convert each value to T for key, value := range data.(map[string]interface{}) { if node.next == nil { + // try direct conversion to T when there is no next converter valueT, ok := value.(T) if !ok { return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", value, getType[T]())} } result[key] = valueT } else { + // nested iteration when there is a next converter val, err := node.next.convert(value) if err != nil { return nil, err @@ -615,6 +619,7 @@ func (node mapConverter[T]) convert(data interface{}) (interface{}, error) { result[key] = null continue } + // convert to T valueT, ok := val.(T) if !ok { return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", val, getType[T]())} @@ -674,6 +679,38 @@ func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) { // TODO: convert sets +func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) (map[string][][]string, error) { + defer C.free_command_response(response) + + typeErr := checkResponseType(response, C.Map, false) + if typeErr != nil { + return nil, typeErr + } + mapData, err := parseMap(response) + if err != nil { + return nil, err + } + converted, err := mapConverter[[][]string]{ + arrayConverter[[]string]{ + arrayConverter[string]{ + nil, + false, + }, + false, + }, + false, + }.convert(mapData) + if err != nil { + return nil, err + } + claimedEntries, ok := converted.(map[string][][]string) + if !ok { + return nil, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} + } + + return claimedEntries, nil +} + func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) { defer C.free_command_response(response) var null XAutoClaimResponse // default response diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 0dc369482a..40ae0fb520 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -164,4 +164,32 @@ type StreamCommands interface { XGroupDelConsumer(key string, group string, consumer string) (int64, error) XAck(key string, group string, ids []string) (int64, error) + + XClaim( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + ) (map[string][][]string, error) + + XClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + options *options.StreamClaimOptions, + ) (map[string][][]string, error) + + XClaimJustId(key string, group string, consumer string, minIdleTime int64, ids []string) ([]string, error) + + XClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + options *options.StreamClaimOptions, + ) ([]string, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index e92c5e8df8..02fc2fc4c2 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -5757,7 +5757,12 @@ func (suite *GlideTestSuite) TestXPendingFailures() { invalidConsumer := "invalid-consumer-" + uuid.New().String() suite.verifyOK( - client.XGroupCreateWithOptions(key, groupName, zeroStreamId, options.NewXGroupCreateOptions().SetMakeStream()), + client.XGroupCreateWithOptions( + key, + groupName, + zeroStreamId, + options.NewXGroupCreateOptions().SetMakeStream(), + ), ) command := []string{"XGroup", "CreateConsumer", key, groupName, consumer1} @@ -6892,3 +6897,316 @@ func (suite *GlideTestSuite) TestBitCountWithOptions_StartEndBit() { assert.Equal(suite.T(), int64(3), result) }) } + +func (suite *GlideTestSuite) TestXPendingAndXClaim() { + suite.runWithDefaultClients(func(client api.BaseClient) { + // 1. Arrange the data + key := uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumer1 := "consumer-1-" + uuid.New().String() + consumer2 := "consumer-2-" + uuid.New().String() + + resp, err := client.XGroupCreateWithOptions( + key, + groupName, + zeroStreamId, + options.NewXGroupCreateOptions().SetMakeStream(), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "OK", resp) + + respBool, err := client.XGroupCreateConsumer(key, groupName, consumer1) + assert.NoError(suite.T(), err) + assert.True(suite.T(), respBool) + + respBool, err = client.XGroupCreateConsumer(key, groupName, consumer2) + assert.NoError(suite.T(), err) + assert.True(suite.T(), respBool) + + // Add two stream entries for consumer 1 + streamid_1, err := client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + streamid_2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) + assert.NoError(suite.T(), err) + + // Read the stream entries for consumer 1 and mark messages as pending + xReadGroupResult1, err := client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + expectedResult := map[string]map[string][][]string{ + key: { + streamid_1.Value(): {{"field1", "value1"}}, + streamid_2.Value(): {{"field2", "value2"}}, + }, + } + assert.Equal(suite.T(), expectedResult, xReadGroupResult1) + + // Add 3 more stream entries for consumer 2 + streamid_3, err := client.XAdd(key, [][]string{{"field3", "value3"}}) + assert.NoError(suite.T(), err) + streamid_4, err := client.XAdd(key, [][]string{{"field4", "value4"}}) + assert.NoError(suite.T(), err) + streamid_5, err := client.XAdd(key, [][]string{{"field5", "value5"}}) + assert.NoError(suite.T(), err) + + // read the entire stream for consumer 2 and mark messages as pending + xReadGroupResult2, err := client.XReadGroup(groupName, consumer2, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + expectedResult2 := map[string]map[string][][]string{ + key: { + streamid_3.Value(): {{"field3", "value3"}}, + streamid_4.Value(): {{"field4", "value4"}}, + streamid_5.Value(): {{"field5", "value5"}}, + }, + } + assert.Equal(suite.T(), expectedResult2, xReadGroupResult2) + + expectedSummary := api.XPendingSummary{ + NumOfMessages: 5, + StartId: streamid_1, + EndId: streamid_5, + ConsumerMessages: []api.ConsumerPendingMessage{ + {ConsumerName: consumer1, MessageCount: 2}, + {ConsumerName: consumer2, MessageCount: 3}, + }, + } + summaryResult, err := client.XPending(key, groupName) + assert.NoError(suite.T(), err) + assert.True( + suite.T(), + reflect.DeepEqual(expectedSummary, summaryResult), + "Expected and actual results do not match", + ) + + // ensure idle time > 0 + time.Sleep(2000 * time.Millisecond) + pendingResultExtended, err := client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10), + ) + assert.NoError(suite.T(), err) + + assert.Greater(suite.T(), len(pendingResultExtended), 2) + // because of the idle time return, we have to exclude it from the expected result + // and check separately + assert.Equal(suite.T(), pendingResultExtended[0].Id, streamid_1.Value()) + assert.Equal(suite.T(), pendingResultExtended[0].ConsumerName, consumer1) + assert.GreaterOrEqual(suite.T(), pendingResultExtended[0].DeliveryCount, int64(0)) + + assert.Equal(suite.T(), pendingResultExtended[1].Id, streamid_2.Value()) + assert.Equal(suite.T(), pendingResultExtended[1].ConsumerName, consumer1) + assert.GreaterOrEqual(suite.T(), pendingResultExtended[1].DeliveryCount, int64(0)) + + assert.Equal(suite.T(), pendingResultExtended[2].Id, streamid_3.Value()) + assert.Equal(suite.T(), pendingResultExtended[2].ConsumerName, consumer2) + assert.GreaterOrEqual(suite.T(), pendingResultExtended[2].DeliveryCount, int64(0)) + + assert.Equal(suite.T(), pendingResultExtended[3].Id, streamid_4.Value()) + assert.Equal(suite.T(), pendingResultExtended[3].ConsumerName, consumer2) + assert.GreaterOrEqual(suite.T(), pendingResultExtended[3].DeliveryCount, int64(0)) + + assert.Equal(suite.T(), pendingResultExtended[4].Id, streamid_5.Value()) + assert.Equal(suite.T(), pendingResultExtended[4].ConsumerName, consumer2) + assert.GreaterOrEqual(suite.T(), pendingResultExtended[4].DeliveryCount, int64(0)) + + // use claim to claim stream 3 and 5 for consumer 1 + claimResult, err := client.XClaim( + key, + groupName, + consumer1, + int64(0), + []string{streamid_3.Value(), streamid_5.Value()}, + ) + assert.NoError(suite.T(), err) + expectedClaimResult := map[string][][]string{ + streamid_3.Value(): {{"field3", "value3"}}, + streamid_5.Value(): {{"field5", "value5"}}, + } + assert.Equal(suite.T(), expectedClaimResult, claimResult) + + claimResultJustId, err := client.XClaimJustId( + key, + groupName, + consumer1, + int64(0), + []string{streamid_3.Value(), streamid_5.Value()}, + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), []string{streamid_3.Value(), streamid_5.Value()}, claimResultJustId) + + // add one more stream + streamid_6, err := client.XAdd(key, [][]string{{"field6", "value6"}}) + assert.NoError(suite.T(), err) + + // using force, we can xclaim the message without reading it + claimResult, err = client.XClaimWithOptions( + key, + groupName, + consumer1, + int64(0), + []string{streamid_6.Value()}, + options.NewStreamClaimOptions().SetForce().SetRetryCount(99), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string][][]string{streamid_6.Value(): {{"field6", "value6"}}}, claimResult) + + forcePendingResult, err := client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions(streamid_6.Value(), streamid_6.Value(), 1), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, len(forcePendingResult)) + assert.Equal(suite.T(), streamid_6.Value(), forcePendingResult[0].Id) + assert.Equal(suite.T(), consumer1, forcePendingResult[0].ConsumerName) + assert.Equal(suite.T(), int64(99), forcePendingResult[0].DeliveryCount) + + // acknowledge streams 2, 3, 4 and 6 and remove them from xpending results + xackResult, err := client.XAck( + key, groupName, + []string{streamid_2.Value(), streamid_3.Value(), streamid_4.Value(), streamid_6.Value()}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(4), xackResult) + + pendingResultExtended, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions(streamid_3.Value(), "+", 10), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, len(pendingResultExtended)) + assert.Equal(suite.T(), streamid_5.Value(), pendingResultExtended[0].Id) + assert.Equal(suite.T(), consumer1, pendingResultExtended[0].ConsumerName) + + pendingResultExtended, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "("+streamid_5.Value(), 10), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, len(pendingResultExtended)) + assert.Equal(suite.T(), streamid_1.Value(), pendingResultExtended[0].Id) + assert.Equal(suite.T(), consumer1, pendingResultExtended[0].ConsumerName) + + pendingResultExtended, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10).SetMinIdleTime(1).SetConsumer(consumer1), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 2, len(pendingResultExtended)) + }) +} + +func (suite *GlideTestSuite) TestXClaimFailure() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.New().String() + stringKey := "string-key-" + uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumer1 := "consumer-1-" + uuid.New().String() + + // create group and consumer for the group + groupCreateResult, err := client.XGroupCreateWithOptions( + key, + groupName, + zeroStreamId, + options.NewXGroupCreateOptions().SetMakeStream(), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "OK", groupCreateResult) + + consumerCreateResult, err := client.XGroupCreateConsumer(key, groupName, consumer1) + assert.NoError(suite.T(), err) + assert.True(suite.T(), consumerCreateResult) + + // Add stream entry and mark as pending + streamid_1, err := client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), streamid_1) + + readGroupResult, err := client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), readGroupResult) + + // claim with invalid stream entry IDs + _, err = client.XClaimJustId(key, groupName, consumer1, int64(1), []string{"invalid-stream-id"}) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + // claim with empty stream entry IDs returns empty map + claimResult, err := client.XClaimJustId(key, groupName, consumer1, int64(1), []string{}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), []string{}, claimResult) + + // non existent key causes a RequestError + claimOptions := options.NewStreamClaimOptions().SetIdleTime(1) + _, err = client.XClaim(stringKey, groupName, consumer1, int64(1), []string{streamid_1.Value()}) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.Contains(suite.T(), err.Error(), "NOGROUP") + + _, err = client.XClaimWithOptions( + stringKey, + groupName, + consumer1, + int64(1), + []string{streamid_1.Value()}, + claimOptions, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.Contains(suite.T(), err.Error(), "NOGROUP") + + _, err = client.XClaimJustId(stringKey, groupName, consumer1, int64(1), []string{streamid_1.Value()}) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.Contains(suite.T(), err.Error(), "NOGROUP") + + _, err = client.XClaimJustIdWithOptions( + stringKey, + groupName, + consumer1, + int64(1), + []string{streamid_1.Value()}, + claimOptions, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.Contains(suite.T(), err.Error(), "NOGROUP") + + // key exists, but is not a stream + _, err = client.Set(stringKey, "test") + assert.NoError(suite.T(), err) + _, err = client.XClaim(stringKey, groupName, consumer1, int64(1), []string{streamid_1.Value()}) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XClaimWithOptions( + stringKey, + groupName, + consumer1, + int64(1), + []string{streamid_1.Value()}, + claimOptions, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XClaimJustId(stringKey, groupName, consumer1, int64(1), []string{streamid_1.Value()}) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XClaimJustIdWithOptions( + stringKey, + groupName, + consumer1, + int64(1), + []string{streamid_1.Value()}, + claimOptions, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +}