Skip to content

Commit

Permalink
WIP finish PscDynamicTableFactoryTest
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 16, 2024
1 parent e713312 commit 2cba3b1
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public class PscConnectorOptions {
"Topic names from which the table is read. Either 'topic-uri' or 'topic-uri-pattern' must be set for source. "
+ "Option 'topic-uri' is required for sink.");

public static final ConfigOption<String> TOPIC_URI_PATTERN =
ConfigOptions.key("topic-uri-pattern")
public static final ConfigOption<String> TOPIC_PATTERN =
ConfigOptions.key("topic-pattern")
.stringType()
.noDefaultValue()
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI_PATTERN;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_PATTERN;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TRANSACTIONAL_ID_PREFIX;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FIELDS_INCLUDE;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT;
Expand Down Expand Up @@ -108,11 +108,11 @@ public static void validateTableSinkOptions(ReadableConfig tableOptions) {

public static void validateSourceTopic(ReadableConfig tableOptions) {
Optional<List<String>> topic = tableOptions.getOptional(TOPIC_URI);
Optional<String> pattern = tableOptions.getOptional(TOPIC_URI_PATTERN);
Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);

if (topic.isPresent() && pattern.isPresent()) {
throw new ValidationException(
"Option 'topic' and 'topic-pattern' shouldn't be set together.");
"Option 'topic-uri' and 'topic-pattern' shouldn't be set together.");
}

if (!topic.isPresent() && !pattern.isPresent()) {
Expand All @@ -122,17 +122,17 @@ public static void validateSourceTopic(ReadableConfig tableOptions) {

public static void validateSinkTopic(ReadableConfig tableOptions) {
String errorMessageTemp =
"Flink Kafka sink currently only supports single topic, but got %s: %s.";
"Flink PSC sink currently only supports single topic, but got %s: %s.";
if (!isSingleTopicUri(tableOptions)) {
if (tableOptions.getOptional(TOPIC_URI_PATTERN).isPresent()) {
if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
throw new ValidationException(
String.format(
errorMessageTemp,
"'topic-pattern'",
tableOptions.get(TOPIC_URI_PATTERN)));
tableOptions.get(TOPIC_PATTERN)));
} else {
throw new ValidationException(
String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC_URI)));
String.format(errorMessageTemp, "'topic-uri'", tableOptions.get(TOPIC_URI)));
}
}
}
Expand Down Expand Up @@ -208,7 +208,7 @@ public static List<String> getSourceTopicUris(ReadableConfig tableOptions) {
}

public static Pattern getSourceTopicUriPattern(ReadableConfig tableOptions) {
return tableOptions.getOptional(TOPIC_URI_PATTERN).map(Pattern::compile).orElse(null);
return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
}

private static boolean isSingleTopicUri(ReadableConfig tableOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand Down Expand Up @@ -215,7 +216,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
context.createTypeInformation(producedDataType);

final PscSource<RowData> kafkaSource =
createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);
createPscSource(keyDeserialization, valueDeserialization, producedTypeInfo);

return new DataStreamScanProvider() {
@Override
Expand Down Expand Up @@ -373,7 +374,7 @@ public int hashCode() {

// --------------------------------------------------------------------------------------------

protected PscSource<RowData> createKafkaSource(
protected PscSource<RowData> createPscSource(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
Expand Down Expand Up @@ -402,6 +403,7 @@ protected PscSource<RowData> createKafkaSource(
properties.getProperty(
PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET,
PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_NONE);
offsetResetConfig = getResetStrategy(offsetResetConfig);
pscSourceBuilder.setStartingOffsets(
OffsetsInitializer.committedOffsets(offsetResetConfig));
break;
Expand All @@ -427,6 +429,23 @@ protected PscSource<RowData> createKafkaSource(
return pscSourceBuilder.build();
}

private String getResetStrategy(String offsetResetConfig) {
final String[] validResetStrategies = {"EARLIEST", "LATEST", "NONE"};
return Arrays.stream(validResetStrategies)
.filter(ors -> ors.equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
.findAny()
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"%s can not be set to %s. Valid values: [%s]",
PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET,
offsetResetConfig,
Arrays.stream(validResetStrategies)
.map(String::toLowerCase)
.collect(Collectors.joining(",")))));
}

private PscDeserializationSchema<RowData> createPscDeserializationSchema(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI_PATTERN;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_PATTERN;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TRANSACTIONAL_ID_PREFIX;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FIELDS_INCLUDE;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT;
Expand Down Expand Up @@ -128,7 +128,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(VALUE_FORMAT);
options.add(VALUE_FIELDS_INCLUDE);
options.add(TOPIC_URI);
options.add(TOPIC_URI_PATTERN);
options.add(TOPIC_PATTERN);
options.add(PROPS_GROUP_ID);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
Expand All @@ -147,7 +147,7 @@ public Set<ConfigOption<?>> forwardOptions() {
return Stream.of(
PROPS_GROUP_ID,
TOPIC_URI,
TOPIC_URI_PATTERN,
TOPIC_PATTERN,
SCAN_STARTUP_MODE,
SCAN_STARTUP_SPECIFIC_OFFSETS,
SCAN_TOPIC_PARTITION_DISCOVERY,
Expand Down
Loading

0 comments on commit 2cba3b1

Please sign in to comment.