Skip to content

Commit

Permalink
WIP finish PscSourceReaderTest
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 9, 2024
1 parent c42d001 commit 18cba80
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.pinterest.psc.consumer.PscConsumer;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.consumer.PscConsumerMessagesIterable;
import com.pinterest.psc.consumer.PscConsumerPollMessageIterator;
import com.pinterest.psc.exception.ClientException;
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.consumer.WakeupException;
Expand Down Expand Up @@ -169,7 +168,6 @@ private void markEmptySplitsAsFinished(PscPartitionSplitRecords recordsBySplits)

@Override
public void handleSplitsChanges(SplitsChange<PscTopicUriPartitionSplit> splitsChange) {
System.out.println("handling splits changes");
// Get all the partition assignments and stopping offsets.
if (!(splitsChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -215,7 +213,7 @@ public void handleSplitsChanges(SplitsChange<PscTopicUriPartitionSplit> splitsCh

// Metric registration
try {
maybeRegisterKafkaConsumerMetrics(props, pscSourceReaderMetrics, consumer);
maybeRegisterPscConsumerMetrics(props, pscSourceReaderMetrics, consumer);
this.pscSourceReaderMetrics.registerNumBytesIn(consumer);
} catch (ClientException e) {
throw new RuntimeException("Failed to register metrics for PscConsumer", e);
Expand Down Expand Up @@ -254,7 +252,6 @@ public void close() throws Exception {
public void notifyCheckpointComplete(
Collection<MessageId> offsetsToCommit,
OffsetCommitCallback offsetCommitCallback) throws ConfigurationException, ConsumerException {
System.out.println("commitAsync: " + offsetsToCommit);
consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
}

Expand Down Expand Up @@ -452,7 +449,7 @@ private long getStoppingOffset(TopicUriPartition tp) {
return stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
}

private void maybeRegisterKafkaConsumerMetrics(
private void maybeRegisterPscConsumerMetrics(
Properties props,
PscSourceReaderMetrics pscSourceReaderMetrics,
PscConsumer<?, ?> consumer) throws ClientException {
Expand Down Expand Up @@ -548,7 +545,6 @@ public PscConsumerMessage<byte[], byte[]> nextRecordFromSplit() {
currentTopicPartition,
"Make sure nextSplit() did not return null before "
+ "iterate over the records split.");
System.out.println("recordIteratorClass: " + recordIterator.getClass().getName());
if (recordIterator.hasNext()) {
final PscConsumerMessage<byte[], byte[]> message = recordIterator.next();
// Only emit records before stopping offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ void testDisableOffsetCommit() throws Exception {
@Test
void testPscSourceMetrics() throws Exception {
final MetricListener metricListener = new MetricListener();
final String groupId = "testKafkaSourceMetrics";
final String groupId = "testPscSourceMetrics";
final TopicUriPartition tp0 = new TopicUriPartition(TOPIC_URI_STR, 0);
final TopicUriPartition tp1 = new TopicUriPartition(TOPIC_URI_STR, 1);

Expand All @@ -320,7 +320,7 @@ void testPscSourceMetrics() throws Exception {
() -> output.getEmittedRecords().size() == NUM_RECORDS_PER_SPLIT * 2,
String.format(
"Failed to poll %d records until timeout", NUM_RECORDS_PER_SPLIT * 2));

Thread.sleep(100); // Wait for the metric to be updated
// Metric "records-consumed-total" of KafkaConsumer should be NUM_RECORDS_PER_SPLIT
assertThat(getPscConsumerMetric("records-consumed-total", metricListener))
.isEqualTo(NUM_RECORDS_PER_SPLIT * 2);
Expand Down Expand Up @@ -427,7 +427,7 @@ void testAssigningEmptySplitOnly() throws Exception {
(PscSourceReader<Integer>)
createReader(
Boundedness.BOUNDED,
"KafkaSourceReaderTestGroup",
"PscSourceReaderTestGroup",
new TestingReaderContext(),
splitFinishedHook)) {
reader.addSplits(Arrays.asList(emptySplit0, emptySplit1));
Expand All @@ -437,8 +437,10 @@ void testAssigningEmptySplitOnly() throws Exception {
() -> reader.getNumAliveFetchers() == 0,
"The split fetcher did not exit before timeout.");
assertThat(reader.getNumAliveFetchers()).isEqualTo(0);

// upstream asserts containsExactly (in order) but that is not necessary given that finishedSplits is a Set
assertThat(finishedSplits)
.containsExactly(emptySplit0.splitId(), emptySplit1.splitId());
.contains(emptySplit0.splitId(), emptySplit1.splitId());
}
}

Expand Down Expand Up @@ -508,6 +510,10 @@ private SourceReader<Integer, PscTopicUriPartitionSplit> createReader(
Properties props)
throws Exception {
props.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
props.setProperty(PscConfiguration.PSC_METRICS_FREQUENCY_MS, "100");
if (!props.containsKey(PscConfiguration.PSC_CONSUMER_GROUP_ID)) {
props.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, "test-group-id");
}
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSourceBuilder<Integer> builder =
PscSource.<Integer>builder()
Expand Down

0 comments on commit 18cba80

Please sign in to comment.