Skip to content

Commit

Permalink
Finish code changes to source and sink API's
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Sep 13, 2024
1 parent 6ec696f commit 8eac82f
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@

package com.pinterest.flink.connector.psc.source;

import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumState;
import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumStateSerializer;
import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumerator;
import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer;
import com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriber;
import com.pinterest.flink.connector.psc.source.metrics.PscSourceReaderMetrics;
import com.pinterest.flink.connector.psc.source.reader.PscRecordEmitter;
import com.pinterest.flink.connector.psc.source.reader.PscSourceReader;
import com.pinterest.flink.connector.psc.source.reader.PscTopicUriPartitionSplitReader;
import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema;
import com.pinterest.flink.connector.psc.source.reader.fetcher.PscSourceFetcherManager;
import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplitSerializer;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.exception.ClientException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
Expand All @@ -33,24 +49,9 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import javax.annotation.Nullable;
import java.io.IOException;
Expand All @@ -60,7 +61,7 @@
import java.util.function.Supplier;

/**
* The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link
* The Source implementation of Kafka. Please use a {@link PscSourceBuilder} to construct a {@link
* PscSource}. The following example shows how to create a KafkaSource emitting records of <code>
* String</code> type.
*
Expand All @@ -75,32 +76,32 @@
* .build();
* }</pre>
*
* <p>See {@link KafkaSourceBuilder} for more details.
* <p>See {@link PscSourceBuilder} for more details.
*
* @param <OUT> the output type of the source.
*/
@PublicEvolving
public class PscSource<OUT>
implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
implements Source<OUT, PscTopicUriPartitionSplit, PscSourceEnumState>,
ResultTypeQueryable<OUT> {
private static final long serialVersionUID = -8755372893283732098L;
// Users can choose only one of the following ways to specify the topics to consume from.
private final KafkaSubscriber subscriber;
private final PscSubscriber subscriber;
// Users can specify the starting / stopping offset initializer.
private final OffsetsInitializer startingOffsetsInitializer;
private final OffsetsInitializer stoppingOffsetsInitializer;
// Boundedness
private final Boundedness boundedness;
private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
private final PscRecordDeserializationSchema<OUT> deserializationSchema;
// The configurations.
private final Properties props;

PscSource(
KafkaSubscriber subscriber,
PscSubscriber subscriber,
OffsetsInitializer startingOffsetsInitializer,
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
Boundedness boundedness,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
PscRecordDeserializationSchema<OUT> deserializationSchema,
Properties props) {
this.subscriber = subscriber;
this.startingOffsetsInitializer = startingOffsetsInitializer;
Expand All @@ -115,8 +116,8 @@ public class PscSource<OUT>
*
* @return a Kafka source builder.
*/
public static <OUT> KafkaSourceBuilder<OUT> builder() {
return new KafkaSourceBuilder<>();
public static <OUT> PscSourceBuilder<OUT> builder() {
return new PscSourceBuilder<>();
}

@Override
Expand All @@ -126,16 +127,16 @@ public Boundedness getBoundedness() {

@Internal
@Override
public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
public SourceReader<OUT, PscTopicUriPartitionSplit> createReader(SourceReaderContext readerContext)
throws Exception {
return createReader(readerContext, (ignore) -> {});
}

@VisibleForTesting
SourceReader<OUT, KafkaPartitionSplit> createReader(
SourceReader<OUT, PscTopicUriPartitionSplit> createReader(
SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
FutureCompletingBlockingQueue<RecordsWithSplitIds<PscConsumerMessage<byte[], byte[]>>>
elementsQueue = new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
Expand All @@ -149,16 +150,22 @@ public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(readerContext.metricGroup());

Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
() -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
final PscSourceReaderMetrics kafkaSourceReaderMetrics =
new PscSourceReaderMetrics(readerContext.metricGroup());

Supplier<PscTopicUriPartitionSplitReader> splitReaderSupplier =
() -> {
try {
return new PscTopicUriPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
} catch (ConfigurationException | ClientException e) {
throw new RuntimeException("Failed to create new PscTopicUriParititionSplitReader", e);
}
};
PscRecordEmitter<OUT> recordEmitter = new PscRecordEmitter<>(deserializationSchema);

return new KafkaSourceReader<>(
return new PscSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
new PscSourceFetcherManager(
elementsQueue, splitReaderSupplier::get, splitFinishedHook),
recordEmitter,
toConfiguration(props),
Expand All @@ -168,9 +175,9 @@ public UserCodeClassLoader getUserCodeClassLoader() {

@Internal
@Override
public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(
SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
return new KafkaSourceEnumerator(
public SplitEnumerator<PscTopicUriPartitionSplit, PscSourceEnumState> createEnumerator(
SplitEnumeratorContext<PscTopicUriPartitionSplit> enumContext) {
return new PscSourceEnumerator(
subscriber,
startingOffsetsInitializer,
stoppingOffsetsInitializer,
Expand All @@ -181,11 +188,11 @@ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerat

@Internal
@Override
public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<KafkaPartitionSplit> enumContext,
KafkaSourceEnumState checkpoint)
public SplitEnumerator<PscTopicUriPartitionSplit, PscSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<PscTopicUriPartitionSplit> enumContext,
PscSourceEnumState checkpoint)
throws IOException {
return new KafkaSourceEnumerator(
return new PscSourceEnumerator(
subscriber,
startingOffsetsInitializer,
stoppingOffsetsInitializer,
Expand All @@ -197,14 +204,14 @@ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumera

@Internal
@Override
public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() {
return new KafkaPartitionSplitSerializer();
public SimpleVersionedSerializer<PscTopicUriPartitionSplit> getSplitSerializer() {
return new PscTopicUriPartitionSplitSerializer();
}

@Internal
@Override
public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() {
return new KafkaSourceEnumStateSerializer();
public SimpleVersionedSerializer<PscSourceEnumState> getEnumeratorCheckpointSerializer() {
return new PscSourceEnumStateSerializer();
}

@Override
Expand Down
Loading

0 comments on commit 8eac82f

Please sign in to comment.