Skip to content

Commit

Permalink
Add test for listConsumerGroupOffsets
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Aug 29, 2024
1 parent 00562d5 commit 7b83ee8
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.common.kafka.KafkaTopicUri;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.consumer.PscConsumer;
import com.pinterest.psc.consumer.PscConsumerPollMessageIterator;
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.producer.ProducerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
Expand All @@ -16,6 +19,7 @@
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.serde.IntegerDeserializer;
import com.pinterest.psc.serde.IntegerSerializer;
import com.pinterest.psc.utils.PscTestUtils;
import com.salesforce.kafka.test.junit5.SharedKafkaTestResource;
Expand All @@ -27,6 +31,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -36,6 +41,8 @@
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class TestPscMetadataClient {

Expand All @@ -53,6 +60,7 @@ public class TestPscMetadataClient {
private KafkaCluster kafkaCluster1;
private String topicUriStr1, topicUriStr2, topicUriStr3, clusterUriStr;
private TopicRn topic1Rn, topic2Rn, topic3Rn;
private TopicUri topic1Uri, topic2Uri, topic3Uri;

/**
* Initializes two Kafka clusters that are commonly used by all tests, and creates a single topic on each.
Expand Down Expand Up @@ -84,9 +92,13 @@ public void setup() throws IOException, InterruptedException, TopicUriSyntaxExce
PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic2, partitions2);
PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic3, partitions3);

topic1Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), topic1);
topic2Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), topic2);
topic3Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr3)), topic3);
topic1Uri = KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1));
topic2Uri = KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2));
topic3Uri = KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr3));

topic1Rn = MetadataUtils.createTopicRn(topic1Uri, topic1);
topic2Rn = MetadataUtils.createTopicRn(topic2Uri, topic2);
topic3Rn = MetadataUtils.createTopicRn(topic3Uri, topic3);
}

/**
Expand Down Expand Up @@ -149,8 +161,8 @@ public void testDescribeTopicRns() throws Exception {
public void testListOffsets() throws Exception {
PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration);
Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicUriPartitionsAndOptions = new HashMap<>();
topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr));
Map<TopicUriPartition, MessageId> offsets = client.listOffsets(
clusterUri,
Expand All @@ -161,8 +173,8 @@ public void testListOffsets() throws Exception {
assertEquals(2, offsets.size());

// ensure that the offsets are 0 when the topic is empty
assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset());
assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset());
assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset());
assertEquals(0, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset());

// send one message to partition 0 for both topics
PscProducer<Integer, Integer> pscProducer = getPscProducer();
Expand All @@ -179,14 +191,14 @@ public void testListOffsets() throws Exception {

// ensure sent offsets are captured in next metadataClient call
assertEquals(2, offsets.size());
assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); // earliest offset
assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); // latest offset
assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); // earliest offset
assertEquals(1, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); // latest offset

// now change the spec to latest for both topics, and add topic1 partitions 5 and 10 to the query
topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 5), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 10), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);

// send 2 messages to topic1 partition 0 - now the latest offset should be 3
pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0, 0, 0));
Expand All @@ -206,18 +218,177 @@ public void testListOffsets() throws Exception {

// ensure sent offsets are captured in next metadataClient call
assertEquals(4, offsets.size());
assertEquals(3, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset());
assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5)).getOffset());
assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10)).getOffset());
assertEquals(2, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset());
assertEquals(3, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset());
assertEquals(1, offsets.get(new TopicUriPartition(topic1Uri, 5)).getOffset());
assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 10)).getOffset());
assertEquals(2, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset());

client.close();
pscProducer.close();
}

@Test
public void testListOffsetsForConsumerGroup() {
public void testListOffsetsForConsumerGroup() throws Exception {
PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration);
TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr));

String consumerGroupId = "test-psc-consumer-group";
PscProducer<Integer, Integer> pscProducer = getPscProducer();
PscConsumer<Integer, Integer> pscConsumer = getPscConsumer(consumerGroupId);

sendNMessages(pscProducer, topicUriStr1, 0, 1000);
sendNMessages(pscProducer, topicUriStr1, 1, 1000);
sendNMessages(pscProducer, topicUriStr2, 23, 1000);
sendNMessages(pscProducer, topicUriStr3, 0, 1000);

TopicUriPartition t1p0 = new TopicUriPartition(topic1Uri, 0);
TopicUriPartition t1p1 = new TopicUriPartition(topic1Uri, 1);
TopicUriPartition t2p23 = new TopicUriPartition(topic2Uri, 23);
TopicUriPartition t3p0 = new TopicUriPartition(topic3Uri, 0);

List<TopicUriPartition> topicUriPartitions = Arrays.asList(
t1p0,
t1p1,
t2p23,
t3p0
);

// assign to t1p0, poll 1 message, commit
pscConsumer.assign(Collections.singleton(t1p0));
pollNMessages(1, pscConsumer);
pscConsumer.commitSync();

Map<TopicUriPartition, MessageId> offsets = client.listOffsetsForConsumerGroup(
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
);

assertEquals(4, offsets.size());
assertTrue(offsets.containsKey(t1p0));
assertTrue(offsets.containsKey(t1p1));
assertTrue(offsets.containsKey(t2p23));
assertTrue(offsets.containsKey(t3p0));
assertEquals(t1p0, offsets.get(t1p0).getTopicUriPartition());
assertEquals(1, offsets.get(t1p0).getOffset());
assertNull(offsets.get(t1p1));
assertNull(offsets.get(t2p23));
assertNull(offsets.get(t3p0));

// already assigned to t1p0, poll 900 messages, commit - this should set the offset to 901
pollNMessages(900, pscConsumer);
pscConsumer.commitSync();

// query again
offsets = client.listOffsetsForConsumerGroup(
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertNull(offsets.get(t1p1));
assertNull(offsets.get(t2p23));
assertNull(offsets.get(t3p0));

// assign to t1p1, poll 500 messages, commit - this should set the offset to 500
pscConsumer.unassign();
pscConsumer.assign(Collections.singleton(t1p1));
pollNMessages(500, pscConsumer);
pscConsumer.commitSync();

// query again
offsets = client.listOffsetsForConsumerGroup(
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertEquals(500, offsets.get(t1p1).getOffset());
assertNull(offsets.get(t2p23));
assertNull(offsets.get(t3p0));

// assign to t2p23, poll 1000 messages, commit - this should set the offset to 1000
pscConsumer.unassign();
pscConsumer.assign(Collections.singleton(t2p23));
pollNMessages(1000, pscConsumer);
pscConsumer.commitSync();

// query again
offsets = client.listOffsetsForConsumerGroup(
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertEquals(500, offsets.get(t1p1).getOffset());
assertEquals(1000, offsets.get(t2p23).getOffset());
assertNull(offsets.get(t3p0));

// assign to t3p0, poll 999 messages, commit - this should set the offset to 999
pscConsumer.unassign();
pscConsumer.assign(Collections.singleton(t3p0));
pollNMessages(999, pscConsumer);
pscConsumer.commitSync();

// query again
offsets = client.listOffsetsForConsumerGroup(
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertEquals(500, offsets.get(t1p1).getOffset());
assertEquals(1000, offsets.get(t2p23).getOffset());
assertEquals(999, offsets.get(t3p0).getOffset());

pscConsumer.close();
pscProducer.close();
client.close();
}

private static void pollNMessages(int numberOfMessages, PscConsumer<Integer, Integer> pscConsumer) throws ConsumerException {
int messagesSeen = 0;
while (messagesSeen < numberOfMessages) {
PscConsumerPollMessageIterator<Integer, Integer> it = pscConsumer.poll();
while (it.hasNext()) {
it.next();
messagesSeen++;
}
}
}

private static void sendNMessages(PscProducer<Integer, Integer> producer, String topicUriStr, int partition, int numberOfMessages) throws ConfigurationException, ProducerException {
for (int i = 0; i < numberOfMessages; i++) {
producer.send(new PscProducerMessage<>(topicUriStr, partition, i, i));
}
producer.flush();
}

private static PscConsumer<Integer, Integer> getPscConsumer(String groupId) throws ConfigurationException, ConsumerException {
PscConfiguration consumerConfiguration = new PscConfiguration();
consumerConfiguration.setProperty(PscConfiguration.PSC_METRICS_REPORTER_CLASS, TestUtils.DEFAULT_METRICS_REPORTER);
consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "test-psc-consumer");
consumerConfiguration.setProperty(PscConfiguration.PSC_CONFIG_LOGGING_ENABLED, "false");
consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, groupId);
consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_KEY_DESERIALIZER, IntegerDeserializer.class.getName());
consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_VALUE_DESERIALIZER, IntegerDeserializer.class.getName());
consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_POLL_MESSAGES_MAX, "1");
return new PscConsumer<>(consumerConfiguration);
}

private static PscProducer<Integer, Integer> getPscProducer() throws ConfigurationException, ProducerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator;
import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -55,6 +56,11 @@ public Map<TopicUriPartition, MessageId> listOffsets(TopicUri clusterUri, Map<To
return backendMetadataClient.listOffsets(topicRnsAndOptions, timeout, timeUnit);
}

public Map<TopicUriPartition, MessageId> listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection<TopicUriPartition> topicUriPartitions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, timeout, timeUnit);
}

@VisibleForTesting
protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) {
String topicUriPrefix = clusterUri.getTopicUriPrefix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ public Map<TopicUriPartition, MessageId> listOffsetsForConsumerGroup(
offsets.forEach((tp, offsetAndMetadata) -> {
TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic());
TopicUriPartition tup = createKafkaTopicUriPartition(topicRn, tp.partition());
MessageId messageId = offsetAndMetadata == null ? null : new MessageId(tup, offsetAndMetadata.offset());
result.put(
tup,
new MessageId(tup, offsetAndMetadata.offset())
messageId
);
});
return result;
Expand Down

0 comments on commit 7b83ee8

Please sign in to comment.