Skip to content

Commit

Permalink
Finish table tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 22, 2024
1 parent e4bbc9b commit b7f30bc
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
public class UpsertPscDynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {

public static final String IDENTIFIER = "upsert-kafka";
public static final String IDENTIFIER = "upsert-psc";

@Override
public String factoryIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testKafkaDebeziumChangelogSource() throws Exception {
+ " description STRING,"
+ " weight DECIMAL(10,3),"
// test connector metadata
+ " origin_topic STRING METADATA FROM 'topic' VIRTUAL,"
+ " origin_topic STRING METADATA FROM 'topic-uri' VIRTUAL,"
+ " origin_partition STRING METADATA FROM 'partition' VIRTUAL" // unused
+ ") WITH ("
+ " 'connector' = 'psc',"
Expand Down Expand Up @@ -228,7 +228,7 @@ public void testPscCanalChangelogSource() throws Exception {
+ " description STRING,"
+ " weight DECIMAL(10,3),"
// test connector metadata
+ " origin_topic STRING METADATA FROM 'topic' VIRTUAL,"
+ " origin_topic STRING METADATA FROM 'topic-uri' VIRTUAL,"
+ " origin_partition STRING METADATA FROM 'partition' VIRTUAL," // unused
+ " WATERMARK FOR origin_es AS origin_es - INTERVAL '5' SECOND"
+ ") WITH ("
Expand Down Expand Up @@ -374,7 +374,7 @@ public void testKafkaMaxwellChangelogSource() throws Exception {
+ " description STRING,"
+ " weight DECIMAL(10,3),"
// test connector metadata
+ " origin_topic STRING METADATA FROM 'topic' VIRTUAL,"
+ " origin_topic STRING METADATA FROM 'topic-uri' VIRTUAL,"
+ " origin_partition STRING METADATA FROM 'partition' VIRTUAL" // unused
+ ") WITH ("
+ " 'connector' = 'psc',"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package com.pinterest.flink.streaming.connectors.psc.table;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.sink.PscSink;
import com.pinterest.flink.connector.psc.source.PscSource;
import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumState;
import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.flink.streaming.connectors.psc.config.StartupMode;
import com.pinterest.psc.config.PscConfiguration;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
Expand Down Expand Up @@ -87,12 +90,14 @@
public class UpsertPscDynamicTableFactoryTest extends TestLogger {

private static final String SOURCE_TOPIC = "sourceTopic_1";
private static final String SOURCE_TOPIC_URI = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + SOURCE_TOPIC;

private static final String SINK_TOPIC = "sinkTopic";
private static final String SINK_TOPIC_URI = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + SINK_TOPIC;

private static final String TEST_REGISTRY_URL = "http://localhost:8081";
private static final String DEFAULT_VALUE_SUBJECT = SINK_TOPIC + "-value";
private static final String DEFAULT_KEY_SUBJECT = SINK_TOPIC + "-key";
private static final String DEFAULT_VALUE_SUBJECT = SINK_TOPIC_URI + "-value";
private static final String DEFAULT_KEY_SUBJECT = SINK_TOPIC_URI + "-key";

private static final ResolvedSchema SOURCE_SCHEMA =
new ResolvedSchema(
Expand Down Expand Up @@ -120,13 +125,13 @@ public class UpsertPscDynamicTableFactoryTest extends TestLogger {

private static final int[] SINK_VALUE_FIELDS = new int[] {0, 1};

private static final Properties UPSERT_KAFKA_SOURCE_PROPERTIES = new Properties();
private static final Properties UPSERT_KAFKA_SINK_PROPERTIES = new Properties();
private static final Properties UPSERT_PSC_SOURCE_PROPERTIES = new Properties();
private static final Properties UPSERT_PSC_SINK_PROPERTIES = new Properties();

static {
UPSERT_KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", "dummy");
UPSERT_PSC_SOURCE_PROPERTIES.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);

UPSERT_KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
UPSERT_PSC_SINK_PROPERTIES.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
}

static EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
Expand Down Expand Up @@ -158,8 +163,8 @@ public void testTableSource() {
SOURCE_KEY_FIELDS,
SOURCE_VALUE_FIELDS,
null,
SOURCE_TOPIC,
UPSERT_KAFKA_SOURCE_PROPERTIES);
SOURCE_TOPIC_URI,
UPSERT_PSC_SOURCE_PROPERTIES);
assertEquals(actualSource, expectedSource);

final PscDynamicSource actualUpsertKafkaSource = (PscDynamicSource) actualSource;
Expand All @@ -181,8 +186,8 @@ public void testTableSink() {
SINK_KEY_FIELDS,
SINK_VALUE_FIELDS,
null,
SINK_TOPIC,
UPSERT_KAFKA_SINK_PROPERTIES,
SINK_TOPIC_URI,
UPSERT_PSC_SINK_PROPERTIES,
DeliveryGuarantee.AT_LEAST_ONCE,
SinkBufferFlushMode.DISABLED,
null);
Expand Down Expand Up @@ -222,8 +227,8 @@ public void testBufferedTableSink() {
SINK_KEY_FIELDS,
SINK_VALUE_FIELDS,
null,
SINK_TOPIC,
UPSERT_KAFKA_SINK_PROPERTIES,
SINK_TOPIC_URI,
UPSERT_PSC_SINK_PROPERTIES,
DeliveryGuarantee.AT_LEAST_ONCE,
new SinkBufferFlushMode(100, 1000L),
null);
Expand Down Expand Up @@ -270,8 +275,8 @@ public void testTableSinkWithParallelism() {
SINK_KEY_FIELDS,
SINK_VALUE_FIELDS,
null,
SINK_TOPIC,
UPSERT_KAFKA_SINK_PROPERTIES,
SINK_TOPIC_URI,
UPSERT_PSC_SINK_PROPERTIES,
DeliveryGuarantee.AT_LEAST_ONCE,
SinkBufferFlushMode.DISABLED,
100);
Expand Down Expand Up @@ -350,9 +355,9 @@ private void verifyEncoderSubject(
Map<String, String> options = new HashMap<>();
// Kafka specific options.
options.put("connector", UpsertPscDynamicTableFactory.IDENTIFIER);
options.put("topic", SINK_TOPIC);
options.put("properties.group.id", "dummy");
options.put("properties.bootstrap.servers", "dummy");
options.put("topic-uri", SINK_TOPIC_URI);
options.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy");
options.put("properties." + PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
optionModifier.accept(options);

final RowType rowType = (RowType) SINK_SCHEMA.toSinkRowDataType().getLogicalType();
Expand Down Expand Up @@ -519,8 +524,8 @@ private static Map<String, String> getFullSourceOptions() {
// table options
Map<String, String> options = new HashMap<>();
options.put("connector", UpsertPscDynamicTableFactory.IDENTIFIER);
options.put("topic", SOURCE_TOPIC);
options.put("properties.bootstrap.servers", "dummy");
options.put("topic-uri", SOURCE_TOPIC_URI);
options.put("properties." + PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
// key format options
options.put("key.format", TestFormatFactory.IDENTIFIER);
options.put(
Expand Down Expand Up @@ -561,8 +566,8 @@ private static Map<String, String> getFullSourceOptions() {
private static Map<String, String> getFullSinkOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", UpsertPscDynamicTableFactory.IDENTIFIER);
options.put("topic", SINK_TOPIC);
options.put("properties.bootstrap.servers", "dummy");
options.put("topic-uri", SINK_TOPIC_URI);
options.put("properties." + PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
// key format options
options.put("value.format", TestFormatFactory.IDENTIFIER);
options.put(
Expand Down Expand Up @@ -597,7 +602,7 @@ private PscDynamicSource createExpectedScanSource(
int[] keyFields,
int[] valueFields,
String keyPrefix,
String topic,
String topicUri,
Properties properties) {
return new PscDynamicSource(
producedDataType,
Expand All @@ -606,7 +611,7 @@ private PscDynamicSource createExpectedScanSource(
keyFields,
valueFields,
keyPrefix,
Collections.singletonList(topic),
Collections.singletonList(topicUri),
null,
properties,
StartupMode.EARLIEST,
Expand Down
Loading

0 comments on commit b7f30bc

Please sign in to comment.