Skip to content

Commit

Permalink
Implement consumer group patch (reset offsets) "dry run" option
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Sep 11, 2024
1 parent cf98df0 commit e1c87c6
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ public CompletionStage<Response> describeConsumerGroup(
@PATCH
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@APIResponseSchema(responseCode = "204", value = Void.class)
@APIResponse(responseCode = "200",
description = "Consumer group patch dry run successful, nothing was applied",
content = @Content(schema = @Schema(implementation = ConsumerGroup.ConsumerGroupDocument.class)))
@APIResponse(responseCode = "204",
description = "Consumer group patch successful, changes applied",
content = @Content(schema = @Schema(implementation = Void.class)))
@Expression(
targetName = "args",
// Only check when the request body Id is present (separately checked for @NotNull)
Expand Down Expand Up @@ -218,8 +223,21 @@ public CompletionStage<Response> patchConsumerGroup(
)
ConsumerGroup.ConsumerGroupDocument patch) {

return consumerGroupService.patchConsumerGroup(patch.getData().getAttributes())
.thenApply(nothing -> Response.noContent())
final boolean dryRun = Boolean.TRUE.equals(patch.meta("dryRun"));

if (dryRun) {
requestedFields.accept(List.of(
ConsumerGroup.Fields.STATE,
ConsumerGroup.Fields.MEMBERS,
ConsumerGroup.Fields.OFFSETS
));
}

return consumerGroupService.patchConsumerGroup(patch.getData().getAttributes(), dryRun)
.thenApply(optionalGroup -> optionalGroup
.map(ConsumerGroup.ConsumerGroupDocument::new)
.map(Response::ok)
.orElseGet(Response::noContent))
.thenApply(Response.ResponseBuilder::build);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ public boolean handlesException(Throwable thrown) {
@Override
public List<Error> buildErrors(JsonProcessingException exception) {
var errorLocation = exception.getLocation();
Error error = category.createError("Unable to parse JSON at line %d, column %d"
.formatted(errorLocation.getLineNr(), errorLocation.getColumnNr()), exception, null);
Error error;
if (errorLocation != null) {
error = category.createError("Unable to process JSON at line %d, column %d"
.formatted(errorLocation.getLineNr(), errorLocation.getColumnNr()), exception, null);
} else {
error = category.createError("Unable to process JSON", exception, null);
}
LOGGER.debugf("error=%s", error);
return List.of(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,15 @@ public CompletionStage<Map<String, List<String>>> listConsumerGroupMembership(Co
(e1, e2) -> { }));
}

public CompletionStage<Void> patchConsumerGroup(ConsumerGroup patch) {
public CompletionStage<Optional<ConsumerGroup>> patchConsumerGroup(ConsumerGroup patch, boolean dryRun) {
Admin adminClient = kafkaContext.admin();
String groupId = preprocessGroupId(patch.getGroupId());

return assertConsumerGroupExists(adminClient, groupId)
.thenComposeAsync(nothing -> Optional.ofNullable(patch.getOffsets())
.filter(Predicate.not(Collection::isEmpty))
.map(patchedOffsets -> alterConsumerGroupOffsets(adminClient, groupId, patch))
.orElseGet(() -> CompletableFuture.completedStage(null)),
.map(patchedOffsets -> alterConsumerGroupOffsets(adminClient, groupId, patch, dryRun))
.orElseGet(() -> CompletableFuture.completedStage(Optional.empty())),
threadContext.currentContextExecutor());
}

Expand All @@ -232,7 +232,7 @@ CompletionStage<Void> assertConsumerGroupExists(Admin adminClient, String groupI
});
}

CompletionStage<Void> alterConsumerGroupOffsets(Admin adminClient, String groupId, ConsumerGroup patch) {
CompletionStage<Optional<ConsumerGroup>> alterConsumerGroupOffsets(Admin adminClient, String groupId, ConsumerGroup patch, boolean dryRun) {
var topicsToDescribe = patch.getOffsets()
.stream()
.map(OffsetAndMetadata::topicId)
Expand Down Expand Up @@ -329,29 +329,64 @@ CompletionStage<Void> alterConsumerGroupOffsets(Admin adminClient, String groupI
.thenApply(nothing1 -> targetOffsets);
})
.thenCompose(alterRequest -> {
var alterResults = adminClient.alterConsumerGroupOffsets(groupId, alterRequest);
if (dryRun) {
return alterConsumerGroupOffsetsDryRun(adminClient, groupId, alterRequest)
.thenApply(Optional::of);
} else {
return alterConsumerGroupOffsets(adminClient, groupId, alterRequest)
.thenApply(nothing -> Optional.empty());
}
});
}

Map<TopicPartition, CompletableFuture<Void>> offsetResults = alterRequest.keySet()
.stream()
.collect(Collectors.toMap(
Function.identity(),
partition -> alterResults.partitionResult(partition)
.toCompletionStage()
.exceptionally(error -> {
if (error instanceof UnknownMemberIdException) {
throw GROUP_NOT_EMPTY;
}
if (error instanceof CompletionException ce) {
throw ce;
}
throw new CompletionException(error);
})
.toCompletableFuture()));

return allOf(offsetResults.values());
CompletionStage<ConsumerGroup> alterConsumerGroupOffsetsDryRun(Admin adminClient, String groupId,
Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> alterRequest) {
var pendingTopicsIds = fetchTopicIdMap(adminClient);

return describeConsumerGroups(adminClient, List.of(groupId), Collections.emptyList())
.thenApply(groups -> groups.get(groupId))
.thenApply(result -> result.getOrThrow(CompletionException::new))
.thenCombine(pendingTopicsIds, (group, topicIds) -> {
group.setOffsets(alterRequest.entrySet().stream().map(e -> {
String topicName = e.getKey().topic();
return new OffsetAndMetadata(topicIds.get(topicName),
topicName,
e.getKey().partition(),
Either.of(e.getValue().offset()),
null,
null,
e.getValue().metadata(),
e.getValue().leaderEpoch().orElse(null));
}).toList());

return group;
});
}

CompletableFuture<Void> alterConsumerGroupOffsets(Admin adminClient, String groupId,
Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> alterRequest) {
var alterResults = adminClient.alterConsumerGroupOffsets(groupId, alterRequest);

Map<TopicPartition, CompletableFuture<Void>> offsetResults = alterRequest.keySet()
.stream()
.collect(Collectors.toMap(
Function.identity(),
partition -> alterResults.partitionResult(partition)
.toCompletionStage()
.exceptionally(error -> {
if (error instanceof UnknownMemberIdException) {
throw GROUP_NOT_EMPTY;
}
if (error instanceof CompletionException ce) {
throw ce;
}
throw new CompletionException(error);
})
.toCompletableFuture()));

return allOf(offsetResults.values());
}

Map<TopicPartition, CompletableFuture<ListOffsetsResultInfo>> getListOffsetsResults(
Set<TopicPartition> partitions,
ListOffsetsResult topicOffsetsResult) {
Expand Down Expand Up @@ -436,9 +471,7 @@ CompletionStage<Map<String, Either<ConsumerGroup, Throwable>>> describeConsumerG

Map<String, Either<ConsumerGroup, Throwable>> result = new LinkedHashMap<>(groupIds.size());

var pendingTopicsIds = topicService.listTopics(adminClient, true)
.thenApply(topics -> topics.stream()
.collect(Collectors.toMap(TopicListing::name, l -> l.topicId().toString())));
var pendingTopicsIds = fetchTopicIdMap(adminClient);

var pendingDescribes = adminClient.describeConsumerGroups(groupIds,
new DescribeConsumerGroupsOptions()
Expand Down Expand Up @@ -480,6 +513,12 @@ CompletionStage<Map<String, Either<ConsumerGroup, Throwable>>> describeConsumerG
});
}

CompletableFuture<Map<String, String>> fetchTopicIdMap(Admin adminClient) {
return topicService.listTopics(adminClient, true)
.thenApply(topics -> topics.stream()
.collect(Collectors.toMap(TopicListing::name, l -> l.topicId().toString())));
}

CompletableFuture<Void> fetchOffsets(Admin adminClient, Map<String, ConsumerGroup> groups, Map<String, String> topicIds) {
var groupOffsetsRequest = groups.keySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
Expand Down Expand Up @@ -757,4 +758,78 @@ void testPatchConsumerGroupToOffsetSpecWithMultiplePartitions(
assertEquals(afterOffset, offset.offset());
});
}

