Skip to content

Commit

Permalink
WIP finish fixing streaming.connectors.psc internals, shuffle, table …
Browse files Browse the repository at this point in the history
…tests
  • Loading branch information
jeffxiang committed Dec 12, 2024
1 parent 8b6cea8 commit a789999
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 54 deletions.
6 changes: 0 additions & 6 deletions psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,6 @@
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public long currentWatermark() {

@Override
public Long timestamp() {
checkNotNull(timestamp, "timestamp must to be set before retrieving it.");
return timestamp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.dynamic.metadata.ClusterMetadata;
import com.pinterest.flink.connector.psc.dynamic.metadata.PscStream;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -45,13 +46,18 @@ public void testParseFile() throws IOException {

Properties propertiesForCluster0 = new Properties();
propertiesForCluster0.setProperty(
PscFlinkConfiguration.CLUSTER_URI_CONFIG, "bootstrap-server-0:443");
PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX);
PscTestUtils.putDiscoveryProperties(propertiesForCluster0, "bootstrap-server-0:443", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX);
Properties propertiesForCluster1 = new Properties();
propertiesForCluster1.setProperty(
PscFlinkConfiguration.CLUSTER_URI_CONFIG, "bootstrap-server-1:443");
PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER1_URI_PREFIX);
PscTestUtils.putDiscoveryProperties(propertiesForCluster1, "bootstrap-server-1:443", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER1_URI_PREFIX);

Properties propertiesForCluster2 = new Properties();
String cluster2Uri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER1_URI_PREFIX.replace("cluster1", "cluster2");
propertiesForCluster2.setProperty(
PscFlinkConfiguration.CLUSTER_URI_CONFIG, "bootstrap-server-2:443");
PscFlinkConfiguration.CLUSTER_URI_CONFIG, cluster2Uri);
PscTestUtils.putDiscoveryProperties(propertiesForCluster2, "bootstrap-server-2:443", cluster2Uri);

assertThat(kafkaStreams)
.containsExactlyInAnyOrderElementsOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.pinterest.flink.streaming.connectors.psc.internals.metrics;

import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationUtils;
import com.pinterest.psc.consumer.PscConsumer;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.stream.Stream;

import static com.pinterest.flink.connector.psc.testutils.DockerImageVersions.KAFKA;
import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.putDiscoveryProperties;
import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer;

@Testcontainers
Expand Down Expand Up @@ -100,8 +102,11 @@ private static PscConfiguration getPscClientConfiguration() {
standardProps.put(PscConfiguration.PSC_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
standardProps.put(PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
standardProps.put(PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName());
standardProps.put(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "psc-metric-mutable-wrapper-test");
standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "psc-metric-mutable-wrapper-test");
standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, "earliest");
standardProps.put(PscConfiguration.PSC_CONSUMER_PARTITION_FETCH_MAX_BYTES, 256);
putDiscoveryProperties(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX);
return PscConfigurationUtils.propertiesToPscConfiguration(standardProps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ private void testRecordSerDe(TimeCharacteristic timeCharacteristic) throws Excep

// Records in a single partition are kept in order
Collection<PscConsumerMessage<byte[], byte[]>> records = testPscShuffleProducer(
topic("test_serde-" + UUID.randomUUID(), timeCharacteristic),
PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX
+ topic("test_serde-" + UUID.randomUUID(), timeCharacteristic),
env,
1,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ public void testPscDebeziumChangelogSource() throws Exception {
*/
List<String> expected =
Arrays.asList(
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, scooter, 3.140]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, car battery, 8.100]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, 12-pack drill bits, 0.800]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, hammer, 2.625]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, rocks, 5.100]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, jacket, 0.600]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, spare tire, 22.200]");
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, scooter, 3.140]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, car battery, 8.100]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, 12-pack drill bits, 0.800]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, hammer, 2.625]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, rocks, 5.100]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, jacket, 0.600]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, spare tire, 22.200]");

waitingExpectedResults("sink", expected, Duration.ofSeconds(10));

Expand Down Expand Up @@ -319,13 +319,13 @@ public void testPscCanalChangelogSource() throws Exception {

List<String> expected =
Arrays.asList(
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, 12-pack drill bits]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, car battery]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, 12-pack drill bits]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, car battery]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");

waitingExpectedResults("sink", expected, Duration.ofSeconds(10));

Expand Down Expand Up @@ -462,13 +462,13 @@ public void testPscMaxwellChangelogSource() throws Exception {

List<String> expected =
Arrays.asList(
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, 12-pack drill bits]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, car battery]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, 12-pack drill bits]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, car battery]",
"+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");

waitingExpectedResults("sink", expected, Duration.ofSeconds(10));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public void testBoundedTimestamp() {
assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
OffsetsInitializer offsetsInitializer =
PscSourceTestUtils.getStoppingOffsetsInitializer(source);
TopicUriPartition partition = new TopicUriPartition(TOPIC, 0);
TopicUriPartition partition = new TopicUriPartition(TOPIC_URI, 0);
long offsetForTimestamp = 123L;
Map<TopicUriPartition, Long> partitionOffsets =
offsetsInitializer.getPartitionOffsets(
Expand All @@ -538,7 +538,7 @@ public void testBoundedTimestamp() {
new HashMap<>();
result.put(
partition,
1L);
123L);
return result;
},
partitions -> {
Expand Down Expand Up @@ -895,7 +895,7 @@ public void testSourceTableWithTopicAndTopicPattern() {
.satisfies(
anyCauseMatches(
ValidationException.class,
"Option 'topic' and 'topic-pattern' shouldn't be set together."));
"Option 'topic-uri' and 'topic-pattern' shouldn't be set together."));
}

@Test
Expand Down Expand Up @@ -1125,7 +1125,7 @@ public void testDiscoverPartitionByDefault() {
new int[0],
new int[] {0, 1, 2},
null,
Collections.singletonList(TOPIC),
Collections.singletonList(TOPIC_URI),
null,
props,
StartupMode.SPECIFIC_OFFSETS,
Expand Down Expand Up @@ -1163,7 +1163,7 @@ public void testDisableDiscoverPartition() {
new int[0],
new int[] {0, 1, 2},
null,
Collections.singletonList(TOPIC),
Collections.singletonList(TOPIC_URI),
null,
props,
StartupMode.SPECIFIC_OFFSETS,
Expand Down
Loading

0 comments on commit a789999

Please sign in to comment.