Skip to content

Commit

Permalink
WIP finish PscSourceBuilderTest
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 9, 2024
1 parent 7fa8184 commit 4a3a57b
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package com.pinterest.flink.connector.psc.source;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer;
import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema;
import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.serde.StringDeserializer;
Expand Down Expand Up @@ -90,7 +92,7 @@ public void testEnableCommitOnCheckpointWithoutGroupId() {
MatcherAssert.assertThat(
exception.getMessage(),
CoreMatchers.containsString(
"Property group.id is required when offset commit is enabled"));
"Property psc.consumer.group.id is required when offset commit is enabled"));
}

@Test
Expand All @@ -106,7 +108,7 @@ public void testEnableAutoCommitWithoutGroupId() {
MatcherAssert.assertThat(
exception.getMessage(),
CoreMatchers.containsString(
"Property group.id is required when offset commit is enabled"));
"Property psc.consumer.group.id is required when offset commit is enabled"));
}

@Test
Expand All @@ -130,7 +132,7 @@ public void testUsingCommittedOffsetsInitializerWithoutGroupId() {
MatcherAssert.assertThat(
startingOffsetException.getMessage(),
CoreMatchers.containsString(
"Property group.id is required when using committed offset for offsets initializer"));
"Property psc.consumer.group.id is required when using committed offset for offsets initializer"));

// Using OffsetsInitializer#committedOffsets as stopping offsets
final IllegalStateException stoppingOffsetException =
Expand All @@ -143,16 +145,17 @@ public void testUsingCommittedOffsetsInitializerWithoutGroupId() {
MatcherAssert.assertThat(
stoppingOffsetException.getMessage(),
CoreMatchers.containsString(
"Property group.id is required when using committed offset for offsets initializer"));
"Property psc.consumer.group.id is required when using committed offset for offsets initializer"));

// Using OffsetsInitializer#offsets to manually specify committed offset as starting offset
TopicUriPartition tup = new TopicUriPartition(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "topic", 0);
final IllegalStateException specificStartingOffsetException =
assertThrows(
IllegalStateException.class,
() -> {
final Map<TopicUriPartition, Long> offsetMap = new HashMap<>();
offsetMap.put(
new TopicUriPartition("topic", 0),
tup,
PscTopicUriPartitionSplit.COMMITTED_OFFSET);
getBasicBuilder()
.setStartingOffsets(OffsetsInitializer.offsets(offsetMap))
Expand All @@ -161,12 +164,13 @@ public void testUsingCommittedOffsetsInitializerWithoutGroupId() {
MatcherAssert.assertThat(
specificStartingOffsetException.getMessage(),
CoreMatchers.containsString(
"Property group.id is required because partition topic-0 is initialized with committed offset"));
"Property psc.consumer.group.id is required because partition " + tup + " is initialized with committed offset"));
}

private PscSourceBuilder<String> getBasicBuilder() {
return new PscSourceBuilder<String>()
.setTopicUris("topic")
.setTopicUris(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "topic")
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setDeserializer(
PscRecordDeserializationSchema.valueOnly(StringDeserializer.class));
}
Expand Down

0 comments on commit 4a3a57b

Please sign in to comment.