From 6ffb8ccc7c4ba4c7f7d548eb4c349de8c58cbc24 Mon Sep 17 00:00:00 2001 From: Zhanghao Chen Date: Sat, 16 Nov 2024 15:17:56 +0800 Subject: [PATCH 1/2] [FLINK-33265] Support source parallelism setting for Kafka connector --- .../content.zh/docs/connectors/table/kafka.md | 8 +++ .../docs/connectors/table/upsert-kafka.md | 8 +++ docs/content/docs/connectors/table/kafka.md | 8 +++ .../docs/connectors/table/upsert-kafka.md | 8 +++ .../kafka/table/KafkaConnectorOptions.java | 1 + .../kafka/table/KafkaDynamicSource.java | 22 +++++-- .../kafka/table/KafkaDynamicTableFactory.java | 14 +++- .../table/UpsertKafkaDynamicTableFactory.java | 7 +- .../table/KafkaDynamicTableFactoryTest.java | 65 ++++++++++++++++--- .../UpsertKafkaDynamicTableFactoryTest.java | 45 +++++++++++-- 10 files changed, 166 insertions(+), 20 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md index 286a922ef..072c7a43a 100644 --- a/docs/content.zh/docs/connectors/table/kafka.md +++ b/docs/content.zh/docs/connectors/table/kafka.md @@ -342,6 +342,14 @@ CREATE TABLE KafkaTable ( Duration Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能 + +
scan.parallelism
+ optional + no + (none) + Integer + 定义 Kafka source 算子的并行度。默认情况下会使用全局默认并发。 +
sink.partitioner
可选 diff --git a/docs/content.zh/docs/connectors/table/upsert-kafka.md b/docs/content.zh/docs/connectors/table/upsert-kafka.md index 3d28ae56b..78c40fbca 100644 --- a/docs/content.zh/docs/connectors/table/upsert-kafka.md +++ b/docs/content.zh/docs/connectors/table/upsert-kafka.md @@ -180,6 +180,14 @@ of all available metadata fields. + +
scan.parallelism
+ optional + no + (none) + Integer + 定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并发。 +
sink.parallelism
可选 diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md index 5756315bc..12b0821c3 100644 --- a/docs/content/docs/connectors/table/kafka.md +++ b/docs/content/docs/connectors/table/kafka.md @@ -369,6 +369,14 @@ Connector Options Duration Interval for consumer to discover dynamically created Kafka topics and partitions periodically. To disable this feature, you need to explicitly set the 'scan.topic-partition-discovery.interval' value to 0. + +
scan.parallelism
+ optional + no + (none) + Integer + Defines the parallelism of the Kafka source operator. If not set, the global default parallelism is used. +
sink.partitioner
optional diff --git a/docs/content/docs/connectors/table/upsert-kafka.md b/docs/content/docs/connectors/table/upsert-kafka.md index e8e38aeda..db75309a2 100644 --- a/docs/content/docs/connectors/table/upsert-kafka.md +++ b/docs/content/docs/connectors/table/upsert-kafka.md @@ -192,6 +192,14 @@ Connector Options format which means that key columns appear in the data type for both the key and value format. + +
scan.parallelism
+ optional + no + (none) + Integer + Defines the parallelism of the upsert-kafka source operator. If not set, the global default parallelism is used. +
sink.parallelism
optional diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java index 11d3c659f..c64ab0bef 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java @@ -106,6 +106,7 @@ public class KafkaConnectorOptions { ValueFieldsStrategy.EXCEPT_KEY)) .build()); + public static final ConfigOption SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM; public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index c963da762..012068085 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -71,6 +71,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -171,6 +172,9 @@ public class KafkaDynamicSource protected final String tableIdentifier; + /** Parallelism of the physical Kafka consumer. * */ + protected final @Nullable Integer parallelism; + public KafkaDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -188,7 +192,8 @@ public KafkaDynamicSource( Map specificBoundedOffsets, long boundedTimestampMillis, boolean upsertMode, - String tableIdentifier) { + String tableIdentifier, + @Nullable Integer parallelism) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -228,6 +233,7 @@ public KafkaDynamicSource( this.boundedTimestampMillis = boundedTimestampMillis; this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; + this.parallelism = parallelism; } @Override @@ -267,6 +273,11 @@ public DataStream produceDataStream( public boolean isBounded() { return kafkaSource.getBoundedness() == Boundedness.BOUNDED; } + + @Override + public Optional getParallelism() { + return Optional.ofNullable(parallelism); + } }; } @@ -344,7 +355,8 @@ public DynamicTableSource copy() { specificBoundedOffsets, boundedTimestampMillis, upsertMode, - tableIdentifier); + tableIdentifier, + parallelism); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -384,7 +396,8 @@ public boolean equals(Object o) { && boundedTimestampMillis == that.boundedTimestampMillis && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) - && Objects.equals(watermarkStrategy, that.watermarkStrategy); + && Objects.equals(watermarkStrategy, that.watermarkStrategy) + && Objects.equals(parallelism, that.parallelism); } @Override @@ -409,7 +422,8 @@ public int hashCode() { boundedTimestampMillis, upsertMode, tableIdentifier, - watermarkStrategy); + watermarkStrategy, + parallelism); } // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 8124691a5..34f57ff15 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -74,6 +74,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; @@ -152,6 +153,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(SCAN_PARALLELISM); return options; } @@ -166,6 +168,7 @@ public Set> forwardOptions() { SCAN_STARTUP_SPECIFIC_OFFSETS, SCAN_TOPIC_PARTITION_DISCOVERY, SCAN_STARTUP_TIMESTAMP_MILLIS, + SCAN_PARALLELISM, SINK_PARTITIONER, SINK_PARALLELISM, TRANSACTIONAL_ID_PREFIX) @@ -215,6 +218,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final Integer parallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -231,7 +236,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedMode, boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, - context.getObjectIdentifier().asSummaryString()); + context.getObjectIdentifier().asSummaryString(), + parallelism); } @Override @@ -396,7 +402,8 @@ protected KafkaDynamicSource createKafkaTableSource( BoundedMode boundedMode, Map specificEndOffsets, long endTimestampMillis, - String tableIdentifier) { + String tableIdentifier, + Integer parallelism) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -414,7 +421,8 @@ protected KafkaDynamicSource createKafkaTableSource( specificEndOffsets, endTimestampMillis, false, - tableIdentifier); + tableIdentifier, + parallelism); } protected KafkaDynamicSink createKafkaTableSink( diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index 78debc175..275aebd3d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -62,6 +62,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM; @@ -115,6 +116,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); + options.add(SCAN_PARALLELISM); return options; } @@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + Integer parallelism = tableOptions.get(SCAN_PARALLELISM); + return new KafkaDynamicSource( context.getPhysicalRowDataType(), keyDecodingFormat, @@ -167,7 +171,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, true, - context.getObjectIdentifier().asSummaryString()); + context.getObjectIdentifier().asSummaryString(), + parallelism); } @Override diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index c1d796d08..dc624df5c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -212,7 +212,8 @@ public void testTableSource() { KAFKA_SOURCE_PROPERTIES, StartupMode.SPECIFIC_OFFSETS, specificOffsets, - 0); + 0, + null); assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); ScanTableSource.ScanRuntimeProvider provider = @@ -220,6 +221,47 @@ public void testTableSource() { assertKafkaSource(provider); } + @Test + public void testTableSourceWithParallelism() { + final Map modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), options -> options.put("scan.parallelism", "100")); + final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); + final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; + + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + + final DecodingFormat> valueDecodingFormat = + new DecodingFormatMock(",", true); + + // Test scan source equals + final KafkaDynamicSource expectedKafkaSource = + createExpectedScanSource( + SCHEMA_DATA_TYPE, + null, + valueDecodingFormat, + new int[0], + new int[] {0, 1, 2}, + null, + Collections.singletonList(TOPIC), + null, + KAFKA_SOURCE_PROPERTIES, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets, + 0, + 100); + assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); + + ScanTableSource.ScanRuntimeProvider provider = + actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(provider).isInstanceOf(DataStreamScanProvider.class); + final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider; + assertThat(sourceProvider.getParallelism().isPresent()).isTrue(); + assertThat(sourceProvider.getParallelism().get()).isEqualTo(100); + } + @Test public void testTableSourceWithPattern() { final Map modifiedOptions = @@ -254,7 +296,8 @@ public void testTableSourceWithPattern() { KAFKA_SOURCE_PROPERTIES, StartupMode.EARLIEST, specificOffsets, - 0); + 0, + null); final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); @@ -295,7 +338,8 @@ public void testTableSourceWithKeyValue() { KAFKA_FINAL_SOURCE_PROPERTIES, StartupMode.GROUP_OFFSETS, Collections.emptyMap(), - 0); + 0, + null); assertThat(actualSource).isEqualTo(expectedKafkaSource); } @@ -346,7 +390,8 @@ public void testTableSourceWithKeyValueAndMetadata() { KAFKA_FINAL_SOURCE_PROPERTIES, StartupMode.GROUP_OFFSETS, Collections.emptyMap(), - 0); + 0, + null); expectedKafkaSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedKafkaSource.metadataKeys = Collections.singletonList("timestamp"); @@ -1188,7 +1233,8 @@ public void testDiscoverPartitionByDefault() { props, StartupMode.SPECIFIC_OFFSETS, specificOffsets, - 0); + 0, + null); assertThat(actualSource).isEqualTo(expectedKafkaSource); ScanTableSource.ScanRuntimeProvider provider = actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); @@ -1226,7 +1272,8 @@ public void testDisableDiscoverPartition() { props, StartupMode.SPECIFIC_OFFSETS, specificOffsets, - 0); + 0, + null); assertThat(actualSource).isEqualTo(expectedKafkaSource); ScanTableSource.ScanRuntimeProvider provider = actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); @@ -1249,7 +1296,8 @@ private static KafkaDynamicSource createExpectedScanSource( Properties properties, StartupMode startupMode, Map specificStartupOffsets, - long startupTimestampMillis) { + long startupTimestampMillis, + @Nullable Integer parallelism) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -1267,7 +1315,8 @@ private static KafkaDynamicSource createExpectedScanSource( Collections.emptyMap(), 0, false, - FactoryMocks.IDENTIFIER.asSummaryString()); + FactoryMocks.IDENTIFIER.asSummaryString(), + parallelism); } private static KafkaDynamicSink createExpectedSink( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 1bcd775a1..855873cec 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -74,6 +74,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -168,7 +170,8 @@ public void testTableSource() { SOURCE_VALUE_FIELDS, null, Collections.singletonList(SOURCE_TOPIC), - UPSERT_KAFKA_SOURCE_PROPERTIES); + UPSERT_KAFKA_SOURCE_PROPERTIES, + null); assertThat(actualSource).isEqualTo(expectedSource); final KafkaDynamicSource actualUpsertKafkaSource = (KafkaDynamicSource) actualSource; @@ -177,6 +180,37 @@ public void testTableSource() { assertKafkaSource(provider); } + @Test + public void testTableSourceWithParallelism() { + final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); + // Construct table source using options and table source factory + final Map modifiedOptions = + getModifiedOptions( + getFullSourceOptions(), options -> options.put("scan.parallelism", "100")); + final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, modifiedOptions); + + final KafkaDynamicSource expectedSource = + createExpectedScanSource( + producedDataType, + keyDecodingFormat, + valueDecodingFormat, + SOURCE_KEY_FIELDS, + SOURCE_VALUE_FIELDS, + null, + Collections.singletonList(SOURCE_TOPIC), + UPSERT_KAFKA_SOURCE_PROPERTIES, + 100); + assertThat(actualSource).isEqualTo(expectedSource); + + final KafkaDynamicSource actualUpsertKafkaSource = (KafkaDynamicSource) actualSource; + ScanTableSource.ScanRuntimeProvider provider = + actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(provider).isInstanceOf(DataStreamScanProvider.class); + final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider; + assertThat(sourceProvider.getParallelism().isPresent()).isTrue(); + assertThat(sourceProvider.getParallelism().get()).isEqualTo(100); + } + @Test public void testTableSourceWithTopicList() { final Map modifiedOptions = @@ -199,7 +233,8 @@ public void testTableSourceWithTopicList() { SOURCE_VALUE_FIELDS, null, Arrays.asList(SOURCE_TOPIC, SOURCE_TOPIC), - UPSERT_KAFKA_SOURCE_PROPERTIES); + UPSERT_KAFKA_SOURCE_PROPERTIES, + null); assertThat(actualSource).isEqualTo(expectedSource); final KafkaDynamicSource actualUpsertKafkaSource = (KafkaDynamicSource) actualSource; @@ -851,7 +886,8 @@ private KafkaDynamicSource createExpectedScanSource( int[] valueFields, String keyPrefix, List topic, - Properties properties) { + Properties properties, + @Nullable Integer parallelism) { return new KafkaDynamicSource( producedDataType, keyDecodingFormat, @@ -869,7 +905,8 @@ private KafkaDynamicSource createExpectedScanSource( Collections.emptyMap(), 0, true, - FactoryMocks.IDENTIFIER.asSummaryString()); + FactoryMocks.IDENTIFIER.asSummaryString(), + parallelism); } private static KafkaDynamicSink createExpectedSink( From 82db2f2c406c7517c9cb0bd6ecc9f8cea7cb3193 Mon Sep 17 00:00:00 2001 From: Zhanghao Chen Date: Fri, 29 Nov 2024 22:01:16 +0800 Subject: [PATCH 2/2] address comments --- docs/content.zh/docs/connectors/table/kafka.md | 2 +- docs/content.zh/docs/connectors/table/upsert-kafka.md | 2 +- .../kafka/table/KafkaDynamicTableFactoryTest.java | 8 +++++--- .../kafka/table/UpsertKafkaDynamicTableFactoryTest.java | 8 +++++--- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md index 072c7a43a..9df680df8 100644 --- a/docs/content.zh/docs/connectors/table/kafka.md +++ b/docs/content.zh/docs/connectors/table/kafka.md @@ -348,7 +348,7 @@ CREATE TABLE KafkaTable ( no (none) Integer - 定义 Kafka source 算子的并行度。默认情况下会使用全局默认并发。 + 定义 Kafka source 算子的并行度。默认情况下会使用全局默认并行度。
sink.partitioner
diff --git a/docs/content.zh/docs/connectors/table/upsert-kafka.md b/docs/content.zh/docs/connectors/table/upsert-kafka.md index 78c40fbca..bacaae52b 100644 --- a/docs/content.zh/docs/connectors/table/upsert-kafka.md +++ b/docs/content.zh/docs/connectors/table/upsert-kafka.md @@ -186,7 +186,7 @@ of all available metadata fields. no (none) Integer - 定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并发。 + 定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并行度。
sink.parallelism
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index dc624df5c..10bfe5939 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -101,6 +101,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; +import static org.apache.flink.table.factories.FactoryUtil.SOURCE_PARALLELISM; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.assertj.core.api.Assertions.assertThat; @@ -225,7 +226,8 @@ public void testTableSource() { public void testTableSourceWithParallelism() { final Map modifiedOptions = getModifiedOptions( - getBasicSourceOptions(), options -> options.put("scan.parallelism", "100")); + getBasicSourceOptions(), + options -> options.put(SOURCE_PARALLELISM.key(), "100")); final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions); final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; @@ -258,8 +260,8 @@ public void testTableSourceWithParallelism() { actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); assertThat(provider).isInstanceOf(DataStreamScanProvider.class); final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider; - assertThat(sourceProvider.getParallelism().isPresent()).isTrue(); - assertThat(sourceProvider.getParallelism().get()).isEqualTo(100); + assertThat(sourceProvider.getParallelism()).isPresent(); + assertThat(sourceProvider.getParallelism()).hasValue(100); } @Test diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 855873cec..abde43dca 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -89,6 +89,7 @@ import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT; +import static org.apache.flink.table.factories.FactoryUtil.SOURCE_PARALLELISM; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.assertj.core.api.Assertions.assertThat; @@ -186,7 +187,8 @@ public void testTableSourceWithParallelism() { // Construct table source using options and table source factory final Map modifiedOptions = getModifiedOptions( - getFullSourceOptions(), options -> options.put("scan.parallelism", "100")); + getFullSourceOptions(), + options -> options.put(SOURCE_PARALLELISM.key(), "100")); final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, modifiedOptions); final KafkaDynamicSource expectedSource = @@ -207,8 +209,8 @@ public void testTableSourceWithParallelism() { actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); assertThat(provider).isInstanceOf(DataStreamScanProvider.class); final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider; - assertThat(sourceProvider.getParallelism().isPresent()).isTrue(); - assertThat(sourceProvider.getParallelism().get()).isEqualTo(100); + assertThat(sourceProvider.getParallelism()).isPresent(); + assertThat(sourceProvider.getParallelism()).hasValue(100); } @Test