diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index 3b64849..32718fa 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -121,8 +121,8 @@ public class PscConnectorOptions { "Topic names from which the table is read. Either 'topic-uri' or 'topic-uri-pattern' must be set for source. " + "Option 'topic-uri' is required for sink."); - public static final ConfigOption TOPIC_URI_PATTERN = - ConfigOptions.key("topic-uri-pattern") + public static final ConfigOption TOPIC_PATTERN = + ConfigOptions.key("topic-pattern") .stringType() .noDefaultValue() .withDescription( diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java index c3bebfd..57f8dcc 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java @@ -59,7 +59,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI_PATTERN; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_PATTERN; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TRANSACTIONAL_ID_PREFIX; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FIELDS_INCLUDE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT; @@ -108,11 +108,11 @@ public static void validateTableSinkOptions(ReadableConfig tableOptions) { public static void validateSourceTopic(ReadableConfig tableOptions) { Optional> topic = tableOptions.getOptional(TOPIC_URI); - Optional pattern = tableOptions.getOptional(TOPIC_URI_PATTERN); + Optional pattern = tableOptions.getOptional(TOPIC_PATTERN); if (topic.isPresent() && pattern.isPresent()) { throw new ValidationException( - "Option 'topic' and 'topic-pattern' shouldn't be set together."); + "Option 'topic-uri' and 'topic-pattern' shouldn't be set together."); } if (!topic.isPresent() && !pattern.isPresent()) { @@ -122,17 +122,17 @@ public static void validateSourceTopic(ReadableConfig tableOptions) { public static void validateSinkTopic(ReadableConfig tableOptions) { String errorMessageTemp = - "Flink Kafka sink currently only supports single topic, but got %s: %s."; + "Flink PSC sink currently only supports single topic, but got %s: %s."; if (!isSingleTopicUri(tableOptions)) { - if (tableOptions.getOptional(TOPIC_URI_PATTERN).isPresent()) { + if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) { throw new ValidationException( String.format( errorMessageTemp, "'topic-pattern'", - tableOptions.get(TOPIC_URI_PATTERN))); + tableOptions.get(TOPIC_PATTERN))); } else { throw new ValidationException( - String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC_URI))); + String.format(errorMessageTemp, "'topic-uri'", tableOptions.get(TOPIC_URI))); } } } @@ -208,7 +208,7 @@ public static List getSourceTopicUris(ReadableConfig tableOptions) { } public static Pattern getSourceTopicUriPattern(ReadableConfig tableOptions) { - return tableOptions.getOptional(TOPIC_URI_PATTERN).map(Pattern::compile).orElse(null); + return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null); } private static boolean isSingleTopicUri(ReadableConfig tableOptions) { diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index f5432c3..80340d5 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -61,6 +61,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -215,7 +216,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { context.createTypeInformation(producedDataType); final PscSource kafkaSource = - createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo); + createPscSource(keyDeserialization, valueDeserialization, producedTypeInfo); return new DataStreamScanProvider() { @Override @@ -373,7 +374,7 @@ public int hashCode() { // -------------------------------------------------------------------------------------------- - protected PscSource createKafkaSource( + protected PscSource createPscSource( DeserializationSchema keyDeserialization, DeserializationSchema valueDeserialization, TypeInformation producedTypeInfo) { @@ -402,6 +403,7 @@ protected PscSource createKafkaSource( properties.getProperty( PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_NONE); + offsetResetConfig = getResetStrategy(offsetResetConfig); pscSourceBuilder.setStartingOffsets( OffsetsInitializer.committedOffsets(offsetResetConfig)); break; @@ -427,6 +429,23 @@ protected PscSource createKafkaSource( return pscSourceBuilder.build(); } + private String getResetStrategy(String offsetResetConfig) { + final String[] validResetStrategies = {"EARLIEST", "LATEST", "NONE"}; + return Arrays.stream(validResetStrategies) + .filter(ors -> ors.equals(offsetResetConfig.toUpperCase(Locale.ROOT))) + .findAny() + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + "%s can not be set to %s. Valid values: [%s]", + PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, + offsetResetConfig, + Arrays.stream(validResetStrategies) + .map(String::toLowerCase) + .collect(Collectors.joining(","))))); + } + private PscDeserializationSchema createPscDeserializationSchema( DeserializationSchema keyDeserialization, DeserializationSchema valueDeserialization, diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index 6146a77..4f44fd5 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -73,7 +73,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI; -import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI_PATTERN; +import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_PATTERN; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TRANSACTIONAL_ID_PREFIX; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FIELDS_INCLUDE; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT; @@ -128,7 +128,7 @@ public Set> optionalOptions() { options.add(VALUE_FORMAT); options.add(VALUE_FIELDS_INCLUDE); options.add(TOPIC_URI); - options.add(TOPIC_URI_PATTERN); + options.add(TOPIC_PATTERN); options.add(PROPS_GROUP_ID); options.add(SCAN_STARTUP_MODE); options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); @@ -147,7 +147,7 @@ public Set> forwardOptions() { return Stream.of( PROPS_GROUP_ID, TOPIC_URI, - TOPIC_URI_PATTERN, + TOPIC_PATTERN, SCAN_STARTUP_MODE, SCAN_STARTUP_SPECIFIC_OFFSETS, SCAN_TOPIC_PARTITION_DISCOVERY, diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index 2d0e988..e42c052 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -18,16 +18,20 @@ package com.pinterest.flink.streaming.connectors.psc.table; +import com.pinterest.flink.connector.psc.PscFlinkConfiguration; import com.pinterest.flink.connector.psc.sink.PscSink; import com.pinterest.flink.connector.psc.source.PscSource; import com.pinterest.flink.connector.psc.source.PscSourceOptions; import com.pinterest.flink.connector.psc.source.PscSourceTestUtils; import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumState; import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition; import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkFixedPartitioner; import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.serde.ByteArraySerializer; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.connector.sink2.Sink; @@ -69,7 +73,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLoggerExtension; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -88,6 +91,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.AVRO_CONFLUENT; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT; @@ -104,10 +108,13 @@ public class PscDynamicTableFactoryTest { private static final String TOPIC = "myTopic"; + private static final String TOPIC_URI = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC; private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3"; + private static final String TOPIC_URIS = Arrays.stream(TOPICS.split(";")).map(topic -> PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic).reduce((a, b) -> a + ";" + b).get(); private static final String TOPIC_REGEX = "myTopic-\\d+"; private static final List TOPIC_LIST = Arrays.asList("myTopic-1", "myTopic-2", "myTopic-3"); + private static final List TOPIC_URI_LIST = TOPIC_LIST.stream().map(topic -> PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic).collect(Collectors.toList()); private static final String TEST_REGISTRY_URL = "http://localhost:8081"; private static final String DEFAULT_VALUE_SUBJECT = TOPIC + "-value"; private static final String DEFAULT_KEY_SUBJECT = TOPIC + "-key"; @@ -126,27 +133,27 @@ public class PscDynamicTableFactoryTest { private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3); private static final String DISCOVERY_INTERVAL = "1000 ms"; - private static final Properties KAFKA_SOURCE_PROPERTIES = new Properties(); - private static final Properties KAFKA_FINAL_SOURCE_PROPERTIES = new Properties(); - private static final Properties KAFKA_SINK_PROPERTIES = new Properties(); - private static final Properties KAFKA_FINAL_SINK_PROPERTIES = new Properties(); + private static final Properties PSC_SOURCE_PROPERTIES = new Properties(); + private static final Properties PSC_FINAL_SOURCE_PROPERTIES = new Properties(); + private static final Properties PSC_SINK_PROPERTIES = new Properties(); + private static final Properties PSC_FINAL_SINK_PROPERTIES = new Properties(); static { - KAFKA_SOURCE_PROPERTIES.setProperty("group.id", "dummy"); - KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", "dummy"); - KAFKA_SOURCE_PROPERTIES.setProperty("partition.discovery.interval.ms", "1000"); + PSC_SOURCE_PROPERTIES.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + PSC_SOURCE_PROPERTIES.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + PSC_SOURCE_PROPERTIES.setProperty("partition.discovery.interval.ms", "1000"); - KAFKA_SINK_PROPERTIES.setProperty("group.id", "dummy"); - KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy"); + PSC_SINK_PROPERTIES.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + PSC_SINK_PROPERTIES.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); - KAFKA_FINAL_SINK_PROPERTIES.putAll(KAFKA_SINK_PROPERTIES); - KAFKA_FINAL_SINK_PROPERTIES.setProperty( - "value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - KAFKA_FINAL_SINK_PROPERTIES.setProperty( - "key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - KAFKA_FINAL_SINK_PROPERTIES.put("transaction.timeout.ms", 3600000); + PSC_FINAL_SINK_PROPERTIES.putAll(PSC_SINK_PROPERTIES); + PSC_FINAL_SINK_PROPERTIES.setProperty( + PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName()); + PSC_FINAL_SINK_PROPERTIES.setProperty( + PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName()); + PSC_FINAL_SINK_PROPERTIES.put(PscConfiguration.PSC_PRODUCER_TRANSACTION_TIMEOUT_MS, 3600000); - KAFKA_FINAL_SOURCE_PROPERTIES.putAll(KAFKA_SOURCE_PROPERTIES); + PSC_FINAL_SOURCE_PROPERTIES.putAll(PSC_SOURCE_PROPERTIES); } private static final String PROPS_SCAN_OFFSETS = @@ -190,8 +197,8 @@ public void testTableSource() { final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource; final Map specificOffsets = new HashMap<>(); - specificOffsets.put(new PscTopicUriPartition(TOPIC, PARTITION_0), OFFSET_0); - specificOffsets.put(new PscTopicUriPartition(TOPIC, PARTITION_1), OFFSET_1); + specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0); + specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_1), OFFSET_1); final DecodingFormat> valueDecodingFormat = new DecodingFormatMock(",", true); @@ -205,9 +212,9 @@ public void testTableSource() { new int[0], new int[] {0, 1, 2}, null, - Collections.singletonList(TOPIC), + Collections.singletonList(TOPIC_URI), null, - KAFKA_SOURCE_PROPERTIES, + PSC_SOURCE_PROPERTIES, StartupMode.SPECIFIC_OFFSETS, specificOffsets, 0); @@ -224,7 +231,7 @@ public void testTableSourceWithPattern() { getModifiedOptions( getBasicSourceOptions(), options -> { - options.remove("topic"); + options.remove("topic-uri"); options.put("topic-pattern", TOPIC_REGEX); options.put( "scan.startup.mode", @@ -249,7 +256,7 @@ public void testTableSourceWithPattern() { null, null, Pattern.compile(TOPIC_REGEX), - KAFKA_SOURCE_PROPERTIES, + PSC_SOURCE_PROPERTIES, StartupMode.EARLIEST, specificOffsets, 0); @@ -288,9 +295,9 @@ public void testTableSourceWithKeyValue() { new int[] {0}, new int[] {1, 2}, null, - Collections.singletonList(TOPIC), + Collections.singletonList(TOPIC_URI), null, - KAFKA_FINAL_SOURCE_PROPERTIES, + PSC_FINAL_SOURCE_PROPERTIES, StartupMode.GROUP_OFFSETS, Collections.emptyMap(), 0); @@ -339,9 +346,9 @@ public void testTableSourceWithKeyValueAndMetadata() { new int[] {0}, new int[] {1}, null, - Collections.singletonList(TOPIC), + Collections.singletonList(TOPIC_URI), null, - KAFKA_FINAL_SOURCE_PROPERTIES, + PSC_FINAL_SOURCE_PROPERTIES, StartupMode.GROUP_OFFSETS, Collections.emptyMap(), 0); @@ -355,7 +362,7 @@ public void testTableSourceWithKeyValueAndMetadata() { public void testTableSourceCommitOnCheckpointDisabled() { final Map modifiedOptions = getModifiedOptions( - getBasicSourceOptions(), options -> options.remove("properties.group.id")); + getBasicSourceOptions(), options -> options.remove("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID)); final DynamicTableSource tableSource = createTableSource(SCHEMA, modifiedOptions); assertThat(tableSource).isInstanceOf(PscDynamicSource.class); @@ -371,7 +378,7 @@ public void testTableSourceCommitOnCheckpointDisabled() { assertThat(configuration.get(PscSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)).isFalse(); assertThat( configuration.get( - ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + ConfigOptions.key(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED) .booleanType() .noDefaultValue())) .isFalse(); @@ -391,8 +398,8 @@ public void testTableSourceSetOffsetResetWithException() { .isInstanceOf(IllegalArgumentException.class) .hasMessage( String.format( - "%s can not be set to %s. Valid values: [latest,earliest,none]", - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, errorStrategy)); + "%s can not be set to %s. Valid values: [earliest,latest,none]", + PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, errorStrategy)); } private void testSetOffsetResetForStartFromGroupOffsets(String value) { @@ -405,7 +412,7 @@ private void testSetOffsetResetForStartFromGroupOffsets(String value) { return; } options.put( - PROPERTIES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + PROPERTIES_PREFIX + PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, value); }); final DynamicTableSource tableSource = createTableSource(SCHEMA, modifiedOptions); @@ -419,10 +426,10 @@ private void testSetOffsetResetForStartFromGroupOffsets(String value) { PscSourceTestUtils.getPscSourceConfiguration(pscSource); if (value == null) { - assertThat(configuration.toMap().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + assertThat(configuration.toMap().get(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET)) .isEqualTo("none"); } else { - assertThat(configuration.toMap().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + assertThat(configuration.toMap().get(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET)) .isEqualTo(value); } } @@ -434,7 +441,7 @@ public void testTableSink() { getBasicSinkOptions(), options -> { options.put("sink.delivery-guarantee", "exactly-once"); - options.put("sink.transactional-id-prefix", "kafka-sink"); + options.put("sink.transactional-id-prefix", "psc-sink"); }); final DynamicTableSink actualSink = createTableSink(SCHEMA, modifiedOptions); @@ -449,12 +456,12 @@ public void testTableSink() { new int[0], new int[] {0, 1, 2}, null, - TOPIC, - KAFKA_SINK_PROPERTIES, + TOPIC_URI, + PSC_SINK_PROPERTIES, new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, null, - "kafka-sink"); + "psc-sink"); assertThat(actualSink).isEqualTo(expectedSink); // Test kafka producer. @@ -478,7 +485,7 @@ public void testTableSinkSemanticTranslation() { getBasicSinkOptions(), options -> { options.put("sink.semantic", semantic); - options.put("sink.transactional-id-prefix", "kafka-sink"); + options.put("sink.transactional-id-prefix", "psc-sink"); }); final DynamicTableSink actualSink = createTableSink(SCHEMA, modifiedOptions); final DynamicTableSink expectedSink = @@ -489,12 +496,12 @@ public void testTableSinkSemanticTranslation() { new int[0], new int[] {0, 1, 2}, null, - TOPIC, - KAFKA_SINK_PROPERTIES, + TOPIC_URI, + PSC_SINK_PROPERTIES, new FlinkFixedPartitioner<>(), DeliveryGuarantee.valueOf(semantic.toUpperCase().replace("-", "_")), null, - "kafka-sink"); + "psc-sink"); assertThat(actualSink).isEqualTo(expectedSink); } } @@ -506,7 +513,7 @@ public void testTableSinkWithKeyValue() { getKeyValueOptions(), options -> { options.put("sink.delivery-guarantee", "exactly-once"); - options.put("sink.transactional-id-prefix", "kafka-sink"); + options.put("sink.transactional-id-prefix", "psc-sink"); }); final DynamicTableSink actualSink = createTableSink(SCHEMA, modifiedOptions); final PscDynamicSink actualKafkaSink = (PscDynamicSink) actualSink; @@ -532,12 +539,12 @@ public void testTableSinkWithKeyValue() { new int[] {0}, new int[] {1, 2}, null, - TOPIC, - KAFKA_FINAL_SINK_PROPERTIES, + TOPIC_URI, + PSC_FINAL_SINK_PROPERTIES, new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, null, - "kafka-sink"); + "psc-sink"); assertThat(actualSink).isEqualTo(expectedSink); } @@ -560,12 +567,12 @@ public void testTableSinkWithParallelism() { new int[0], new int[] {0, 1, 2}, null, - TOPIC, - KAFKA_SINK_PROPERTIES, + TOPIC_URI, + PSC_SINK_PROPERTIES, new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, 100, - "kafka-sink"); + "psc-sink"); assertThat(actualSink).isEqualTo(expectedSink); final DynamicTableSink.SinkRuntimeProvider provider = @@ -584,7 +591,7 @@ public void testTableSinkAutoCompleteSchemaRegistrySubject() { options.put("format", "debezium-avro-confluent"); options.put("debezium-avro-confluent.url", TEST_REGISTRY_URL); }, - DEFAULT_VALUE_SUBJECT, + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + DEFAULT_VALUE_SUBJECT, "N/A"); // only value.format @@ -593,7 +600,7 @@ public void testTableSinkAutoCompleteSchemaRegistrySubject() { options.put("value.format", "avro-confluent"); options.put("value.avro-confluent.url", TEST_REGISTRY_URL); }, - DEFAULT_VALUE_SUBJECT, + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + DEFAULT_VALUE_SUBJECT, "N/A"); // value.format + key.format @@ -605,8 +612,9 @@ public void testTableSinkAutoCompleteSchemaRegistrySubject() { options.put("key.avro-confluent.url", TEST_REGISTRY_URL); options.put("key.fields", NAME); }, - DEFAULT_VALUE_SUBJECT, - DEFAULT_KEY_SUBJECT); + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + DEFAULT_VALUE_SUBJECT, + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + DEFAULT_KEY_SUBJECT + ); // value.format + non-avro key.format verifyEncoderSubject( @@ -616,7 +624,7 @@ public void testTableSinkAutoCompleteSchemaRegistrySubject() { options.put("key.format", "csv"); options.put("key.fields", NAME); }, - DEFAULT_VALUE_SUBJECT, + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + DEFAULT_VALUE_SUBJECT, "N/A"); // non-avro value.format + key.format @@ -628,7 +636,7 @@ public void testTableSinkAutoCompleteSchemaRegistrySubject() { options.put("key.fields", NAME); }, "N/A", - DEFAULT_KEY_SUBJECT); + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + DEFAULT_KEY_SUBJECT); // not override for 'format' verifyEncoderSubject( @@ -650,7 +658,7 @@ public void testTableSinkAutoCompleteSchemaRegistrySubject() { options.put("key.avro-confluent.subject", "sub2"); options.put("key.fields", NAME); }, - DEFAULT_VALUE_SUBJECT, + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + DEFAULT_VALUE_SUBJECT, "sub2"); } @@ -659,11 +667,11 @@ private void verifyEncoderSubject( String expectedValueSubject, String expectedKeySubject) { Map options = new HashMap<>(); - // Kafka specific options. + // Psc specific options. options.put("connector", PscDynamicTableFactory.IDENTIFIER); - options.put("topic", TOPIC); - options.put("properties.group.id", "dummy"); - options.put("properties.bootstrap.servers", "dummy"); + options.put("topic-uri", TOPIC_URI); + options.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + options.put("properties." + PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); optionModifier.accept(options); final RowType rowType = (RowType) SCHEMA_DATA_TYPE.getLogicalType(); @@ -732,7 +740,7 @@ public void testSourceTableWithTopicAndTopicPattern() { getModifiedOptions( getBasicSourceOptions(), options -> { - options.put("topic", TOPICS); + options.put("topic-uri", TOPIC_URIS); options.put("topic-pattern", TOPIC_REGEX); }); @@ -742,7 +750,7 @@ public void testSourceTableWithTopicAndTopicPattern() { .satisfies( containsCause( new ValidationException( - "Option 'topic' and 'topic-pattern' shouldn't be set together."))); + "Option 'topic-uri' and 'topic-pattern' shouldn't be set together."))); } @Test @@ -857,12 +865,12 @@ public void testSinkWithTopicListOrTopicPattern() { getModifiedOptions( getBasicSinkOptions(), options -> { - options.put("topic", TOPICS); + options.put("topic-uri", TOPIC_URIS); options.put("scan.startup.mode", "earliest-offset"); options.remove("specific-offsets"); }); final String errorMessageTemp = - "Flink Kafka sink currently only supports single topic, but got %s: %s."; + "Flink PSC sink currently only supports single topic, but got %s: %s."; try { createTableSink(SCHEMA, modifiedOptions); @@ -871,8 +879,8 @@ public void testSinkWithTopicListOrTopicPattern() { .isEqualTo( String.format( errorMessageTemp, - "'topic'", - String.format("[%s]", String.join(", ", TOPIC_LIST)))); + "'topic-uri'", + String.format("[%s]", String.join(", ", TOPIC_URI_LIST)))); } modifiedOptions = @@ -1027,9 +1035,10 @@ private static Map getBasicSourceOptions() { Map tableOptions = new HashMap<>(); // Kafka specific options. tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER); - tableOptions.put("topic", TOPIC); - tableOptions.put("properties.group.id", "dummy"); - tableOptions.put("properties.bootstrap.servers", "dummy"); + tableOptions.put("topic-uri", TOPIC_URI); + tableOptions.put("properties.psc.consumer.group.id", "dummy"); + tableOptions.put("properties.psc.cluster.uri", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); +// tableOptions.put("properties.bootstrap.servers", "dummy"); tableOptions.put("scan.startup.mode", "specific-offsets"); tableOptions.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS); tableOptions.put("scan.topic-partition-discovery.interval", DISCOVERY_INTERVAL); @@ -1051,13 +1060,13 @@ private static Map getBasicSinkOptions() { Map tableOptions = new HashMap<>(); // Kafka specific options. tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER); - tableOptions.put("topic", TOPIC); - tableOptions.put("properties.group.id", "dummy"); - tableOptions.put("properties.bootstrap.servers", "dummy"); + tableOptions.put("topic-uri", TOPIC_URI); + tableOptions.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + tableOptions.put("properties." + PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); tableOptions.put( "sink.partitioner", PscConnectorOptionsUtil.SINK_PARTITIONER_VALUE_FIXED); tableOptions.put("sink.delivery-guarantee", DeliveryGuarantee.EXACTLY_ONCE.toString()); - tableOptions.put("sink.transactional-id-prefix", "kafka-sink"); + tableOptions.put("sink.transactional-id-prefix", "psc-sink"); // Format options. tableOptions.put("format", TestFormatFactory.IDENTIFIER); final String formatDelimiterKey = @@ -1071,14 +1080,14 @@ private static Map getKeyValueOptions() { Map tableOptions = new HashMap<>(); // Kafka specific options. tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER); - tableOptions.put("topic", TOPIC); - tableOptions.put("properties.group.id", "dummy"); - tableOptions.put("properties.bootstrap.servers", "dummy"); + tableOptions.put("topic-uri", TOPIC_URI); + tableOptions.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + tableOptions.put("properties." + PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); tableOptions.put("scan.topic-partition-discovery.interval", DISCOVERY_INTERVAL); tableOptions.put( "sink.partitioner", PscConnectorOptionsUtil.SINK_PARTITIONER_VALUE_FIXED); tableOptions.put("sink.delivery-guarantee", DeliveryGuarantee.EXACTLY_ONCE.toString()); - tableOptions.put("sink.transactional-id-prefix", "kafka-sink"); + tableOptions.put("sink.transactional-id-prefix", "psc-sink"); // Format options. tableOptions.put("key.format", TestFormatFactory.IDENTIFIER); tableOptions.put(