Skip to content

Commit

Permalink
Refactor to remove Kafka references
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 28, 2024
1 parent 29adc49 commit 6627948
Show file tree
Hide file tree
Showing 56 changed files with 293 additions and 885 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@
import java.util.Properties;

public class PscFlinkConfiguration {

/**
* Configuration key for the cluster URI. This is required for the following client types that use the {@link com.pinterest.flink.connector.psc.source.PscSource}
* and {@link com.pinterest.flink.connector.psc.sink.PscSink} classes:
*
* <li>Transactional producer: The cluster URI specifies the cluster to which the producer will connect. Transactional
* * producers require a cluster URI to be specified because transaction lifecycles are managed by the backend
* * PubSub cluster, and it is generally not possible to have transactions span multiple clusters.</li>
* <li>Consumer: Unlike regular PscConsumers, consumers in {@link com.pinterest.flink.connector.psc.source.PscSource} require
* * clusterUri to connect to the cluster for metadata queries during their lifecycles.</li>
*/
public static final String CLUSTER_URI_CONFIG = "psc.cluster.uri";

public static TopicUri validateAndGetBaseClusterUri(Properties properties) throws TopicUriSyntaxException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@
* Flink Sink to produce data into a PSC topicUri. The sink supports all delivery guarantees
* described by {@link DeliveryGuarantee}.
* <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: messages may be lost in case
* of issues on the Kafka broker and messages may be duplicated in case of a Flink failure.
* of issues on the PubSub broker and messages may be duplicated in case of a Flink failure.
* <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all outstanding records in the
* PSC buffers to be acknowledged by the PSC producer on a checkpoint. No messages will be
* producer buffers to be acknowledged by the PSC producer on a checkpoint. No messages will be
* lost in case of any issue with the brokers but messages may be duplicated when Flink
* restarts.
* <li>{@link DeliveryGuarantee#EXACTLY_ONCE}: Note that this mode is only supported by KafkaProducers in the backend.
*
* In this mode the PscSink will write all messages in
* a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer
* reads only committed data (see Kafka consumer config isolation.level), no duplicates will be
* a transaction that will be committed to PubSub on a checkpoint. Thus, if the consumer
* reads only committed data (see PSC consumer config isolation.level), no duplicates will be
* seen in case of a Flink restart. However, this delays record writing effectively until a
* checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure that you
* use unique {@link #transactionalIdPrefix}s across your applications running on the same Kafka
* use unique {@link #transactionalIdPrefix}s across your applications running on the same PubSub
* cluster such that multiple running jobs do not interfere in their transactions! Additionally,
* it is highly recommended to tweak Kafka transaction timeout (link) >> maximum checkpoint
* duration + maximum restart duration or data loss may happen when Kafka expires an uncommitted
* it is highly recommended to tweak transaction timeout (link) >> maximum checkpoint
* duration + maximum restart duration or data loss may happen when the PubSub cluster expires an uncommitted
* transaction.
*
* @param <IN> type of the records written
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public PscSinkBuilder<IN> setRecordSerializer(
*
* <p>It is mandatory to always set this value with {@link DeliveryGuarantee#EXACTLY_ONCE} to
* prevent corrupted transactions if multiple jobs using the PscSink run against the same
* Kafka Cluster. The default prefix is {@link #transactionalIdPrefix}.
* PubSub Cluster. The default prefix is {@link #transactionalIdPrefix}.
*
* <p>The size of the prefix is capped by {@link #MAXIMUM_PREFIX_BYTES} formatted with UTF-8.
*
Expand All @@ -162,7 +162,7 @@ public PscSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefix)
checkState(
transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length
<= MAXIMUM_PREFIX_BYTES,
"The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
"The configured prefix is too long and the resulting transactionalId might exceed PSC's transactionalIds size.");
return this;
}

Expand All @@ -173,7 +173,7 @@ private void sanityCheck() {
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
checkState(
transactionalIdPrefix != null,
"EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple PscSinks writing to the same Kafka cluster.");
"EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple PscSinks writing to the same PSC cluster.");
}
checkNotNull(recordSerializer, "recordSerializer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
*
* <p>Transactions are lingering if they are not tracked anywhere. For example, if a job is started
* transactions are opened. A restart without checkpoint would not allow Flink to abort old
* transactions. Since Kafka's transactions are sequential, newly produced data does not become
* visible for read_committed consumers. However, Kafka has no API for querying open transactions,
* transactions. Since PSC's transactions are sequential, newly produced data does not become
* visible for read_committed consumers. However, PSC has no API for querying open transactions,
* so they become lingering.
*
* <p>Flink solves this by assuming consecutive transaction ids. On restart of checkpoint C on
Expand Down Expand Up @@ -119,7 +119,7 @@ private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int
// This method will only cease to work if transaction log timeout = topic retention
// and a user didn't restart the application for that period of time. Then the first
// transactions would vanish from the topic while later transactions are still
// lingering until they are cleaned up by Kafka. Then the user has to wait until the
// lingering until they are cleaned up by PSC. Then the user has to wait until the
// other transactions are timed out (which shouldn't take too long).
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@
import java.util.function.Supplier;

/**
* The Source implementation of Kafka. Please use a {@link PscSourceBuilder} to construct a {@link
* PscSource}. The following example shows how to create a KafkaSource emitting records of <code>
* The Source implementation of PSC. Please use a {@link PscSourceBuilder} to construct a {@link
* PscSource}. The following example shows how to create a PscSource emitting records of <code>
* String</code> type.
*
* <pre>{@code
* KafkaSource<String> source = KafkaSource
* PscSource<String> source = PscSource
* .<String>builder()
* .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
* .setGroupId("MyGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(new TestingKafkaRecordDeserializationSchema())
* .setTopicUris(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(new TestingPscRecordDeserializationSchema())
* .setStartingOffsets(OffsetsInitializer.earliest())
* .build();
* }</pre>
Expand Down Expand Up @@ -112,9 +111,9 @@ public class PscSource<OUT>
}

/**
* Get a kafkaSourceBuilder to build a {@link PscSource}.
* Get a PscSourceBuilder to build a {@link PscSource}.
*
* @return a Kafka source builder.
* @return a PSC source builder.
*/
public static <OUT> PscSourceBuilder<OUT> builder() {
return new PscSourceBuilder<>();
Expand Down
Loading

0 comments on commit 6627948

Please sign in to comment.