From 8eac82ff113de48dd34e5f3ef2f3b42a85a1831f Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Fri, 13 Sep 2024 12:26:47 -0400 Subject: [PATCH] Finish code changes to source and sink API's --- .../flink/connector/psc/source/PscSource.java | 99 ++++++----- .../psc/source/PscSourceBuilder.java | 168 +++++++++--------- .../psc/source/PscSourceOptions.java | 8 +- .../enumerator/subscriber/PscSubscriber.java | 3 +- .../PscTopicUriPartitionSplitReader.java | 4 +- .../TestMultiKafkaClusterBackends.java | 2 +- 6 files changed, 147 insertions(+), 137 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java index 53e53b0..c9ded40 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java @@ -18,6 +18,22 @@ package com.pinterest.flink.connector.psc.source; +import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumState; +import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumStateSerializer; +import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumerator; +import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer; +import com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriber; +import com.pinterest.flink.connector.psc.source.metrics.PscSourceReaderMetrics; +import com.pinterest.flink.connector.psc.source.reader.PscRecordEmitter; +import com.pinterest.flink.connector.psc.source.reader.PscSourceReader; +import com.pinterest.flink.connector.psc.source.reader.PscTopicUriPartitionSplitReader; +import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema; +import com.pinterest.flink.connector.psc.source.reader.fetcher.PscSourceFetcherManager; +import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit; +import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplitSerializer; +import com.pinterest.psc.consumer.PscConsumerMessage; +import com.pinterest.psc.exception.ClientException; +import com.pinterest.psc.exception.startup.ConfigurationException; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; @@ -33,24 +49,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; -import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; -import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer; -import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; -import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; -import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader; -import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter; -import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager; -import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; -import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.kafka.clients.consumer.ConsumerRecord; import javax.annotation.Nullable; import java.io.IOException; @@ -60,7 +61,7 @@ import java.util.function.Supplier; /** - * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link + * The Source implementation of Kafka. Please use a {@link PscSourceBuilder} to construct a {@link * PscSource}. The following example shows how to create a KafkaSource emitting records of * String type. * @@ -75,32 +76,32 @@ * .build(); * } * - *

See {@link KafkaSourceBuilder} for more details. + *

