builder() {
return new PscSourceBuilder<>();
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java
index 76aaec3..3215e13 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java
@@ -47,45 +47,45 @@
import static org.apache.flink.util.Preconditions.checkState;
/**
- * The @builder class for {@link org.apache.flink.connector.kafka.source.KafkaSource} to make it easier for the users to construct a {@link
- * org.apache.flink.connector.kafka.source.KafkaSource}.
+ * The @builder class for {@link PscSource} to make it easier for the users to construct a {@link
+ * PscSource}.
*
- * The following example shows the minimum setup to create a KafkaSource that reads the String
- * values from a Kafka topic.
+ *
The following example shows the minimum setup to create a PscSource that reads the String
+ * values from a PSC topic.
*
*
{@code
- * KafkaSource source = KafkaSource
+ * PscSource source = PscSource
* .builder()
* .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
- * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ * .setDeserializer(PscRecordDeserializationSchema.valueOnly(StringDeserializer.class))
* .build();
* }
*
* The bootstrap servers, topics/partitions to consume, and the record deserializer are required
* fields that must be set.
*
- *
To specify the starting offsets of the KafkaSource, one can call {@link
+ *
To specify the starting offsets of the PscSource, one can call {@link
* #setStartingOffsets(OffsetsInitializer)}.
*
- *
By default the KafkaSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never
- * stops until the Flink job is canceled or fails. To let the KafkaSource run in {@link
+ *
By default the PscSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stops until the Flink job is canceled or fails. To let the PscSource run in {@link
* Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link
- * #setUnbounded(OffsetsInitializer)}. For example the following KafkaSource stops after it consumes
+ * #setUnbounded(OffsetsInitializer)}. For example the following PscSource stops after it consumes
* up to the latest partition offsets at the point when the Flink started.
*
*
{@code
- * KafkaSource source = KafkaSource
+ * PscSource source = PscSource
* .builder()
* .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
- * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ * .setDeserializer(PscRecordDeserializationSchema.valueOnly(StringDeserializer.class))
* .setUnbounded(OffsetsInitializer.latest())
* .build();
* }
*
* Check the Java docs of each individual methods to learn more about the settings to build a
- * KafkaSource.
+ * PscSource.
*/
@PublicEvolving
public class PscSourceBuilder {
@@ -140,7 +140,7 @@ public PscSourceBuilder setGroupId(String groupId) {
*
* @param topicUris the list of topicRns to consume from.
* @return this PscSourceBuilder.
- * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+ * @see com.pinterest.psc.consumer.PscConsumer#subscribe(Collection)
*/
public PscSourceBuilder setTopicUris(List topicUris) {
ensureSubscriberIsNull("topics");
@@ -155,7 +155,7 @@ public PscSourceBuilder setTopicUris(List topicUris) {
*
* @param topicUris the list of topicRns to consume from.
* @return this PscSourceBuilder.
- * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+ * @see com.pinterest.psc.consumer.PscConsumer#subscribe(Collection)
*/
public PscSourceBuilder setTopicUris(String... topicUris) {
return setTopicUris(Arrays.asList(topicUris));
@@ -166,7 +166,6 @@ public PscSourceBuilder setTopicUris(String... topicUris) {
*
* @param topicUriPattern the pattern of the topic name to consume from.
* @return this PscSourceBuilder.
- * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern)
*/
public PscSourceBuilder setTopicUriPattern(Pattern topicUriPattern) {
ensureSubscriberIsNull("topicUri pattern");
@@ -178,8 +177,8 @@ public PscSourceBuilder setTopicUriPattern(Pattern topicUriPattern) {
* Set a set of partitions to consume from.
*
* @param partitions the set of partitions to consume from.
- * @return this KafkaSourceBuilder.
- * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)
+ * @return this PscSourceBuilder.
+ * @see com.pinterest.psc.consumer.PscConsumer#assign(Collection)
*/
public PscSourceBuilder setPartitions(Set partitions) {
ensureSubscriberIsNull("partitions");
@@ -188,7 +187,7 @@ public PscSourceBuilder setPartitions(Set partitions) {
}
/**
- * Specify from which offsets the KafkaSource should start consume from by providing an {@link
+ * Specify from which offsets the PscSource should start consume from by providing an {@link
* OffsetsInitializer}.
*
* The following {@link OffsetsInitializer}s are commonly used and provided out of the box.
@@ -196,7 +195,7 @@ public PscSourceBuilder setPartitions(Set partitions) {
*
*
* - {@link OffsetsInitializer#earliest()} - starting from the earliest offsets. This is
- * also the default {@link OffsetsInitializer} of the KafkaSource for starting offsets.
+ * also the default {@link OffsetsInitializer} of the PscSource for starting offsets.
*
- {@link OffsetsInitializer#latest()} - starting from the latest offsets.
*
- {@link OffsetsInitializer#committedOffsets()} - starting from the committed offsets of
* the consumer group.
@@ -207,7 +206,7 @@ public PscSourceBuilder setPartitions(Set partitions) {
*
- {@link OffsetsInitializer#offsets(Map)} - starting from the specified offsets for each
* partition.
*
- {@link OffsetsInitializer#timestamp(long)} - starting from the specified timestamp for
- * each partition. Note that the guarantee here is that all the records in Kafka whose
+ * each partition. Note that the guarantee here is that all the records in the backend whose
* {@link MessageId#getOffset()} is greater than
* the given starting timestamp will be consumed. However, it is possible that some
* consumer records whose timestamp is smaller than the given starting timestamp are also
@@ -216,7 +215,7 @@ public PscSourceBuilder setPartitions(Set partitions) {
*
* @param startingOffsetsInitializer the {@link OffsetsInitializer} setting the starting offsets
* for the Source.
- * @return this KafkaSourceBuilder.
+ * @return this PscSourceBuilder.
*/
public PscSourceBuilder setStartingOffsets(
OffsetsInitializer startingOffsetsInitializer) {
@@ -226,7 +225,7 @@ public PscSourceBuilder setStartingOffsets(
/**
* By default the PscSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
- * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run as
+ * and thus never stops until the Flink job fails or is canceled. To let the PscSource run as
* a streaming source but still stops at some point, one can set an {@link OffsetsInitializer}
* to specify the stopping offsets for each partition. When all the partitions have reached
* their stopping offsets, the PscSource will then exit.
@@ -241,13 +240,13 @@ public PscSourceBuilder setStartingOffsets(
*
*
* - {@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when
- * the KafkaSource starts to run.
+ * the PscSource starts to run.
*
- {@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the
* consumer group.
*
- {@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each
* partition.
*
- {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each
- * partition. The guarantee of setting the stopping timestamp is that no Kafka records
+ * partition. The guarantee of setting the stopping timestamp is that no records
* whose {@link MessageId#getTimestamp()} is greater
* than the given stopping timestamp will be consumed. However, it is possible that some
* records whose timestamp is smaller than the specified stopping timestamp are not
@@ -256,7 +255,7 @@ public PscSourceBuilder setStartingOffsets(
*
* @param stoppingOffsetsInitializer The {@link OffsetsInitializer} to specify the stopping
* offset.
- * @return this KafkaSourceBuilder.
+ * @return this PscSourceBuilder.
* @see #setBounded(OffsetsInitializer)
*/
public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) {
@@ -267,7 +266,7 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit
/**
* By default the PscSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
- * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in
+ * and thus never stops until the Flink job fails or is canceled. To let the PscSource run in
* {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link
* OffsetsInitializer} to specify the stopping offsets for each partition. When all the
* partitions have reached their stopping offsets, the PscSource will then exit.
@@ -281,13 +280,13 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit
*
*
* - {@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when
- * the KafkaSource starts to run.
+ * the PscSource starts to run.
*
- {@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the
* consumer group.
*
- {@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each
* partition.
*
- {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each
- * partition. The guarantee of setting the stopping timestamp is that no Kafka records
+ * partition. The guarantee of setting the stopping timestamp is that no records
* whose {@link MessageId#getTimestamp()} is greater
* than the given stopping timestamp will be consumed. However, it is possible that some
* records whose timestamp is smaller than the specified stopping timestamp are not
@@ -296,7 +295,7 @@ public PscSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInit
*
* @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify the stopping
* offsets.
- * @return this KafkaSourceBuilder.
+ * @return this PscSourceBuilder.
* @see #setUnbounded(OffsetsInitializer)
*/
public PscSourceBuilder setBounded(OffsetsInitializer stoppingOffsetsInitializer) {
@@ -307,11 +306,11 @@ public PscSourceBuilder setBounded(OffsetsInitializer stoppingOffsetsInitia
/**
* Sets the {@link PscRecordDeserializationSchema deserializer} of the {@link
- * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource.
+ * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for PscSource.
*
- * @param recordDeserializer the deserializer for Kafka {@link
+ * @param recordDeserializer the deserializer for PSC {@link
* org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
- * @return this KafkaSourceBuilder.
+ * @return this PscSourceBuilder.
*/
public PscSourceBuilder setDeserializer(
PscRecordDeserializationSchema recordDeserializer) {
@@ -321,12 +320,12 @@ public PscSourceBuilder setDeserializer(
/**
* Sets the {@link PscRecordDeserializationSchema deserializer} of the {@link
- * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given
+ * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for PscSource. The given
* {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The
* other information (e.g. key) in a ConsumerRecord will be ignored.
*
* @param deserializationSchema the {@link DeserializationSchema} to use for deserialization.
- * @return this KafkaSourceBuilder.
+ * @return this PscSourceBuilder.
*/
public PscSourceBuilder setValueOnlyDeserializer(
DeserializationSchema deserializationSchema) {
@@ -336,20 +335,20 @@ public PscSourceBuilder setValueOnlyDeserializer(
}
/**
- * Sets the client id prefix of this KafkaSource.
+ * Sets the client id prefix of this PscSource.
*
- * @param prefix the client id prefix to use for this KafkaSource.
- * @return this KafkaSourceBuilder.
+ * @param prefix the client id prefix to use for this PscSource.
+ * @return this PscSourceBuilder.
*/
public PscSourceBuilder setClientIdPrefix(String prefix) {
return setProperty(PscSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
}
/**
- * Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found
+ * Set an arbitrary property for the PscSource and PscConsumer. The valid keys can be found
* in {@link PscConfiguration} and {@link PscSourceOptions}.
*
- *
Note that the following keys will be overridden by the builder when the KafkaSource is
+ *
Note that the following keys will be overridden by the builder when the PscSource is
* created.
*
*
@@ -364,7 +363,7 @@ public PscSourceBuilder setClientIdPrefix(String prefix) {
*
* @param key the key of the property.
* @param value the value of the property.
- * @return this KafkaSourceBuilder.
+ * @return this PscSourceBuilder.
*/
public PscSourceBuilder setProperty(String key, String value) {
props.setProperty(key, value);
@@ -375,7 +374,7 @@ public PscSourceBuilder setProperty(String key, String value) {
* Set arbitrary properties for the PscSource and PscConsumer. The valid keys can be found
* in {@link PscConfiguration} and {@link PscSourceOptions}.
*
- * Note that the following keys will be overridden by the builder when the KafkaSource is
+ *
Note that the following keys will be overridden by the builder when the PscSource is
* created.
*
*
@@ -390,8 +389,8 @@ public PscSourceBuilder setProperty(String key, String value) {
* "group.id-RANDOM_LONG" if the client id prefix is not set.
*
*
- * @param props the properties to set for the KafkaSource.
- * @return this KafkaSourceBuilder.
+ * @param props the properties to set for the PscSource.
+ * @return this PscSourceBuilder.
*/
public PscSourceBuilder setProperties(Properties props) {
this.props.putAll(props);
@@ -399,9 +398,9 @@ public PscSourceBuilder setProperties(Properties props) {
}
/**
- * Build the {@link org.apache.flink.connector.kafka.source.KafkaSource}.
+ * Build the {@link org.apache.flink.connector.kafka.source.PscSource}.
*
- * @return a KafkaSource with the settings made for this builder.
+ * @return a PscSource with the settings made for this builder.
*/
public PscSource build() {
sanityCheck();
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java
index 1742b9d..606edb8 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceOptions.java
@@ -25,7 +25,7 @@
import java.util.Properties;
import java.util.function.Function;
-/** Configurations for KafkaSource. */
+/** Configurations for PscSource. */
@Internal
public class PscSourceOptions {
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
index 76e58ce..61cafef 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
@@ -28,7 +28,7 @@
/**
* An implementation of {@link OffsetsInitializer} which does not initialize anything.
*
- * This class is used as the default stopping offsets initializer for unbounded Kafka sources.
+ *
This class is used as the default stopping offsets initializer for unbounded PSC sources.
*/
@Internal
public class NoStoppingOffsetsInitializer implements OffsetsInitializer {
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java
index 803768c..8061bdf 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java
@@ -40,16 +40,16 @@
public interface OffsetsInitializer extends Serializable {
/**
- * Get the initial offsets for the given Kafka partitions. These offsets will be used as either
- * starting offsets or stopping offsets of the Kafka partitions.
+ * Get the initial offsets for the given PSC partitions. These offsets will be used as either
+ * starting offsets or stopping offsets of the PSC partitions.
*
*
If the implementation returns a starting offset which causes {@code
- * OffsetsOutOfRangeException} from Kafka. The offsetResetStrategy provided by the
+ * OffsetsOutOfRangeException} from PSC. The offsetResetStrategy provided by the
* {@link #getAutoOffsetResetStrategy()} will be used to reset the offset.
*
- * @param partitions the Kafka partitions to get the starting offsets.
- * @param partitionOffsetsRetriever a helper to retrieve information of the Kafka partitions.
- * @return A mapping from Kafka partition to their offsets to start consuming from.
+ * @param partitions the PSC partitions to get the starting offsets.
+ * @param partitionOffsetsRetriever a helper to retrieve information of the PSC partitions.
+ * @return A mapping from PSC partition to their offsets to start consuming from.
*/
Map getPartitionOffsets(
Collection partitions,
@@ -68,7 +68,7 @@ Map getPartitionOffsets(
/**
* An interface that provides necessary information to the {@link OffsetsInitializer} to get the
- * initial offsets of the Kafka partitions.
+ * initial offsets of the PSC partitions.
*/
interface PartitionOffsetsRetriever {
@@ -76,7 +76,7 @@ interface PartitionOffsetsRetriever {
* The group id should be the set for {@link com.pinterest.flink.connector.psc.source.PscSource PscSource} before invoking this
* method. Otherwise an {@code IllegalStateException} will be thrown.
*
- * @throws IllegalStateException if the group id is not set for the {@code KafkaSource}.
+ * @throws IllegalStateException if the group id is not set for the {@code PscSource}.
*/
Map committedOffsets(Collection partitions);
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java
index 1f1c0fe..b9b352c 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java
@@ -30,9 +30,9 @@
public interface OffsetsInitializerValidator {
/**
- * Validate offsets initializer with properties of Kafka source.
+ * Validate offsets initializer with properties of PSC source.
*
- * @param pscSourceProperties Properties of Kafka source
+ * @param pscSourceProperties Properties of PSC source
* @throws IllegalStateException if validation fails
*/
void validate(Properties pscSourceProperties) throws IllegalStateException;
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
index 4b3f414..db68f5d 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
@@ -31,8 +31,8 @@
/**
* A initializer that initialize the partitions to the earliest / latest / last-committed offsets.
- * The offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of
- * by the {@code KafkaSourceEnumerator}.
+ * The offsets initialization are taken care of by the {@code PscTopicUriPartitionSplitReader} instead of
+ * by the {@code PscSourceEnumerator}.
*
* Package private and should be instantiated via {@link org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer}.
*/
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java
index 78ff3cd..43f3495 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java
@@ -47,7 +47,7 @@ public Map getPartitionOffsets(
// First get the current end offsets of the partitions. This is going to be used
// in case we cannot find a suitable offsets based on the timestamp, i.e. the message
- // meeting the requirement of the timestamp have not been produced to Kafka yet, in
+ // meeting the requirement of the timestamp have not been produced to PSC yet, in
// this case, we just use the latest offset.
// We need to get the latest offsets before querying offsets by time to ensure that
// no message is going to be missed.
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java
index 519bbea..da40f85 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriber.java
@@ -29,7 +29,7 @@
import java.util.regex.Pattern;
/**
- * Kafka consumer allows a few different ways to consume from the topics, including:
+ * PSC consumer allows a few different ways to consume from the topics, including:
*
*
* - Subscribe from a collection of topics.
@@ -37,7 +37,7 @@
*
- Assign specific partitions.
*
*
- * The KafkaSubscriber provides a unified interface for the Kafka source to support all these
+ *
The PscSubscriber provides a unified interface for the PSC source to support all these
* three types of subscribing mode.
*/
@Internal
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java
index 5da1dfd..d9ef48b 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java
@@ -37,7 +37,7 @@
import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicRnMetadata;
/**
- * A subscriber to a fixed list of topics. The subscribed topics must have existed in the Kafka
+ * A subscriber to a fixed list of topics. The subscribed topics must have existed in the PSC
* cluster, otherwise an exception will be thrown.
*/
class PscTopicUriListSubscriber implements PscSubscriber {
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java
index 945c4db..fe0086d 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java
@@ -48,7 +48,7 @@ class TopicNamePatternSubscriber implements PscSubscriber {
@Override
public Set getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) {
- LOG.debug("Fetching descriptions for all topics on Kafka cluster");
+ LOG.debug("Fetching descriptions for all topics on PubSub cluster");
final Map allTopicRnMetadata = getAllTopicRnMetadata(metadataClient, clusterUri);
Set subscribedTopicUriPartitions = new HashSet<>();
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java
index c3b7577..ef40de4 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java
@@ -49,7 +49,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-/** The source reader for Kafka partitions. */
+/** The source reader for PSC TopicUriPartitions. */
@Internal
public class PscSourceReader
extends SingleThreadMultiplexSourceReaderBase<
@@ -80,7 +80,7 @@ public PscSourceReader(
if (!commitOffsetsOnCheckpoint) {
LOG.warn(
"Offset commit on checkpoint is disabled. "
- + "Consuming offset will not be reported back to Kafka cluster.");
+ + "Consuming offset will not be reported back to PubSub cluster.");
}
}
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java
index c8e60bd..9187b06 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java
@@ -389,7 +389,7 @@ private void removeEmptySplits() throws ConsumerException, ConfigurationExceptio
emptyPartitions.stream()
.map(PscTopicUriPartitionSplit::toSplitId)
.collect(Collectors.toSet()));
- // Un-assign partitions from Kafka consumer
+ // Un-assign partitions from PSC consumer
unassignPartitions(emptyPartitions);
}
}
@@ -464,7 +464,7 @@ private void maybeRegisterPscConsumerMetrics(
}
/**
- * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception.
+ * Catch {@link WakeupException} in PSC consumer call and retry the invocation on exception.
*
* This helper function handles a race condition as below:
*
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java
index ac87b66..1317359 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchema.java
@@ -32,7 +32,7 @@
import java.io.Serializable;
import java.util.Map;
-/** An interface for the deserialization of Kafka records. */
+/** An interface for the deserialization of PSC messages. */
@PublicEvolving
public interface PscRecordDeserializationSchema extends Serializable, ResultTypeQueryable {
@@ -97,7 +97,7 @@ static PscRecordDeserializationSchema valueOnly(
}
/**
- * Wraps a Kafka {@link Deserializer} to a {@link PscRecordDeserializationSchema}.
+ * Wraps a PSC {@link Deserializer} to a {@link PscRecordDeserializationSchema}.
*
* @param valueDeserializerClass the deserializer class used to deserialize the value.
* @param the value type.
@@ -110,12 +110,12 @@ static PscRecordDeserializationSchema valueOnly(
}
/**
- * Wraps a Kafka {@link Deserializer} to a {@link PscRecordDeserializationSchema}.
+ * Wraps a PSC {@link Deserializer} to a {@link PscRecordDeserializationSchema}.
*
* @param valueDeserializerClass the deserializer class used to deserialize the value.
* @param config the configuration of the value deserializer. If the deserializer is an
* implementation of {@code Configurable}, the configuring logic will be handled by {@link
- * org.apache.kafka.common.Configurable#configure(Map)} with the given config,
+ * com.pinterest.psc.common.PscPlugin#configure(PscConfiguration)} with the given config,
* otherwise {@link Deserializer#configure(com.pinterest.psc.config.PscConfiguration, boolean)} will be invoked.
* @param the value type.
* @param the type of the deserializer.
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java
index eb5a508..a758b2e 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/fetcher/PscSourceFetcherManager.java
@@ -42,8 +42,8 @@
import java.util.function.Supplier;
/**
- * The SplitFetcherManager for Kafka source. This class is needed to help commit the offsets to
- * Kafka using the KafkaConsumer inside the {@link
+ * The SplitFetcherManager for PSC source. This class is needed to help commit the offsets to
+ * PSC using the PscConsumer inside the {@link
* PscTopicUriPartitionSplitReader}.
*/
@Internal
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java
index b5398ba..5536baa 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java
@@ -43,9 +43,9 @@ public void setCurrentOffset(long currentOffset) {
}
/**
- * Use the current offset as the starting offset to create a new KafkaPartitionSplit.
+ * Use the current offset as the starting offset to create a new PscTopicUriPartitionSplit.
*
- * @return a new KafkaPartitionSplit which uses the current offset as its starting offset.
+ * @return a new PscTopicUriPartitionSplit which uses the current offset as its starting offset.
*/
public PscTopicUriPartitionSplit toPscTopicUriPartitionSplit() {
return new PscTopicUriPartitionSplit(
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java
index 32718fa..fb5c5f6 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java
@@ -109,7 +109,7 @@ public class PscConnectorOptions {
public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
// --------------------------------------------------------------------------------------------
- // Kafka specific options
+ // Psc specific options
// --------------------------------------------------------------------------------------------
public static final ConfigOption> TOPIC_URI =
@@ -133,7 +133,7 @@ public class PscConnectorOptions {
.stringType()
.noDefaultValue()
.withDescription(
- "Required consumer group in Kafka consumer, no need for Kafka producer");
+ "Required consumer group in PSC consumer, no need for PSC producer");
// --------------------------------------------------------------------------------------------
// Scan specific options
@@ -143,7 +143,7 @@ public class PscConnectorOptions {
ConfigOptions.key("scan.startup.mode")
.enumType(ScanStartupMode.class)
.defaultValue(ScanStartupMode.GROUP_OFFSETS)
- .withDescription("Startup mode for Kafka consumer.");
+ .withDescription("Startup mode for PSC consumer.");
public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSETS =
ConfigOptions.key("scan.startup.specific-offsets")
@@ -164,7 +164,7 @@ public class PscConnectorOptions {
.durationType()
.noDefaultValue()
.withDescription(
- "Optional interval for consumer to discover dynamically created Kafka partitions periodically.");
+ "Optional interval for consumer to discover dynamically created backend partitions periodically.");
// --------------------------------------------------------------------------------------------
// Sink specific options
@@ -177,16 +177,16 @@ public class PscConnectorOptions {
.withDescription(
Description.builder()
.text(
- "Optional output partitioning from Flink's partitions into Kafka's partitions. Valid enumerations are")
+ "Optional output partitioning from Flink's partitions into PSC's backend partitions. Valid enumerations are")
.list(
text(
- "'default' (use kafka default partitioner to partition records)"),
+ "'default' (use PSC default partitioner to partition records)"),
text(
- "'fixed' (each Flink partition ends up in at most one Kafka partition)"),
+ "'fixed' (each Flink partition ends up in at most one PubSub partition)"),
text(
- "'round-robin' (a Flink partition is distributed to Kafka partitions round-robin when 'key.fields' is not specified)"),
+ "'round-robin' (a Flink partition is distributed to PubSub partitions round-robin when 'key.fields' is not specified)"),
text(
- "custom class name (use custom FlinkKafkaPartitioner subclass)"))
+ "custom class name (use custom FlinkPscPartitioner subclass)"))
.build());
// Disable this feature by default
@@ -200,7 +200,7 @@ public class PscConnectorOptions {
"The max size of buffered records before flushing. "
+ "When the sink receives many updates on the same key, "
+ "the buffer will retain the last records of the same key. "
- + "This can help to reduce data shuffling and avoid possible tombstone messages to the Kafka topic.")
+ + "This can help to reduce data shuffling and avoid possible tombstone messages to the PubSub topic.")
.linebreak()
.text("Can be set to '0' to disable it.")
.linebreak()
@@ -241,7 +241,7 @@ public class PscConnectorOptions {
.withDescription(
"If the delivery guarantee is configured as "
+ DeliveryGuarantee.EXACTLY_ONCE
- + " this value is used a prefix for the identifier of all opened Kafka transactions.");
+ + " this value is used a prefix for the identifier of all opened transactions.");
// --------------------------------------------------------------------------------------------
// Enums
@@ -253,14 +253,14 @@ public enum ValueFieldsStrategy {
EXCEPT_KEY
}
- /** Startup mode for the Kafka consumer, see {@link #SCAN_STARTUP_MODE}. */
+ /** Startup mode for the PSC consumer, see {@link #SCAN_STARTUP_MODE}. */
public enum ScanStartupMode implements DescribedEnum {
EARLIEST_OFFSET("earliest-offset", text("Start from the earliest offset possible.")),
LATEST_OFFSET("latest-offset", text("Start from the latest offset.")),
GROUP_OFFSETS(
"group-offsets",
text(
- "Start from committed offsets in ZooKeeper / Kafka brokers of a specific consumer group.")),
+ "Start from committed offsets in ZooKeeper / PubSub brokers of a specific consumer group.")),
TIMESTAMP("timestamp", text("Start from user-supplied timestamp for each partition.")),
SPECIFIC_OFFSETS(
"specific-offsets",
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java
index 57f8dcc..08d67ec 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java
@@ -65,7 +65,7 @@
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
-/** Utilities for {@link KafkaConnectorOptions}. */
+/** Utilities for {@link PscConnectorOptions}. */
@Internal
class PscConnectorOptionsUtil {
@@ -81,7 +81,7 @@ class PscConnectorOptionsUtil {
public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
- // Prefix for Kafka specific properties.
+ // Prefix for PSC specific properties.
public static final String PROPERTIES_PREFIX = "properties.";
// Other keywords.
@@ -169,7 +169,7 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) {
}
if (!isSingleTopicUri(tableOptions)) {
throw new ValidationException(
- "Currently Kafka source only supports specific offset for single topic.");
+ "Currently PSC source only supports specific offset for single topic.");
}
String specificOffsets =
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
@@ -255,7 +255,7 @@ private static void buildSpecificOffsets(
}
/**
- * Returns the {@link StartupMode} of Kafka Consumer by passed-in table-specific {@link
+ * Returns the {@link StartupMode} of PSC Consumer by passed-in table-specific {@link
* com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.ScanStartupMode}.
*/
private static StartupMode fromOption(PscConnectorOptions.ScanStartupMode scanStartupMode) {
@@ -278,19 +278,19 @@ private static StartupMode fromOption(PscConnectorOptions.ScanStartupMode scanSt
}
public static Properties getPscProperties(Map tableOptions) {
- final Properties kafkaProperties = new Properties();
+ final Properties pscProperties = new Properties();
- if (hasKafkaClientProperties(tableOptions)) {
+ if (hasPscClientProperties(tableOptions)) {
tableOptions.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(
key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
- kafkaProperties.put(subKey, value);
+ pscProperties.put(subKey, value);
});
}
- return kafkaProperties;
+ return pscProperties;
}
/**
@@ -368,10 +368,10 @@ public static Map parseSpecificOffsets(
}
/**
- * Decides if the table options contains Kafka client properties that start with prefix
+ * Decides if the table options contains PSC client properties that start with prefix
* 'properties'.
*/
- private static boolean hasKafkaClientProperties(Map tableOptions) {
+ private static boolean hasPscClientProperties(Map tableOptions) {
return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
@@ -387,10 +387,10 @@ private static FlinkPscPartitioner initializePartitioner(
name, FlinkPscPartitioner.class.getName()));
}
@SuppressWarnings("unchecked")
- final FlinkPscPartitioner kafkaPartitioner =
+ final FlinkPscPartitioner pscPartitioner =
InstantiationUtil.instantiate(name, FlinkPscPartitioner.class, classLoader);
- return kafkaPartitioner;
+ return pscPartitioner;
} catch (ClassNotFoundException | FlinkException e) {
throw new ValidationException(
String.format("Could not find and instantiate partitioner class '%s'", name),
@@ -572,7 +572,7 @@ static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
// Inner classes
// --------------------------------------------------------------------------------------------
- /** Kafka startup options. * */
+ /** PSC startup options. * */
public static class StartupOptions {
public StartupMode startupMode;
public Map specificOffsets;
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java
index 518bc97..04a096e 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSink.java
@@ -59,7 +59,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** A version-agnostic Kafka {@link DynamicTableSink}. */
+/** A version-agnostic PSC {@link DynamicTableSink}. */
@Internal
public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata {
@@ -82,10 +82,10 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata
/** Data type to configure the formats. */
protected final DataType physicalDataType;
- /** Optional format for encoding keys to Kafka. */
+ /** Optional format for encoding keys to PSC. */
protected final @Nullable EncodingFormat> keyEncodingFormat;
- /** Format for encoding values to Kafka. */
+ /** Format for encoding values to PSC. */
protected final EncodingFormat> valueEncodingFormat;
/** Indices that determine the key fields and the source position in the consumed row. */
@@ -98,7 +98,7 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata
protected final @Nullable String keyPrefix;
// --------------------------------------------------------------------------------------------
- // Kafka-specific attributes
+ // PSC-specific attributes
// --------------------------------------------------------------------------------------------
/** The defined delivery guarantee. */
@@ -106,17 +106,17 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata
/**
* If the {@link #deliveryGuarantee} is {@link DeliveryGuarantee#EXACTLY_ONCE} the value is the
- * prefix for all ids of opened Kafka transactions.
+ * prefix for all ids of opened PubSub transactions.
*/
@Nullable private final String transactionalIdPrefix;
- /** The Kafka topic to write to. */
+ /** The PubSub topic to write to. */
protected final String topic;
- /** Properties for the Kafka producer. */
+ /** Properties for the PSC producer. */
protected final Properties properties;
- /** Partitioner to select Kafka partition for each item. */
+ /** Partitioner to select PubSub partition for each item. */
protected final @Nullable FlinkPscPartitioner partitioner;
/**
@@ -128,7 +128,7 @@ public class PscDynamicSink implements DynamicTableSink, SupportsWritingMetadata
/** Sink buffer flush config which only supported in upsert mode now. */
protected final SinkBufferFlushMode flushMode;
- /** Parallelism of the physical Kafka producer. * */
+ /** Parallelism of the physical PubSub producer. * */
protected final @Nullable Integer parallelism;
public PscDynamicSink(
@@ -161,7 +161,7 @@ public PscDynamicSink(
this.transactionalIdPrefix = transactionalIdPrefix;
// Mutable attributes
this.metadataKeys = Collections.emptyList();
- // Kafka-specific attributes
+ // PSC-specific attributes
this.topic = checkNotNull(topic, "Topic must not be null.");
this.properties = checkNotNull(properties, "Properties must not be null.");
this.partitioner = partitioner;
@@ -171,7 +171,7 @@ public PscDynamicSink(
this.flushMode = checkNotNull(flushMode);
if (flushMode.isEnabled() && !upsertMode) {
throw new IllegalArgumentException(
- "Sink buffer flush is only supported in upsert-kafka.");
+ "Sink buffer flush is only supported in upsert-psc.");
}
this.parallelism = parallelism;
}
@@ -275,7 +275,7 @@ public DynamicTableSink copy() {
@Override
public String asSummaryString() {
- return "Kafka table sink";
+ return "PSC table sink";
}
@Override
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java
index 55e7f75..528f772 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java
@@ -70,7 +70,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
-/** A version-agnostic Kafka {@link ScanTableSource}. */
+/** A version-agnostic PSC {@link ScanTableSource}. */
@Internal
public class PscDynamicSource
implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
@@ -99,10 +99,10 @@ public class PscDynamicSource
/** Data type to configure the formats. */
protected final DataType physicalDataType;
- /** Optional format for decoding keys from Kafka. */
+ /** Optional format for decoding keys from PSC. */
protected final @Nullable DecodingFormat> keyDecodingFormat;
- /** Format for decoding values from Kafka. */
+ /** Format for decoding values from PSC. */
protected final DecodingFormat> valueDecodingFormat;
/** Indices that determine the key fields and the target position in the produced row. */
@@ -115,16 +115,16 @@ public class PscDynamicSource
protected final @Nullable String keyPrefix;
// --------------------------------------------------------------------------------------------
- // Kafka-specific attributes
+ // PSC-specific attributes
// --------------------------------------------------------------------------------------------
- /** The Kafka topics to consume. */
+ /** The PSC topics to consume. */
protected final List topicUris;
- /** The Kafka topic pattern to consume. */
+ /** The PSC topic pattern to consume. */
protected final Pattern topicUriPattern;
- /** Properties for the Kafka consumer. */
+ /** Properties for the PSC consumer. */
protected final Properties properties;
/**
@@ -181,7 +181,7 @@ public PscDynamicSource(
this.producedDataType = physicalDataType;
this.metadataKeys = Collections.emptyList();
this.watermarkStrategy = null;
- // Kafka-specific attributes
+ // PSC-specific attributes
Preconditions.checkArgument(
(topics != null && topicPattern == null)
|| (topics == null && topicPattern != null),
@@ -215,7 +215,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final TypeInformation producedTypeInfo =
context.createTypeInformation(producedDataType);
- final PscSource kafkaSource =
+ final PscSource pscSource =
createPscSource(keyDeserialization, valueDeserialization, producedTypeInfo);
return new DataStreamScanProvider() {
@@ -227,14 +227,14 @@ public DataStream produceDataStream(
}
DataStreamSource sourceStream =
execEnv.fromSource(
- kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
+ pscSource, watermarkStrategy, "PscSource-" + tableIdentifier);
providerContext.generateUid(PSC_TRANSFORMATION).ifPresent(sourceStream::uid);
return sourceStream;
}
@Override
public boolean isBounded() {
- return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
+ return pscSource.getBoundedness() == Boundedness.BOUNDED;
}
};
}
@@ -319,7 +319,7 @@ public DynamicTableSource copy() {
@Override
public String asSummaryString() {
- return "Kafka table source";
+ return "PSC table source";
}
@Override
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java
index 4f44fd5..d2fb294 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java
@@ -340,7 +340,7 @@ private static void validatePKConstraints(
.orElse(configuration.get(VALUE_FORMAT));
throw new ValidationException(
String.format(
- "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ "The PSC table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ " on the table, because it can't guarantee the semantic of primary key.",
tableName.asSummaryString(), formatName));
}
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java
index 501ef98..f51d06d 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactory.java
@@ -70,7 +70,7 @@
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.getSourceTopicUriPattern;
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptionsUtil.getSourceTopicUris;
-/** Upsert-Kafka factory. */
+/** Upsert-Psc factory. */
public class UpsertPscDynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@@ -176,7 +176,6 @@ public DynamicTableSink createDynamicTableSink(Context context) {
SinkBufferFlushMode flushMode =
new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
- // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
// it will use hash partition if key is set else in round-robin behaviour.
return new PscDynamicSink(
context.getPhysicalRowDataType(),
@@ -203,7 +202,7 @@ private Tuple2 createKeyValueProjections(ResolvedCatalogTable cata
DataType physicalDataType = schema.toPhysicalRowDataType();
Configuration tableOptions = Configuration.fromMap(catalogTable.getOptions());
- // upsert-kafka will set key.fields to primary key fields by default
+ // upsert-psc will set key.fields to primary key fields by default
tableOptions.set(KEY_FIELDS, keyFields);
int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
@@ -241,7 +240,7 @@ private static void validateTopic(ReadableConfig tableOptions) {
List topic = tableOptions.get(TOPIC_URI);
if (topic.size() > 1) {
throw new ValidationException(
- "The 'upsert-kafka' connector doesn't support topic list now. "
+ "The 'upsert-psc' connector doesn't support topic list now. "
+ "Please use single topic as the value of the parameter 'topic'.");
}
}
@@ -252,7 +251,7 @@ private static void validateFormat(
String identifier = tableOptions.get(KEY_FORMAT);
throw new ValidationException(
String.format(
- "'upsert-kafka' connector doesn't support '%s' as key format, "
+ "'upsert-psc' connector doesn't support '%s' as key format, "
+ "because '%s' is not in insert-only mode.",
identifier, identifier));
}
@@ -260,7 +259,7 @@ private static void validateFormat(
String identifier = tableOptions.get(VALUE_FORMAT);
throw new ValidationException(
String.format(
- "'upsert-kafka' connector doesn't support '%s' as value format, "
+ "'upsert-psc' connector doesn't support '%s' as value format, "
+ "because '%s' is not in insert-only mode.",
identifier, identifier));
}
@@ -269,9 +268,9 @@ private static void validateFormat(
private static void validatePKConstraints(int[] schema) {
if (schema.length == 0) {
throw new ValidationException(
- "'upsert-kafka' tables require to define a PRIMARY KEY constraint. "
- + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. "
- + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.");
+ "'upsert-psc' tables require to define a PRIMARY KEY constraint. "
+ + "The PRIMARY KEY specifies which columns should be read from or write to the PubSub message key. "
+ + "The PRIMARY KEY also defines records in the 'upsert-psc' table should update or delete on which keys.");
}
}
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java
index 93a041c..6776162 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java
@@ -166,7 +166,7 @@ public void testSerializeRecordWithKey() {
}
@Test
- public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception {
+ public void testPscKeySerializerWrapperWithoutConfigurable() throws Exception {
final Map config = ImmutableMap.of("simpleKey", "simpleValue");
final PscRecordSerializationSchema schema =
PscRecordSerializationSchema.builder()
@@ -183,7 +183,7 @@ public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception
}
@Test
- public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exception {
+ public void tesPscValueSerializerWrapperWithoutConfigurable() throws Exception {
final Map config = ImmutableMap.of("simpleKey", "simpleValue");
final PscRecordSerializationSchema schema =
PscRecordSerializationSchema.builder()
@@ -197,7 +197,7 @@ public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exceptio
}
@Test
- public void testSerializeRecordWithKafkaSerializer() throws Exception {
+ public void testSerializeRecordWithPscSerializer() throws Exception {
final Map config = ImmutableMap.of("configKey", "configValue");
final PscRecordSerializationSchema schema =
PscRecordSerializationSchema.builder()
@@ -289,7 +289,7 @@ private static void assertOnlyOneSerializerAllowed(
}
/**
- * Serializer based on Kafka's serialization stack. This is the special case that implements
+ * Serializer based on PSC's serialization stack. This is the special case that implements
* {@link PscPlugin}
*
* This class must be public to make it instantiable by the tests.
@@ -303,7 +303,7 @@ public void configure(Map configs, boolean isKey) {
}
/**
- * Serializer based on Kafka's serialization stack.
+ * Serializer based on PSC's serialization stack.
*
* This class must be public to make it instantiable by the tests.
*/
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java
index 3e61d0e..bd744c0 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java
@@ -20,7 +20,6 @@
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.util.TestLoggerExtension;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,7 +34,6 @@ public class PscSinkBuilderTest {
@Test
public void testBootstrapServerSettingWithProperties() {
Properties testConf = new Properties();
- testConf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "testServer");
PscSinkBuilder builder =
new PscSinkBuilder()
.setPscProducerConfig(testConf)
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java
index d9eab6b..4660b1b 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java
@@ -210,18 +210,18 @@ class IntegrationTests extends SinkTestSuiteBase {
}
@Test
- public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception {
- writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount);
+ public void testWriteRecordsToPscWithAtLeastOnceGuarantee() throws Exception {
+ writeRecordsToPsc(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount);
}
@Test
- public void testWriteRecordsToKafkaWithNoneGuarantee() throws Exception {
- writeRecordsToKafka(DeliveryGuarantee.NONE, emittedRecordsCount);
+ public void testWriteRecordsToPscWithNoneGuarantee() throws Exception {
+ writeRecordsToPsc(DeliveryGuarantee.NONE, emittedRecordsCount);
}
@Test
- public void testWriteRecordsToKafkaWithExactlyOnceGuarantee() throws Exception {
- writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, emittedRecordsWithCheckpoint);
+ public void testWriteRecordsToPscWithExactlyOnceGuarantee() throws Exception {
+ writeRecordsToPsc(DeliveryGuarantee.EXACTLY_ONCE, emittedRecordsWithCheckpoint);
}
@Test
@@ -390,7 +390,7 @@ private void testRecoveryWithAssertion(
checkProducerLeak();
}
- private void writeRecordsToKafka(
+ private void writeRecordsToPsc(
DeliveryGuarantee deliveryGuarantee, SharedReference expectedRecords)
throws Exception {
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java
index b226a59..7212760 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLog.java
@@ -49,15 +49,15 @@ class PscTransactionLog {
private final String clusterUriStr;
/**
- * Constructor creating a KafkaTransactionLog.
+ * Constructor creating a PscTransactionLog.
*
- * @param kafkaConfig used to configure the {@link com.pinterest.psc.consumer.PscConsumer} to query the topic containing
+ * @param pscConfig used to configure the {@link com.pinterest.psc.consumer.PscConsumer} to query the topic containing
* the transaction information
*/
- PscTransactionLog(String clusterUriStr, Properties kafkaConfig) {
+ PscTransactionLog(String clusterUriStr, Properties pscConfig) {
this.clusterUriStr = checkNotNull(clusterUriStr);
this.consumerConfig = new Properties();
- consumerConfig.putAll(checkNotNull(kafkaConfig, "kafkaConfig"));
+ consumerConfig.putAll(checkNotNull(pscConfig, "pscConfig"));
consumerConfig.put(PscConfiguration.PSC_CONSUMER_KEY_DESERIALIZER, ByteArrayDeserializer.class.getName());
consumerConfig.put(PscConfiguration.PSC_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
consumerConfig.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false);
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java
index fd54f89..735b248 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java
@@ -53,11 +53,11 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
-/** Tests for {@link PscTransactionLog} to retrieve abortable Kafka transactions. */
+/** Tests for {@link PscTransactionLog} to retrieve abortable PSC transactions. */
public class PscTransactionLogITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(PscSinkITCase.class);
- private static final String TOPIC_URI_STR = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString() + "kafkaTransactionLogTest";
+ private static final String TOPIC_URI_STR = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString() + "pscTransactionLogTest";
private static final String TRANSACTIONAL_ID_PREFIX = "psc-log";
@ClassRule
@@ -174,18 +174,12 @@ private static PscProducer createProducer(String transactionalI
private static Properties getPscClientConfiguration() {
final Properties standardProps = new Properties();
-// standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "flink-tests");
standardProps.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false);
standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
standardProps.put(PscConfiguration.PSC_CONSUMER_PARTITION_FETCH_MAX_BYTES, 256);
standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "flink-tests");
injectDiscoveryConfigs(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString());
-
-// standardProps.put("group.id", "flink-tests");
-// standardProps.put("enable.auto.commit", false);
-// standardProps.put("auto.id.reset", "earliest");
-// standardProps.put("max.partition.fetch.bytes", 256);
return standardProps;
}
}
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java
index 02efd29..c13528c 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java
@@ -82,7 +82,7 @@
import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for the standalone KafkaWriter. */
+/** Tests for the standalone PscWriter. */
@ExtendWith(TestLoggerExtension.class)
public class PscWriterITCase {
@@ -134,8 +134,8 @@ public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception {
@ParameterizedTest
@EnumSource(DeliveryGuarantee.class)
public void testNotRegisterMetrics(DeliveryGuarantee guarantee) throws Exception {
- assertKafkaMetricNotPresent(guarantee, "flink.disable-metrics", "true");
- assertKafkaMetricNotPresent(guarantee, "register.producer.metrics", "false");
+ assertPscMetricNotPresent(guarantee, "flink.disable-metrics", "true");
+ assertPscMetricNotPresent(guarantee, "register.producer.metrics", "false");
}
@Test
@@ -185,7 +185,7 @@ public void testCurrentSendTimeMetric() throws Exception {
writer.flush(false);
}
} catch (IOException | InterruptedException e) {
- throw new RuntimeException("Failed writing Kafka record.");
+ throw new RuntimeException("Failed writing PSC record.");
}
});
Thread.sleep(500L);
@@ -330,7 +330,7 @@ void usePoolForTransactional() throws Exception {
.as("Expected different producer")
.isTrue();
- // recycle first producer, KafkaCommitter would commit it and then return it
+ // recycle first producer, PscCommitter would commit it and then return it
assertThat(writer.getProducerPool()).hasSize(0);
firstProducer.commitTransaction();
committable.getProducer().get().close();
@@ -383,7 +383,7 @@ void testAbortOnClose() throws Exception {
}
}
- private void assertKafkaMetricNotPresent(
+ private void assertPscMetricNotPresent(
DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception {
final Properties config = getPscClientConfiguration();
config.put(configKey, configValue);
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java
index b2746b9..164aa0e 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscDataReader.java
@@ -33,7 +33,7 @@
import java.util.List;
import java.util.Properties;
-/** Kafka dataStream data reader. */
+/** PSC dataStream data reader. */
public class PscDataReader implements ExternalSystemDataReader {
private final PscConsumer consumer;
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java
index 13ac24d..836b6de 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java
@@ -63,7 +63,7 @@
import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.injectDiscoveryConfigs;
import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
-/** A Kafka external context that will create only one topic and use partitions in that topic. */
+/** A PSC external context that will create only one topic and use partitions in that topic. */
public class PscSinkExternalContext implements DataStreamSinkV2ExternalContext {
private static final Logger LOG = LoggerFactory.getLogger(PscSinkExternalContext.class);
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java
index 9b6c7ee..2950052 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java
@@ -98,7 +98,7 @@ public class PscSourceITCase {
@Nested
@TestInstance(Lifecycle.PER_CLASS)
- class KafkaSpecificTests {
+ class PscSpecificTests {
@BeforeAll
public void setup() throws Throwable {
PscSourceTestEnv.setup();
@@ -130,7 +130,6 @@ public void testTimestamp(boolean enableObjectReuse) throws Throwable {
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource source =
PscSource.builder()
-// .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings)
.setGroupId("testTimestampAndWatermark")
.setTopicUris(topicUri)
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
@@ -312,7 +311,6 @@ public void testPerPartitionWatermark() throws Throwable {
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource source =
PscSource.builder()
-// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopicUris(watermarkTopic)
.setGroupId("watermark-test")
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
@@ -354,7 +352,6 @@ public void testConsumingEmptyTopic() throws Throwable {
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource source =
PscSource.builder()
-// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopicUris(emptyTopic)
.setGroupId("empty-topic-test")
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
@@ -395,7 +392,6 @@ public void testConsumingTopicWithEmptyPartitions() throws Throwable {
PscSource source =
PscSource.builder()
-// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopicUris(topicWithEmptyPartitions)
.setGroupId("topic-with-empty-partition-test")
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java
index 70dde42..cdff2bc 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java
@@ -145,13 +145,13 @@ public void testStartFromTimestamp() throws Exception {
// --- offset committing ---
@Test
- public void testCommitOffsetsToKafka() throws Exception {
- runCommitOffsetsToKafka();
+ public void testCommitOffsetsToPsc() throws Exception {
+ runCommitOffsetsToPsc();
}
@Test
- public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
- runAutoOffsetRetrievalAndCommitToKafka();
+ public void testAutoOffsetRetrievalAndCommitToPsc() throws Exception {
+ runAutoOffsetRetrievalAndCommitToPsc();
}
@Test
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java
index f21a26a..b594898 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceTestUtils.java
@@ -30,9 +30,9 @@ public class PscSourceTestUtils {
/**
* Create {@link PscSourceReader} with a custom hook handling IDs of finished {@link
- * org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit}.
+ * com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit}.
*
- * @param pscSource Kafka source
+ * @param pscSource PSC source
* @param sourceReaderContext Context for SourceReader
* @param splitFinishedHook Hook for handling finished splits
* @param Type of emitting records
@@ -46,7 +46,7 @@ public static PscSourceReader createReaderWithFinishedSplitHook(
pscSource.createReader(sourceReaderContext, splitFinishedHook));
}
- /** Get configuration of KafkaSource. */
+ /** Get configuration of PscSource. */
public static Configuration getPscSourceConfiguration(PscSource> pscSource) {
return pscSource.getConfiguration();
}
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java
index 3adf60c..a92db44 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumStateSerializerTest.java
@@ -63,12 +63,12 @@ public void testBackwardCompatibility() throws IOException {
final Map> splitAssignments =
toSplitAssignments(topicPartitions);
- // Create bytes in the way of KafkaEnumStateSerializer version 0 doing serialization
+ // Create bytes in the way of PscEnumStateSerializer version 0 doing serialization
final byte[] bytes =
SerdeUtils.serializeSplitAssignments(
splitAssignments, new PscTopicUriPartitionSplitSerializer());
- // Deserialize above bytes with KafkaEnumStateSerializer version 1 to check backward
+ // Deserialize above bytes with PscEnumStateSerializer version 1 to check backward
// compatibility
final PscSourceEnumState pscSourceEnumState =
new PscSourceEnumStateSerializer().deserialize(0, bytes);
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java
index 0ffc72f..181e89d 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetricsTest.java
@@ -119,10 +119,10 @@ public void testNonTrackingTopicPartition() {
@Test
public void testFailedCommit() {
MetricListener metricListener = new MetricListener();
- final PscSourceReaderMetrics kafkaSourceReaderMetrics =
+ final PscSourceReaderMetrics pscSourceReaderMetrics =
new PscSourceReaderMetrics(
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
- kafkaSourceReaderMetrics.recordFailedCommit();
+ pscSourceReaderMetrics.recordFailedCommit();
final Optional commitsFailedCounter =
metricListener.getCounter(
PscSourceReaderMetrics.PSC_SOURCE_READER_METRIC_GROUP,
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java
index 747ded3..aa2b4b1 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java
@@ -321,7 +321,7 @@ void testPscSourceMetrics() throws Exception {
String.format(
"Failed to poll %d records until timeout", NUM_RECORDS_PER_SPLIT * 2));
Thread.sleep(100); // Wait for the metric to be updated
- // Metric "records-consumed-total" of KafkaConsumer should be NUM_RECORDS_PER_SPLIT
+ // Metric "records-consumed-total" of PscConsumer should be NUM_RECORDS_PER_SPLIT
assertThat(getPscConsumerMetric("records-consumed-total", metricListener))
.isEqualTo(NUM_RECORDS_PER_SPLIT * 2);
@@ -354,7 +354,7 @@ void testPscSourceMetrics() throws Exception {
"Offsets are not committed successfully. Dangling offsets: %s",
reader.getOffsetsToCommit()));
- // Metric "commit-total" of KafkaConsumer should be greater than 0
+ // Metric "commit-total" of PscConsumer should be greater than 0
// It's hard to know the exactly number of commit because of the retry
MatcherAssert.assertThat(
getPscConsumerMetric("commit-total", metricListener),
@@ -393,7 +393,7 @@ void testAssigningEmptySplits() throws Exception {
(PscSourceReader)
createReader(
Boundedness.BOUNDED,
- "KafkaSourceReaderTestGroup",
+ "PscSourceReaderTestGroup",
new TestingReaderContext(),
splitFinishedHook)) {
reader.addSplits(Arrays.asList(normalSplit, emptySplit));
@@ -448,7 +448,7 @@ void testAssigningEmptySplitOnly() throws Exception {
@Override
protected SourceReader createReader() throws Exception {
- return createReader(Boundedness.BOUNDED, "KafkaSourceReaderTestGroup");
+ return createReader(Boundedness.BOUNDED, "PscSourceReaderTestGroup");
}
@Override
@@ -523,9 +523,6 @@ private SourceReader createReader(
PscRecordDeserializationSchema.valueOnly(
IntegerDeserializer.class))
.setPartitions(Collections.singleton(new TopicUriPartition(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "AnyTopic", 0)))
-// .setProperty(
-// ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-// KafkaSourceTestEnv.brokerConnectionStrings)
.setProperty(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, "false")
.setProperties(props);
if (boundedness == Boundedness.BOUNDED) {
@@ -558,11 +555,11 @@ private void pollUntil(
}
private long getPscConsumerMetric(String name, MetricListener listener) {
- final Optional> kafkaConsumerGauge =
+ final Optional> pscConsumerGauge =
listener.getGauge(
PSC_SOURCE_READER_METRIC_GROUP, PSC_CONSUMER_METRIC_GROUP, name);
- assertThat(kafkaConsumerGauge).isPresent();
- return ((Double) kafkaConsumerGauge.get().getValue()).longValue();
+ assertThat(pscConsumerGauge).isPresent();
+ return ((Double) pscConsumerGauge.get().getValue()).longValue();
}
private long getCurrentOffsetMetric(TopicUriPartition tp, MetricListener listener) {
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java
index 673a9c9..75b5e35 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/deserializer/PscRecordDeserializationSchemaTest.java
@@ -62,7 +62,7 @@ public void setUp() {
}
@Test
- public void testKafkaDeserializationSchemaWrapper() throws IOException, DeserializerException {
+ public void testPscDeserializationSchemaWrapper() throws IOException, DeserializerException {
final PscConsumerMessage consumerRecord = getPscConsumerMessage();
PscRecordDeserializationSchema schema =
PscRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true));
@@ -80,7 +80,7 @@ public void testKafkaDeserializationSchemaWrapper() throws IOException, Deserial
}
@Test
- public void testKafkaValueDeserializationSchemaWrapper() throws IOException, DeserializerException {
+ public void testPscValueDeserializationSchemaWrapper() throws IOException, DeserializerException {
final PscConsumerMessage consumerRecord = getPscConsumerMessage();
PscRecordDeserializationSchema schema =
PscRecordDeserializationSchema.valueOnly(new JsonNodeDeserializationSchema());
@@ -96,7 +96,7 @@ public void testKafkaValueDeserializationSchemaWrapper() throws IOException, Des
}
@Test
- public void testKafkaValueDeserializerWrapper() throws Exception {
+ public void testPscValueDeserializerWrapper() throws Exception {
final String topic = "Topic";
byte[] value = new StringSerializer().serialize(topic, "world");
final PscConsumerMessage consumerRecord =
@@ -113,7 +113,7 @@ public void testKafkaValueDeserializerWrapper() throws Exception {
}
@Test
- public void testKafkaValueDeserializerWrapperWithoutConfigurable() throws Exception {
+ public void testPscValueDeserializerWrapperWithoutConfigurable() throws Exception {
final Map config = ImmutableMap.of("simpleKey", "simpleValue");
PscRecordDeserializationSchema schema =
PscRecordDeserializationSchema.valueOnly(SimpleStringSerializer.class, config);
@@ -124,7 +124,7 @@ public void testKafkaValueDeserializerWrapperWithoutConfigurable() throws Except
}
@Test
- public void testKafkaValueDeserializerWrapperWithConfigurable() throws Exception {
+ public void testPscValueDeserializerWrapperWithConfigurable() throws Exception {
final Map config = ImmutableMap.of("configKey", "configValue");
PscRecordDeserializationSchema schema =
PscRecordDeserializationSchema.valueOnly(
@@ -164,7 +164,7 @@ public void close() {
}
/**
- * Serializer based on Kafka's serialization stack. This is the special case that implements
+ * Serializer based on PSC's serialization stack. This is the special case that implements
* {@link PscPlugin}
*
* This class must be public to make it instantiable by the tests.
@@ -178,7 +178,7 @@ public void configure(PscConfiguration pscConfig, boolean isKey) {
}
/**
- * Serializer based on Kafka's serialization stack.
+ * Serializer based on PSC's serialization stack.
*
*
This class must be public to make it instantiable by the tests.
*/
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java
index a8aab90..6309872 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java
@@ -60,9 +60,9 @@
public class PscSourceExternalContext implements DataStreamSourceExternalContext {
private static final Logger LOG = LoggerFactory.getLogger(PscSourceExternalContext.class);
- private static final String TOPIC_NAME_PREFIX = "kafka-test-topic-";
+ private static final String TOPIC_NAME_PREFIX = "psc-test-topic-";
private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile(TOPIC_NAME_PREFIX + ".*");
- private static final String GROUP_ID_PREFIX = "kafka-source-external-context-";
+ private static final String GROUP_ID_PREFIX = "psc-source-external-context-";
private static final int NUM_RECORDS_UPPER_BOUND = 500;
private static final int NUM_RECORDS_LOWER_BOUND = 100;
@@ -177,7 +177,7 @@ public void close() throws Exception {
@Override
public String toString() {
- return "KafkaSource-" + splitMappingMode.toString();
+ return "PscSource-" + splitMappingMode.toString();
}
private String randomize(String prefix) {
@@ -232,8 +232,6 @@ private PscTopicUriPartitionDataWriter scaleOutTopic(String topicName) throws Ex
private Properties getPscProducerProperties(int producerId) {
Properties pscProducerProperties = new Properties();
-// kafkaProducerProperties.setProperty(
-// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
pscProducerProperties.setProperty(
PscConfiguration.PSC_PRODUCER_CLIENT_ID,
String.join(
@@ -249,7 +247,7 @@ private Properties getPscProducerProperties(int producerId) {
return pscProducerProperties;
}
- /** Mode of mapping split to Kafka components. */
+ /** Mode of mapping split to PSC components. */
public enum SplitMappingMode {
/** Use a single-partitioned topic as a split. */
TOPIC,
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java
index efd7247..3215d63 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContextFactory.java
@@ -19,7 +19,6 @@
package com.pinterest.flink.connector.psc.testutils;
import org.apache.flink.connector.testframe.external.ExternalContextFactory;
-
import org.testcontainers.containers.KafkaContainer;
import java.net.URL;
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java
index c2512be..be6be07 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java
@@ -35,11 +35,7 @@
import com.pinterest.psc.serde.StringDeserializer;
import com.pinterest.psc.serde.StringSerializer;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.RecordsToDelete;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
@@ -53,7 +49,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
-import java.util.stream.Collectors;
import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.putDiscoveryProperties;
import static org.junit.Assert.assertEquals;
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java
index 9b16f41..474e308 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscTopicUriPartitionDataWriter.java
@@ -31,7 +31,7 @@
import java.util.List;
import java.util.Properties;
-/** Source split data writer for writing test data into Kafka topic partitions. */
+/** Source split data writer for writing test data into PSC topicUriPartitions. */
public class PscTopicUriPartitionDataWriter implements ExternalSystemSplitDataWriter {
private final PscProducer pscProducer;
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java
index 8a23555..464b2c7 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java
@@ -225,7 +225,7 @@ public void runFailOnNoBrokerTest() throws Exception {
* Ensures that the committed offsets to Kafka are the offsets of "the next
* record to process".
*/
- public void runCommitOffsetsToKafka() throws Exception {
+ public void runCommitOffsetsToPsc() throws Exception {
// 3 partitions with 50 records each (0-49, so the expected commit offset of
// each partition should be 50)
final int parallelism = 3;
@@ -312,7 +312,7 @@ public void run() {
*
* See FLINK-3440 as well
*/
- public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+ public void runAutoOffsetRetrievalAndCommitToPsc() throws Exception {
// 3 partitions with 50 records each (0-49, so the expected commit offset of
// each partition should be 50)
final int parallelism = 3;
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java
index a3c3125..97765fe 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscITCase.java
@@ -171,12 +171,12 @@ public void testStartFromTimestamp() throws Exception {
@Test(timeout = 60000)
public void testCommitOffsetsToKafka() throws Exception {
- runCommitOffsetsToKafka();
+ runCommitOffsetsToPsc();
}
@Test(timeout = 60000)
public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
- runAutoOffsetRetrievalAndCommitToKafka();
+ runAutoOffsetRetrievalAndCommitToPsc();
}
@Test(timeout = 60000)
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTest.java
deleted file mode 100644
index 1f116ab..0000000
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.pinterest.flink.streaming.connectors.psc;
-//
-//import com.pinterest.flink.streaming.connectors.psc.config.StartupMode;
-//import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition;
-//import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner;
-//import com.pinterest.flink.table.descriptors.psc.PscValidator;
-//import org.apache.flink.api.common.serialization.DeserializationSchema;
-//import org.apache.flink.api.common.serialization.SerializationSchema;
-//import org.apache.flink.table.api.TableSchema;
-//import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-//import org.apache.flink.types.Row;
-//
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Optional;
-//import java.util.Properties;
-//
-///**
-// * Test for {@link PscTableSource} and {@link PscTableSink} created
-// * by {@link PscTableSourceSinkFactory}.
-// */
-//public class PscTableSourceSinkFactoryTest extends PscTableSourceSinkFactoryTestBase {
-//
-// @Override
-// protected String getPscVersion() {
-// return PscValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
-// }
-//
-// @Override
-// protected Class> getExpectedFlinkPscConsumer() {
-// return (Class) FlinkPscConsumer.class;
-// }
-//
-// @Override
-// protected Class> getExpectedFlinkPscProducer() {
-// return FlinkPscProducer.class;
-// }
-//
-// @Override
-// protected PscTableSourceBase getExpectedPscTableSource(
-// TableSchema schema,
-// Optional proctimeAttribute,
-// List rowtimeAttributeDescriptors,
-// Map fieldMapping,
-// String topicUri,
-// Properties properties,
-// DeserializationSchema deserializationSchema,
-// StartupMode startupMode,
-// Map specificStartupOffsets,
-// long startupTimestamp) {
-//
-// return new PscTableSource(
-// schema,
-// proctimeAttribute,
-// rowtimeAttributeDescriptors,
-// Optional.of(fieldMapping),
-// topicUri,
-// properties,
-// deserializationSchema,
-// startupMode,
-// specificStartupOffsets,
-// startupTimestamp);
-// }
-//
-// protected PscTableSinkBase getExpectedPscTableSink(
-// TableSchema schema,
-// String topicUri,
-// Properties properties,
-// Optional> partitioner,
-// SerializationSchema serializationSchema) {
-//
-// return new PscTableSink(
-// schema,
-// topicUri,
-// properties,
-// partitioner,
-// serializationSchema);
-// }
-//}
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTestBase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTestBase.java
deleted file mode 100644
index 38ae40a..0000000
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTableSourceSinkFactoryTestBase.java
+++ /dev/null
@@ -1,477 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.pinterest.flink.streaming.connectors.psc;
-//
-//import com.pinterest.flink.streaming.connectors.psc.config.StartupMode;
-//import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition;
-//import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkFixedPartitioner;
-//import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner;
-//import com.pinterest.flink.table.descriptors.psc.Psc;
-//import com.pinterest.flink.table.descriptors.psc.PscValidator;
-//import com.pinterest.psc.common.TopicUri;
-//import com.pinterest.psc.config.PscConfiguration;
-//import org.apache.flink.api.common.JobExecutionResult;
-//import org.apache.flink.api.common.serialization.DeserializationSchema;
-//import org.apache.flink.api.common.serialization.SerializationSchema;
-//import org.apache.flink.api.common.typeinfo.TypeInformation;
-//import org.apache.flink.api.dag.Transformation;
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.datastream.DataStreamSink;
-//import org.apache.flink.streaming.api.datastream.DataStreamSource;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-//import org.apache.flink.streaming.api.functions.source.SourceFunction;
-//import org.apache.flink.streaming.api.graph.StreamGraph;
-//import org.apache.flink.table.api.DataTypes;
-//import org.apache.flink.table.api.TableSchema;
-//import org.apache.flink.table.descriptors.Rowtime;
-//import org.apache.flink.table.descriptors.Schema;
-//import org.apache.flink.table.descriptors.StreamTableDescriptorValidator;
-//import org.apache.flink.table.descriptors.TestTableDescriptor;
-//import org.apache.flink.table.factories.StreamTableSinkFactory;
-//import org.apache.flink.table.factories.StreamTableSourceFactory;
-//import org.apache.flink.table.factories.TableFactoryService;
-//import org.apache.flink.table.factories.utils.TestDeserializationSchema;
-//import org.apache.flink.table.factories.utils.TestSerializationSchema;
-//import org.apache.flink.table.factories.utils.TestTableFormat;
-//import org.apache.flink.table.sinks.TableSink;
-//import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-//import org.apache.flink.table.sources.TableSource;
-//import org.apache.flink.table.sources.TableSourceValidation;
-//import org.apache.flink.table.sources.tsextractors.ExistingField;
-//import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
-//import org.apache.flink.table.types.DataType;
-//import org.apache.flink.types.Row;
-//import org.apache.flink.util.TestLogger;
-//import org.junit.Ignore;
-//import org.junit.Test;
-//
-//import java.util.Collection;
-//import java.util.Collections;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Optional;
-//import java.util.Properties;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.junit.Assert.assertTrue;
-//
-///**
-// * Abstract test base for {@link PscTableSourceSinkFactoryBase}.
-// */
-//public abstract class PscTableSourceSinkFactoryTestBase extends TestLogger {
-//
-// private static final String TOPIC_URI =
-// TopicUri.DEFAULT_PROTOCOL + ":" + TopicUri.SEPARATOR + "rn:kafka:env:cloud_region::cluster:myTopic";
-// private static final int PARTITION_0 = 0;
-// private static final long OFFSET_0 = 100L;
-// private static final int PARTITION_1 = 1;
-// private static final long OFFSET_1 = 123L;
-// private static final String FRUIT_NAME = "fruit-name";
-// private static final String NAME = "name";
-// private static final String COUNT = "count";
-// private static final String TIME = "time";
-// private static final String EVENT_TIME = "event-time";
-// private static final String PROC_TIME = "proc-time";
-// private static final String WATERMARK_EXPRESSION = EVENT_TIME + " - INTERVAL '5' SECOND";
-// private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3);
-// private static final String COMPUTED_COLUMN_NAME = "computed-column";
-// private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0";
-// private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3);
-//
-// private static final Properties PSC_CONFIGURATION = new Properties();
-//
-// static {
-// PSC_CONFIGURATION.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, "test");
-// }
-//
-// private static final Map OFFSETS = new HashMap<>();
-//
-// static {
-// OFFSETS.put(PARTITION_0, OFFSET_0);
-// OFFSETS.put(PARTITION_1, OFFSET_1);
-// }
-//
-// @Test
-// @SuppressWarnings("unchecked")
-// @Ignore("Table API not updated to 1.15 yet")
-// public void testTableSource() {
-// // prepare parameters for Kafka table source
-// final TableSchema schema = TableSchema.builder()
-// .field(FRUIT_NAME, DataTypes.STRING())
-// .field(COUNT, DataTypes.DECIMAL(38, 18))
-// .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
-// .field(PROC_TIME, DataTypes.TIMESTAMP(3))
-// .build();
-//
-// final List rowtimeAttributeDescriptors = Collections.singletonList(
-// new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps()));
-//
-// final Map fieldMapping = new HashMap<>();
-// fieldMapping.put(FRUIT_NAME, NAME);
-// fieldMapping.put(NAME, NAME);
-// fieldMapping.put(COUNT, COUNT);
-// fieldMapping.put(TIME, TIME);
-//
-// final Map specificOffsets = new HashMap<>();
-// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0);
-// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_1), OFFSET_1);
-//
-// final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema(
-// TableSchema.builder()
-// .field(NAME, DataTypes.STRING())
-// .field(COUNT, DataTypes.DECIMAL(38, 18))
-// .field(TIME, DataTypes.TIMESTAMP(3))
-// .build().toRowType()
-// );
-//
-// final PscTableSourceBase expected = getExpectedPscTableSource(
-// schema,
-// Optional.of(PROC_TIME),
-// rowtimeAttributeDescriptors,
-// fieldMapping,
-// TOPIC_URI,
-// PSC_CONFIGURATION,
-// deserializationSchema,
-// StartupMode.SPECIFIC_OFFSETS,
-// specificOffsets,
-// 0L);
-//
-// TableSourceValidation.validateTableSource(expected, schema);
-//
-// // construct table source using descriptors and table source factory
-// final Map propertiesMap = new HashMap<>();
-// propertiesMap.putAll(createPscSourceProperties());
-// propertiesMap.put("schema.watermark.0.rowtime", EVENT_TIME);
-// propertiesMap.put("schema.watermark.0.strategy.expr", WATERMARK_EXPRESSION);
-// propertiesMap.put("schema.watermark.0.strategy.data-type", WATERMARK_DATATYPE.toString());
-// propertiesMap.put("schema.4.name", COMPUTED_COLUMN_NAME);
-// propertiesMap.put("schema.4.data-type", COMPUTED_COLUMN_DATATYPE.toString());
-// propertiesMap.put("schema.4.expr", COMPUTED_COLUMN_EXPRESSION);
-//
-// final TableSource> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
-// .createStreamTableSource(propertiesMap);
-//
-// assertEquals(expected, actualSource);
-//
-// // test PSC consumer
-// final PscTableSourceBase actualPscSource = (PscTableSourceBase) actualSource;
-// final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
-// actualPscSource.getDataStream(mock);
-// assertTrue(getExpectedFlinkPscConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
-// // Test commitOnCheckpoints flag should be true when set consumer group.
-// assertTrue(((FlinkPscConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
-// }
-//
-// @Test
-// @Ignore("Table API not updated to 1.15 yet")
-// public void testTableSourceCommitOnCheckpointsDisabled() {
-// Map propertiesMap = new HashMap<>();
-// createPscSourceProperties().forEach((k, v) -> {
-// if (!k.equals(PscValidator.CONNECTOR_PROPERTIES_GROUP_ID)) {
-// propertiesMap.put(k, v);
-// }
-// });
-// final TableSource> tableSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
-// .createStreamTableSource(propertiesMap);
-// final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
-// // Test commitOnCheckpoints flag should be false when do not set consumer group.
-// ((PscTableSourceBase) tableSource).getDataStream(mock);
-// assertTrue(mock.sourceFunction instanceof FlinkPscConsumerBase);
-// assertFalse(((FlinkPscConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
-// }
-//
-// @Test
-// @SuppressWarnings("unchecked")
-// @Ignore("Table API not updated to 1.15 yet")
-// public void testTableSourceWithLegacyProperties() {
-// // prepare parameters for Kafka table source
-// final TableSchema schema = TableSchema.builder()
-// .field(FRUIT_NAME, DataTypes.STRING())
-// .field(COUNT, DataTypes.DECIMAL(38, 18))
-// .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
-// .field(PROC_TIME, DataTypes.TIMESTAMP(3))
-// .build();
-//
-// final List rowtimeAttributeDescriptors = Collections.singletonList(
-// new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps()));
-//
-// final Map fieldMapping = new HashMap<>();
-// fieldMapping.put(FRUIT_NAME, NAME);
-// fieldMapping.put(NAME, NAME);
-// fieldMapping.put(COUNT, COUNT);
-// fieldMapping.put(TIME, TIME);
-//
-// final Map specificOffsets = new HashMap<>();
-// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0);
-// specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_1), OFFSET_1);
-//
-// final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema(
-// TableSchema.builder()
-// .field(NAME, DataTypes.STRING())
-// .field(COUNT, DataTypes.DECIMAL(38, 18))
-// .field(TIME, DataTypes.TIMESTAMP(3))
-// .build().toRowType()
-// );
-//
-// final PscTableSourceBase expected = getExpectedPscTableSource(
-// schema,
-// Optional.of(PROC_TIME),
-// rowtimeAttributeDescriptors,
-// fieldMapping,
-// TOPIC_URI,
-// PSC_CONFIGURATION,
-// deserializationSchema,
-// StartupMode.SPECIFIC_OFFSETS,
-// specificOffsets,
-// 0L);
-//
-// TableSourceValidation.validateTableSource(expected, schema);
-//
-// // construct table source using descriptors and table source factory
-// final Map legacyPropertiesMap = new HashMap<>();
-// legacyPropertiesMap.putAll(createPscSourceProperties());
-//
-// // use legacy properties
-// legacyPropertiesMap.remove(PscValidator.CONNECTOR_SPECIFIC_OFFSETS);
-// legacyPropertiesMap.remove(PscValidator.CONNECTOR_PROPERTIES_GROUP_ID);
-//
-// // keep compatible with a specified update-mode
-// legacyPropertiesMap.put(StreamTableDescriptorValidator.UPDATE_MODE, "append");
-//
-// // legacy properties for specific-offsets and properties
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.partition", "0");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.offset", "100");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.partition", "1");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.offset", "123");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.key", PscConfiguration.PSC_CONSUMER_GROUP_ID);
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.value", "test");
-//
-// final TableSource> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
-// .createStreamTableSource(legacyPropertiesMap);
-//
-// assertEquals(expected, actualSource);
-//
-// // test Kafka consumer
-// final PscTableSourceBase actualKafkaSource = (PscTableSourceBase) actualSource;
-// final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
-// actualKafkaSource.getDataStream(mock);
-// assertTrue(getExpectedFlinkPscConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
-// }
-//
-// protected Map createPscSourceProperties() {
-// return new TestTableDescriptor(
-// new Psc()
-// .version(getPscVersion())
-// .topicUri(TOPIC_URI)
-// .properties(PSC_CONFIGURATION)
-// .sinkPartitionerRoundRobin() // test if accepted although not needed
-// .startFromSpecificOffsets(OFFSETS))
-// .withFormat(new TestTableFormat())
-// .withSchema(
-// new Schema()
-// .field(FRUIT_NAME, DataTypes.STRING()).from(NAME)
-// .field(COUNT, DataTypes.DECIMAL(38, 18)) // no from so it must match with the input
-// .field(EVENT_TIME, DataTypes.TIMESTAMP(3)).rowtime(
-// new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
-// .field(PROC_TIME, DataTypes.TIMESTAMP(3)).proctime())
-// .toProperties();
-// }
-//
-//
-// /**
-// * This test can be unified with the corresponding source test once we have fixed FLINK-9870.
-// */
-// @Test
-// @Ignore("Table API not updated to 1.15 yet")
-// public void testTableSink() {
-// // prepare parameters for Kafka table sink
-// final TableSchema schema = TableSchema.builder()
-// .field(FRUIT_NAME, DataTypes.STRING())
-// .field(COUNT, DataTypes.DECIMAL(10, 4))
-// .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
-// .build();
-//
-// final PscTableSinkBase expected = getExpectedPscTableSink(
-// schema,
-// TOPIC_URI,
-// PSC_CONFIGURATION,
-// Optional.of(new FlinkFixedPartitioner<>()),
-// new TestSerializationSchema(schema.toRowType()));
-//
-// // construct table sink using descriptors and table sink factory
-// final Map propertiesMap = createPscSinkProperties();
-// final TableSink> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
-// .createStreamTableSink(propertiesMap);
-//
-// assertEquals(expected, actualSink);
-//
-// // test Kafka producer
-// final PscTableSinkBase actualPscSink = (PscTableSinkBase) actualSink;
-// final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
-// actualPscSink.consumeDataStream(streamMock);
-// assertTrue(getExpectedFlinkPscProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
-// }
-//
-// @Test
-// @Ignore("Table API not updated to 1.15 yet")
-// public void testTableSinkWithLegacyProperties() {
-// // prepare parameters for PSC table sink
-// final TableSchema schema = TableSchema.builder()
-// .field(FRUIT_NAME, DataTypes.STRING())
-// .field(COUNT, DataTypes.DECIMAL(10, 4))
-// .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
-// .build();
-//
-// final PscTableSinkBase expected = getExpectedPscTableSink(
-// schema,
-// TOPIC_URI,
-// PSC_CONFIGURATION,
-// Optional.of(new FlinkFixedPartitioner<>()),
-// new TestSerializationSchema(schema.toRowType()));
-//
-// // construct table sink using descriptors and table sink factory
-// final Map legacyPropertiesMap = new HashMap<>();
-// legacyPropertiesMap.putAll(createPscSinkProperties());
-//
-// // use legacy properties
-// legacyPropertiesMap.remove(PscValidator.CONNECTOR_SPECIFIC_OFFSETS);
-// legacyPropertiesMap.remove(PscValidator.CONNECTOR_PROPERTIES_GROUP_ID);
-//
-// // keep compatible with a specified update-mode
-// legacyPropertiesMap.put(StreamTableDescriptorValidator.UPDATE_MODE, "append");
-//
-// // legacy properties for specific-offsets and properties
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.partition", "0");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".0.offset", "100");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.partition", "1");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_SPECIFIC_OFFSETS + ".1.offset", "123");
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.key", PscConfiguration.PSC_CONSUMER_GROUP_ID);
-// legacyPropertiesMap.put(PscValidator.CONNECTOR_PROPERTIES + ".0.value", "test");
-//
-// final TableSink> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
-// .createStreamTableSink(legacyPropertiesMap);
-//
-// assertEquals(expected, actualSink);
-//
-// // test PSC producer
-// final PscTableSinkBase actualKafkaSink = (PscTableSinkBase) actualSink;
-// final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
-// actualKafkaSink.consumeDataStream(streamMock);
-// assertTrue(getExpectedFlinkPscProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
-// }
-//
-// protected Map createPscSinkProperties() {
-// return new TestTableDescriptor(
-// new Psc()
-// .version(getPscVersion())
-// .topicUri(TOPIC_URI)
-// .properties(PSC_CONFIGURATION)
-// .sinkPartitionerFixed()
-// .startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
-// .withFormat(new TestTableFormat())
-// .withSchema(
-// new Schema()
-// .field(FRUIT_NAME, DataTypes.STRING())
-// .field(COUNT, DataTypes.DECIMAL(10, 4))
-// .field(EVENT_TIME, DataTypes.TIMESTAMP(3)))
-// .inAppendMode()
-// .toProperties();
-// }
-//
-// private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
-//
-// public SourceFunction> sourceFunction;
-//
-// @Override
-// public DataStreamSource addSource(SourceFunction sourceFunction) {
-// this.sourceFunction = sourceFunction;
-// return super.addSource(sourceFunction);
-// }
-//
-// @Override
-// public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
-// throw new UnsupportedOperationException();
-// }
-// }
-//
-// private static class DataStreamMock extends DataStream {
-//
-// public SinkFunction> sinkFunction;
-//
-// public DataStreamMock(StreamExecutionEnvironment environment, TypeInformation outType) {
-// super(environment, new TransformationMock("name", outType, 1));
-// }
-//
-// @Override
-// public DataStreamSink addSink(SinkFunction sinkFunction) {
-// this.sinkFunction = sinkFunction;
-// return super.addSink(sinkFunction);
-// }
-// }
-//
-// private static class TransformationMock extends Transformation {
-//
-// public TransformationMock(String name, TypeInformation outputType, int parallelism) {
-// super(name, outputType, parallelism);
-// }
-//
-// @Override
-// public List> getTransitivePredecessors() {
-// return null;
-// }
-//
-// @Override
-// public List> getInputs() {
-// return null;
-// }
-//
-// }
-//
-// // --------------------------------------------------------------------------------------------
-// // For version-specific tests
-// // --------------------------------------------------------------------------------------------
-//
-// protected abstract String getPscVersion();
-//
-// protected abstract Class> getExpectedFlinkPscConsumer();
-//
-// protected abstract Class> getExpectedFlinkPscProducer();
-//
-// protected abstract PscTableSourceBase getExpectedPscTableSource(
-// TableSchema schema,
-// Optional proctimeAttribute,
-// List rowtimeAttributeDescriptors,
-// Map fieldMapping,
-// String topicUri,
-// Properties properties,
-// DeserializationSchema deserializationSchema,
-// StartupMode startupMode,
-// Map specificStartupOffsets,
-// long startupTimestampMillis);
-//
-// protected abstract PscTableSinkBase getExpectedPscTableSink(
-// TableSchema schema,
-// String topicUri,
-// Properties properties,
-// Optional> partitioner,
-// SerializationSchema serializationSchema);
-//}
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java
index 7f7f6f2..bd4e40c 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java
@@ -43,7 +43,7 @@
import static com.pinterest.flink.streaming.connectors.psc.table.PscTableTestUtils.readLines;
import static com.pinterest.flink.streaming.connectors.psc.table.PscTableTestUtils.waitingExpectedResults;
-/** IT cases for Kafka with changelog format for Table API & SQL. */
+/** IT cases for Psc with changelog format for Table API & SQL. */
public class PscChangelogTableITCase extends PscTableTestBase {
@Before
@@ -54,7 +54,7 @@ public void before() {
}
@Test
- public void testKafkaDebeziumChangelogSource() throws Exception {
+ public void testPscDebeziumChangelogSource() throws Exception {
final String topic = "changelog_topic";
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
createTestTopic(topic, 1, 1);
@@ -67,15 +67,15 @@ public void testKafkaDebeziumChangelogSource() throws Exception {
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
- // ---------- Write the Debezium json into Kafka -------------------
+ // ---------- Write the Debezium json into Psc -------------------
List lines = readLines("debezium-data-schema-exclude.txt");
try {
- writeRecordsToKafka(topicUri, lines);
+ writeRecordsToPsc(topicUri, lines);
} catch (Exception e) {
- throw new Exception("Failed to write debezium data to Kafka.", e);
+ throw new Exception("Failed to write debezium data to PSC.", e);
}
- // ---------- Produce an event time stream into Kafka -------------------
+ // ---------- Produce an event time stream into PSC -------------------
String bootstraps = getBootstrapServers();
String sourceDDL =
String.format(
@@ -203,15 +203,15 @@ public void testPscCanalChangelogSource() throws Exception {
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
- // ---------- Write the Canal json into Kafka -------------------
+ // ---------- Write the Canal json into PSC -------------------
List lines = readLines("canal-data.txt");
try {
- writeRecordsToKafka(topicUri, lines);
+ writeRecordsToPsc(topicUri, lines);
} catch (Exception e) {
throw new Exception("Failed to write canal data to PSC.", e);
}
- // ---------- Produce an event time stream into Kafka -------------------
+ // ---------- Produce an event time stream into PSC -------------------
String bootstraps = getBootstrapServers();
String sourceDDL =
String.format(
@@ -336,7 +336,7 @@ public void testPscCanalChangelogSource() throws Exception {
}
@Test
- public void testKafkaMaxwellChangelogSource() throws Exception {
+ public void testPscMaxwellChangelogSource() throws Exception {
final String topic = "changelog_maxwell";
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
createTestTopic(topic, 1, 1);
@@ -351,15 +351,15 @@ public void testKafkaMaxwellChangelogSource() throws Exception {
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
- // ---------- Write the Maxwell json into Kafka -------------------
+ // ---------- Write the Maxwell json into PSC -------------------
List lines = readLines("maxwell-data.txt");
try {
- writeRecordsToKafka(topicUri, lines);
+ writeRecordsToPsc(topicUri, lines);
} catch (Exception e) {
throw new Exception("Failed to write maxwell data to PSC.", e);
}
- // ---------- Produce an event time stream into Kafka -------------------
+ // ---------- Produce an event time stream into PSC -------------------
String bootstraps = getBootstrapServers();
String sourceDDL =
String.format(
@@ -478,7 +478,7 @@ public void testKafkaMaxwellChangelogSource() throws Exception {
deleteTestTopic(topic);
}
- private void writeRecordsToKafka(String topic, List lines) throws Exception {
+ private void writeRecordsToPsc(String topic, List lines) throws Exception {
DataStreamSource stream = env.fromCollection(lines);
SerializationSchema serSchema = new SimpleStringSchema();
FlinkPscPartitioner partitioner = new FlinkFixedPartitioner<>();
@@ -488,9 +488,6 @@ private void writeRecordsToKafka(String topic, List lines) throws Except
producerProperties.setProperty("retries", "0");
stream.sinkTo(
PscSink.builder()
-// .setBootstrapServers(
-// producerProperties.getProperty(
-// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString(topic)
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java
index e42c052..e7f61e7 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java
@@ -194,7 +194,7 @@ public class PscDynamicTableFactoryTest {
@Test
public void testTableSource() {
final DynamicTableSource actualSource = createTableSource(SCHEMA, getBasicSourceOptions());
- final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource;
+ final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource;
final Map specificOffsets = new HashMap<>();
specificOffsets.put(new PscTopicUriPartition(TOPIC_URI, PARTITION_0), OFFSET_0);
@@ -204,7 +204,7 @@ public void testTableSource() {
new DecodingFormatMock(",", true);
// Test scan source equals
- final PscDynamicSource expectedKafkaSource =
+ final PscDynamicSource expectedPscSource =
createExpectedScanSource(
SCHEMA_DATA_TYPE,
null,
@@ -218,10 +218,10 @@ public void testTableSource() {
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
- assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+ assertThat(actualPscSource).isEqualTo(expectedPscSource);
ScanTableSource.ScanRuntimeProvider provider =
- actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertPscSource(provider);
}
@@ -246,7 +246,7 @@ public void testTableSourceWithPattern() {
new DecodingFormatMock(",", true);
// Test scan source equals
- final PscDynamicSource expectedKafkaSource =
+ final PscDynamicSource expectedPscSource =
createExpectedScanSource(
SCHEMA_DATA_TYPE,
null,
@@ -260,11 +260,11 @@ public void testTableSourceWithPattern() {
StartupMode.EARLIEST,
specificOffsets,
0);
- final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource;
- assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+ final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource;
+ assertThat(actualPscSource).isEqualTo(expectedPscSource);
ScanTableSource.ScanRuntimeProvider provider =
- actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertPscSource(provider);
}
@@ -272,9 +272,9 @@ public void testTableSourceWithPattern() {
@Test
public void testTableSourceWithKeyValue() {
final DynamicTableSource actualSource = createTableSource(SCHEMA, getKeyValueOptions());
- final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource;
+ final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource;
// initialize stateful testing formats
- actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
final DecodingFormatMock keyDecodingFormat = new DecodingFormatMock("#", false);
keyDecodingFormat.producedDataType =
@@ -287,7 +287,7 @@ public void testTableSourceWithKeyValue() {
DataTypes.FIELD(TIME, DataTypes.TIMESTAMP(3)))
.notNull();
- final PscDynamicSource expectedKafkaSource =
+ final PscDynamicSource expectedPscSource =
createExpectedScanSource(
SCHEMA_DATA_TYPE,
keyDecodingFormat,
@@ -302,7 +302,7 @@ public void testTableSourceWithKeyValue() {
Collections.emptyMap(),
0);
- assertThat(actualSource).isEqualTo(expectedKafkaSource);
+ assertThat(actualSource).isEqualTo(expectedPscSource);
}
@Test
@@ -311,12 +311,12 @@ public void testTableSourceWithKeyValueAndMetadata() {
options.put("value.test-format.readable-metadata", "metadata_1:INT, metadata_2:STRING");
final DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, options);
- final PscDynamicSource actualKafkaSource = (PscDynamicSource) actualSource;
+ final PscDynamicSource actualPscSource = (PscDynamicSource) actualSource;
// initialize stateful testing formats
- actualKafkaSource.applyReadableMetadata(
+ actualPscSource.applyReadableMetadata(
Arrays.asList("timestamp", "value.metadata_2"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
- actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ actualPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
final DecodingFormatMock expectedKeyFormat =
new DecodingFormatMock(
@@ -464,10 +464,10 @@ public void testTableSink() {
"psc-sink");
assertThat(actualSink).isEqualTo(expectedSink);
- // Test kafka producer.
- final PscDynamicSink actualKafkaSink = (PscDynamicSink) actualSink;
+ // Test PSC producer.
+ final PscDynamicSink actualPscSink = (PscDynamicSink) actualSink;
DynamicTableSink.SinkRuntimeProvider provider =
- actualKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+ actualPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
assertThat(provider).isInstanceOf(SinkV2Provider.class);
final SinkV2Provider sinkProvider = (SinkV2Provider) provider;
final Sink sinkFunction = sinkProvider.createSink();
@@ -516,9 +516,9 @@ public void testTableSinkWithKeyValue() {
options.put("sink.transactional-id-prefix", "psc-sink");
});
final DynamicTableSink actualSink = createTableSink(SCHEMA, modifiedOptions);
- final PscDynamicSink actualKafkaSink = (PscDynamicSink) actualSink;
+ final PscDynamicSink actualPscSink = (PscDynamicSink) actualSink;
// initialize stateful testing formats
- actualKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+ actualPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
final EncodingFormatMock keyEncodingFormat = new EncodingFormatMock("#");
keyEncodingFormat.consumedDataType =
@@ -921,7 +921,7 @@ public void testPrimaryKeyValidation() {
.isThrownBy(() -> createTableSink(pkSchema, getBasicSinkOptions()))
.havingRootCause()
.withMessage(
- "The Kafka table 'default.default.t1' with 'test-format' format"
+ "The PSC table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.");
@@ -929,7 +929,7 @@ public void testPrimaryKeyValidation() {
.isThrownBy(() -> createTableSink(pkSchema, getKeyValueOptions()))
.havingRootCause()
.withMessage(
- "The Kafka table 'default.default.t1' with 'test-format' format"
+ "The PSC table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.");
@@ -950,7 +950,7 @@ public void testPrimaryKeyValidation() {
.isThrownBy(() -> createTableSource(pkSchema, getBasicSourceOptions()))
.havingRootCause()
.withMessage(
- "The Kafka table 'default.default.t1' with 'test-format' format"
+ "The PSC table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.");
}
@@ -1033,12 +1033,11 @@ private static Map getModifiedOptions(
private static Map getBasicSourceOptions() {
Map tableOptions = new HashMap<>();
- // Kafka specific options.
+ // PSC specific options.
tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER);
tableOptions.put("topic-uri", TOPIC_URI);
tableOptions.put("properties.psc.consumer.group.id", "dummy");
tableOptions.put("properties.psc.cluster.uri", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
-// tableOptions.put("properties.bootstrap.servers", "dummy");
tableOptions.put("scan.startup.mode", "specific-offsets");
tableOptions.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS);
tableOptions.put("scan.topic-partition-discovery.interval", DISCOVERY_INTERVAL);
@@ -1058,7 +1057,7 @@ private static Map getBasicSourceOptions() {
private static Map getBasicSinkOptions() {
Map tableOptions = new HashMap<>();
- // Kafka specific options.
+ // PSC specific options.
tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER);
tableOptions.put("topic-uri", TOPIC_URI);
tableOptions.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy");
@@ -1078,7 +1077,7 @@ private static Map getBasicSinkOptions() {
private static Map getKeyValueOptions() {
Map tableOptions = new HashMap<>();
- // Kafka specific options.
+ // PSC specific options.
tableOptions.put("connector", PscDynamicTableFactory.IDENTIFIER);
tableOptions.put("topic-uri", TOPIC_URI);
tableOptions.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy");
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java
index 015ec84..32339a8 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java
@@ -97,7 +97,7 @@ public void testPscSourceSink() throws Exception {
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
createTestTopic(topic, 1, 1);
- // ---------- Produce an event time stream into Kafka -------------------
+ // ---------- Produce an event time stream into PSC -------------------
String groupId = getStandardProps().getProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID);
String bootstraps = getBootstrapServers();
@@ -218,7 +218,7 @@ public void testPscTableWithMultipleTopics() throws Exception {
"%s_%s_%s", currency, format, UUID.randomUUID()))
.collect(Collectors.toList());
List topicUris = topics.stream().map(t -> PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + t).collect(Collectors.toList());
- // Because kafka connector currently doesn't support write data into multiple topic
+ // Because psc connector currently doesn't support write data into multiple topic
// together,
// we have to create multiple sink tables.
IntStream.range(0, 4)
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java
index b5e8629..01fc25a 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestBase.java
@@ -59,7 +59,7 @@
import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.injectDiscoveryConfigs;
-/** Base class for Kafka Table IT Cases. */
+/** Base class for PSC Table IT Cases. */
public abstract class PscTableTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PscTableTestBase.class);
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java
index 65cf430..d754c34 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableTestUtils.java
@@ -45,7 +45,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-/** Utils for kafka table tests. */
+/** Utils for psc table tests. */
public class PscTableTestUtils {
public static List collectRows(Table table, int expectedSize) throws Exception {
final TableResult result = table.execute();
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java
index da6fb29..44d2a52 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java
@@ -167,10 +167,10 @@ public void testTableSource() {
UPSERT_PSC_SOURCE_PROPERTIES);
assertEquals(actualSource, expectedSource);
- final PscDynamicSource actualUpsertKafkaSource = (PscDynamicSource) actualSource;
+ final PscDynamicSource actualUpsertPscSource = (PscDynamicSource) actualSource;
ScanTableSource.ScanRuntimeProvider provider =
- actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
- assertKafkaSource(provider);
+ actualUpsertPscSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertPscSource(provider);
}
@Test
@@ -193,12 +193,12 @@ public void testTableSink() {
null);
// Test sink format.
- final PscDynamicSink actualUpsertKafkaSink = (PscDynamicSink) actualSink;
+ final PscDynamicSink actualUpsertPscSink = (PscDynamicSink) actualSink;
assertEquals(expectedSink, actualSink);
- // Test kafka producer.
+ // Test PSC producer.
DynamicTableSink.SinkRuntimeProvider provider =
- actualUpsertKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+ actualUpsertPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
assertThat(provider, instanceOf(SinkV2Provider.class));
final SinkV2Provider sinkFunctionProvider = (SinkV2Provider) provider;
final Sink sink = sinkFunctionProvider.createSink();
@@ -234,12 +234,12 @@ public void testBufferedTableSink() {
null);
// Test sink format.
- final PscDynamicSink actualUpsertKafkaSink = (PscDynamicSink) actualSink;
+ final PscDynamicSink actualUpsertPscSink = (PscDynamicSink) actualSink;
assertEquals(expectedSink, actualSink);
- // Test kafka producer.
+ // Test PSC producer.
DynamicTableSink.SinkRuntimeProvider provider =
- actualUpsertKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+ actualUpsertPscSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
assertThat(provider, instanceOf(DataStreamSinkProvider.class));
final DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider) provider;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -353,7 +353,7 @@ private void verifyEncoderSubject(
String expectedValueSubject,
String expectedKeySubject) {
Map options = new HashMap<>();
- // Kafka specific options.
+ // PSC specific options.
options.put("connector", UpsertPscDynamicTableFactory.IDENTIFIER);
options.put("topic-uri", SINK_TOPIC_URI);
options.put("properties." + PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy");
@@ -407,9 +407,9 @@ public void testCreateSourceTableWithoutPK() {
thrown.expect(
containsCause(
new ValidationException(
- "'upsert-kafka' tables require to define a PRIMARY KEY constraint. "
- + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. "
- + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.")));
+ "'upsert-psc' tables require to define a PRIMARY KEY constraint. "
+ + "The PRIMARY KEY specifies which columns should be read from or write to the PubSub message key. "
+ + "The PRIMARY KEY also defines records in the 'upsert-psc' table should update or delete on which keys.")));
ResolvedSchema illegalSchema =
ResolvedSchema.of(
@@ -425,9 +425,9 @@ public void testCreateSinkTableWithoutPK() {
thrown.expect(
containsCause(
new ValidationException(
- "'upsert-kafka' tables require to define a PRIMARY KEY constraint. "
- + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. "
- + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.")));
+ "'upsert-psc' tables require to define a PRIMARY KEY constraint. "
+ + "The PRIMARY KEY specifies which columns should be read from or write to the PubSub message key. "
+ + "The PRIMARY KEY also defines records in the 'upsert-psc' table should update or delete on which keys.")));
ResolvedSchema illegalSchema =
ResolvedSchema.of(
@@ -443,7 +443,7 @@ public void testSerWithCDCFormatAsValue() {
containsCause(
new ValidationException(
String.format(
- "'upsert-kafka' connector doesn't support '%s' as value format, "
+ "'upsert-psc' connector doesn't support '%s' as value format, "
+ "because '%s' is not in insert-only mode.",
TestFormatFactory.IDENTIFIER,
TestFormatFactory.IDENTIFIER))));
@@ -468,7 +468,7 @@ public void testDeserWithCDCFormatAsValue() {
containsCause(
new ValidationException(
String.format(
- "'upsert-kafka' connector doesn't support '%s' as value format, "
+ "'upsert-psc' connector doesn't support '%s' as value format, "
+ "because '%s' is not in insert-only mode.",
TestFormatFactory.IDENTIFIER,
TestFormatFactory.IDENTIFIER))));
@@ -651,7 +651,7 @@ private static PscDynamicSink createExpectedSink(
null);
}
- private void assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
+ private void assertPscSource(ScanTableSource.ScanRuntimeProvider provider) {
assertThat(provider, instanceOf(DataStreamScanProvider.class));
final DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) provider;
final Transformation transformation =
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java
index 66fa86c..61938cf 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscTableITCase.java
@@ -92,8 +92,8 @@ public void testTemporalJoin() throws Exception {
// Kafka DefaultPartitioner's hash strategy is slightly different from Flink
// KeyGroupStreamPartitioner,
// which causes the records in the different Flink partitions are written into the same
- // Kafka partition.
- // When reading from the out-of-order Kafka partition, we need to set suitable watermark
+ // PubSub partition.
+ // When reading from the out-of-order PubSub partition, we need to set suitable watermark
// interval to
// tolerate the disorderliness.
// For convenience, we just set the parallelism 1 to make all records are in the same Flink
@@ -309,7 +309,7 @@ public void testSourceSinkWithKeyAndPartialValue() throws Exception {
}
@Test
- public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
+ public void testPscSourceSinkWithKeyAndFullValue() throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "key_full_value_topic_" + format;