From 662794801ef8c251fef7717dd67d3d7a69539847 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 28 Oct 2024 11:54:42 -0400 Subject: [PATCH] Refactor to remove Kafka references --- .../connector/psc/PscFlinkConfiguration.java | 11 + .../flink/connector/psc/sink/PscSink.java | 14 +- .../connector/psc/sink/PscSinkBuilder.java | 6 +- .../psc/sink/TransactionAborter.java | 6 +- .../flink/connector/psc/source/PscSource.java | 15 +- .../psc/source/PscSourceBuilder.java | 91 ++-- .../psc/source/PscSourceOptions.java | 2 +- .../NoStoppingOffsetsInitializer.java | 2 +- .../initializer/OffsetsInitializer.java | 16 +- .../OffsetsInitializerValidator.java | 4 +- .../ReaderHandledOffsetsInitializer.java | 4 +- .../TimestampOffsetsInitializer.java | 2 +- .../enumerator/subscriber/PscSubscriber.java | 4 +- .../subscriber/PscTopicUriListSubscriber.java | 2 +- .../TopicNamePatternSubscriber.java | 2 +- .../psc/source/reader/PscSourceReader.java | 4 +- .../PscTopicUriPartitionSplitReader.java | 4 +- .../PscRecordDeserializationSchema.java | 8 +- .../fetcher/PscSourceFetcherManager.java | 4 +- .../split/PscTopicUriPartitionSplitState.java | 4 +- .../psc/table/PscConnectorOptions.java | 26 +- .../psc/table/PscConnectorOptionsUtil.java | 26 +- .../connectors/psc/table/PscDynamicSink.java | 24 +- .../psc/table/PscDynamicSource.java | 24 +- .../psc/table/PscDynamicTableFactory.java | 2 +- .../table/UpsertPscDynamicTableFactory.java | 17 +- ...cRecordSerializationSchemaBuilderTest.java | 10 +- .../psc/sink/PscSinkBuilderTest.java | 2 - .../connector/psc/sink/PscSinkITCase.java | 14 +- .../connector/psc/sink/PscTransactionLog.java | 8 +- .../psc/sink/PscTransactionLogITCase.java | 10 +- .../connector/psc/sink/PscWriterITCase.java | 12 +- .../psc/sink/testutils/PscDataReader.java | 2 +- .../testutils/PscSinkExternalContext.java | 2 +- .../connector/psc/source/PscSourceITCase.java | 6 +- .../psc/source/PscSourceLegacyITCase.java | 8 +- .../psc/source/PscSourceTestUtils.java | 6 +- .../PscSourceEnumStateSerializerTest.java | 4 +- .../metrics/PscSourceReaderMetricsTest.java | 4 +- .../source/reader/PscSourceReaderTest.java | 17 +- .../PscRecordDeserializationSchemaTest.java | 14 +- .../testutils/PscSourceExternalContext.java | 10 +- .../PscSourceExternalContextFactory.java | 1 - .../psc/testutils/PscSourceTestEnv.java | 5 - .../PscTopicUriPartitionDataWriter.java | 2 +- .../PscConsumerTestBaseWithKafkaAsPubSub.java | 4 +- .../streaming/connectors/psc/PscITCase.java | 4 +- .../psc/PscTableSourceSinkFactoryTest.java | 96 ---- .../PscTableSourceSinkFactoryTestBase.java | 477 ------------------ .../psc/table/PscChangelogTableITCase.java | 31 +- .../psc/table/PscDynamicTableFactoryTest.java | 53 +- .../connectors/psc/table/PscTableITCase.java | 4 +- .../psc/table/PscTableTestBase.java | 2 +- .../psc/table/PscTableTestUtils.java | 2 +- .../UpsertPscDynamicTableFactoryTest.java | 38 +- .../psc/table/UpsertPscTableITCase.java | 6 +- 56 files changed, 293 insertions(+), 885 deletions(-) delete mode 100644 psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTest.java delete mode 100644 psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTestBase.java diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java index 636ef3b..ea2c18c 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java @@ -7,6 +7,17 @@ import java.util.Properties; public class PscFlinkConfiguration { + + /** + * Configuration key for the cluster URI. This is required for the following client types that use the {@link com.pinterest.flink.connector.psc.source.PscSource} + * and {@link com.pinterest.flink.connector.psc.sink.PscSink} classes: + * + *
  • Transactional producer: The cluster URI specifies the cluster to which the producer will connect. Transactional + * * producers require a cluster URI to be specified because transaction lifecycles are managed by the backend + * * PubSub cluster, and it is generally not possible to have transactions span multiple clusters.
  • + *
  • Consumer: Unlike regular PscConsumers, consumers in {@link com.pinterest.flink.connector.psc.source.PscSource} require + * * clusterUri to connect to the cluster for metadata queries during their lifecycles.
  • + */ public static final String CLUSTER_URI_CONFIG = "psc.cluster.uri"; public static TopicUri validateAndGetBaseClusterUri(Properties properties) throws TopicUriSyntaxException { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSink.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSink.java index 1cb2506..2361545 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSink.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSink.java @@ -37,22 +37,22 @@ * Flink Sink to produce data into a PSC topicUri. The sink supports all delivery guarantees * described by {@link DeliveryGuarantee}. *
  • {@link DeliveryGuarantee#NONE} does not provide any guarantees: messages may be lost in case - * of issues on the Kafka broker and messages may be duplicated in case of a Flink failure. + * of issues on the PubSub broker and messages may be duplicated in case of a Flink failure. *
  • {@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the - * PSC buffers to be acknowledged by the PSC producer on a checkpoint. No messages will be + * producer buffers to be acknowledged by the PSC producer on a checkpoint. No messages will be * lost in case of any issue with the brokers but messages may be duplicated when Flink * restarts. *
  • {@link DeliveryGuarantee#EXACTLY_ONCE}: Note that this mode is only supported by KafkaProducers in the backend. * * In this mode the PscSink will write all messages in - * a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer - * reads only committed data (see Kafka consumer config isolation.level), no duplicates will be + * a transaction that will be committed to PubSub on a checkpoint. Thus, if the consumer + * reads only committed data (see PSC consumer config isolation.level), no duplicates will be * seen in case of a Flink restart. However, this delays record writing effectively until a * checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure that you - * use unique {@link #transactionalIdPrefix}s across your applications running on the same Kafka + * use unique {@link #transactionalIdPrefix}s across your applications running on the same PubSub * cluster such that multiple running jobs do not interfere in their transactions! Additionally, - * it is highly recommended to tweak Kafka transaction timeout (link) >> maximum checkpoint - * duration + maximum restart duration or data loss may happen when Kafka expires an uncommitted + * it is highly recommended to tweak transaction timeout (link) >> maximum checkpoint + * duration + maximum restart duration or data loss may happen when the PubSub cluster expires an uncommitted * transaction. * * @param type of the records written diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilder.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilder.java index d5f5467..c059bbc 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilder.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilder.java @@ -146,7 +146,7 @@ public PscSinkBuilder setRecordSerializer( * *

    It is mandatory to always set this value with {@link DeliveryGuarantee#EXACTLY_ONCE} to * prevent corrupted transactions if multiple jobs using the PscSink run against the same - * Kafka Cluster. The default prefix is {@link #transactionalIdPrefix}. + * PubSub Cluster. The default prefix is {@link #transactionalIdPrefix}. * *

    The size of the prefix is capped by {@link #MAXIMUM_PREFIX_BYTES} formatted with UTF-8. * @@ -162,7 +162,7 @@ public PscSinkBuilder setTransactionalIdPrefix(String transactionalIdPrefix) checkState( transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length <= MAXIMUM_PREFIX_BYTES, - "The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size."); + "The configured prefix is too long and the resulting transactionalId might exceed PSC's transactionalIds size."); return this; } @@ -173,7 +173,7 @@ private void sanityCheck() { if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { checkState( transactionalIdPrefix != null, - "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple PscSinks writing to the same Kafka cluster."); + "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple PscSinks writing to the same PSC cluster."); } checkNotNull(recordSerializer, "recordSerializer"); } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java index a11e2e1..fd9e7b1 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java @@ -32,8 +32,8 @@ * *

    Transactions are lingering if they are not tracked anywhere. For example, if a job is started * transactions are opened. A restart without checkpoint would not allow Flink to abort old - * transactions. Since Kafka's transactions are sequential, newly produced data does not become - * visible for read_committed consumers. However, Kafka has no API for querying open transactions, + * transactions. Since PSC's transactions are sequential, newly produced data does not become + * visible for read_committed consumers. However, PSC has no API for querying open transactions, * so they become lingering. * *

    Flink solves this by assuming consecutive transaction ids. On restart of checkpoint C on @@ -119,7 +119,7 @@ private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int // This method will only cease to work if transaction log timeout = topic retention // and a user didn't restart the application for that period of time. Then the first // transactions would vanish from the topic while later transactions are still - // lingering until they are cleaned up by Kafka. Then the user has to wait until the + // lingering until they are cleaned up by PSC. Then the user has to wait until the // other transactions are timed out (which shouldn't take too long). break; } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java index ce75513..8b7dc06 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java @@ -61,17 +61,16 @@ import java.util.function.Supplier; /** - * The Source implementation of Kafka. Please use a {@link PscSourceBuilder} to construct a {@link - * PscSource}. The following example shows how to create a KafkaSource emitting records of + * The Source implementation of PSC. Please use a {@link PscSourceBuilder} to construct a {@link + * PscSource}. The following example shows how to create a PscSource emitting records of * String type. * *

    {@code
    - * KafkaSource source = KafkaSource
    + * PscSource source = PscSource
      *     .builder()
    - *     .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
      *     .setGroupId("MyGroup")
    - *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
    - *     .setDeserializer(new TestingKafkaRecordDeserializationSchema())
    + *     .setTopicUris(Arrays.asList(TOPIC1, TOPIC2))
    + *     .setDeserializer(new TestingPscRecordDeserializationSchema())
      *     .setStartingOffsets(OffsetsInitializer.earliest())
      *     .build();
      * }
    @@ -112,9 +111,9 @@ public class PscSource } /** - * Get a kafkaSourceBuilder to build a {@link PscSource}. + * Get a PscSourceBuilder to build a {@link PscSource}. * - * @return a Kafka source builder. + * @return a PSC source builder. */ public static PscSourceBuilder builder() { return new PscSourceBuilder<>(); diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java index 76aaec3..3215e13 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java @@ -47,45 +47,45 @@ import static org.apache.flink.util.Preconditions.checkState; /** - * The @builder class for {@link org.apache.flink.connector.kafka.source.KafkaSource} to make it easier for the users to construct a {@link - * org.apache.flink.connector.kafka.source.KafkaSource}. + * The @builder class for {@link PscSource} to make it easier for the users to construct a {@link + * PscSource}. * - *

    The following example shows the minimum setup to create a KafkaSource that reads the String - * values from a Kafka topic. + *

    The following example shows the minimum setup to create a PscSource that reads the String + * values from a PSC topic. * *

    {@code
    - * KafkaSource source = KafkaSource
    + * PscSource source = PscSource
      *     .builder()
      *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
      *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
    - *     .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
    + *     .setDeserializer(PscRecordDeserializationSchema.valueOnly(StringDeserializer.class))
      *     .build();
      * }
    * *

    The bootstrap servers, topics/partitions to consume, and the record deserializer are required * fields that must be set. * - *

    To specify the starting offsets of the KafkaSource, one can call {@link + *

    To specify the starting offsets of the PscSource, one can call {@link * #setStartingOffsets(OffsetsInitializer)}. * - *

    By default the KafkaSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never - * stops until the Flink job is canceled or fails. To let the KafkaSource run in {@link + *

    By default the PscSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never + * stops until the Flink job is canceled or fails. To let the PscSource run in {@link * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link - * #setUnbounded(OffsetsInitializer)}. For example the following KafkaSource stops after it consumes + * #setUnbounded(OffsetsInitializer)}. For example the following PscSource stops after it consumes * up to the latest partition offsets at the point when the Flink started. * *

    {@code
    - * KafkaSource source = KafkaSource
    + * PscSource source = PscSource
      *     .builder()
      *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
      *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
    - *     .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
    + *     .setDeserializer(PscRecordDeserializationSchema.valueOnly(StringDeserializer.class))
      *     .setUnbounded(OffsetsInitializer.latest())
      *     .build();
      * }
    * *

    Check the Java docs of each individual methods to learn more about the settings to build a - * KafkaSource. + * PscSource. */ @PublicEvolving public class PscSourceBuilder { @@ -140,7 +140,7 @@ public PscSourceBuilder setGroupId(String groupId) { * * @param topicUris the list of topicRns to consume from. * @return this PscSourceBuilder. - * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) + * @see com.pinterest.psc.consumer.PscConsumer#subscribe(Collection) */ public PscSourceBuilder setTopicUris(List topicUris) { ensureSubscriberIsNull("topics"); @@ -155,7 +155,7 @@ public PscSourceBuilder setTopicUris(List topicUris) { * * @param topicUris the list of topicRns to consume from. * @return this PscSourceBuilder. - * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) + * @see com.pinterest.psc.consumer.PscConsumer#subscribe(Collection) */ public PscSourceBuilder setTopicUris(String... topicUris) { return setTopicUris(Arrays.asList(topicUris)); @@ -166,7 +166,6 @@ public PscSourceBuilder setTopicUris(String... topicUris) { * * @param topicUriPattern the pattern of the topic name to consume from. * @return this PscSourceBuilder. - * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern) */ public PscSourceBuilder setTopicUriPattern(Pattern topicUriPattern) { ensureSubscriberIsNull("topicUri pattern"); @@ -178,8 +177,8 @@ public PscSourceBuilder setTopicUriPattern(Pattern topicUriPattern) { * Set a set of partitions to consume from. * * @param partitions the set of partitions to consume from. - * @return this KafkaSourceBuilder. - * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection) + * @return this PscSourceBuilder. + * @see com.pinterest.psc.consumer.PscConsumer#assign(Collection) */ public PscSourceBuilder setPartitions(Set partitions) { ensureSubscriberIsNull("partitions"); @@ -188,7 +187,7 @@ public PscSourceBuilder setPartitions(Set partitions) { } /** - * Specify from which offsets the KafkaSource should start consume from by providing an {@link + * Specify from which offsets the PscSource should start consume from by providing an {@link * OffsetsInitializer}. * *

    The following {@link OffsetsInitializer}s are commonly used and provided out of the box. @@ -196,7 +195,7 @@ public PscSourceBuilder setPartitions(Set partitions) { * *

      *
    • {@link OffsetsInitializer#earliest()} - starting from the earliest offsets. This is - * also the default {@link OffsetsInitializer} of the KafkaSource for starting offsets. + * also the default {@link OffsetsInitializer} of the PscSource for starting offsets. *
    • {@link OffsetsInitializer#latest()} - starting from the latest offsets. *
    • {@link OffsetsInitializer#committedOffsets()} - starting from the committed offsets of * the consumer group. @@ -207,7 +206,7 @@ public PscSourceBuilder setPartitions(Set partitions) { *
    • {@link OffsetsInitializer#offsets(Map)} - starting from the specified offsets for each * partition. *
    • {@link OffsetsInitializer#timestamp(long)} - starting from the specified timestamp for - * each partition. Note that the guarantee here is that all the records in Kafka whose + * each partition. Note that the guarantee here is that all the records in the backend whose * {@link MessageId#getOffset()} is greater than * the given starting timestamp will be consumed. However, it is possible that some * consumer records whose timestamp is smaller than the given starting timestamp are also @@ -216,7 +215,7 @@ public PscSourceBuilder setPartitions(Set partitions) { * * @param startingOffsetsInitializer the {@link OffsetsInitializer} setting the starting offsets * for the Source. - * @return this KafkaSourceBuilder. + * @return this PscSourceBuilder. */ public PscSourceBuilder setStartingOffsets( OffsetsInitializer startingOffsetsInitializer) { @@ -226,7 +225,7 @@ public PscSourceBuilder setStartingOffsets( /** * By default the PscSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner - * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run as + * and thus never stops until the Flink job fails or is canceled. To let the PscSource run as * a streaming source but still stops at some point, one can set an {@link OffsetsInitializer} * to specify the stopping offsets for each partition. When all the partitions have reached * their stopping offsets, the PscSource will then exit. @@ -241,13 +240,13 @@ public PscSourceBuilder setStartingOffsets( * *
        *
      • {@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when - * the KafkaSource starts to run. + * the PscSource starts to run. *
      • {@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the * consumer group. *
      • {@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each * partition. *
      • {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each - * partition. The guarantee of setting the stopping timestamp is that no Kafka records + * partition. The guarantee of setting the stopping timestamp is that no records * whose {@link MessageId#getTimestamp()} is greater * than the given stopping timestamp will be consumed. However, it is possible that some * records whose timestamp is smaller than the specified stopping timestamp are not @@ -256,7 +255,7 @@ public PscSourceBuilder setStartingOffsets( * * @param stoppingOffsetsInitializer The {@link OffsetsInitializer} to specify the stopping * offset. - * @return this KafkaSourceBuilder. + * @return this PscSourceBuilder. * @see #setBounded(OffsetsInitializer) */ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) { @@ -267,7 +266,7 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit /** * By default the PscSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner - * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in + * and thus never stops until the Flink job fails or is canceled. To let the PscSource run in * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link * OffsetsInitializer} to specify the stopping offsets for each partition. When all the * partitions have reached their stopping offsets, the PscSource will then exit. @@ -281,13 +280,13 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit * *
          *
        • {@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when - * the KafkaSource starts to run. + * the PscSource starts to run. *
        • {@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the * consumer group. *
        • {@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each * partition. *
        • {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each - * partition. The guarantee of setting the stopping timestamp is that no Kafka records + * partition. The guarantee of setting the stopping timestamp is that no records * whose {@link MessageId#getTimestamp()} is greater * than the given stopping timestamp will be consumed. However, it is possible that some * records whose timestamp is smaller than the specified stopping timestamp are not @@ -296,7 +295,7 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit * * @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify the stopping * offsets. - * @return this KafkaSourceBuilder. + * @return this PscSourceBuilder. * @see #setUnbounded(OffsetsInitializer) */ public PscSourceBuilder setBounded(OffsetsInitializer stoppingOffsetsInitializer) { @@ -307,11 +306,11 @@ public PscSourceBuilder setBounded(OffsetsInitializer stoppingOffsetsInitia /** * Sets the {@link PscRecordDeserializationSchema deserializer} of the {@link - * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. + * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for PscSource. * - * @param recordDeserializer the deserializer for Kafka {@link + * @param recordDeserializer the deserializer for PSC {@link * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}. - * @return this KafkaSourceBuilder. + * @return this PscSourceBuilder. */ public PscSourceBuilder setDeserializer( PscRecordDeserializationSchema recordDeserializer) { @@ -321,12 +320,12 @@ public PscSourceBuilder setDeserializer( /** * Sets the {@link PscRecordDeserializationSchema deserializer} of the {@link - * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given + * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for PscSource. The given * {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The * other information (e.g. key) in a ConsumerRecord will be ignored. * * @param deserializationSchema the {@link DeserializationSchema} to use for deserialization. - * @return this KafkaSourceBuilder. + * @return this PscSourceBuilder. */ public PscSourceBuilder setValueOnlyDeserializer( DeserializationSchema deserializationSchema) { @@ -336,20 +335,20 @@ public PscSourceBuilder setValueOnlyDeserializer( } /** - * Sets the client id prefix of this KafkaSource. + * Sets the client id prefix of this PscSource. * - * @param prefix the client id prefix to use for this KafkaSource. - * @return this KafkaSourceBuilder. + * @param prefix the client id prefix to use for this PscSource. + * @return this PscSourceBuilder. */ public PscSourceBuilder setClientIdPrefix(String prefix) { return setProperty(PscSourceOptions.CLIENT_ID_PREFIX.key(), prefix); } /** - * Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found + * Set an arbitrary property for the PscSource and PscConsumer. The valid keys can be found * in {@link PscConfiguration} and {@link PscSourceOptions}. * - *

          Note that the following keys will be overridden by the builder when the KafkaSource is + *

          Note that the following keys will be overridden by the builder when the PscSource is * created. * *

            @@ -364,7 +363,7 @@ public PscSourceBuilder setClientIdPrefix(String prefix) { * * @param key the key of the property. * @param value the value of the property. - * @return this KafkaSourceBuilder. + * @return this PscSourceBuilder. */ public PscSourceBuilder setProperty(String key, String value) { props.setProperty(key, value); @@ -375,7 +374,7 @@ public PscSourceBuilder setProperty(String key, String value) { * Set arbitrary properties for the PscSource and PscConsumer. The valid keys can be found * in {@link PscConfiguration} and {@link PscSourceOptions}. * - *

            Note that the following keys will be overridden by the builder when the KafkaSource is + *

            Note that the following keys will be overridden by the builder when the PscSource is * created. * *

              @@ -390,8 +389,8 @@ public PscSourceBuilder setProperty(String key, String value) { * "group.id-RANDOM_LONG" if the client id prefix is not set. *
            * - * @param props the properties to set for the KafkaSource. - * @return this KafkaSourceBuilder. + * @param props the properties to set for the PscSource. + * @return this PscSourceBuilder. */ public PscSourceBuilder setProperties(Properties props) { this.props.putAll(props); @@ -399,9 +398,9 @@ public PscSourceBuilder setProperties(Properties props) { } /** - * Build the {@link org.apache.flink.connector.kafka.source.KafkaSource}. + * Build the {@link org.apache.flink.connector.kafka.source.PscSource}. * - * @return a KafkaSource with the settings made for this builder. + * @return a PscSource with the settings made for this builder. */ public PscSource build() { sanityCheck(); diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java index 1742b9d..606edb8 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java @@ -25,7 +25,7 @@ import java.util.Properties; import java.util.function.Function; -/** Configurations for KafkaSource. */ +/** Configurations for PscSource. */ @Internal public class PscSourceOptions { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java index 76e58ce..61cafef 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java @@ -28,7 +28,7 @@ /** * An implementation of {@link OffsetsInitializer} which does not initialize anything. * - *

            This class is used as the default stopping offsets initializer for unbounded Kafka sources. + *

            This class is used as the default stopping offsets initializer for unbounded PSC sources. */ @Internal public class NoStoppingOffsetsInitializer implements OffsetsInitializer { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java index 803768c..8061bdf 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java @@ -40,16 +40,16 @@ public interface OffsetsInitializer extends Serializable { /** - * Get the initial offsets for the given Kafka partitions. These offsets will be used as either - * starting offsets or stopping offsets of the Kafka partitions. + * Get the initial offsets for the given PSC partitions. These offsets will be used as either + * starting offsets or stopping offsets of the PSC partitions. * *

            If the implementation returns a starting offset which causes {@code - * OffsetsOutOfRangeException} from Kafka. The offsetResetStrategy provided by the + * OffsetsOutOfRangeException} from PSC. The offsetResetStrategy provided by the * {@link #getAutoOffsetResetStrategy()} will be used to reset the offset. * - * @param partitions the Kafka partitions to get the starting offsets. - * @param partitionOffsetsRetriever a helper to retrieve information of the Kafka partitions. - * @return A mapping from Kafka partition to their offsets to start consuming from. + * @param partitions the PSC partitions to get the starting offsets. + * @param partitionOffsetsRetriever a helper to retrieve information of the PSC partitions. + * @return A mapping from PSC partition to their offsets to start consuming from. */ Map getPartitionOffsets( Collection partitions, @@ -68,7 +68,7 @@ Map getPartitionOffsets( /** * An interface that provides necessary information to the {@link OffsetsInitializer} to get the - * initial offsets of the Kafka partitions. + * initial offsets of the PSC partitions. */ interface PartitionOffsetsRetriever { @@ -76,7 +76,7 @@ interface PartitionOffsetsRetriever { * The group id should be the set for {@link com.pinterest.flink.connector.psc.source.PscSource PscSource} before invoking this * method. Otherwise an {@code IllegalStateException} will be thrown. * - * @throws IllegalStateException if the group id is not set for the {@code KafkaSource}. + * @throws IllegalStateException if the group id is not set for the {@code PscSource}. */ Map committedOffsets(Collection partitions); diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java index 1f1c0fe..b9b352c 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java @@ -30,9 +30,9 @@ public interface OffsetsInitializerValidator { /** - * Validate offsets initializer with properties of Kafka source. + * Validate offsets initializer with properties of PSC source. * - * @param pscSourceProperties Properties of Kafka source + * @param pscSourceProperties Properties of PSC source * @throws IllegalStateException if validation fails */ void validate(Properties pscSourceProperties) throws IllegalStateException; diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java index 4b3f414..db68f5d 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java @@ -31,8 +31,8 @@ /** * A initializer that initialize the partitions to the earliest / latest / last-committed offsets. - * The offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of - * by the {@code KafkaSourceEnumerator}. + * The offsets initialization are taken care of by the {@code PscTopicUriPartitionSplitReader} instead of + * by the {@code PscSourceEnumerator}. * *

            Package private and should be instantiated via {@link org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer}. */ diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java index 78ff3cd..43f3495 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java @@ -47,7 +47,7 @@ public Map getPartitionOffsets( // First get the current end offsets of the partitions. This is going to be used // in case we cannot find a suitable offsets based on the timestamp, i.e. the message - // meeting the requirement of the timestamp have not been produced to Kafka yet, in + // meeting the requirement of the timestamp have not been produced to PSC yet, in // this case, we just use the latest offset. // We need to get the latest offsets before querying offsets by time to ensure that // no message is going to be missed. diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java index 519bbea..da40f85 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java @@ -29,7 +29,7 @@ import java.util.regex.Pattern; /** - * Kafka consumer allows a few different ways to consume from the topics, including: + * PSC consumer allows a few different ways to consume from the topics, including: * *

              *
            1. Subscribe from a collection of topics. @@ -37,7 +37,7 @@ *
            2. Assign specific partitions. *
            * - *

            The KafkaSubscriber provides a unified interface for the Kafka source to support all these + *

            The PscSubscriber provides a unified interface for the PSC source to support all these * three types of subscribing mode. */ @Internal diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java index 5da1dfd..d9ef48b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java @@ -37,7 +37,7 @@ import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicRnMetadata; /** - * A subscriber to a fixed list of topics. The subscribed topics must have existed in the Kafka + * A subscriber to a fixed list of topics. The subscribed topics must have existed in the PSC * cluster, otherwise an exception will be thrown. */ class PscTopicUriListSubscriber implements PscSubscriber { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java index 945c4db..fe0086d 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java @@ -48,7 +48,7 @@ class TopicNamePatternSubscriber implements PscSubscriber { @Override public Set getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) { - LOG.debug("Fetching descriptions for all topics on Kafka cluster"); + LOG.debug("Fetching descriptions for all topics on PubSub cluster"); final Map allTopicRnMetadata = getAllTopicRnMetadata(metadataClient, clusterUri); Set subscribedTopicUriPartitions = new HashSet<>(); diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java index c3b7577..ef40de4 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java @@ -49,7 +49,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -/** The source reader for Kafka partitions. */ +/** The source reader for PSC TopicUriPartitions. */ @Internal public class PscSourceReader extends SingleThreadMultiplexSourceReaderBase< @@ -80,7 +80,7 @@ public PscSourceReader( if (!commitOffsetsOnCheckpoint) { LOG.warn( "Offset commit on checkpoint is disabled. " - + "Consuming offset will not be reported back to Kafka cluster."); + + "Consuming offset will not be reported back to PubSub cluster."); } } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index c8e60bd..9187b06 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -389,7 +389,7 @@ private void removeEmptySplits() throws ConsumerException, ConfigurationExceptio emptyPartitions.stream() .map(PscTopicUriPartitionSplit::toSplitId) .collect(Collectors.toSet())); - // Un-assign partitions from Kafka consumer + // Un-assign partitions from PSC consumer unassignPartitions(emptyPartitions); } } @@ -464,7 +464,7 @@ private void maybeRegisterPscConsumerMetrics( } /** - * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception. + * Catch {@link WakeupException} in PSC consumer call and retry the invocation on exception. * *

            This helper function handles a race condition as below: * diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java index ac87b66..1317359 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java @@ -32,7 +32,7 @@ import java.io.Serializable; import java.util.Map; -/** An interface for the deserialization of Kafka records. */ +/** An interface for the deserialization of PSC messages. */ @PublicEvolving public interface PscRecordDeserializationSchema extends Serializable, ResultTypeQueryable { @@ -97,7 +97,7 @@ static PscRecordDeserializationSchema valueOnly( } /** - * Wraps a Kafka {@link Deserializer} to a {@link PscRecordDeserializationSchema}. + * Wraps a PSC {@link Deserializer} to a {@link PscRecordDeserializationSchema}. * * @param valueDeserializerClass the deserializer class used to deserialize the value. * @param the value type. @@ -110,12 +110,12 @@ static PscRecordDeserializationSchema valueOnly( } /** - * Wraps a Kafka {@link Deserializer} to a {@link PscRecordDeserializationSchema}. + * Wraps a PSC {@link Deserializer} to a {@link PscRecordDeserializationSchema}. * * @param valueDeserializerClass the deserializer class used to deserialize the value. * @param config the configuration of the value deserializer. If the deserializer is an * implementation of {@code Configurable}, the configuring logic will be handled by {@link - * org.apache.kafka.common.Configurable#configure(Map)} with the given config, + * com.pinterest.psc.common.PscPlugin#configure(PscConfiguration)} with the given config, * otherwise {@link Deserializer#configure(com.pinterest.psc.config.PscConfiguration, boolean)} will be invoked. * @param the value type. * @param the type of the deserializer. diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java index eb5a508..a758b2e 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java @@ -42,8 +42,8 @@ import java.util.function.Supplier; /** - * The SplitFetcherManager for Kafka source. This class is needed to help commit the offsets to - * Kafka using the KafkaConsumer inside the {@link + * The SplitFetcherManager for PSC source. This class is needed to help commit the offsets to + * PSC using the PscConsumer inside the {@link * PscTopicUriPartitionSplitReader}. */ @Internal diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java index b5398ba..5536baa 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java @@ -43,9 +43,9 @@ public void setCurrentOffset(long currentOffset) { } /** - * Use the current offset as the starting offset to create a new KafkaPartitionSplit. + * Use the current offset as the starting offset to create a new PscTopicUriPartitionSplit. * - * @return a new KafkaPartitionSplit which uses the current offset as its starting offset. + * @return a new PscTopicUriPartitionSplit which uses the current offset as its starting offset. */ public PscTopicUriPartitionSplit toPscTopicUriPartitionSplit() { return new PscTopicUriPartitionSplit( diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java index 32718fa..fb5c5f6 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java @@ -109,7 +109,7 @@ public class PscConnectorOptions { public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; // -------------------------------------------------------------------------------------------- - // Kafka specific options + // Psc specific options // -------------------------------------------------------------------------------------------- public static final ConfigOption> TOPIC_URI = @@ -133,7 +133,7 @@ public class PscConnectorOptions { .stringType() .noDefaultValue() .withDescription( - "Required consumer group in Kafka consumer, no need for Kafka producer"); + "Required consumer group in PSC consumer, no need for PSC producer"); // -------------------------------------------------------------------------------------------- // Scan specific options @@ -143,7 +143,7 @@ public class PscConnectorOptions { ConfigOptions.key("scan.startup.mode") .enumType(ScanStartupMode.class) .defaultValue(ScanStartupMode.GROUP_OFFSETS) - .withDescription("Startup mode for Kafka consumer."); + .withDescription("Startup mode for PSC consumer."); public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key("scan.startup.specific-offsets") @@ -164,7 +164,7 @@ public class PscConnectorOptions { .durationType() .noDefaultValue() .withDescription( - "Optional interval for consumer to discover dynamically created Kafka partitions periodically."); + "Optional interval for consumer to discover dynamically created backend partitions periodically."); // -------------------------------------------------------------------------------------------- // Sink specific options @@ -177,16 +177,16 @@ public class PscConnectorOptions { .withDescription( Description.builder() .text( - "Optional output partitioning from Flink's partitions into Kafka's partitions. Valid enumerations are") + "Optional output partitioning from Flink's partitions into PSC's backend partitions. Valid enumerations are") .list( text( - "'default' (use kafka default partitioner to partition records)"), + "'default' (use PSC default partitioner to partition records)"), text( - "'fixed' (each Flink partition ends up in at most one Kafka partition)"), + "'fixed' (each Flink partition ends up in at most one PubSub partition)"), text( - "'round-robin' (a Flink partition is distributed to Kafka partitions round-robin when 'key.fields' is not specified)"), + "'round-robin' (a Flink partition is distributed to PubSub partitions round-robin when 'key.fields' is not specified)"), text( - "custom class name (use custom FlinkKafkaPartitioner subclass)")) + "custom class name (use custom FlinkPscPartitioner subclass)")) .build()); // Disable this feature by default @@ -200,7 +200,7 @@ public class PscConnectorOptions { "The max size of buffered records before flushing. " + "When the sink receives many updates on the same key, " + "the buffer will retain the last records of the same key. " - + "This can help to reduce data shuffling and avoid possible tombstone messages to the Kafka topic.") + + "This can help to reduce data shuffling and avoid possible tombstone messages to the PubSub topic.") .linebreak() .text("Can be set to '0' to disable it.") .linebreak() @@ -241,7 +241,7 @@ public class PscConnectorOptions { .withDescription( "If the delivery guarantee is configured as " + DeliveryGuarantee.EXACTLY_ONCE - + " this value is used a prefix for the identifier of all opened Kafka transactions."); + + " this value is used a prefix for the identifier of all opened transactions."); // -------------------------------------------------------------------------------------------- // Enums @@ -253,14 +253,14 @@ public enum ValueFieldsStrategy { EXCEPT_KEY } - /** Startup mode for the Kafka consumer, see {@link #SCAN_STARTUP_MODE}. */ + /** Startup mode for the PSC consumer, see {@link #SCAN_STARTUP_MODE}. */ public enum ScanStartupMode implements DescribedEnum { EARLIEST_OFFSET("earliest-offset", text("Start from the earliest offset possible.")), LATEST_OFFSET("latest-offset", text("Start from the latest offset.")), GROUP_OFFSETS( "group-offsets", text( - "Start from committed offsets in ZooKeeper / Kafka brokers of a specific consumer group.")), + "Start from committed offsets in ZooKeeper / PubSub brokers of a specific consumer group.")), TIMESTAMP("timestamp", text("Start from user-supplied timestamp for each partition.")), SPECIFIC_OFFSETS( "specific-offsets", diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java index 57f8dcc..08d67ec 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java @@ -65,7 +65,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; -/** Utilities for {@link KafkaConnectorOptions}. */ +/** Utilities for {@link PscConnectorOptions}. */ @Internal class PscConnectorOptionsUtil { @@ -81,7 +81,7 @@ class PscConnectorOptionsUtil { public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed"; public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin"; - // Prefix for Kafka specific properties. + // Prefix for PSC specific properties. public static final String PROPERTIES_PREFIX = "properties."; // Other keywords. @@ -169,7 +169,7 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) { } if (!isSingleTopicUri(tableOptions)) { throw new ValidationException( - "Currently Kafka source only supports specific offset for single topic."); + "Currently PSC source only supports specific offset for single topic."); } String specificOffsets = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS); @@ -255,7 +255,7 @@ private static void buildSpecificOffsets( } /** - * Returns the {@link StartupMode} of Kafka Consumer by passed-in table-specific {@link + * Returns the {@link StartupMode} of PSC Consumer by passed-in table-specific {@link * com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.ScanStartupMode}. */ private static StartupMode fromOption(PscConnectorOptions.ScanStartupMode scanStartupMode) { @@ -278,19 +278,19 @@ private static StartupMode fromOption(PscConnectorOptions.ScanStartupMode scanSt } public static Properties getPscProperties(Map tableOptions) { - final Properties kafkaProperties = new Properties(); + final Properties pscProperties = new Properties(); - if (hasKafkaClientProperties(tableOptions)) { + if (hasPscClientProperties(tableOptions)) { tableOptions.keySet().stream() .filter(key -> key.startsWith(PROPERTIES_PREFIX)) .forEach( key -> { final String value = tableOptions.get(key); final String subKey = key.substring((PROPERTIES_PREFIX).length()); - kafkaProperties.put(subKey, value); + pscProperties.put(subKey, value); }); } - return kafkaProperties; + return pscProperties; } /** @@ -368,10 +368,10 @@ public static Map parseSpecificOffsets( } /** - * Decides if the table options contains Kafka client properties that start with prefix + * Decides if the table options contains PSC client properties that start with prefix * 'properties'. */ - private static boolean hasKafkaClientProperties(Map tableOptions) { + private static boolean hasPscClientProperties(Map tableOptions) { return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); } @@ -387,10 +387,10 @@ private static FlinkPscPartitioner initializePartitioner( name, FlinkPscPartitioner.class.getName())); } @SuppressWarnings("unchecked") - final FlinkPscPartitioner kafkaPartitioner = + final FlinkPscPartitioner pscPartitioner = InstantiationUtil.instantiate(name, FlinkPscPartitioner.class, classLoader); - return kafkaPartitioner; + return pscPartitioner; } catch (ClassNotFoundException | FlinkException e) { throw new ValidationException( String.format("Could not find and instantiate partitioner class '%s'", name), @@ -572,7 +572,7 @@ static void validateDeliveryGuarantee(ReadableConfig tableOptions) { // Inner classes // -------------------------------------------------------------------------------------------- - /** Kafka startup options. * */ + /** PSC startup options. * */ public static class StartupOptions { public StartupMode startupMode; public Map specificOffsets; diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java index 518bc97..04a096e 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java @@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; -/** A version-agnostic Kafka {@link DynamicTableSink}. */ +/** A version-agnostic PSC {@link DynamicTableSink}. */ @Internal public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata { @@ -82,10 +82,10 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata /** Data type to configure the formats. */ protected final DataType physicalDataType; - /** Optional format for encoding keys to Kafka. */ + /** Optional format for encoding keys to PSC. */ protected final @Nullable EncodingFormat> keyEncodingFormat; - /** Format for encoding values to Kafka. */ + /** Format for encoding values to PSC. */ protected final EncodingFormat> valueEncodingFormat; /** Indices that determine the key fields and the source position in the consumed row. */ @@ -98,7 +98,7 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata protected final @Nullable String keyPrefix; // -------------------------------------------------------------------------------------------- - // Kafka-specific attributes + // PSC-specific attributes // -------------------------------------------------------------------------------------------- /** The defined delivery guarantee. */ @@ -106,17 +106,17 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata /** * If the {@link #deliveryGuarantee} is {@link DeliveryGuarantee#EXACTLY_ONCE} the value is the - * prefix for all ids of opened Kafka transactions. + * prefix for all ids of opened PubSub transactions. */ @Nullable private final String transactionalIdPrefix; - /** The Kafka topic to write to. */ + /** The PubSub topic to write to. */ protected final String topic; - /** Properties for the Kafka producer. */ + /** Properties for the PSC producer. */ protected final Properties properties; - /** Partitioner to select Kafka partition for each item. */ + /** Partitioner to select PubSub partition for each item. */ protected final @Nullable FlinkPscPartitioner partitioner; /** @@ -128,7 +128,7 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata /** Sink buffer flush config which only supported in upsert mode now. */ protected final SinkBufferFlushMode flushMode; - /** Parallelism of the physical Kafka producer. * */ + /** Parallelism of the physical PubSub producer. * */ protected final @Nullable Integer parallelism; public PscDynamicSink( @@ -161,7 +161,7 @@ public PscDynamicSink( this.transactionalIdPrefix = transactionalIdPrefix; // Mutable attributes this.metadataKeys = Collections.emptyList(); - // Kafka-specific attributes + // PSC-specific attributes this.topic = checkNotNull(topic, "Topic must not be null."); this.properties = checkNotNull(properties, "Properties must not be null."); this.partitioner = partitioner; @@ -171,7 +171,7 @@ public PscDynamicSink( this.flushMode = checkNotNull(flushMode); if (flushMode.isEnabled() && !upsertMode) { throw new IllegalArgumentException( - "Sink buffer flush is only supported in upsert-kafka."); + "Sink buffer flush is only supported in upsert-psc."); } this.parallelism = parallelism; } @@ -275,7 +275,7 @@ public DynamicTableSink copy() { @Override public String asSummaryString() { - return "Kafka table sink"; + return "PSC table sink"; } @Override diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index 55e7f75..528f772 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -70,7 +70,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -/** A version-agnostic Kafka {@link ScanTableSource}. */ +/** A version-agnostic PSC {@link ScanTableSource}. */ @Internal public class PscDynamicSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown { @@ -99,10 +99,10 @@ public class PscDynamicSource /** Data type to configure the formats. */ protected final DataType physicalDataType; - /** Optional format for decoding keys from Kafka. */ + /** Optional format for decoding keys from PSC. */ protected final @Nullable DecodingFormat> keyDecodingFormat; - /** Format for decoding values from Kafka. */ + /** Format for decoding values from PSC. */ protected final DecodingFormat> valueDecodingFormat; /** Indices that determine the key fields and the target position in the produced row. */ @@ -115,16 +115,16 @@ public class PscDynamicSource protected final @Nullable String keyPrefix; // -------------------------------------------------------------------------------------------- - // Kafka-specific attributes + // PSC-specific attributes // -------------------------------------------------------------------------------------------- - /** The Kafka topics to consume. */ + /** The PSC topics to consume. */ protected final List topicUris; - /** The Kafka topic pattern to consume. */ + /** The PSC topic pattern to consume. */ protected final Pattern topicUriPattern; - /** Properties for the Kafka consumer. */ + /** Properties for the PSC consumer. */ protected final Properties properties; /** @@ -181,7 +181,7 @@ public PscDynamicSource( this.producedDataType = physicalDataType; this.metadataKeys = Collections.emptyList(); this.watermarkStrategy = null; - // Kafka-specific attributes + // PSC-specific attributes Preconditions.checkArgument( (topics != null && topicPattern == null) || (topics == null && topicPattern != null), @@ -215,7 +215,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { final TypeInformation producedTypeInfo = context.createTypeInformation(producedDataType); - final PscSource kafkaSource = + final PscSource pscSource = createPscSource(keyDeserialization, valueDeserialization, producedTypeInfo); return new DataStreamScanProvider() { @@ -227,14 +227,14 @@ public DataStream produceDataStream( } DataStreamSource sourceStream = execEnv.fromSource( - kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier); + pscSource, watermarkStrategy, "PscSource-" + tableIdentifier); providerContext.generateUid(PSC_TRANSFORMATION).ifPresent(sourceStream::uid); return sourceStream; } @Override public boolean isBounded() { - return kafkaSource.getBoundedness() == Boundedness.BOUNDED; + return pscSource.getBoundedness() == Boundedness.BOUNDED; } }; } @@ -319,7 +319,7 @@ public DynamicTableSource copy() { @Override public String asSummaryString() { - return "Kafka table source"; + return "PSC table source"; } @Override diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java index 4f44fd5..d2fb294 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java @@ -340,7 +340,7 @@ private static void validatePKConstraints( .orElse(configuration.get(VALUE_FORMAT)); throw new ValidationException( String.format( - "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + "The PSC table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + " on the table, because it can't guarantee the semantic of primary key.", tableName.asSummaryString(), formatName)); } diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java index 501ef98..f51d06d 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java @@ -70,7 +70,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.getSourceTopicUriPattern; import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.getSourceTopicUris; -/** Upsert-Kafka factory. */ +/** Upsert-Psc factory. */ public class UpsertPscDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { @@ -176,7 +176,6 @@ public DynamicTableSink createDynamicTableSink(Context context) { SinkBufferFlushMode flushMode = new SinkBufferFlushMode(batchSize, batchInterval.toMillis()); - // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. // it will use hash partition if key is set else in round-robin behaviour. return new PscDynamicSink( context.getPhysicalRowDataType(), @@ -203,7 +202,7 @@ private Tuple2 createKeyValueProjections(ResolvedCatalogTable cata DataType physicalDataType = schema.toPhysicalRowDataType(); Configuration tableOptions = Configuration.fromMap(catalogTable.getOptions()); - // upsert-kafka will set key.fields to primary key fields by default + // upsert-psc will set key.fields to primary key fields by default tableOptions.set(KEY_FIELDS, keyFields); int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); @@ -241,7 +240,7 @@ private static void validateTopic(ReadableConfig tableOptions) { List topic = tableOptions.get(TOPIC_URI); if (topic.size() > 1) { throw new ValidationException( - "The 'upsert-kafka' connector doesn't support topic list now. " + "The 'upsert-psc' connector doesn't support topic list now. " + "Please use single topic as the value of the parameter 'topic'."); } } @@ -252,7 +251,7 @@ private static void validateFormat( String identifier = tableOptions.get(KEY_FORMAT); throw new ValidationException( String.format( - "'upsert-kafka' connector doesn't support '%s' as key format, " + "'upsert-psc' connector doesn't support '%s' as key format, " + "because '%s' is not in insert-only mode.", identifier, identifier)); } @@ -260,7 +259,7 @@ private static void validateFormat( String identifier = tableOptions.get(VALUE_FORMAT); throw new ValidationException( String.format( - "'upsert-kafka' connector doesn't support '%s' as value format, " + "'upsert-psc' connector doesn't support '%s' as value format, " + "because '%s' is not in insert-only mode.", identifier, identifier)); } @@ -269,9 +268,9 @@ private static void validateFormat( private static void validatePKConstraints(int[] schema) { if (schema.length == 0) { throw new ValidationException( - "'upsert-kafka' tables require to define a PRIMARY KEY constraint. " - + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " - + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys."); + "'upsert-psc' tables require to define a PRIMARY KEY constraint. " + + "The PRIMARY KEY specifies which columns should be read from or write to the PubSub message key. " + + "The PRIMARY KEY also defines records in the 'upsert-psc' table should update or delete on which keys."); } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java index 93a041c..6776162 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java @@ -166,7 +166,7 @@ public void testSerializeRecordWithKey() { } @Test - public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception { + public void testPscKeySerializerWrapperWithoutConfigurable() throws Exception { final Map config = ImmutableMap.of("simpleKey", "simpleValue"); final PscRecordSerializationSchema schema = PscRecordSerializationSchema.builder() @@ -183,7 +183,7 @@ public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception } @Test - public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exception { + public void tesPscValueSerializerWrapperWithoutConfigurable() throws Exception { final Map config = ImmutableMap.of("simpleKey", "simpleValue"); final PscRecordSerializationSchema schema = PscRecordSerializationSchema.builder() @@ -197,7 +197,7 @@ public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exceptio } @Test - public void testSerializeRecordWithKafkaSerializer() throws Exception { + public void testSerializeRecordWithPscSerializer() throws Exception { final Map config = ImmutableMap.of("configKey", "configValue"); final PscRecordSerializationSchema schema = PscRecordSerializationSchema.builder() @@ -289,7 +289,7 @@ private static void assertOnlyOneSerializerAllowed( } /** - * Serializer based on Kafka's serialization stack. This is the special case that implements + * Serializer based on PSC's serialization stack. This is the special case that implements * {@link PscPlugin} * *

            This class must be public to make it instantiable by the tests. @@ -303,7 +303,7 @@ public void configure(Map configs, boolean isKey) { } /** - * Serializer based on Kafka's serialization stack. + * Serializer based on PSC's serialization stack. * *

            This class must be public to make it instantiable by the tests. */ diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java index 3e61d0e..bd744c0 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java @@ -20,7 +20,6 @@ import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.util.TestLoggerExtension; -import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -35,7 +34,6 @@ public class PscSinkBuilderTest { @Test public void testBootstrapServerSettingWithProperties() { Properties testConf = new Properties(); - testConf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "testServer"); PscSinkBuilder builder = new PscSinkBuilder() .setPscProducerConfig(testConf) diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java index d9eab6b..4660b1b 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java @@ -210,18 +210,18 @@ class IntegrationTests extends SinkTestSuiteBase { } @Test - public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception { - writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount); + public void testWriteRecordsToPscWithAtLeastOnceGuarantee() throws Exception { + writeRecordsToPsc(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount); } @Test - public void testWriteRecordsToKafkaWithNoneGuarantee() throws Exception { - writeRecordsToKafka(DeliveryGuarantee.NONE, emittedRecordsCount); + public void testWriteRecordsToPscWithNoneGuarantee() throws Exception { + writeRecordsToPsc(DeliveryGuarantee.NONE, emittedRecordsCount); } @Test - public void testWriteRecordsToKafkaWithExactlyOnceGuarantee() throws Exception { - writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, emittedRecordsWithCheckpoint); + public void testWriteRecordsToPscWithExactlyOnceGuarantee() throws Exception { + writeRecordsToPsc(DeliveryGuarantee.EXACTLY_ONCE, emittedRecordsWithCheckpoint); } @Test @@ -390,7 +390,7 @@ private void testRecoveryWithAssertion( checkProducerLeak(); } - private void writeRecordsToKafka( + private void writeRecordsToPsc( DeliveryGuarantee deliveryGuarantee, SharedReference expectedRecords) throws Exception { final StreamExecutionEnvironment env = new LocalStreamEnvironment(); diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java index b226a59..7212760 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java @@ -49,15 +49,15 @@ class PscTransactionLog { private final String clusterUriStr; /** - * Constructor creating a KafkaTransactionLog. + * Constructor creating a PscTransactionLog. * - * @param kafkaConfig used to configure the {@link com.pinterest.psc.consumer.PscConsumer} to query the topic containing + * @param pscConfig used to configure the {@link com.pinterest.psc.consumer.PscConsumer} to query the topic containing * the transaction information */ - PscTransactionLog(String clusterUriStr, Properties kafkaConfig) { + PscTransactionLog(String clusterUriStr, Properties pscConfig) { this.clusterUriStr = checkNotNull(clusterUriStr); this.consumerConfig = new Properties(); - consumerConfig.putAll(checkNotNull(kafkaConfig, "kafkaConfig")); + consumerConfig.putAll(checkNotNull(pscConfig, "pscConfig")); consumerConfig.put(PscConfiguration.PSC_CONSUMER_KEY_DESERIALIZER, ByteArrayDeserializer.class.getName()); consumerConfig.put(PscConfiguration.PSC_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName()); consumerConfig.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false); diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java index fd54f89..735b248 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java @@ -53,11 +53,11 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; -/** Tests for {@link PscTransactionLog} to retrieve abortable Kafka transactions. */ +/** Tests for {@link PscTransactionLog} to retrieve abortable PSC transactions. */ public class PscTransactionLogITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(PscSinkITCase.class); - private static final String TOPIC_URI_STR = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString() + "kafkaTransactionLogTest"; + private static final String TOPIC_URI_STR = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString() + "pscTransactionLogTest"; private static final String TRANSACTIONAL_ID_PREFIX = "psc-log"; @ClassRule @@ -174,18 +174,12 @@ private static PscProducer createProducer(String transactionalI private static Properties getPscClientConfiguration() { final Properties standardProps = new Properties(); -// standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "flink-tests"); standardProps.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false); standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST); standardProps.put(PscConfiguration.PSC_CONSUMER_PARTITION_FETCH_MAX_BYTES, 256); standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "flink-tests"); injectDiscoveryConfigs(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString()); - -// standardProps.put("group.id", "flink-tests"); -// standardProps.put("enable.auto.commit", false); -// standardProps.put("auto.id.reset", "earliest"); -// standardProps.put("max.partition.fetch.bytes", 256); return standardProps; } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java index 02efd29..c13528c 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java @@ -82,7 +82,7 @@ import static org.apache.flink.util.DockerImageVersions.KAFKA; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for the standalone KafkaWriter. */ +/** Tests for the standalone PscWriter. */ @ExtendWith(TestLoggerExtension.class) public class PscWriterITCase { @@ -134,8 +134,8 @@ public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception { @ParameterizedTest @EnumSource(DeliveryGuarantee.class) public void testNotRegisterMetrics(DeliveryGuarantee guarantee) throws Exception { - assertKafkaMetricNotPresent(guarantee, "flink.disable-metrics", "true"); - assertKafkaMetricNotPresent(guarantee, "register.producer.metrics", "false"); + assertPscMetricNotPresent(guarantee, "flink.disable-metrics", "true"); + assertPscMetricNotPresent(guarantee, "register.producer.metrics", "false"); } @Test @@ -185,7 +185,7 @@ public void testCurrentSendTimeMetric() throws Exception { writer.flush(false); } } catch (IOException | InterruptedException e) { - throw new RuntimeException("Failed writing Kafka record."); + throw new RuntimeException("Failed writing PSC record."); } }); Thread.sleep(500L); @@ -330,7 +330,7 @@ void usePoolForTransactional() throws Exception { .as("Expected different producer") .isTrue(); - // recycle first producer, KafkaCommitter would commit it and then return it + // recycle first producer, PscCommitter would commit it and then return it assertThat(writer.getProducerPool()).hasSize(0); firstProducer.commitTransaction(); committable.getProducer().get().close(); @@ -383,7 +383,7 @@ void testAbortOnClose() throws Exception { } } - private void assertKafkaMetricNotPresent( + private void assertPscMetricNotPresent( DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception { final Properties config = getPscClientConfiguration(); config.put(configKey, configValue); diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java index b2746b9..164aa0e 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.Properties; -/** Kafka dataStream data reader. */ +/** PSC dataStream data reader. */ public class PscDataReader implements ExternalSystemDataReader { private final PscConsumer consumer; diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java index 13ac24d..836b6de 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java @@ -63,7 +63,7 @@ import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.injectDiscoveryConfigs; import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; -/** A Kafka external context that will create only one topic and use partitions in that topic. */ +/** A PSC external context that will create only one topic and use partitions in that topic. */ public class PscSinkExternalContext implements DataStreamSinkV2ExternalContext { private static final Logger LOG = LoggerFactory.getLogger(PscSinkExternalContext.class); diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java index 9b6c7ee..2950052 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java @@ -98,7 +98,7 @@ public class PscSourceITCase { @Nested @TestInstance(Lifecycle.PER_CLASS) - class KafkaSpecificTests { + class PscSpecificTests { @BeforeAll public void setup() throws Throwable { PscSourceTestEnv.setup(); @@ -130,7 +130,6 @@ public void testTimestamp(boolean enableObjectReuse) throws Throwable { putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() -// .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings) .setGroupId("testTimestampAndWatermark") .setTopicUris(topicUri) .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) @@ -312,7 +311,6 @@ public void testPerPartitionWatermark() throws Throwable { putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() -// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) .setTopicUris(watermarkTopic) .setGroupId("watermark-test") .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) @@ -354,7 +352,6 @@ public void testConsumingEmptyTopic() throws Throwable { putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() -// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) .setTopicUris(emptyTopic) .setGroupId("empty-topic-test") .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) @@ -395,7 +392,6 @@ public void testConsumingTopicWithEmptyPartitions() throws Throwable { PscSource source = PscSource.builder() -// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) .setTopicUris(topicWithEmptyPartitions) .setGroupId("topic-with-empty-partition-test") .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java index 70dde42..cdff2bc 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java @@ -145,13 +145,13 @@ public void testStartFromTimestamp() throws Exception { // --- offset committing --- @Test - public void testCommitOffsetsToKafka() throws Exception { - runCommitOffsetsToKafka(); + public void testCommitOffsetsToPsc() throws Exception { + runCommitOffsetsToPsc(); } @Test - public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { - runAutoOffsetRetrievalAndCommitToKafka(); + public void testAutoOffsetRetrievalAndCommitToPsc() throws Exception { + runAutoOffsetRetrievalAndCommitToPsc(); } @Test diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java index f21a26a..b594898 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java @@ -30,9 +30,9 @@ public class PscSourceTestUtils { /** * Create {@link PscSourceReader} with a custom hook handling IDs of finished {@link - * org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit}. + * com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit}. * - * @param pscSource Kafka source + * @param pscSource PSC source * @param sourceReaderContext Context for SourceReader * @param splitFinishedHook Hook for handling finished splits * @param Type of emitting records @@ -46,7 +46,7 @@ public static PscSourceReader createReaderWithFinishedSplitHook( pscSource.createReader(sourceReaderContext, splitFinishedHook)); } - /** Get configuration of KafkaSource. */ + /** Get configuration of PscSource. */ public static Configuration getPscSourceConfiguration(PscSource pscSource) { return pscSource.getConfiguration(); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java index 3adf60c..a92db44 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java @@ -63,12 +63,12 @@ public void testBackwardCompatibility() throws IOException { final Map> splitAssignments = toSplitAssignments(topicPartitions); - // Create bytes in the way of KafkaEnumStateSerializer version 0 doing serialization + // Create bytes in the way of PscEnumStateSerializer version 0 doing serialization final byte[] bytes = SerdeUtils.serializeSplitAssignments( splitAssignments, new PscTopicUriPartitionSplitSerializer()); - // Deserialize above bytes with KafkaEnumStateSerializer version 1 to check backward + // Deserialize above bytes with PscEnumStateSerializer version 1 to check backward // compatibility final PscSourceEnumState pscSourceEnumState = new PscSourceEnumStateSerializer().deserialize(0, bytes); diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java index 0ffc72f..181e89d 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java @@ -119,10 +119,10 @@ public void testNonTrackingTopicPartition() { @Test public void testFailedCommit() { MetricListener metricListener = new MetricListener(); - final PscSourceReaderMetrics kafkaSourceReaderMetrics = + final PscSourceReaderMetrics pscSourceReaderMetrics = new PscSourceReaderMetrics( InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); - kafkaSourceReaderMetrics.recordFailedCommit(); + pscSourceReaderMetrics.recordFailedCommit(); final Optional commitsFailedCounter = metricListener.getCounter( PscSourceReaderMetrics.PSC_SOURCE_READER_METRIC_GROUP, diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java index 747ded3..aa2b4b1 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java @@ -321,7 +321,7 @@ void testPscSourceMetrics() throws Exception { String.format( "Failed to poll %d records until timeout", NUM_RECORDS_PER_SPLIT * 2)); Thread.sleep(100); // Wait for the metric to be updated - // Metric "records-consumed-total" of KafkaConsumer should be NUM_RECORDS_PER_SPLIT + // Metric "records-consumed-total" of PscConsumer should be NUM_RECORDS_PER_SPLIT assertThat(getPscConsumerMetric("records-consumed-total", metricListener)) .isEqualTo(NUM_RECORDS_PER_SPLIT * 2); @@ -354,7 +354,7 @@ void testPscSourceMetrics() throws Exception { "Offsets are not committed successfully. Dangling offsets: %s", reader.getOffsetsToCommit())); - // Metric "commit-total" of KafkaConsumer should be greater than 0 + // Metric "commit-total" of PscConsumer should be greater than 0 // It's hard to know the exactly number of commit because of the retry MatcherAssert.assertThat( getPscConsumerMetric("commit-total", metricListener), @@ -393,7 +393,7 @@ void testAssigningEmptySplits() throws Exception { (PscSourceReader) createReader( Boundedness.BOUNDED, - "KafkaSourceReaderTestGroup", + "PscSourceReaderTestGroup", new TestingReaderContext(), splitFinishedHook)) { reader.addSplits(Arrays.asList(normalSplit, emptySplit)); @@ -448,7 +448,7 @@ void testAssigningEmptySplitOnly() throws Exception { @Override protected SourceReader createReader() throws Exception { - return createReader(Boundedness.BOUNDED, "KafkaSourceReaderTestGroup"); + return createReader(Boundedness.BOUNDED, "PscSourceReaderTestGroup"); } @Override @@ -523,9 +523,6 @@ private SourceReader createReader( PscRecordDeserializationSchema.valueOnly( IntegerDeserializer.class)) .setPartitions(Collections.singleton(new TopicUriPartition(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "AnyTopic", 0))) -// .setProperty( -// ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, -// KafkaSourceTestEnv.brokerConnectionStrings) .setProperty(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, "false") .setProperties(props); if (boundedness == Boundedness.BOUNDED) { @@ -558,11 +555,11 @@ private void pollUntil( } private long getPscConsumerMetric(String name, MetricListener listener) { - final Optional> kafkaConsumerGauge = + final Optional> pscConsumerGauge = listener.getGauge( PSC_SOURCE_READER_METRIC_GROUP, PSC_CONSUMER_METRIC_GROUP, name); - assertThat(kafkaConsumerGauge).isPresent(); - return ((Double) kafkaConsumerGauge.get().getValue()).longValue(); + assertThat(pscConsumerGauge).isPresent(); + return ((Double) pscConsumerGauge.get().getValue()).longValue(); } private long getCurrentOffsetMetric(TopicUriPartition tp, MetricListener listener) { diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java index 673a9c9..75b5e35 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java @@ -62,7 +62,7 @@ public void setUp() { } @Test - public void testKafkaDeserializationSchemaWrapper() throws IOException, DeserializerException { + public void testPscDeserializationSchemaWrapper() throws IOException, DeserializerException { final PscConsumerMessage consumerRecord = getPscConsumerMessage(); PscRecordDeserializationSchema schema = PscRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)); @@ -80,7 +80,7 @@ public void testKafkaDeserializationSchemaWrapper() throws IOException, Deserial } @Test - public void testKafkaValueDeserializationSchemaWrapper() throws IOException, DeserializerException { + public void testPscValueDeserializationSchemaWrapper() throws IOException, DeserializerException { final PscConsumerMessage consumerRecord = getPscConsumerMessage(); PscRecordDeserializationSchema schema = PscRecordDeserializationSchema.valueOnly(new JsonNodeDeserializationSchema()); @@ -96,7 +96,7 @@ public void testKafkaValueDeserializationSchemaWrapper() throws IOException, Des } @Test - public void testKafkaValueDeserializerWrapper() throws Exception { + public void testPscValueDeserializerWrapper() throws Exception { final String topic = "Topic"; byte[] value = new StringSerializer().serialize(topic, "world"); final PscConsumerMessage consumerRecord = @@ -113,7 +113,7 @@ public void testKafkaValueDeserializerWrapper() throws Exception { } @Test - public void testKafkaValueDeserializerWrapperWithoutConfigurable() throws Exception { + public void testPscValueDeserializerWrapperWithoutConfigurable() throws Exception { final Map config = ImmutableMap.of("simpleKey", "simpleValue"); PscRecordDeserializationSchema schema = PscRecordDeserializationSchema.valueOnly(SimpleStringSerializer.class, config); @@ -124,7 +124,7 @@ public void testKafkaValueDeserializerWrapperWithoutConfigurable() throws Except } @Test - public void testKafkaValueDeserializerWrapperWithConfigurable() throws Exception { + public void testPscValueDeserializerWrapperWithConfigurable() throws Exception { final Map config = ImmutableMap.of("configKey", "configValue"); PscRecordDeserializationSchema schema = PscRecordDeserializationSchema.valueOnly( @@ -164,7 +164,7 @@ public void close() { } /** - * Serializer based on Kafka's serialization stack. This is the special case that implements + * Serializer based on PSC's serialization stack. This is the special case that implements * {@link PscPlugin} * *

            This class must be public to make it instantiable by the tests. @@ -178,7 +178,7 @@ public void configure(PscConfiguration pscConfig, boolean isKey) { } /** - * Serializer based on Kafka's serialization stack. + * Serializer based on PSC's serialization stack. * *

            This class must be public to make it instantiable by the tests. */ diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java index a8aab90..6309872 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java @@ -60,9 +60,9 @@ public class PscSourceExternalContext implements DataStreamSourceExternalContext { private static final Logger LOG = LoggerFactory.getLogger(PscSourceExternalContext.class); - private static final String TOPIC_NAME_PREFIX = "kafka-test-topic-"; + private static final String TOPIC_NAME_PREFIX = "psc-test-topic-"; private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile(TOPIC_NAME_PREFIX + ".*"); - private static final String GROUP_ID_PREFIX = "kafka-source-external-context-"; + private static final String GROUP_ID_PREFIX = "psc-source-external-context-"; private static final int NUM_RECORDS_UPPER_BOUND = 500; private static final int NUM_RECORDS_LOWER_BOUND = 100; @@ -177,7 +177,7 @@ public void close() throws Exception { @Override public String toString() { - return "KafkaSource-" + splitMappingMode.toString(); + return "PscSource-" + splitMappingMode.toString(); } private String randomize(String prefix) { @@ -232,8 +232,6 @@ private PscTopicUriPartitionDataWriter scaleOutTopic(String topicName) throws Ex private Properties getPscProducerProperties(int producerId) { Properties pscProducerProperties = new Properties(); -// kafkaProducerProperties.setProperty( -// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); pscProducerProperties.setProperty( PscConfiguration.PSC_PRODUCER_CLIENT_ID, String.join( @@ -249,7 +247,7 @@ private Properties getPscProducerProperties(int producerId) { return pscProducerProperties; } - /** Mode of mapping split to Kafka components. */ + /** Mode of mapping split to PSC components. */ public enum SplitMappingMode { /** Use a single-partitioned topic as a split. */ TOPIC, diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java index efd7247..3215d63 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java @@ -19,7 +19,6 @@ package com.pinterest.flink.connector.psc.testutils; import org.apache.flink.connector.testframe.external.ExternalContextFactory; - import org.testcontainers.containers.KafkaContainer; import java.net.URL; diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java index c2512be..be6be07 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java @@ -35,11 +35,7 @@ import com.pinterest.psc.serde.StringDeserializer; import com.pinterest.psc.serde.StringSerializer; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.RecordsToDelete; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.time.Duration; @@ -53,7 +49,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Function; -import java.util.stream.Collectors; import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.putDiscoveryProperties; import static org.junit.Assert.assertEquals; diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java index 9b16f41..474e308 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Properties; -/** Source split data writer for writing test data into Kafka topic partitions. */ +/** Source split data writer for writing test data into PSC topicUriPartitions. */ public class PscTopicUriPartitionDataWriter implements ExternalSystemSplitDataWriter { private final PscProducer pscProducer; diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java index 8a23555..464b2c7 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java @@ -225,7 +225,7 @@ public void runFailOnNoBrokerTest() throws Exception { * Ensures that the committed offsets to Kafka are the offsets of "the next * record to process". */ - public void runCommitOffsetsToKafka() throws Exception { + public void runCommitOffsetsToPsc() throws Exception { // 3 partitions with 50 records each (0-49, so the expected commit offset of // each partition should be 50) final int parallelism = 3; @@ -312,7 +312,7 @@ public void run() { *

            * See FLINK-3440 as well */ - public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception { + public void runAutoOffsetRetrievalAndCommitToPsc() throws Exception { // 3 partitions with 50 records each (0-49, so the expected commit offset of // each partition should be 50) final int parallelism = 3; diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java index a3c3125..97765fe 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java @@ -171,12 +171,12 @@ public void testStartFromTimestamp() throws Exception { @Test(timeout = 60000) public void testCommitOffsetsToKafka() throws Exception { - runCommitOffsetsToKafka(); + runCommitOffsetsToPsc(); } @Test(timeout = 60000) public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { - runAutoOffsetRetrievalAndCommitToKafka(); + runAutoOffsetRetrievalAndCommitToPsc(); } @Test(timeout = 60000) diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTest.java deleted file mode 100644 index 1f116ab..0000000 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTest.java +++ /dev/null @@ -1,96 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package com.pinterest.flink.streaming.connectors.psc; -// -//import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; -//import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition; -//import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner; -//import com.pinterest.flink.table.descriptors.psc.PscValidator; -//import org.apache.flink.api.common.serialization.DeserializationSchema; -//import org.apache.flink.api.common.serialization.SerializationSchema; -//import org.apache.flink.table.api.TableSchema; -//import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -//import org.apache.flink.types.Row; -// -//import java.util.List; -//import java.util.Map; -//import java.util.Optional; -//import java.util.Properties; -// -///** -// * Test for {@link PscTableSource} and {@link PscTableSink} created -// * by {@link PscTableSourceSinkFactory}. -// */ -//public class PscTableSourceSinkFactoryTest extends PscTableSourceSinkFactoryTestBase { -// -// @Override -// protected String getPscVersion() { -// return PscValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL; -// } -// -// @Override -// protected Class> getExpectedFlinkPscConsumer() { -// return (Class) FlinkPscConsumer.class; -// } -// -// @Override -// protected Class getExpectedFlinkPscProducer() { -// return FlinkPscProducer.class; -// } -// -// @Override -// protected PscTableSourceBase getExpectedPscTableSource( -// TableSchema schema, -// Optional proctimeAttribute, -// List rowtimeAttributeDescriptors, -// Map fieldMapping, -// String topicUri, -// Properties properties, -// DeserializationSchema deserializationSchema, -// StartupMode startupMode, -// Map specificStartupOffsets, -// long startupTimestamp) { -// -// return new PscTableSource( -// schema, -// proctimeAttribute, -// rowtimeAttributeDescriptors, -// Optional.of(fieldMapping), -// topicUri, -// properties, -// deserializationSchema, -// startupMode, -// specificStartupOffsets, -// startupTimestamp); -// } -// -// protected PscTableSinkBase getExpectedPscTableSink( -// TableSchema schema, -// String topicUri, -// Properties properties, -// Optional> partitioner, -// SerializationSchema serializationSchema) { -// -// return new PscTableSink( -// schema, -// topicUri, -// properties, -// partitioner, -// serializationSchema); -// } -//} diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTestBase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTestBase.java deleted file mode 100644 index 38ae40a..0000000 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTestBase.java +++ /dev/null @@ -1,477 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package com.pinterest.flink.streaming.connectors.psc; -// -//import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; -//import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition; -//import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkFixedPartitioner; -//import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner; -//import com.pinterest.flink.table.descriptors.psc.Psc; -//import com.pinterest.flink.table.descriptors.psc.PscValidator; -//import com.pinterest.psc.common.TopicUri; -//import com.pinterest.psc.config.PscConfiguration; -//import org.apache.flink.api.common.JobExecutionResult; -//import org.apache.flink.api.common.serialization.DeserializationSchema; -//import org.apache.flink.api.common.serialization.SerializationSchema; -//import org.apache.flink.api.common.typeinfo.TypeInformation; -//import org.apache.flink.api.dag.Transformation; -//import org.apache.flink.streaming.api.datastream.DataStream; -//import org.apache.flink.streaming.api.datastream.DataStreamSink; -//import org.apache.flink.streaming.api.datastream.DataStreamSource; -//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -//import org.apache.flink.streaming.api.functions.sink.SinkFunction; -//import org.apache.flink.streaming.api.functions.source.SourceFunction; -//import org.apache.flink.streaming.api.graph.StreamGraph; -//import org.apache.flink.table.api.DataTypes; -//import org.apache.flink.table.api.TableSchema; -//import org.apache.flink.table.descriptors.Rowtime; -//import org.apache.flink.table.descriptors.Schema; -//import org.apache.flink.table.descriptors.StreamTableDescriptorValidator; -//import org.apache.flink.table.descriptors.TestTableDescriptor; -//import org.apache.flink.table.factories.StreamTableSinkFactory; -//import org.apache.flink.table.factories.StreamTableSourceFactory; -//import org.apache.flink.table.factories.TableFactoryService; -//import org.apache.flink.table.factories.utils.TestDeserializationSchema; -//import org.apache.flink.table.factories.utils.TestSerializationSchema; -//import org.apache.flink.table.factories.utils.TestTableFormat; -//import org.apache.flink.table.sinks.TableSink; -//import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -//import org.apache.flink.table.sources.TableSource; -//import org.apache.flink.table.sources.TableSourceValidation; -//import org.apache.flink.table.sources.tsextractors.ExistingField; -//import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; -//import org.apache.flink.table.types.DataType; -//import org.apache.flink.types.Row; -//import org.apache.flink.util.TestLogger; -//import org.junit.Ignore; -//import org.junit.Test; -// -//import java.util.Collection; -//import java.util.Collections; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -//import java.util.Optional; -//import java.util.Properties; -// -//import static org.junit.Assert.assertEquals; -//import static org.junit.Assert.assertFalse; -//import static org.junit.Assert.assertTrue; -// -///** -// * Abstract test base for {@link PscTableSourceSinkFactoryBase}. -// */ -//public abstract class PscTableSourceSinkFactoryTestBase extends TestLogger { -// -// private static final String TOPIC_URI = -// TopicUri.DEFAULT_PROTOCOL + ":" + TopicUri.SEPARATOR + "rn:kafka:env:cloud_region::cluster:myTopic"; -// private static final int PARTITION_0 = 0; -// private static final long OFFSET_0 = 100L; -// private static final int PARTITION_1 = 1; -// private static final long OFFSET_1 = 123L; -// private static final String FRUIT_NAME = "fruit-name"; -// private static final String NAME = "name"; -// private static final String COUNT = "count"; -// private static final String TIME = "time"; -// private static final String EVENT_TIME = "event-time"; -// private static final String PROC_TIME = "proc-time"; -// private static final String WATERMARK_EXPRESSION = EVENT_TIME + " - INTERVAL '5' SECOND"; -// private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3); -// private static final String COMPUTED_COLUMN_NAME = "computed-column"; -// private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0"; -// private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3); -// -// private static final Properties PSC_CONFIGURATION = new Properties(); -// -// static { -// PSC_CONFIGURATION.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, "test"); -// } -// -// private static final Map OFFSETS = new HashMap<>(); -// -// static { -// OFFSETS.put(PARTITION_0, OFFSET_0); -// OFFSETS.put(PARTITION_1, OFFSET_1); -// } -// -// @Test -// @SuppressWarnings("unchecked") -// @Ignore("Table API not updated to 1.15 yet") -// public void testTableSource() { -// // prepare parameters for Kafka table source -// final TableSchema schema = TableSchema.builder() -// .field(FRUIT_NAME, DataTypes.STRING()) -// .field(COUNT, DataTypes.DECIMAL(38, 18)) -// .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) -// .field(PROC_TIME, DataTypes.TIMESTAMP(3)) -// .build(); -// -// final List rowtimeAttributeDescriptors = Collections.singletonList( -// new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps())); -// -// final Map fieldMapping = new HashMap<>(); -// fieldMapping.put(FRUIT_NAME, NAME); -// fieldMapping.put(NAME, NAME); -// fieldMapping.put(COUNT, COUNT); -// fieldMapping.put(TIME, TIME); -// -// final Map specificOffsets = new HashMap<>(); -// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0); -// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_1), OFFSET_1); -// -// final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( -// TableSchema.builder() -// .field(NAME, DataTypes.STRING()) -// .field(COUNT, DataTypes.DECIMAL(38, 18)) -// .field(TIME, DataTypes.TIMESTAMP(3)) -// .build().toRowType() -// ); -// -// final PscTableSourceBase expected = getExpectedPscTableSource( -// schema, -// Optional.of(PROC_TIME), -// rowtimeAttributeDescriptors, -// fieldMapping, -// TOPIC_URI, -// PSC_CONFIGURATION, -// deserializationSchema, -// StartupMode.SPECIFIC_OFFSETS, -// specificOffsets, -// 0L); -// -// TableSourceValidation.validateTableSource(expected, schema); -// -// // construct table source using descriptors and table source factory -// final Map propertiesMap = new HashMap<>(); -// propertiesMap.putAll(createPscSourceProperties()); -// propertiesMap.put("schema.watermark.0.rowtime", EVENT_TIME); -// propertiesMap.put("schema.watermark.0.strategy.expr", WATERMARK_EXPRESSION); -// propertiesMap.put("schema.watermark.0.strategy.data-type", WATERMARK_DATATYPE.toString()); -// propertiesMap.put("schema.4.name", COMPUTED_COLUMN_NAME); -// propertiesMap.put("schema.4.data-type", COMPUTED_COLUMN_DATATYPE.toString()); -// propertiesMap.put("schema.4.expr", COMPUTED_COLUMN_EXPRESSION); -// -// final TableSource actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap) -// .createStreamTableSource(propertiesMap); -// -// assertEquals(expected, actualSource); -// -// // test PSC consumer -// final PscTableSourceBase actualPscSource = (PscTableSourceBase) actualSource; -// final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock(); -// actualPscSource.getDataStream(mock); -// assertTrue(getExpectedFlinkPscConsumer().isAssignableFrom(mock.sourceFunction.getClass())); -// // Test commitOnCheckpoints flag should be true when set consumer group. -// assertTrue(((FlinkPscConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints()); -// } -// -// @Test -// @Ignore("Table API not updated to 1.15 yet") -// public void testTableSourceCommitOnCheckpointsDisabled() { -// Map propertiesMap = new HashMap<>(); -// createPscSourceProperties().forEach((k, v) -> { -// if (!k.equals(PscValidator.CONNECTOR_PROPERTIES_GROUP_ID)) { -// propertiesMap.put(k, v); -// } -// }); -// final TableSource tableSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap) -// .createStreamTableSource(propertiesMap); -// final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock(); -// // Test commitOnCheckpoints flag should be false when do not set consumer group. -// ((PscTableSourceBase) tableSource).getDataStream(mock); -// assertTrue(mock.sourceFunction instanceof FlinkPscConsumerBase); -// assertFalse(((FlinkPscConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints()); -// } -// -// @Test -// @SuppressWarnings("unchecked") -// @Ignore("Table API not updated to 1.15 yet") -// public void testTableSourceWithLegacyProperties() { -// // prepare parameters for Kafka table source -// final TableSchema schema = TableSchema.builder() -// .field(FRUIT_NAME, DataTypes.STRING()) -// .field(COUNT, DataTypes.DECIMAL(38, 18)) -// .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) -// .field(PROC_TIME, DataTypes.TIMESTAMP(3)) -// .build(); -// -// final List rowtimeAttributeDescriptors = Collections.singletonList( -// new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps())); -// -// final Map fieldMapping = new HashMap<>(); -// fieldMapping.put(FRUIT_NAME, NAME); -// fieldMapping.put(NAME, NAME); -// fieldMapping.put(COUNT, COUNT); -// fieldMapping.put(TIME, TIME); -// -// final Map specificOffsets = new HashMap<>(); -// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0); -// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_1), OFFSET_1); -// -// final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( -// TableSchema.builder() -// .field(NAME, DataTypes.STRING()) -// .field(COUNT, DataTypes.DECIMAL(38, 18)) -// .field(TIME, DataTypes.TIMESTAMP(3)) -// .build().toRowType() -// ); -// -// final PscTableSourceBase expected = getExpectedPscTableSource( -// schema, -// Optional.of(PROC_TIME), -// rowtimeAttributeDescriptors, -// fieldMapping, -// TOPIC_URI, -// PSC_CONFIGURATION, -// deserializationSchema, -// StartupMode.SPECIFIC_OFFSETS, -// specificOffsets, -// 0L); -// -// TableSourceValidation.validateTableSource(expected, schema); -// -// // construct table source using descriptors and table source factory -// final Map legacyPropertiesMap = new HashMap<>(); -// legacyPropertiesMap.putAll(createPscSourceProperties()); -// -// // use legacy properties -// legacyPropertiesMap.remove(PscValidator.CONNECTOR_SPECIFIC_OFFSETS); -// legacyPropertiesMap.remove(PscValidator.CONNECTOR_PROPERTIES_GROUP_ID); -// -// // keep compatible with a specified update-mode -// legacyPropertiesMap.put(StreamTableDescriptorValidator.UPDATE_MODE, "append"); -// -// // legacy properties for specific-offsets and properties -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.partition", "0"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.offset", "100"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.partition", "1"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.offset", "123"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.key", PscConfiguration.PSC_CONSUMER_GROUP_ID); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.value", "test"); -// -// final TableSource actualSource = TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap) -// .createStreamTableSource(legacyPropertiesMap); -// -// assertEquals(expected, actualSource); -// -// // test Kafka consumer -// final PscTableSourceBase actualKafkaSource = (PscTableSourceBase) actualSource; -// final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock(); -// actualKafkaSource.getDataStream(mock); -// assertTrue(getExpectedFlinkPscConsumer().isAssignableFrom(mock.sourceFunction.getClass())); -// } -// -// protected Map createPscSourceProperties() { -// return new TestTableDescriptor( -// new Psc() -// .version(getPscVersion()) -// .topicUri(TOPIC_URI) -// .properties(PSC_CONFIGURATION) -// .sinkPartitionerRoundRobin() // test if accepted although not needed -// .startFromSpecificOffsets(OFFSETS)) -// .withFormat(new TestTableFormat()) -// .withSchema( -// new Schema() -// .field(FRUIT_NAME, DataTypes.STRING()).from(NAME) -// .field(COUNT, DataTypes.DECIMAL(38, 18)) // no from so it must match with the input -// .field(EVENT_TIME, DataTypes.TIMESTAMP(3)).rowtime( -// new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending()) -// .field(PROC_TIME, DataTypes.TIMESTAMP(3)).proctime()) -// .toProperties(); -// } -// -// -// /** -// * This test can be unified with the corresponding source test once we have fixed FLINK-9870. -// */ -// @Test -// @Ignore("Table API not updated to 1.15 yet") -// public void testTableSink() { -// // prepare parameters for Kafka table sink -// final TableSchema schema = TableSchema.builder() -// .field(FRUIT_NAME, DataTypes.STRING()) -// .field(COUNT, DataTypes.DECIMAL(10, 4)) -// .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) -// .build(); -// -// final PscTableSinkBase expected = getExpectedPscTableSink( -// schema, -// TOPIC_URI, -// PSC_CONFIGURATION, -// Optional.of(new FlinkFixedPartitioner<>()), -// new TestSerializationSchema(schema.toRowType())); -// -// // construct table sink using descriptors and table sink factory -// final Map propertiesMap = createPscSinkProperties(); -// final TableSink actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap) -// .createStreamTableSink(propertiesMap); -// -// assertEquals(expected, actualSink); -// -// // test Kafka producer -// final PscTableSinkBase actualPscSink = (PscTableSinkBase) actualSink; -// final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType()); -// actualPscSink.consumeDataStream(streamMock); -// assertTrue(getExpectedFlinkPscProducer().isAssignableFrom(streamMock.sinkFunction.getClass())); -// } -// -// @Test -// @Ignore("Table API not updated to 1.15 yet") -// public void testTableSinkWithLegacyProperties() { -// // prepare parameters for PSC table sink -// final TableSchema schema = TableSchema.builder() -// .field(FRUIT_NAME, DataTypes.STRING()) -// .field(COUNT, DataTypes.DECIMAL(10, 4)) -// .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) -// .build(); -// -// final PscTableSinkBase expected = getExpectedPscTableSink( -// schema, -// TOPIC_URI, -// PSC_CONFIGURATION, -// Optional.of(new FlinkFixedPartitioner<>()), -// new TestSerializationSchema(schema.toRowType())); -// -// // construct table sink using descriptors and table sink factory -// final Map legacyPropertiesMap = new HashMap<>(); -// legacyPropertiesMap.putAll(createPscSinkProperties()); -// -// // use legacy properties -// legacyPropertiesMap.remove(PscValidator.CONNECTOR_SPECIFIC_OFFSETS); -// legacyPropertiesMap.remove(PscValidator.CONNECTOR_PROPERTIES_GROUP_ID); -// -// // keep compatible with a specified update-mode -// legacyPropertiesMap.put(StreamTableDescriptorValidator.UPDATE_MODE, "append"); -// -// // legacy properties for specific-offsets and properties -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.partition", "0"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.offset", "100"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.partition", "1"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.offset", "123"); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.key", PscConfiguration.PSC_CONSUMER_GROUP_ID); -// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.value", "test"); -// -// final TableSink actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap) -// .createStreamTableSink(legacyPropertiesMap); -// -// assertEquals(expected, actualSink); -// -// // test PSC producer -// final PscTableSinkBase actualKafkaSink = (PscTableSinkBase) actualSink; -// final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType()); -// actualKafkaSink.consumeDataStream(streamMock); -// assertTrue(getExpectedFlinkPscProducer().isAssignableFrom(streamMock.sinkFunction.getClass())); -// } -// -// protected Map createPscSinkProperties() { -// return new TestTableDescriptor( -// new Psc() -// .version(getPscVersion()) -// .topicUri(TOPIC_URI) -// .properties(PSC_CONFIGURATION) -// .sinkPartitionerFixed() -// .startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed -// .withFormat(new TestTableFormat()) -// .withSchema( -// new Schema() -// .field(FRUIT_NAME, DataTypes.STRING()) -// .field(COUNT, DataTypes.DECIMAL(10, 4)) -// .field(EVENT_TIME, DataTypes.TIMESTAMP(3))) -// .inAppendMode() -// .toProperties(); -// } -// -// private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment { -// -// public SourceFunction sourceFunction; -// -// @Override -// public DataStreamSource addSource(SourceFunction sourceFunction) { -// this.sourceFunction = sourceFunction; -// return super.addSource(sourceFunction); -// } -// -// @Override -// public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { -// throw new UnsupportedOperationException(); -// } -// } -// -// private static class DataStreamMock extends DataStream { -// -// public SinkFunction sinkFunction; -// -// public DataStreamMock(StreamExecutionEnvironment environment, TypeInformation outType) { -// super(environment, new TransformationMock("name", outType, 1)); -// } -// -// @Override -// public DataStreamSink addSink(SinkFunction sinkFunction) { -// this.sinkFunction = sinkFunction; -// return super.addSink(sinkFunction); -// } -// } -// -// private static class TransformationMock extends Transformation { -// -// public TransformationMock(String name, TypeInformation outputType, int parallelism) { -// super(name, outputType, parallelism); -// } -// -// @Override -// public List> getTransitivePredecessors() { -// return null; -// } -// -// @Override -// public List> getInputs() { -// return null; -// } -// -// } -// -// // -------------------------------------------------------------------------------------------- -// // For version-specific tests -// // -------------------------------------------------------------------------------------------- -// -// protected abstract String getPscVersion(); -// -// protected abstract Class> getExpectedFlinkPscConsumer(); -// -// protected abstract Class getExpectedFlinkPscProducer(); -// -// protected abstract PscTableSourceBase getExpectedPscTableSource( -// TableSchema schema, -// Optional proctimeAttribute, -// List rowtimeAttributeDescriptors, -// Map fieldMapping, -// String topicUri, -// Properties properties, -// DeserializationSchema deserializationSchema, -// StartupMode startupMode, -// Map specificStartupOffsets, -// long startupTimestampMillis); -// -// protected abstract PscTableSinkBase getExpectedPscTableSink( -// TableSchema schema, -// String topicUri, -// Properties properties, -// Optional> partitioner, -// SerializationSchema serializationSchema); -//} diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java index 7f7f6f2..bd4e40c 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java @@ -43,7 +43,7 @@ import static com.pinterest.flink.streaming.connectors.psc.table.PscTableTestUtils.readLines; import static com.pinterest.flink.streaming.connectors.psc.table.PscTableTestUtils.waitingExpectedResults; -/** IT cases for Kafka with changelog format for Table API & SQL. */ +/** IT cases for Psc with changelog format for Table API & SQL. */ public class PscChangelogTableITCase extends PscTableTestBase { @Before @@ -54,7 +54,7 @@ public void before() { } @Test - public void testKafkaDebeziumChangelogSource() throws Exception { + public void testPscDebeziumChangelogSource() throws Exception { final String topic = "changelog_topic"; final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; createTestTopic(topic, 1, 1); @@ -67,15 +67,15 @@ public void testKafkaDebeziumChangelogSource() throws Exception { tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); - // ---------- Write the Debezium json into Kafka ------------------- + // ---------- Write the Debezium json into Psc ------------------- List lines = readLines("debezium-data-schema-exclude.txt"); try { - writeRecordsToKafka(topicUri, lines); + writeRecordsToPsc(topicUri, lines); } catch (Exception e) { - throw new Exception("Failed to write debezium data to Kafka.", e); + throw new Exception("Failed to write debezium data to PSC.", e); } - // ---------- Produce an event time stream into Kafka ------------------- + // ---------- Produce an event time stream into PSC ------------------- String bootstraps = getBootstrapServers(); String sourceDDL = String.format( @@ -203,15 +203,15 @@ public void testPscCanalChangelogSource() throws Exception { tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); - // ---------- Write the Canal json into Kafka ------------------- + // ---------- Write the Canal json into PSC ------------------- List lines = readLines("canal-data.txt"); try { - writeRecordsToKafka(topicUri, lines); + writeRecordsToPsc(topicUri, lines); } catch (Exception e) { throw new Exception("Failed to write canal data to PSC.", e); } - // ---------- Produce an event time stream into Kafka ------------------- + // ---------- Produce an event time stream into PSC ------------------- String bootstraps = getBootstrapServers(); String sourceDDL = String.format( @@ -336,7 +336,7 @@ public void testPscCanalChangelogSource() throws Exception { } @Test - public void testKafkaMaxwellChangelogSource() throws Exception { + public void testPscMaxwellChangelogSource() throws Exception { final String topic = "changelog_maxwell"; final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; createTestTopic(topic, 1, 1); @@ -351,15 +351,15 @@ public void testKafkaMaxwellChangelogSource() throws Exception { tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); - // ---------- Write the Maxwell json into Kafka ------------------- + // ---------- Write the Maxwell json into PSC ------------------- List lines = readLines("maxwell-data.txt"); try { - writeRecordsToKafka(topicUri, lines); + writeRecordsToPsc(topicUri, lines); } catch (Exception e) { throw new Exception("Failed to write maxwell data to PSC.", e); } - // ---------- Produce an event time stream into Kafka ------------------- + // ---------- Produce an event time stream into PSC ------------------- String bootstraps = getBootstrapServers(); String sourceDDL = String.format( @@ -478,7 +478,7 @@ public void testKafkaMaxwellChangelogSource() throws Exception { deleteTestTopic(topic); } - private void writeRecordsToKafka(String topic, List lines) throws Exception { + private void writeRecordsToPsc(String topic, List lines) throws Exception { DataStreamSource stream = env.fromCollection(lines); SerializationSchema serSchema = new SimpleStringSchema(); FlinkPscPartitioner partitioner = new FlinkFixedPartitioner<>(); @@ -488,9 +488,6 @@ private void writeRecordsToKafka(String topic, List lines) throws Except producerProperties.setProperty("retries", "0"); stream.sinkTo( PscSink.builder() -// .setBootstrapServers( -// producerProperties.getProperty( -// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) .setRecordSerializer( PscRecordSerializationSchema.builder() .setTopicUriString(topic) diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java index e42c052..e7f61e7 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java @@ -194,7 +194,7 @@ public class PscDynamicTableFactoryTest { @Test public void testTableSource() { final DynamicTableSource actualSource = createTableSource(SCHEMA, getBasicSourceOptions()); - final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource; + final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource; final Map specificOffsets = new HashMap<>(); specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0); @@ -204,7 +204,7 @@ public void testTableSource() { new DecodingFormatMock(",", true); // Test scan source equals - final PscDynamicSource expectedKafkaSource = + final PscDynamicSource expectedPscSource = createExpectedScanSource( SCHEMA_DATA_TYPE, null, @@ -218,10 +218,10 @@ public void testTableSource() { StartupMode.SPECIFIC_OFFSETS, specificOffsets, 0); - assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); + assertThat(actualPscSource).isEqualTo(expectedPscSource); ScanTableSource.ScanRuntimeProvider provider = - actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); assertPscSource(provider); } @@ -246,7 +246,7 @@ public void testTableSourceWithPattern() { new DecodingFormatMock(",", true); // Test scan source equals - final PscDynamicSource expectedKafkaSource = + final PscDynamicSource expectedPscSource = createExpectedScanSource( SCHEMA_DATA_TYPE, null, @@ -260,11 +260,11 @@ public void testTableSourceWithPattern() { StartupMode.EARLIEST, specificOffsets, 0); - final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource; - assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); + final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource; + assertThat(actualPscSource).isEqualTo(expectedPscSource); ScanTableSource.ScanRuntimeProvider provider = - actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); assertPscSource(provider); } @@ -272,9 +272,9 @@ public void testTableSourceWithPattern() { @Test public void testTableSourceWithKeyValue() { final DynamicTableSource actualSource = createTableSource(SCHEMA, getKeyValueOptions()); - final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource; + final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource; // initialize stateful testing formats - actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); final DecodingFormatMock keyDecodingFormat = new DecodingFormatMock("#", false); keyDecodingFormat.producedDataType = @@ -287,7 +287,7 @@ public void testTableSourceWithKeyValue() { DataTypes.FIELD(TIME, DataTypes.TIMESTAMP(3))) .notNull(); - final PscDynamicSource expectedKafkaSource = + final PscDynamicSource expectedPscSource = createExpectedScanSource( SCHEMA_DATA_TYPE, keyDecodingFormat, @@ -302,7 +302,7 @@ public void testTableSourceWithKeyValue() { Collections.emptyMap(), 0); - assertThat(actualSource).isEqualTo(expectedKafkaSource); + assertThat(actualSource).isEqualTo(expectedPscSource); } @Test @@ -311,12 +311,12 @@ public void testTableSourceWithKeyValueAndMetadata() { options.put("value.test-format.readable-metadata", "metadata_1:INT, metadata_2:STRING"); final DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, options); - final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource; + final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource; // initialize stateful testing formats - actualKafkaSource.applyReadableMetadata( + actualPscSource.applyReadableMetadata( Arrays.asList("timestamp", "value.metadata_2"), SCHEMA_WITH_METADATA.toSourceRowDataType()); - actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); final DecodingFormatMock expectedKeyFormat = new DecodingFormatMock( @@ -464,10 +464,10 @@ public void testTableSink() { "psc-sink"); assertThat(actualSink).isEqualTo(expectedSink); - // Test kafka producer. - final PscDynamicSink actualKafkaSink = (PscDynamicSink) actualSink; + // Test PSC producer. + final PscDynamicSink actualPscSink = (PscDynamicSink) actualSink; DynamicTableSink.SinkRuntimeProvider provider = - actualKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + actualPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); assertThat(provider).isInstanceOf(SinkV2Provider.class); final SinkV2Provider sinkProvider = (SinkV2Provider) provider; final Sink sinkFunction = sinkProvider.createSink(); @@ -516,9 +516,9 @@ public void testTableSinkWithKeyValue() { options.put("sink.transactional-id-prefix", "psc-sink"); }); final DynamicTableSink actualSink = createTableSink(SCHEMA, modifiedOptions); - final PscDynamicSink actualKafkaSink = (PscDynamicSink) actualSink; + final PscDynamicSink actualPscSink = (PscDynamicSink) actualSink; // initialize stateful testing formats - actualKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + actualPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); final EncodingFormatMock keyEncodingFormat = new EncodingFormatMock("#"); keyEncodingFormat.consumedDataType = @@ -921,7 +921,7 @@ public void testPrimaryKeyValidation() { .isThrownBy(() -> createTableSink(pkSchema, getBasicSinkOptions())) .havingRootCause() .withMessage( - "The Kafka table 'default.default.t1' with 'test-format' format" + "The PSC table 'default.default.t1' with 'test-format' format" + " doesn't support defining PRIMARY KEY constraint on the table, because it can't" + " guarantee the semantic of primary key."); @@ -929,7 +929,7 @@ public void testPrimaryKeyValidation() { .isThrownBy(() -> createTableSink(pkSchema, getKeyValueOptions())) .havingRootCause() .withMessage( - "The Kafka table 'default.default.t1' with 'test-format' format" + "The PSC table 'default.default.t1' with 'test-format' format" + " doesn't support defining PRIMARY KEY constraint on the table, because it can't" + " guarantee the semantic of primary key."); @@ -950,7 +950,7 @@ public void testPrimaryKeyValidation() { .isThrownBy(() -> createTableSource(pkSchema, getBasicSourceOptions())) .havingRootCause() .withMessage( - "The Kafka table 'default.default.t1' with 'test-format' format" + "The PSC table 'default.default.t1' with 'test-format' format" + " doesn't support defining PRIMARY KEY constraint on the table, because it can't" + " guarantee the semantic of primary key."); } @@ -1033,12 +1033,11 @@ private static Map getModifiedOptions( private static Map getBasicSourceOptions() { Map tableOptions = new HashMap<>(); - // Kafka specific options. + // PSC specific options. tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER); tableOptions.put("topic-uri", TOPIC_URI); tableOptions.put("properties.psc.consumer.group.id", "dummy"); tableOptions.put("properties.psc.cluster.uri", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); -// tableOptions.put("properties.bootstrap.servers", "dummy"); tableOptions.put("scan.startup.mode", "specific-offsets"); tableOptions.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS); tableOptions.put("scan.topic-partition-discovery.interval", DISCOVERY_INTERVAL); @@ -1058,7 +1057,7 @@ private static Map getBasicSourceOptions() { private static Map getBasicSinkOptions() { Map tableOptions = new HashMap<>(); - // Kafka specific options. + // PSC specific options. tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER); tableOptions.put("topic-uri", TOPIC_URI); tableOptions.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); @@ -1078,7 +1077,7 @@ private static Map getBasicSinkOptions() { private static Map getKeyValueOptions() { Map tableOptions = new HashMap<>(); - // Kafka specific options. + // PSC specific options. tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER); tableOptions.put("topic-uri", TOPIC_URI); tableOptions.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java index 015ec84..32339a8 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java @@ -97,7 +97,7 @@ public void testPscSourceSink() throws Exception { final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; createTestTopic(topic, 1, 1); - // ---------- Produce an event time stream into Kafka ------------------- + // ---------- Produce an event time stream into PSC ------------------- String groupId = getStandardProps().getProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID); String bootstraps = getBootstrapServers(); @@ -218,7 +218,7 @@ public void testPscTableWithMultipleTopics() throws Exception { "%s_%s_%s", currency, format, UUID.randomUUID())) .collect(Collectors.toList()); List topicUris = topics.stream().map(t -> PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + t).collect(Collectors.toList()); - // Because kafka connector currently doesn't support write data into multiple topic + // Because psc connector currently doesn't support write data into multiple topic // together, // we have to create multiple sink tables. IntStream.range(0, 4) diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java index b5e8629..01fc25a 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java @@ -59,7 +59,7 @@ import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.injectDiscoveryConfigs; -/** Base class for Kafka Table IT Cases. */ +/** Base class for PSC Table IT Cases. */ public abstract class PscTableTestBase extends AbstractTestBase { private static final Logger LOG = LoggerFactory.getLogger(PscTableTestBase.class); diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java index 65cf430..d754c34 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java @@ -45,7 +45,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -/** Utils for kafka table tests. */ +/** Utils for psc table tests. */ public class PscTableTestUtils { public static List collectRows(Table table, int expectedSize) throws Exception { final TableResult result = table.execute(); diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java index da6fb29..44d2a52 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java @@ -167,10 +167,10 @@ public void testTableSource() { UPSERT_PSC_SOURCE_PROPERTIES); assertEquals(actualSource, expectedSource); - final PscDynamicSource actualUpsertKafkaSource = (PscDynamicSource) actualSource; + final PscDynamicSource actualUpsertPscSource = (PscDynamicSource) actualSource; ScanTableSource.ScanRuntimeProvider provider = - actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); - assertKafkaSource(provider); + actualUpsertPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertPscSource(provider); } @Test @@ -193,12 +193,12 @@ public void testTableSink() { null); // Test sink format. - final PscDynamicSink actualUpsertKafkaSink = (PscDynamicSink) actualSink; + final PscDynamicSink actualUpsertPscSink = (PscDynamicSink) actualSink; assertEquals(expectedSink, actualSink); - // Test kafka producer. + // Test PSC producer. DynamicTableSink.SinkRuntimeProvider provider = - actualUpsertKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + actualUpsertPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); assertThat(provider, instanceOf(SinkV2Provider.class)); final SinkV2Provider sinkFunctionProvider = (SinkV2Provider) provider; final Sink sink = sinkFunctionProvider.createSink(); @@ -234,12 +234,12 @@ public void testBufferedTableSink() { null); // Test sink format. - final PscDynamicSink actualUpsertKafkaSink = (PscDynamicSink) actualSink; + final PscDynamicSink actualUpsertPscSink = (PscDynamicSink) actualSink; assertEquals(expectedSink, actualSink); - // Test kafka producer. + // Test PSC producer. DynamicTableSink.SinkRuntimeProvider provider = - actualUpsertKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + actualUpsertPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); assertThat(provider, instanceOf(DataStreamSinkProvider.class)); final DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider) provider; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -353,7 +353,7 @@ private void verifyEncoderSubject( String expectedValueSubject, String expectedKeySubject) { Map options = new HashMap<>(); - // Kafka specific options. + // PSC specific options. options.put("connector", UpsertPscDynamicTableFactory.IDENTIFIER); options.put("topic-uri", SINK_TOPIC_URI); options.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); @@ -407,9 +407,9 @@ public void testCreateSourceTableWithoutPK() { thrown.expect( containsCause( new ValidationException( - "'upsert-kafka' tables require to define a PRIMARY KEY constraint. " - + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " - + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys."))); + "'upsert-psc' tables require to define a PRIMARY KEY constraint. " + + "The PRIMARY KEY specifies which columns should be read from or write to the PubSub message key. " + + "The PRIMARY KEY also defines records in the 'upsert-psc' table should update or delete on which keys."))); ResolvedSchema illegalSchema = ResolvedSchema.of( @@ -425,9 +425,9 @@ public void testCreateSinkTableWithoutPK() { thrown.expect( containsCause( new ValidationException( - "'upsert-kafka' tables require to define a PRIMARY KEY constraint. " - + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " - + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys."))); + "'upsert-psc' tables require to define a PRIMARY KEY constraint. " + + "The PRIMARY KEY specifies which columns should be read from or write to the PubSub message key. " + + "The PRIMARY KEY also defines records in the 'upsert-psc' table should update or delete on which keys."))); ResolvedSchema illegalSchema = ResolvedSchema.of( @@ -443,7 +443,7 @@ public void testSerWithCDCFormatAsValue() { containsCause( new ValidationException( String.format( - "'upsert-kafka' connector doesn't support '%s' as value format, " + "'upsert-psc' connector doesn't support '%s' as value format, " + "because '%s' is not in insert-only mode.", TestFormatFactory.IDENTIFIER, TestFormatFactory.IDENTIFIER)))); @@ -468,7 +468,7 @@ public void testDeserWithCDCFormatAsValue() { containsCause( new ValidationException( String.format( - "'upsert-kafka' connector doesn't support '%s' as value format, " + "'upsert-psc' connector doesn't support '%s' as value format, " + "because '%s' is not in insert-only mode.", TestFormatFactory.IDENTIFIER, TestFormatFactory.IDENTIFIER)))); @@ -651,7 +651,7 @@ private static PscDynamicSink createExpectedSink( null); } - private void assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) { + private void assertPscSource(ScanTableSource.ScanRuntimeProvider provider) { assertThat(provider, instanceOf(DataStreamScanProvider.class)); final DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) provider; final Transformation transformation = diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java index 66fa86c..61938cf 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java @@ -92,8 +92,8 @@ public void testTemporalJoin() throws Exception { // Kafka DefaultPartitioner's hash strategy is slightly different from Flink // KeyGroupStreamPartitioner, // which causes the records in the different Flink partitions are written into the same - // Kafka partition. - // When reading from the out-of-order Kafka partition, we need to set suitable watermark + // PubSub partition. + // When reading from the out-of-order PubSub partition, we need to set suitable watermark // interval to // tolerate the disorderliness. // For convenience, we just set the parallelism 1 to make all records are in the same Flink @@ -309,7 +309,7 @@ public void testSourceSinkWithKeyAndPartialValue() throws Exception { } @Test - public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception { + public void testPscSourceSinkWithKeyAndFullValue() throws Exception { // we always use a different topic name for each parameterized topic, // in order to make sure the topic can be created. final String topic = "key_full_value_topic_" + format;