diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 7344eb2479..6207fde1f5 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -24,23 +24,33 @@ type ConsumerConfig struct { ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"` // Minimum idle time after which messages will be autoclaimed IdletimeToAutoclaim time.Duration `koanf:"idletime-to-autoclaim"` + // Enables retrying too long pending messages + Retry bool `koanf:"retry"` + // Number of message retries after which we set error response + MaxRetryCount int64 `koanf:"max-retry-count"` } var DefaultConsumerConfig = ConsumerConfig{ ResponseEntryTimeout: time.Hour, IdletimeToAutoclaim: 5 * time.Minute, + Retry: true, + MaxRetryCount: -1, } var TestConsumerConfig = ConsumerConfig{ ResponseEntryTimeout: time.Minute, IdletimeToAutoclaim: 30 * time.Millisecond, + Retry: true, + MaxRetryCount: -1, } var AlreadySetError = errors.New("redis key already set") func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry") - f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers") + f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers. This option should be set to the same value for all consumers and producers.") + f.Bool(prefix+".retry", DefaultConsumerConfig.Retry, "enables autoclaim for this consumer, if set to false this consumer will not check messages from PEL (Pending Entries List)") + f.Int64(prefix+".max-retry-count", DefaultConsumerConfig.MaxRetryCount, "number of message retries after which this consumer will set an error response and Acknowledge the message (-1 = no limit)") } // Consumer implements a consumer for redis stream provides heartbeat to @@ -111,32 +121,55 @@ func decrementMsgIdByOne(msgId string) string { // Consumer first checks it there exists pending message that is claimed by // unresponsive consumer, if not then reads from the stream. func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) { - // First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim - // this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration var messages []redis.XMessage - if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{ - Stream: c.redisStream, - Group: c.redisGroup, - Start: "-", - End: "+", - Count: 50, - Idle: c.cfg.IdletimeToAutoclaim, - }).Result(); err != nil { - if !errors.Is(err, redis.Nil) { - log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "penindlen", len(pendingMsgs)) - } - } else if len(pendingMsgs) > 0 { - idx := rand.Intn(len(pendingMsgs)) - messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{ - Group: c.redisGroup, - Consumer: c.id, - MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds) - Stream: c.redisStream, - Start: decrementMsgIdByOne(pendingMsgs[idx].ID), - Count: 1, - }).Result() - if err != nil { - log.Info("error from xautoclaim", "err", err) + if c.cfg.Retry { + // First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim + // this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration + if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: c.redisStream, + Group: c.redisGroup, + Start: "-", + End: "+", + Count: 50, + Idle: c.cfg.IdletimeToAutoclaim, + }).Result(); err != nil { + if !errors.Is(err, redis.Nil) { + log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "pendingLen", len(pendingMsgs)) + } + } else if len(pendingMsgs) > 0 { + if c.cfg.MaxRetryCount != -1 { + var exceededRetries []redis.XPendingExt + var filtered []redis.XPendingExt + for _, msg := range pendingMsgs { + if msg.RetryCount > c.cfg.MaxRetryCount { + exceededRetries = append(exceededRetries, msg) + } else { + filtered = append(filtered, msg) + } + } + if len(exceededRetries) > 0 { + idx := rand.Intn(len(exceededRetries)) + if err := c.SetError(ctx, exceededRetries[idx].ID, "too many retries"); err != nil { + // TODO(magic): don't log error when other consumer set the error before us + log.Error("Failed to set error response for a message that exceeded retries limit", "err", err, "retryCount", exceededRetries[idx].RetryCount) + } + } + pendingMsgs = filtered + } + if len(pendingMsgs) > 0 { + idx := rand.Intn(len(pendingMsgs)) + messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Group: c.redisGroup, + Consumer: c.id, + MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds) + Stream: c.redisStream, + Start: decrementMsgIdByOne(pendingMsgs[idx].ID), + Count: 1, + }).Result() + if err != nil { + log.Info("error from xautoclaim", "err", err) + } + } } } if len(messages) == 0 { @@ -233,6 +266,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil { return fmt.Errorf("acking message: %v, error: %w", messageID, err) } + log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID) if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil { return fmt.Errorf("deleting message: %v, error: %w", messageID, err) } @@ -257,6 +291,7 @@ func (c *Consumer[Request, Response]) SetError(ctx context.Context, messageID st if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil { return fmt.Errorf("acking message: %v, error: %w", messageID, err) } + log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID) if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil { return fmt.Errorf("deleting message: %v, error: %w", messageID, err) } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index c82a35e0b8..ad5627ebe8 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "sort" + "strings" "testing" "time" @@ -48,48 +49,69 @@ func destroyRedisGroup(ctx context.Context, t *testing.T, streamName string, cli } } -func producerCfg() *ProducerConfig { - return &ProducerConfig{ - CheckResultInterval: TestProducerConfig.CheckResultInterval, - RequestTimeout: 2 * time.Second, +func producerConfigs(count int) []*ProducerConfig { + var configs []*ProducerConfig + for i := 0; i < count; i++ { + config := TestProducerConfig + config.RequestTimeout = 2 * time.Second + configs = append(configs, &config) } + return configs } -func consumerCfg() *ConsumerConfig { - return &ConsumerConfig{ - ResponseEntryTimeout: TestConsumerConfig.ResponseEntryTimeout, - IdletimeToAutoclaim: TestConsumerConfig.IdletimeToAutoclaim, +func consumerConfigs(count int) []*ConsumerConfig { + var configs []*ConsumerConfig + for i := 0; i < count; i++ { + config := TestConsumerConfig + configs = append(configs, &config) } + return configs } -func newProducerConsumers(ctx context.Context, t *testing.T) (redis.UniversalClient, string, *Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) { +func newProducersConsumers(ctx context.Context, t *testing.T, producersCount, consumersCount, notRetryingConsumers int) (redis.UniversalClient, string, []*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) { t.Helper() + if notRetryingConsumers > consumersCount { + t.Fatal("internal test error - notRetryingConsumers > consumersCount") + } + producerConfigs, consumerConfigs := producerConfigs(producersCount), consumerConfigs(consumersCount) + for i := 0; i < notRetryingConsumers; i++ { + consumerConfigs[i].Retry = false + } + return newProducersConsumersForConfigs(ctx, t, producerConfigs, consumerConfigs) +} + +func newProducersConsumersForConfigs(ctx context.Context, t *testing.T, producerConfigs []*ProducerConfig, consumerConfigs []*ConsumerConfig) (redis.UniversalClient, string, []*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) { + t.Helper() + if len(producerConfigs) == 0 { + t.Fatalf("internal test error - helper got empty producer configs list") + } redisClient, err := redisutil.RedisClientFromURL(redisutil.CreateTestRedis(ctx, t)) if err != nil { t.Fatalf("RedisClientFromURL() unexpected error: %v", err) } - prodCfg, consCfg := producerCfg(), consumerCfg() streamName := fmt.Sprintf("stream:%s", uuid.NewString()) - - producer, err := NewProducer[testRequest, testResponse](redisClient, streamName, prodCfg) - if err != nil { - t.Fatalf("Error creating new producer: %v", err) + var producers []*Producer[testRequest, testResponse] + for _, producerConfig := range producerConfigs { + producer, err := NewProducer[testRequest, testResponse](redisClient, streamName, producerConfig) + if err != nil { + t.Fatalf("Error creating new producer: %v", err) + } + producers = append(producers, producer) } - var consumers []*Consumer[testRequest, testResponse] - for i := 0; i < consumersCount; i++ { - c, err := NewConsumer[testRequest, testResponse](redisClient, streamName, consCfg) + for _, consumerConfig := range consumerConfigs { + c, err := NewConsumer[testRequest, testResponse](redisClient, streamName, consumerConfig) if err != nil { t.Fatalf("Error creating new consumer: %v", err) } consumers = append(consumers, c) } - createRedisGroup(ctx, t, streamName, producer.client) + createRedisGroup(ctx, t, streamName, producers[0].client) t.Cleanup(func() { ctx := context.Background() - destroyRedisGroup(ctx, t, streamName, producer.client) + destroyRedisGroup(ctx, t, streamName, producers[0].client) }) - return redisClient, streamName, producer, consumers + return redisClient, streamName, producers, consumers } func messagesMaps(n int) []map[string]string { @@ -104,12 +126,18 @@ func msgForIndex(idx int) string { return fmt.Sprintf("msg: %d", idx) } -func wantMessages(n int, group string) []string { - var ret []string - for i := 0; i < n; i++ { - ret = append(ret, group+msgForIndex(i)) +func wantMessages(entriesCounts []int) [][]string { + ret := make([][]string, len(entriesCounts)) + for i, n := range entriesCounts { + group := "" + if len(entriesCounts) > 1 { + group = fmt.Sprintf("%d.", i) + } + for j := 0; j < n; j++ { + ret[i] = append(ret[i], group+msgForIndex(j)) + } + sort.Strings(ret[i]) } - sort.Strings(ret) return ret } @@ -122,34 +150,33 @@ func flatten(responses [][]string) []string { return ret } -func produceMessages(ctx context.Context, msgs []string, producer *Producer[testRequest, testResponse], withInvalidEntries bool) ([]*containers.Promise[testResponse], error) { +func produceMessages(ctx context.Context, msgs []string, producer *Producer[testRequest, testResponse], withInvalidEntries bool) ([]*containers.Promise[testResponse], []testRequest, error) { var promises []*containers.Promise[testResponse] + var requests []testRequest for i := 0; i < len(msgs); i++ { - req := testRequest{Request: msgs[i]} + request := testRequest{Request: msgs[i]} if withInvalidEntries && i%50 == 0 { - req.IsInvalid = true + request.IsInvalid = true } - promise, err := producer.Produce(ctx, req) + promise, err := producer.Produce(ctx, request) if err != nil { - return nil, err + return nil, nil, err } promises = append(promises, promise) + requests = append(requests, request) } - return promises, nil + return promises, requests, nil } -func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, []int) { +func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, []error) { var ( responses []string - errs []int + errs []error ) - for idx, p := range promises { + for _, p := range promises { res, err := p.Await(ctx) - if err != nil { - errs = append(errs, idx) - continue - } responses = append(responses, res.Response) + errs = append(errs, err) } return responses, errs } @@ -168,7 +195,7 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques func(ctx context.Context) { for { - res, err := c.Consume(ctx) + msg, err := c.Consume(ctx) if err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { t.Errorf("Consume() unexpected error: %v", err) @@ -176,18 +203,18 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques } return } - if res == nil { + if msg == nil { continue } - gotMessages[idx][res.ID] = res.Value.Request - if !res.Value.IsInvalid { - resp := fmt.Sprintf("result for: %v", res.ID) - if err := c.SetResult(ctx, res.ID, testResponse{Response: resp}); err != nil { + gotMessages[idx][msg.ID] = msg.Value.Request + if !msg.Value.IsInvalid { + resp := fmt.Sprintf("result for: %v", msg.ID) + if err := c.SetResult(ctx, msg.ID, testResponse{Response: resp}); err != nil { t.Errorf("Error setting a result: %v", err) } wantResponses[idx] = append(wantResponses[idx], resp) } - res.Ack() + msg.Ack() } }) } @@ -198,11 +225,12 @@ func TestRedisProduceComplex(t *testing.T) { log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) t.Parallel() for _, tc := range []struct { - name string - entriesCount []int - numProducers int - killConsumers bool - withInvalidEntries bool // If this is set, then every 50th entry is invalid (requests that can't be solved by any consumer) + name string + entriesCount []int + numProducers int + killConsumers bool + withInvalidEntries bool // If this is set, then every 50th entry is invalid (requests that can't be solved by any consumer) + notRetryingConsumers int // number of consumers that should retry timed out messages }{ { name: "one producer, all consumers are active", @@ -220,13 +248,40 @@ func TestRedisProduceComplex(t *testing.T) { numProducers: 1, killConsumers: true, }, - + { + name: "one producer, some consumers killed, others should NOT take over their work", + entriesCount: []int{messagesCount}, + numProducers: 1, + killConsumers: true, + notRetryingConsumers: consumersCount, + }, + { + name: "one producer, some consumers killed, one retrying consumer should take over their work", + entriesCount: []int{messagesCount}, + numProducers: 1, + killConsumers: true, + notRetryingConsumers: consumersCount - 1, + }, { name: "two producers, some consumers killed, others should take over their work, unequal number of requests from producers", entriesCount: []int{messagesCount, 2 * messagesCount}, numProducers: 2, killConsumers: true, }, + { + name: "two producers, some consumers killed, others should NOT take over their work, unequal number of requests from producers", + entriesCount: []int{messagesCount, 2 * messagesCount}, + numProducers: 2, + killConsumers: true, + notRetryingConsumers: consumersCount, + }, + { + name: "two producers, some consumers killed, one retrying consumer take over their work, unequal number of requests from producers", + entriesCount: []int{messagesCount, 2 * messagesCount}, + numProducers: 2, + killConsumers: true, + notRetryingConsumers: consumersCount - 1, + }, { name: "two producers, some consumers killed, others should take over their work, some invalid entries, unequal number of requests from producers", entriesCount: []int{messagesCount, 2 * messagesCount}, @@ -238,44 +293,48 @@ func TestRedisProduceComplex(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - var producers []*Producer[testRequest, testResponse] - redisClient, streamName, producer, consumers := newProducerConsumers(ctx, t) - producers = append(producers, producer) - if tc.numProducers == 2 { - producer, err := NewProducer[testRequest, testResponse](redisClient, streamName, producerCfg()) - if err != nil { - t.Fatalf("Error creating second producer: %v", err) - } - producers = append(producers, producer) - } + redisClient, streamName, producers, consumers := newProducersConsumers(ctx, t, tc.numProducers, consumersCount, tc.notRetryingConsumers) for _, producer := range producers { producer.Start(ctx) } - var entries [][]string - if tc.numProducers == 2 { - entries = append(entries, wantMessages(tc.entriesCount[0], "1.")) - entries = append(entries, wantMessages(tc.entriesCount[1], "2.")) - } else { - entries = append(entries, wantMessages(tc.entriesCount[0], "")) - } + entries := wantMessages(tc.entriesCount) - var promises [][]*containers.Promise[testResponse] + var allPromises [][]*containers.Promise[testResponse] + var allRequests [][]testRequest for i := 0; i < tc.numProducers; i++ { - prs, err := produceMessages(ctx, entries[i], producers[i], tc.withInvalidEntries) + prs, rqs, err := produceMessages(ctx, entries[i], producers[i], tc.withInvalidEntries) if err != nil { t.Fatalf("Error producing messages from producer%d: %v", i, err) } - promises = append(promises, prs) + allPromises = append(allPromises, prs) + allRequests = append(allRequests, rqs) } gotMessages := messagesMaps(len(consumers)) + var killedEntries []string if tc.killConsumers { // Consumer messages in every third consumer but don't ack them to check // that other consumers will claim ownership on those messages. + + // make sure not to remove the only retrying / not-retrying consumer + var keepOneNotRetrying, keepOneRetrying bool + if tc.notRetryingConsumers == 1 { + keepOneNotRetrying = true + } + if tc.notRetryingConsumers == consumersCount-1 { + keepOneRetrying = true + } for i := 0; i < len(consumers); i += 3 { + if keepOneRetrying && consumers[i].cfg.Retry { + keepOneRetrying = false + continue + } + if keepOneNotRetrying && !consumers[i].cfg.Retry { + keepOneNotRetrying = false + continue + } consumers[i].Start(ctx) req, err := consumers[i].Consume(ctx) if err != nil { @@ -283,32 +342,54 @@ func TestRedisProduceComplex(t *testing.T) { } if req == nil { t.Error("Didn't consume any message") + } else { + killedEntries = append(killedEntries, req.Value.Request) } // Kills the actnotifier hence allowing XAUTOCLAIM consumers[i].StopAndWait() } } + killedConsumers := len(killedEntries) time.Sleep(time.Second) wantResponses := consume(ctx, t, consumers, gotMessages) var gotResponses []string + waitingForTooLong := 0 for i := 0; i < tc.numProducers; i++ { - grs, errIndexes := awaitResponses(ctx, promises[i]) - if tc.withInvalidEntries { - if errIndexes[len(errIndexes)-1]+50 < len(entries[i]) { - t.Fatalf("Unexpected number of invalid requests while awaiting responses") + requests := allRequests[i] + responses, errors := awaitResponses(ctx, allPromises[i]) + if len(responses) != len(requests) { + t.Errorf("Unexpected number of responses, producer: %d, len(responses): %d, len(requests): %d", i, len(responses), len(requests)) + } + if len(errors) != len(responses) { + t.Errorf("internal test error - got unexpected number of errors, should be equal number of responses, producer: %d, len(errors): %d, len(responses): %d", i, len(errors), len(responses)) + } + if t.Failed() { + continue + } + for j := 0; j < len(responses); j++ { + request, response, err := requests[j], responses[j], errors[j] + if err != nil && strings.Contains(err.Error(), "request has been waiting for too long") && tc.notRetryingConsumers == consumersCount && killedConsumers > 0 { + // we expect this error in case all consumers are not retrying and some have been killed + waitingForTooLong++ + continue } - for j, idx := range errIndexes { - if idx != j*50 { - t.Fatalf("Invalid request' index mismatch want: %d got %d", j*50, idx) - } + if !request.IsInvalid && err != nil { + t.Errorf("Unexpected error while awaiting responses, producer: %d, response: %d, err: %v", i, j, err) + } else if request.IsInvalid && err == nil { + t.Errorf("Did not get expected error while awaiting responses, producer: %d, response: %d, err: %v", i, j, err) + } else if err == nil { + gotResponses = append(gotResponses, response) } - } else if len(errIndexes) != 0 { - t.Fatalf("Error awaiting responses from promises %d: %v", i, errIndexes) } - gotResponses = append(gotResponses, grs...) + } + if waitingForTooLong > killedConsumers { + t.Errorf("Got to many \"request has been waiting for too long\" errors, got: %d, expected: %d", waitingForTooLong, killedConsumers) + } + if t.Failed() { + t.FailNow() } for _, c := range consumers { @@ -326,7 +407,11 @@ func TestRedisProduceComplex(t *testing.T) { var combinedEntries []string for i := 0; i < tc.numProducers; i++ { - combinedEntries = append(combinedEntries, entries[i]...) + producerEntries := entries[i] + if len(killedEntries) > 0 && tc.notRetryingConsumers == consumersCount { + producerEntries = filterEntries(producerEntries, killedEntries) + } + combinedEntries = append(combinedEntries, producerEntries...) } wantMsgs := combinedEntries if diff := cmp.Diff(wantMsgs, got); diff != "" { @@ -395,3 +480,20 @@ func mergeValues(messages []map[string]string, withInvalidEntries bool) ([]strin sort.Strings(ret) return ret, nil } + +func filterEntries(entries []string, toSkip []string) []string { + var res []string + for _, e := range entries { + skip := false + for _, s := range toSkip { + if s == e { + skip = true + break + } + } + if !skip { + res = append(res, e) + } + } + return res +}