Skip to content

Commit

Permalink
WIP source API's
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Aug 13, 2024
1 parent 1bdb150 commit 82342af
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

package com.pinterest.flink.connector.psc.source.enumerator.initializer;

import com.pinterest.psc.common.TopicUriPartition;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;

/**
* An implementation of {@link org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer} which does not initialize anything.
* An implementation of {@link OffsetsInitializer} which does not initialize anything.
*
* <p>This class is used as the default stopping offsets initializer for unbounded Kafka sources.
*/
Expand All @@ -37,14 +35,14 @@ public class NoStoppingOffsetsInitializer implements OffsetsInitializer {
private static final long serialVersionUID = 4186323669290142732L;

@Override
public Map<TopicPartition, Long> getPartitionOffsets(
Collection<TopicPartition> partitions,
public Map<TopicUriPartition, Long> getPartitionOffsets(
Collection<TopicUriPartition> topicUriPartitions,
PartitionOffsetsRetriever partitionOffsetsRetriever) {
return Collections.emptyMap();
}

@Override
public OffsetResetStrategy getAutoOffsetResetStrategy() {
public String getAutoOffsetResetStrategy() {
throw new UnsupportedOperationException(
"The NoStoppingOffsetsInitializer does not have an OffsetResetStrategy. It should only be used "
+ "to end offset.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,19 @@

package com.pinterest.flink.connector.psc.source.enumerator.initializer;

import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.config.PscConfiguration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.SpecifiedOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;

/**
* An interface for users to specify the starting / stopping offset of a {@link
* KafkaPartitionSplit}.
* PscTopicUriPartitionSplit}.
*
* @see ReaderHandledOffsetsInitializer
* @see SpecifiedOffsetsInitializer
Expand All @@ -50,15 +44,15 @@ public interface OffsetsInitializer extends Serializable {
* starting offsets or stopping offsets of the Kafka partitions.
*
* <p>If the implementation returns a starting offset which causes {@code
* OffsetsOutOfRangeException} from Kafka. The {@link OffsetResetStrategy} provided by the
* OffsetsOutOfRangeException} from Kafka. The offsetResetStrategy provided by the
* {@link #getAutoOffsetResetStrategy()} will be used to reset the offset.
*
* @param partitions the Kafka partitions to get the starting offsets.
* @param partitionOffsetsRetriever a helper to retrieve information of the Kafka partitions.
* @return A mapping from Kafka partition to their offsets to start consuming from.
*/
Map<TopicPartition, Long> getPartitionOffsets(
Collection<TopicPartition> partitions,
Map<TopicUriPartition, Long> getPartitionOffsets(
Collection<TopicUriPartition> partitions,
PartitionOffsetsRetriever partitionOffsetsRetriever);

/**
Expand All @@ -67,10 +61,10 @@ Map<TopicPartition, Long> getPartitionOffsets(
* <p>The OffsetStrategy is only used when the offset initializer is used to initialize the
* starting offsets and the starting offsets is out of range.
*
* @return An {@link OffsetResetStrategy} to use if the initialized offsets are out of the
* @return An offsetResetStrategy to use if the initialized offsets are out of the
* range.
*/
OffsetResetStrategy getAutoOffsetResetStrategy();
String getAutoOffsetResetStrategy();

/**
* An interface that provides necessary information to the {@link OffsetsInitializer} to get the
Expand All @@ -79,23 +73,22 @@ Map<TopicPartition, Long> getPartitionOffsets(
interface PartitionOffsetsRetriever {

/**
* The group id should be the set for {@link KafkaSource KafkaSource} before invoking this
* The group id should be the set for {@link com.pinterest.flink.connector.psc.source.PscSource PscSource} before invoking this
* method. Otherwise an {@code IllegalStateException} will be thrown.
*
* @see KafkaAdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)
* @throws IllegalStateException if the group id is not set for the {@code KafkaSource}.
*/
Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions);
Map<TopicUriPartition, Long> committedOffsets(Collection<TopicUriPartition> partitions);

/** List end offsets for the specified partitions. */
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
Map<TopicUriPartition, Long> endOffsets(Collection<TopicUriPartition> partitions);

/** List beginning offsets for the specified partitions. */
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
Map<TopicUriPartition, Long> beginningOffsets(Collection<TopicUriPartition> partitions);

/** List offsets matching a timestamp for the specified partitions. */
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch);
Map<TopicUriPartition, MessageId> offsetsForTimes(
Map<TopicUriPartition, Long> timestampsToSearch);
}

// --------------- factory methods ---------------
Expand All @@ -107,21 +100,21 @@ Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
* @return an offset initializer which initialize the offsets to the committed offsets.
*/
static OffsetsInitializer committedOffsets() {
return committedOffsets(OffsetResetStrategy.NONE);
return committedOffsets(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_NONE);
}

/**
* Get an {@link OffsetsInitializer} which initializes the offsets to the committed offsets. Use
* the given {@link OffsetResetStrategy} to initialize the offsets if the committed offsets does
* the given offsetResetStrategy to initialize the offsets if the committed offsets does
* not exist.
*
* @param offsetResetStrategy the offset reset strategy to use when the committed offsets do not
* exist.
* @return an {@link OffsetsInitializer} which initializes the offsets to the committed offsets.
*/
static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrategy) {
static OffsetsInitializer committedOffsets(String offsetResetStrategy) {
return new ReaderHandledOffsetsInitializer(
KafkaPartitionSplit.COMMITTED_OFFSET, offsetResetStrategy);
PscTopicUriPartitionSplit.COMMITTED_OFFSET, offsetResetStrategy);
}

/**
Expand All @@ -132,7 +125,6 @@ static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrate
* @param timestamp the timestamp (milliseconds) to start the consumption.
* @return an {@link OffsetsInitializer} which initializes the offsets based on the given
* timestamp.
* @see KafkaAdminClient#listOffsets(Map)
*/
static OffsetsInitializer timestamp(long timestamp) {
return new TimestampOffsetsInitializer(timestamp);
Expand All @@ -147,7 +139,7 @@ static OffsetsInitializer timestamp(long timestamp) {
*/
static OffsetsInitializer earliest() {
return new ReaderHandledOffsetsInitializer(
KafkaPartitionSplit.EARLIEST_OFFSET, OffsetResetStrategy.EARLIEST);
PscTopicUriPartitionSplit.EARLIEST_OFFSET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
}

/**
Expand All @@ -158,7 +150,7 @@ static OffsetsInitializer earliest() {
*/
static OffsetsInitializer latest() {
return new ReaderHandledOffsetsInitializer(
KafkaPartitionSplit.LATEST_OFFSET, OffsetResetStrategy.LATEST);
PscTopicUriPartitionSplit.LATEST_OFFSET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_LATEST);
}

/**
Expand All @@ -167,22 +159,22 @@ static OffsetsInitializer latest() {
* @param offsets the specified offsets for each partition.
* @return an {@link OffsetsInitializer} which initializes the offsets to the specified offsets.
*/
static OffsetsInitializer offsets(Map<TopicPartition, Long> offsets) {
return new SpecifiedOffsetsInitializer(offsets, OffsetResetStrategy.EARLIEST);
static OffsetsInitializer offsets(Map<TopicUriPartition, Long> offsets) {
return new SpecifiedOffsetsInitializer(offsets, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
}

/**
* Get an {@link OffsetsInitializer} which initializes the offsets to the specified offsets. Use
* the given {@link OffsetResetStrategy} to initialize the offsets in case the specified offset
* the given offsetResetStrategy to initialize the offsets in case the specified offset
* is out of range.
*
* @param offsets the specified offsets for each partition.
* @param offsetResetStrategy the {@link OffsetResetStrategy} to use when the specified offset
* @param offsetResetStrategy the offsetResetStrategy to use when the specified offset
* is out of range.
* @return an {@link OffsetsInitializer} which initializes the offsets to the specified offsets.
*/
static OffsetsInitializer offsets(
Map<TopicPartition, Long> offsets, OffsetResetStrategy offsetResetStrategy) {
Map<TopicUriPartition, Long> offsets, String offsetResetStrategy) {
return new SpecifiedOffsetsInitializer(offsets, offsetResetStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@
package com.pinterest.flink.connector.psc.source.enumerator.initializer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import java.util.Properties;

/**
* Interface for validating {@link OffsetsInitializer} with properties from {@link
* org.apache.flink.connector.kafka.source.KafkaSource}.
* com.pinterest.flink.connector.psc.source.PscSource}.
*/
@Internal
public interface OffsetsInitializerValidator {

/**
* Validate offsets initializer with properties of Kafka source.
*
* @param kafkaSourceProperties Properties of Kafka source
* @param pscSourceProperties Properties of Kafka source
* @throws IllegalStateException if validation fails
*/
void validate(Properties kafkaSourceProperties) throws IllegalStateException;
void validate(Properties pscSourceProperties) throws IllegalStateException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

package com.pinterest.flink.connector.psc.source.enumerator.initializer;

import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.config.PscConfiguration;

import java.util.Collection;
import java.util.HashMap;
Expand All @@ -42,43 +39,43 @@
class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
private static final long serialVersionUID = 172938052008787981L;
private final long startingOffset;
private final OffsetResetStrategy offsetResetStrategy;
private final String offsetResetStrategy;

/**
* The only valid value for startingOffset is following. {@link
* KafkaPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link
* KafkaPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link KafkaPartitionSplit#COMMITTED_OFFSET
* PscTopicUriPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link
* PscTopicUriPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link PscTopicUriPartitionSplit#COMMITTED_OFFSET
* COMMITTED_OFFSET}
*/
ReaderHandledOffsetsInitializer(long startingOffset, OffsetResetStrategy offsetResetStrategy) {
ReaderHandledOffsetsInitializer(long startingOffset, String offsetResetStrategy) {
this.startingOffset = startingOffset;
this.offsetResetStrategy = offsetResetStrategy;
}

@Override
public Map<TopicPartition, Long> getPartitionOffsets(
Collection<TopicPartition> partitions,
public Map<TopicUriPartition, Long> getPartitionOffsets(
Collection<TopicUriPartition> partitions,
PartitionOffsetsRetriever partitionOffsetsRetriever) {
Map<TopicPartition, Long> initialOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
Map<TopicUriPartition, Long> initialOffsets = new HashMap<>();
for (TopicUriPartition tp : partitions) {
initialOffsets.put(tp, startingOffset);
}
return initialOffsets;
}

@Override
public OffsetResetStrategy getAutoOffsetResetStrategy() {
public String getAutoOffsetResetStrategy() {
return offsetResetStrategy;
}

@Override
public void validate(Properties kafkaSourceProperties) {
if (startingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
if (startingOffset == PscTopicUriPartitionSplit.COMMITTED_OFFSET) {
checkState(
kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
kafkaSourceProperties.containsKey(PscConfiguration.PSC_CONSUMER_GROUP_ID),
String.format(
"Property %s is required when using committed offset for offsets initializer",
ConsumerConfig.GROUP_ID_CONFIG));
PscConfiguration.PSC_CONSUMER_GROUP_ID));
}
}
}
Loading

0 comments on commit 82342af

Please sign in to comment.