Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Sep 11, 2024
1 parent f044efe commit f033886
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -121,21 +121,21 @@ public void tearDown() throws ExecutionException, InterruptedException {
}

/**
* Tests that {@link PscMetadataClient#listTopicRns(TopicUri, long, TimeUnit)} returns the correct list of topic RNs
* Tests that {@link PscMetadataClient#listTopicRns(TopicUri, Duration)} returns the correct list of topic RNs
*
* @throws Exception
*/
@Test
public void testListTopicRns() throws Exception {
PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration);
List<TopicRn> topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), 10000, TimeUnit.MILLISECONDS);
List<TopicRn> topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), Duration.ofMillis(10000));
List<TopicRn> expectedTopicRnList = Arrays.asList(topic1Rn, topic2Rn, topic3Rn);
assertEquals(expectedTopicRnList, topicRnList);
client.close();
}

/**
* Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, long, TimeUnit)} returns the correct
* Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, Duration)} returns the correct
* metadata for the supplied topic RNs
*
* @throws Exception
Expand All @@ -146,8 +146,7 @@ public void testDescribeTopicRns() throws Exception {
Map<TopicRn, TopicRnMetadata> topicRnDescriptionMap = client.describeTopicRns(
KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)),
new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)),
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);
assertEquals(3, topicRnDescriptionMap.size());

Expand All @@ -173,7 +172,7 @@ public void testDescribeTopicRns() throws Exception {
}

/**
* Tests that {@link PscMetadataClient#listOffsets(TopicUri, Map, long, TimeUnit)} returns the correct offsets for the
* Tests that {@link PscMetadataClient#listOffsets(TopicUri, Map, Duration)} returns the correct offsets for the
* supplied topic partitions and specs
*
* @throws Exception
Expand All @@ -185,35 +184,33 @@ public void testListOffsets() throws Exception {
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr));
Map<TopicUriPartition, MessageId> offsets = client.listOffsets(
Map<TopicUriPartition, Long> offsets = client.listOffsets(
clusterUri,
topicUriPartitionsAndOptions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);
assertEquals(2, offsets.size());

// ensure that the offsets are 0 when the topic is empty
assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset());
assertEquals(0, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset());
assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 0)));
assertEquals(0, (long) offsets.get(new TopicUriPartition(topic2Uri, 0)));

// send one message to partition 0 for both topics
PscProducer<Integer, Integer> pscProducer = getPscProducer();
pscProducer.send(new PscProducerMessage(topicUriStr1, 0,0,0));
pscProducer.send(new PscProducerMessage(topicUriStr2, 0,0,0));
pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0,0,0));
pscProducer.send(new PscProducerMessage<>(topicUriStr2, 0,0,0));
pscProducer.flush();

offsets = client.listOffsets(
clusterUri,
topicUriPartitionsAndOptions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

// ensure sent offsets are captured in next metadataClient call
assertEquals(2, offsets.size());
assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); // earliest offset
assertEquals(1, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); // latest offset
assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 0))); // earliest offset
assertEquals(1, (long) offsets.get(new TopicUriPartition(topic2Uri, 0))); // latest offset

// now change the spec to latest for both topics, and add topic1 partitions 5 and 10 to the query
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
Expand All @@ -233,23 +230,22 @@ public void testListOffsets() throws Exception {
offsets = client.listOffsets(
clusterUri,
topicUriPartitionsAndOptions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

// ensure sent offsets are captured in next metadataClient call
assertEquals(4, offsets.size());
assertEquals(3, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset());
assertEquals(1, offsets.get(new TopicUriPartition(topic1Uri, 5)).getOffset());
assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 10)).getOffset());
assertEquals(2, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset());
assertEquals(3, (long) offsets.get(new TopicUriPartition(topic1Uri, 0)));
assertEquals(1, (long) offsets.get(new TopicUriPartition(topic1Uri, 5)));
assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 10)));
assertEquals(2, (long) offsets.get(new TopicUriPartition(topic2Uri, 0)));

client.close();
pscProducer.close();
}

/**
* Tests that {@link PscMetadataClient#listOffsetsForConsumerGroup(TopicUri, String, Collection, long, TimeUnit)} returns
* Tests that {@link PscMetadataClient#listOffsetsForConsumerGroup(TopicUri, String, Collection, Duration)} returns
* the correct offsets for the supplied consumer group and topic partitions. Also tests correct behavior when supplied
* with edge case scenarios such as non-existent consumerGroupId, non-existent partitions, etc.
*
Expand Down Expand Up @@ -286,21 +282,19 @@ public void testListOffsetsForConsumerGroup() throws Exception {
pollNMessages(1, pscConsumer);
pscConsumer.commitSync();

Map<TopicUriPartition, MessageId> offsets = client.listOffsetsForConsumerGroup(
Map<TopicUriPartition, Long> offsets = client.listOffsetsForConsumerGroup(
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

assertEquals(4, offsets.size());
assertTrue(offsets.containsKey(t1p0));
assertTrue(offsets.containsKey(t1p1));
assertTrue(offsets.containsKey(t2p23));
assertTrue(offsets.containsKey(t3p0));
assertEquals(t1p0, offsets.get(t1p0).getTopicUriPartition());
assertEquals(1, offsets.get(t1p0).getOffset());
assertEquals(1, (long) offsets.get(t1p0));
assertNull(offsets.get(t1p1));
assertNull(offsets.get(t2p23));
assertNull(offsets.get(t3p0));
Expand All @@ -314,11 +308,10 @@ public void testListOffsetsForConsumerGroup() throws Exception {
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertEquals(901, (long) offsets.get(t1p0));
assertNull(offsets.get(t1p1));
assertNull(offsets.get(t2p23));
assertNull(offsets.get(t3p0));
Expand All @@ -334,12 +327,11 @@ public void testListOffsetsForConsumerGroup() throws Exception {
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertEquals(500, offsets.get(t1p1).getOffset());
assertEquals(901, (long) offsets.get(t1p0));
assertEquals(500, (long) offsets.get(t1p1));
assertNull(offsets.get(t2p23));
assertNull(offsets.get(t3p0));

Expand All @@ -354,13 +346,12 @@ public void testListOffsetsForConsumerGroup() throws Exception {
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertEquals(500, offsets.get(t1p1).getOffset());
assertEquals(1000, offsets.get(t2p23).getOffset());
assertEquals(901, (long) offsets.get(t1p0));
assertEquals(500, (long) offsets.get(t1p1));
assertEquals(1000, (long) offsets.get(t2p23));
assertNull(offsets.get(t3p0));

// assign to t3p0, poll 999 messages, commit - this should set the offset to 999
Expand All @@ -374,22 +365,20 @@ public void testListOffsetsForConsumerGroup() throws Exception {
clusterUri,
consumerGroupId,
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

assertEquals(901, offsets.get(t1p0).getOffset());
assertEquals(500, offsets.get(t1p1).getOffset());
assertEquals(1000, offsets.get(t2p23).getOffset());
assertEquals(999, offsets.get(t3p0).getOffset());
assertEquals(901, (long) offsets.get(t1p0));
assertEquals(500, (long) offsets.get(t1p1));
assertEquals(1000, (long) offsets.get(t2p23));
assertEquals(999, (long) offsets.get(t3p0));

// query a non-existent consumer group
offsets = client.listOffsetsForConsumerGroup(
clusterUri,
"non-existent-consumer-group",
topicUriPartitions,
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

assertEquals(4, offsets.size());
Expand All @@ -407,8 +396,7 @@ public void testListOffsetsForConsumerGroup() throws Exception {
clusterUri,
consumerGroupId,
Collections.singleton(new TopicUriPartition(topic1Uri, 100)),
10000,
TimeUnit.MILLISECONDS
Duration.ofMillis(10000)
);

assertEquals(1, offsets.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.metadata.TopicRnMetadata;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
Expand All @@ -36,28 +36,23 @@ public void initialize(TopicUri topicUri, Environment env, PscConfigurationInter
ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri);
}

public abstract List<TopicRn> listTopicRns(
long timeout,
TimeUnit timeUnit
) throws ExecutionException, InterruptedException, TimeoutException;
public abstract List<TopicRn> listTopicRns(Duration duration)
throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicRn, TopicRnMetadata> describeTopicRns(
Collection<TopicRn> topicRns,
long timeout,
TimeUnit timeUnit
Duration duration
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicUriPartition, MessageId> listOffsets(
public abstract Map<TopicUriPartition, Long> listOffsets(
Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicRnsAndOptions,
long timeout,
TimeUnit timeUnit
Duration duration
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicUriPartition, MessageId> listOffsetsForConsumerGroup(
public abstract Map<TopicUriPartition, Long> listOffsetsForConsumerGroup(
String consumerGroupId,
Collection<TopicUriPartition> topicUriPartitions,
long timeout,
TimeUnit timeUnit
Duration duration
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract void close() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator;
import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
Expand Down Expand Up @@ -68,33 +68,31 @@ private void initialize() {
* List all the {@link TopicRn}'s in the cluster.
*
* @param clusterUri
* @param timeout
* @param timeUnit
* @param duration
* @return the list of {@link TopicRn}'s in the cluster
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
public List<TopicRn> listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
public List<TopicRn> listTopicRns(TopicUri clusterUri, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listTopicRns(timeout, timeUnit);
return backendMetadataClient.listTopicRns(duration);
}

/**
* Describe the metadata for the given {@link TopicRn}'s in the cluster.
*
* @param clusterUri
* @param topicRns
* @param timeout
* @param timeUnit
* @param duration
* @return a map of {@link TopicRn} to {@link TopicRnMetadata}
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
public Map<TopicRn, TopicRnMetadata> describeTopicRns(TopicUri clusterUri, Set<TopicRn> topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
public Map<TopicRn, TopicRnMetadata> describeTopicRns(TopicUri clusterUri, Set<TopicRn> topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.describeTopicRns(topicRns, timeout, timeUnit);
return backendMetadataClient.describeTopicRns(topicRns, duration);
}

/**
Expand All @@ -108,17 +106,16 @@ public Map<TopicRn, TopicRnMetadata> describeTopicRns(TopicUri clusterUri, Set<T
*
* @param clusterUri
* @param topicRnsAndOptions
* @param timeout
* @param timeUnit
* @param duration
* @return a map of {@link TopicUriPartition} to {@link MessageId}. The {@link MessageId} will contain the offset but
* not necessarily the timestamp (timestamp might be null or unset)
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
public Map<TopicUriPartition, MessageId> listOffsets(TopicUri clusterUri, Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
public Map<TopicUriPartition, Long> listOffsets(TopicUri clusterUri, Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicRnsAndOptions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listOffsets(topicRnsAndOptions, timeout, timeUnit);
return backendMetadataClient.listOffsets(topicRnsAndOptions, duration);
}

/**
Expand All @@ -127,17 +124,16 @@ public Map<TopicUriPartition, MessageId> listOffsets(TopicUri clusterUri, Map<To
* @param clusterUri
* @param consumerGroup
* @param topicUriPartitions
* @param timeout
* @param timeUnit
* @param duration
* @return a map of {@link TopicUriPartition} to {@link MessageId}. The {@link MessageId} will contain the offset but
* not necessarily the timestamp (timestamp might be null or unset)
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
public Map<TopicUriPartition, MessageId> listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection<TopicUriPartition> topicUriPartitions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
public Map<TopicUriPartition, Long> listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection<TopicUriPartition> topicUriPartitions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, timeout, timeUnit);
return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, duration);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit f033886

Please sign in to comment.