@ParameterizedTest
@CsvSource({
"false, 5, 'earliest' , 0",
"false, 5, '2023-01-01T00:00:00.000Z' , 0",
"true , 0, 'latest' , 5", // latest resets to after the last offset
"true , 0, 'maxTimestamp' , 4", // maxTimestamp resets to before the offset of latest timestamp
})
void testPatchConsumerGroupToOffsetSpecWithMultiplePartitionsDryRun(
boolean resetEarliestBefore,
long beforeOffset,
String offsetSpec,
int afterOffset) {
final int partitionCount = 2;
String topic1 = "t1-" + UUID.randomUUID().toString();
String topic1Id = topicUtils.createTopics(clusterId1, List.of(topic1), partitionCount).get(topic1);
String group1 = "g1-" + UUID.randomUUID().toString();
String client1 = "c1-" + UUID.randomUUID().toString();

groupUtils.request()
.groupId(group1)
.topic(topic1, partitionCount)
.createTopic(false)
.clientId(client1)
.messagesPerTopic(10)
.consumeMessages(10)
.autoClose(true)
.consume();

if (resetEarliestBefore) {
groupUtils.alterConsumerGroupOffsets(group1, Map.ofEntries(
Map.entry(new TopicPartition(topic1, 0), new OffsetAndMetadata(0)),
Map.entry(new TopicPartition(topic1, 1), new OffsetAndMetadata(0))));
}

var offsetBefore = groupUtils.consumerGroupOffsets(group1);

assertEquals(partitionCount, offsetBefore.size());
offsetBefore.forEach((partition, offset) -> {
assertEquals(beforeOffset, offset.offset());
});

whenRequesting(req -> req
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.body(Json.createObjectBuilder()
.add("meta", Json.createObjectBuilder()
.add("dryRun", true))
.add("data", Json.createObjectBuilder()
.add("id", group1)
.add("type", "consumerGroups")
.add("attributes", Json.createObjectBuilder()
.add("offsets", Json.createArrayBuilder()
.add(Json.createObjectBuilder()
.add("topicId", topic1Id)
.add("offset", offsetSpec)))))
.build()
.toString())
.patch("{groupId}", clusterId1, group1))
.assertThat()
.statusCode(is(Status.OK.getStatusCode()))
.body("data.attributes.state", is(ConsumerGroupState.EMPTY.name()))
.body("data.attributes.offsets.topicId", everyItem(is(topic1Id)))
.body("data.attributes.offsets.topicName", everyItem(is(topic1)))
.body("data.attributes.offsets.partition", containsInAnyOrder(0, 1))
.body("data.attributes.offsets.offset", everyItem(is(afterOffset)));

var offsetAfter = groupUtils.consumerGroupOffsets(group1);

assertEquals(partitionCount, offsetAfter.size());
offsetAfter.forEach((partition, offset) -> {
// unchanged
assertEquals(beforeOffset, offset.offset());
});
}
}

0 comments on commit e1c87c6

Please sign in to comment.