See {@link PscSourceBuilder} for more details. * * @param the output type of the source. */ @PublicEvolving public class PscSource - implements Source, + implements Source, ResultTypeQueryable { private static final long serialVersionUID = -8755372893283732098L; // Users can choose only one of the following ways to specify the topics to consume from. - private final KafkaSubscriber subscriber; + private final PscSubscriber subscriber; // Users can specify the starting / stopping offset initializer. private final OffsetsInitializer startingOffsetsInitializer; private final OffsetsInitializer stoppingOffsetsInitializer; // Boundedness private final Boundedness boundedness; - private final KafkaRecordDeserializationSchema deserializationSchema; + private final PscRecordDeserializationSchema deserializationSchema; // The configurations. private final Properties props; PscSource( - KafkaSubscriber subscriber, + PscSubscriber subscriber, OffsetsInitializer startingOffsetsInitializer, @Nullable OffsetsInitializer stoppingOffsetsInitializer, Boundedness boundedness, - KafkaRecordDeserializationSchema deserializationSchema, + PscRecordDeserializationSchema deserializationSchema, Properties props) { this.subscriber = subscriber; this.startingOffsetsInitializer = startingOffsetsInitializer; @@ -115,8 +116,8 @@ public class PscSource * * @return a Kafka source builder. */ - public static KafkaSourceBuilder builder() { - return new KafkaSourceBuilder<>(); + public static PscSourceBuilder builder() { + return new PscSourceBuilder<>(); } @Override @@ -126,16 +127,16 @@ public Boundedness getBoundedness() { @Internal @Override - public SourceReader createReader(SourceReaderContext readerContext) + public SourceReader createReader(SourceReaderContext readerContext) throws Exception { return createReader(readerContext, (ignore) -> {}); } @VisibleForTesting - SourceReader createReader( + SourceReader createReader( SourceReaderContext readerContext, Consumer> splitFinishedHook) throws Exception { - FutureCompletingBlockingQueue>> + FutureCompletingBlockingQueue>> elementsQueue = new FutureCompletingBlockingQueue<>(); deserializationSchema.open( new DeserializationSchema.InitializationContext() { @@ -149,16 +150,22 @@ public UserCodeClassLoader getUserCodeClassLoader() { return readerContext.getUserCodeClassLoader(); } }); - final KafkaSourceReaderMetrics kafkaSourceReaderMetrics = - new KafkaSourceReaderMetrics(readerContext.metricGroup()); - - Supplier splitReaderSupplier = - () -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics); - KafkaRecordEmitter recordEmitter = new KafkaRecordEmitter<>(deserializationSchema); + final PscSourceReaderMetrics kafkaSourceReaderMetrics = + new PscSourceReaderMetrics(readerContext.metricGroup()); + + Supplier splitReaderSupplier = + () -> { + try { + return new PscTopicUriPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics); + } catch (ConfigurationException | ClientException e) { + throw new RuntimeException("Failed to create new PscTopicUriParititionSplitReader", e); + } + }; + PscRecordEmitter recordEmitter = new PscRecordEmitter<>(deserializationSchema); - return new KafkaSourceReader<>( + return new PscSourceReader<>( elementsQueue, - new KafkaSourceFetcherManager( + new PscSourceFetcherManager( elementsQueue, splitReaderSupplier::get, splitFinishedHook), recordEmitter, toConfiguration(props), @@ -168,9 +175,9 @@ public UserCodeClassLoader getUserCodeClassLoader() { @Internal @Override - public SplitEnumerator createEnumerator( - SplitEnumeratorContext enumContext) { - return new KafkaSourceEnumerator( + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + return new PscSourceEnumerator( subscriber, startingOffsetsInitializer, stoppingOffsetsInitializer, @@ -181,11 +188,11 @@ public SplitEnumerator createEnumerat @Internal @Override - public SplitEnumerator restoreEnumerator( - SplitEnumeratorContext enumContext, - KafkaSourceEnumState checkpoint) + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, + PscSourceEnumState checkpoint) throws IOException { - return new KafkaSourceEnumerator( + return new PscSourceEnumerator( subscriber, startingOffsetsInitializer, stoppingOffsetsInitializer, @@ -197,14 +204,14 @@ public SplitEnumerator restoreEnumera @Internal @Override - public SimpleVersionedSerializer getSplitSerializer() { - return new KafkaPartitionSplitSerializer(); + public SimpleVersionedSerializer getSplitSerializer() { + return new PscTopicUriPartitionSplitSerializer(); } @Internal @Override - public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { - return new KafkaSourceEnumStateSerializer(); + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new PscSourceEnumStateSerializer(); } @Override diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java index eba59dd..c0f5699 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java @@ -18,19 +18,20 @@ package com.pinterest.flink.connector.psc.source; +import com.pinterest.flink.connector.psc.PscFlinkUtil; +import com.pinterest.flink.connector.psc.source.enumerator.initializer.NoStoppingOffsetsInitializer; +import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer; +import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializerValidator; +import com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriber; +import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.serde.ByteArrayDeserializer; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.KafkaSourceOptions; -import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator; -import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,15 +91,17 @@ @PublicEvolving public class PscSourceBuilder { private static final Logger LOG = LoggerFactory.getLogger(PscSourceBuilder.class); - private static final String[] REQUIRED_CONFIGS = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}; + private static final String[] REQUIRED_CONFIGS = { + PscFlinkUtil.CLUSTER_URI_CONFIG, + }; // The subscriber specifies the partitions to subscribe to. - private KafkaSubscriber subscriber; + private PscSubscriber subscriber; // Users can specify the starting / stopping offset initializer. private OffsetsInitializer startingOffsetsInitializer; private OffsetsInitializer stoppingOffsetsInitializer; // Boundedness private Boundedness boundedness; - private KafkaRecordDeserializationSchema deserializationSchema; + private PscRecordDeserializationSchema deserializationSchema; // The configurations. protected Properties props; @@ -112,63 +115,63 @@ public class PscSourceBuilder { } /** - * Sets the bootstrap servers for the KafkaConsumer of the KafkaSource. + * Sets the clusterUri for the PscConsumer of the PscSource. * - * @param bootstrapServers the bootstrap servers of the Kafka cluster. - * @return this KafkaSourceBuilder. + * @param clusterUri the clusterUri of the PubSub cluster. + * @return this PscSourceBuilder. */ - public PscSourceBuilder setBootstrapServers(String bootstrapServers) { - return setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + public PscSourceBuilder setClusterUri(String clusterUri) { + return setProperty(PscFlinkUtil.CLUSTER_URI_CONFIG, clusterUri); } /** - * Sets the consumer group id of the KafkaSource. + * Sets the consumer group id of the PscSource. * - * @param groupId the group id of the KafkaSource. - * @return this KafkaSourceBuilder. + * @param groupId the group id of the PscSource. + * @return this PscSourceBuilder. */ public PscSourceBuilder setGroupId(String groupId) { - return setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, groupId); } /** - * Set a list of topics the KafkaSource should consume from. All the topics in the list should - * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of + * Set a list of topicRns the PscSource should consume from. All the topics in the list should + * have existed in the PubSub cluster. Otherwise an exception will be thrown. To allow some of * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead. * - * @param topics the list of topics to consume from. - * @return this KafkaSourceBuilder. + * @param topics the list of topicRns to consume from. + * @return this PscSourceBuilder. * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) */ - public PscSourceBuilder setTopics(List topics) { + public PscSourceBuilder setTopicRns(List topics) { ensureSubscriberIsNull("topics"); - subscriber = KafkaSubscriber.getTopicListSubscriber(topics); + subscriber = PscSubscriber.getTopicRnListSubscriber(topics); return this; } /** - * Set a list of topics the KafkaSource should consume from. All the topics in the list should - * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of + * Set a list of topicRns the PscSource should consume from. All the topics in the list should + * have existed in the PubSub cluster. Otherwise an exception will be thrown. To allow some of * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead. * - * @param topics the list of topics to consume from. - * @return this KafkaSourceBuilder. + * @param topicRns the list of topicRns to consume from. + * @return this PscSourceBuilder. * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) */ - public PscSourceBuilder setTopics(String... topics) { - return setTopics(Arrays.asList(topics)); + public PscSourceBuilder setTopicRns(TopicRn... topicRns) { + return setTopicRns(Arrays.asList(topicRns)); } /** * Set a topic pattern to consume from use the java {@link Pattern}. * * @param topicPattern the pattern of the topic name to consume from. - * @return this KafkaSourceBuilder. + * @return this PscSourceBuilder. * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern) */ public PscSourceBuilder setTopicPattern(Pattern topicPattern) { ensureSubscriberIsNull("topic pattern"); - subscriber = KafkaSubscriber.getTopicPatternSubscriber(topicPattern); + subscriber = PscSubscriber.getTopicPatternSubscriber(topicPattern); return this; } @@ -179,9 +182,9 @@ public PscSourceBuilder setTopicPattern(Pattern topicPattern) { * @return this KafkaSourceBuilder. * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection) */ - public PscSourceBuilder setPartitions(Set partitions) { + public PscSourceBuilder setPartitions(Set partitions) { ensureSubscriberIsNull("partitions"); - subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions); + subscriber = PscSubscriber.getPartitionSetSubscriber(partitions); return this; } @@ -199,15 +202,14 @@ public PscSourceBuilder setPartitions(Set partitions) { *

  • {@link OffsetsInitializer#committedOffsets()} - starting from the committed offsets of * the consumer group. *
  • {@link - * OffsetsInitializer#committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy)} + * OffsetsInitializer#committedOffsets(String)} * - starting from the committed offsets of the consumer group. If there is no committed - * offsets, starting from the offsets specified by the {@link - * org.apache.kafka.clients.consumer.OffsetResetStrategy OffsetResetStrategy}. + * offsets, starting from the offsets specified by the offset reset strategy. *
  • {@link OffsetsInitializer#offsets(Map)} - starting from the specified offsets for each * partition. *
  • {@link OffsetsInitializer#timestamp(long)} - starting from the specified timestamp for * each partition. Note that the guarantee here is that all the records in Kafka whose - * {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater than + * {@link MessageId#getOffset()} is greater than * the given starting timestamp will be consumed. However, it is possible that some * consumer records whose timestamp is smaller than the given starting timestamp are also * consumed. @@ -224,14 +226,14 @@ public PscSourceBuilder setStartingOffsets( } /** - * By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * By default the PscSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run as * a streaming source but still stops at some point, one can set an {@link OffsetsInitializer} * to specify the stopping offsets for each partition. When all the partitions have reached - * their stopping offsets, the KafkaSource will then exit. + * their stopping offsets, the PscSource will then exit. * *

    This method is different from {@link #setBounded(OffsetsInitializer)} that after setting - * the stopping offsets with this method, {@link org.apache.flink.connector.kafka.source.KafkaSource#getBoundedness()} will still return + * the stopping offsets with this method, {@link PscSource#getBoundedness()} will still return * {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the stopping offsets * specified by the stopping offsets {@link OffsetsInitializer}. * @@ -247,7 +249,7 @@ public PscSourceBuilder setStartingOffsets( * partition. *

  • {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each * partition. The guarantee of setting the stopping timestamp is that no Kafka records - * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater + * whose {@link MessageId#getTimestamp()} is greater * than the given stopping timestamp will be consumed. However, it is possible that some * records whose timestamp is smaller than the specified stopping timestamp are not * consumed. @@ -265,14 +267,14 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit } /** - * By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * By default the PscSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link * OffsetsInitializer} to specify the stopping offsets for each partition. When all the - * partitions have reached their stopping offsets, the KafkaSource will then exit. + * partitions have reached their stopping offsets, the PscSource will then exit. * *

    This method is different from {@link #setUnbounded(OffsetsInitializer)} that after setting - * the stopping offsets with this method, {@link org.apache.flink.connector.kafka.source.KafkaSource#getBoundedness()} will return + * the stopping offsets with this method, {@link PscSource#getBoundedness()} will return * {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}. * *

    The following {@link OffsetsInitializer} are commonly used and provided out of the box. @@ -287,7 +289,7 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit * partition. *

  • {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each * partition. The guarantee of setting the stopping timestamp is that no Kafka records - * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater + * whose {@link MessageId#getTimestamp()} is greater * than the given stopping timestamp will be consumed. However, it is possible that some * records whose timestamp is smaller than the specified stopping timestamp are not * consumed. @@ -305,7 +307,7 @@ public PscSourceBuilder setBounded(OffsetsInitializer stoppingOffsetsInitia } /** - * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link + * Sets the {@link PscRecordDeserializationSchema deserializer} of the {@link * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. * * @param recordDeserializer the deserializer for Kafka {@link @@ -313,13 +315,13 @@ public PscSourceBuilder setBounded(OffsetsInitializer stoppingOffsetsInitia * @return this KafkaSourceBuilder. */ public PscSourceBuilder setDeserializer( - KafkaRecordDeserializationSchema recordDeserializer) { + PscRecordDeserializationSchema recordDeserializer) { this.deserializationSchema = recordDeserializer; return this; } /** - * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link + * Sets the {@link PscRecordDeserializationSchema deserializer} of the {@link * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given * {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The * other information (e.g. key) in a ConsumerRecord will be ignored. @@ -330,7 +332,7 @@ public PscSourceBuilder setDeserializer( public PscSourceBuilder setValueOnlyDeserializer( DeserializationSchema deserializationSchema) { this.deserializationSchema = - KafkaRecordDeserializationSchema.valueOnly(deserializationSchema); + PscRecordDeserializationSchema.valueOnly(deserializationSchema); return this; } @@ -341,19 +343,19 @@ public PscSourceBuilder setValueOnlyDeserializer( * @return this KafkaSourceBuilder. */ public PscSourceBuilder setClientIdPrefix(String prefix) { - return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix); + return setProperty(PscSourceOptions.CLIENT_ID_PREFIX.key(), prefix); } /** * Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found - * in {@link ConsumerConfig} and {@link KafkaSourceOptions}. + * in {@link PscConfiguration} and {@link PscSourceOptions}. * *

    Note that the following keys will be overridden by the builder when the KafkaSource is * created. * *

      - *
    • key.deserializer is always set to {@link ByteArrayDeserializer}. - *
    • value.deserializer is always set to {@link ByteArrayDeserializer}. + *
    • key.deserializer is always set to {@link com.pinterest.psc.serde.ByteArrayDeserializer}. + *
    • value.deserializer is always set to {@link com.pinterest.psc.serde.ByteArrayDeserializer}. *
    • auto.offset.reset.strategy is overridden by {@link * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by * default {@link OffsetsInitializer#earliest()}. @@ -371,15 +373,15 @@ public PscSourceBuilder setProperty(String key, String value) { } /** - * Set arbitrary properties for the KafkaSource and KafkaConsumer. The valid keys can be found - * in {@link ConsumerConfig} and {@link KafkaSourceOptions}. + * Set arbitrary properties for the PscSource and PscConsumer. The valid keys can be found + * in {@link PscConfiguration} and {@link PscSourceOptions}. * *

      Note that the following keys will be overridden by the builder when the KafkaSource is * created. * *

        - *
      • key.deserializer is always set to {@link ByteArrayDeserializer}. - *
      • value.deserializer is always set to {@link ByteArrayDeserializer}. + *
      • key.deserializer is always set to {@link com.pinterest.psc.serde.ByteArrayDeserializer}. + *
      • value.deserializer is always set to {@link com.pinterest.psc.serde.ByteArrayDeserializer}. *
      • auto.offset.reset.strategy is overridden by {@link * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by * default {@link OffsetsInitializer#earliest()}. @@ -402,10 +404,10 @@ public PscSourceBuilder setProperties(Properties props) { * * @return a KafkaSource with the settings made for this builder. */ - public org.apache.flink.connector.kafka.source.KafkaSource build() { + public PscSource build() { sanityCheck(); parseAndSetRequiredProperties(); - return new KafkaSource<>( + return new PscSource<>( subscriber, startingOffsetsInitializer, stoppingOffsetsInitializer, @@ -427,38 +429,38 @@ private void ensureSubscriberIsNull(String attemptingSubscribeMode) { private void parseAndSetRequiredProperties() { maybeOverride( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + PscConfiguration.PSC_CONSUMER_KEY_DESERIALIZER, ByteArrayDeserializer.class.getName(), true); maybeOverride( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + PscConfiguration.PSC_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName(), true); - if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + if (!props.containsKey(PscConfiguration.PSC_CONSUMER_GROUP_ID)) { LOG.warn( "Offset commit on checkpoint is disabled because {} is not specified", - ConsumerConfig.GROUP_ID_CONFIG); - maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false); + PscConfiguration.PSC_CONSUMER_GROUP_ID); + maybeOverride(PscSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false); } - maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); + maybeOverride(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, "false", false); maybeOverride( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), + PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, + startingOffsetsInitializer.getAutoOffsetResetStrategy().toLowerCase(), true); // If the source is bounded, do not run periodic partition discovery. maybeOverride( - KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), + PscSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", boundedness == Boundedness.BOUNDED); // If the client id prefix is not set, reuse the consumer group id as the client id prefix, // or generate a random string if consumer group id is not specified. maybeOverride( - KafkaSourceOptions.CLIENT_ID_PREFIX.key(), - props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) - ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) - : "KafkaSource-" + new Random().nextLong(), + PscSourceOptions.CLIENT_ID_PREFIX.key(), + props.containsKey(PscConfiguration.PSC_CONSUMER_GROUP_ID) + ? props.getProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID) + : "PscSource-" + new Random().nextLong(), false); } @@ -495,10 +497,10 @@ private void sanityCheck() { checkNotNull(deserializationSchema, "Deserialization schema is required but not provided."); // Check consumer group ID checkState( - props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(), + props.containsKey(PscConfiguration.PSC_CONSUMER_GROUP_ID) || !offsetCommitEnabledManually(), String.format( "Property %s is required when offset commit is enabled", - ConsumerConfig.GROUP_ID_CONFIG)); + PscConfiguration.PSC_CONSUMER_GROUP_ID)); // Check offsets initializers if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) { ((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props); @@ -510,14 +512,14 @@ private void sanityCheck() { private boolean offsetCommitEnabledManually() { boolean autoCommit = - props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + props.containsKey(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED) && Boolean.parseBoolean( - props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + props.getProperty(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED)); boolean commitOnCheckpoint = - props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) + props.containsKey(PscSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) && Boolean.parseBoolean( props.getProperty( - KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())); + PscSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())); return autoCommit || commitOnCheckpoint; } } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java index 28a7c23..1742b9d 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java @@ -33,22 +33,22 @@ public class PscSourceOptions { ConfigOptions.key("client.id.prefix") .stringType() .noDefaultValue() - .withDescription("The prefix to use for the Kafka consumers."); + .withDescription("The prefix to use for the PSC consumers."); public static final ConfigOption PARTITION_DISCOVERY_INTERVAL_MS = ConfigOptions.key("partition.discovery.interval.ms") .longType() .noDefaultValue() .withDescription( - "The interval in milliseconds for the Kafka source to discover " + "The interval in milliseconds for the PSC source to discover " + "the new partitions. A non-positive value disables the partition discovery."); - public static final ConfigOption REGISTER_KAFKA_CONSUMER_METRICS = + public static final ConfigOption REGISTER_PSC_CONSUMER_METRICS = ConfigOptions.key("register.consumer.metrics") .booleanType() .defaultValue(true) .withDescription( - "Whether to register metrics of KafkaConsumer into Flink metric group"); + "Whether to register metrics of PscConsumer into Flink metric group"); public static final ConfigOption COMMIT_OFFSETS_ON_CHECKPOINT = ConfigOptions.key("commit.offsets.on.checkpoint") diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java index a54f320..676bd1a 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java @@ -54,11 +54,12 @@ public interface PscSubscriber extends Serializable { // ----------------- factory methods -------------- - static PscSubscriber getTopicListSubscriber(List topicRns) { + static PscSubscriber getTopicRnListSubscriber(List topicRns) { return new PscTopicUriListSubscriber(topicRns); } static PscSubscriber getTopicPatternSubscriber(Pattern topicUriPattern) { + // TODO: should this be topic pattern or topic uri pattern? return new TopicUriPatternSubscriber(topicUriPattern); } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index 521f855..85263d6 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -61,7 +61,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -/** A {@link SplitReader} implementation that reads records from Kafka partitions. */ +/** A {@link SplitReader} implementation that reads records from PSC TopicUriPartitions. */ @Internal public class PscTopicUriPartitionSplitReader implements SplitReader, PscTopicUriPartitionSplit> { @@ -450,7 +450,7 @@ private void maybeRegisterKafkaConsumerMetrics( final Boolean needToRegister = PscSourceOptions.getOption( props, - PscSourceOptions.REGISTER_KAFKA_CONSUMER_METRICS, + PscSourceOptions.REGISTER_PSC_CONSUMER_METRICS, Boolean::parseBoolean); if (needToRegister) { pscSourceReaderMetrics.registerPscConsumerMetrics(consumer); diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/producer/TestMultiKafkaClusterBackends.java b/psc-integration-test/src/test/java/com/pinterest/psc/producer/TestMultiKafkaClusterBackends.java index 2dd282e..c763d15 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/producer/TestMultiKafkaClusterBackends.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/producer/TestMultiKafkaClusterBackends.java @@ -98,7 +98,7 @@ public void tearDown() throws ExecutionException, InterruptedException { * @throws ProducerException */ @Test - public void testTransactionalProducersStates() throws ConfigurationException, ProducerException { + public void testTransactionalProducersStates() throws ConfigurationException, ProducerException, IOException { producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, IntegerSerializer.class.getName()); producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, IntegerSerializer.class.getName()); producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_TRANSACTIONAL_ID, "test-transactional-id");