diff --git a/e2e/service.go b/e2e/service.go index 53e24d0..195e9a3 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -155,14 +155,6 @@ func (s *Service) Start(ctx context.Context) error { return fmt.Errorf("could not validate end-to-end topic: %w", err) } - // Get up-to-date metadata and inform our custom partitioner about the partition count - topicMetadata, err := s.getTopicMetadata(ctx) - if err != nil { - return fmt.Errorf("could not get topic metadata after validation: %w", err) - } - partitions := len(topicMetadata.Topics[0].Partitions) - s.partitionCount = partitions - // finally start everything else (producing, consuming, continuous validation, consumer group tracking) go s.startReconciliation(ctx) diff --git a/e2e/topic.go b/e2e/topic.go index d54291a..57e7b0a 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -3,6 +3,7 @@ package e2e import ( "context" "fmt" + "github.com/pkg/errors" "math" "time" @@ -68,7 +69,41 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { return fmt.Errorf("failed to create partitions: %w", err) } - return nil + return s.updatePartitionCount(ctx) +} + +// The partition count must be updated after topic validation because the validation process may lead to the +// creation of new partitions. This can occur when new brokers are added to the cluster. +func (s *Service) updatePartitionCount(ctx context.Context) error { + retryTicker := time.NewTicker(1 * time.Second) + defer retryTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-retryTicker.C: + meta, err := s.getTopicMetadata(ctx) + if err != nil { + return fmt.Errorf("could not get topic metadata while updating partition count: %w", err) + } + + typedErr := kerr.TypedErrorForCode(meta.Topics[0].ErrorCode) + if typedErr == nil { + s.partitionCount = len(meta.Topics[0].Partitions) + s.logger.Debug("updatePartitionCount: successfully updated partition count", zap.Int("partition_count", s.partitionCount)) + return nil + } + if !errors.Is(typedErr, kerr.UnknownTopicOrPartition) { + return fmt.Errorf("unexpected error while updating partition count: %w", typedErr) + } + s.logger.Warn("updatePartitionCount: received UNKNOWN_TOPIC_OR_PARTITION error, possibly due to timing issue. Retrying...") + // The UNKNOWN_TOPIC_OR_PARTITION error occurs occasionally even though the topic is created + // in the validateManagementTopic function. It appears to be a timing issue where the topic metadata + // is not immediately available after creation. In practice, waiting for a short period and then retrying + // the operation resolves the issue. + } + } } func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreatePartitionsRequest) error {