From 82342af5bfa1fd82bbd1afcb2e308a050e3a61f6 Mon Sep 17 00:00:00 2001
From: Jeff Xiang <jxiang@pinterest.com>
Date: Tue, 13 Aug 2024 13:07:20 -0400
Subject: [PATCH] WIP source API's

---
 .../NoStoppingOffsetsInitializer.java         | 12 ++--
 .../initializer/OffsetsInitializer.java       | 62 ++++++++-----------
 .../OffsetsInitializerValidator.java          |  7 +--
 .../ReaderHandledOffsetsInitializer.java      | 33 +++++-----
 .../SpecifiedOffsetsInitializer.java          | 41 ++++++------
 .../TimestampOffsetsInitializer.java          | 21 +++----
 .../split/PscTopicUriPartitionSplit.java      | 40 ++++++------
 .../PscTopicUriPartitionSplitSerializer.java  | 21 +++----
 .../split/PscTopicUriPartitionSplitState.java | 11 ++--
 9 files changed, 114 insertions(+), 134 deletions(-)

diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
index 9bf0a88..76e58ce 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
@@ -18,17 +18,15 @@
 
 package com.pinterest.flink.connector.psc.source.enumerator.initializer;
 
+import com.pinterest.psc.common.TopicUriPartition;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
 /**
- * An implementation of {@link org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer} which does not initialize anything.
+ * An implementation of {@link OffsetsInitializer} which does not initialize anything.
  *
  * <p>This class is used as the default stopping offsets initializer for unbounded Kafka sources.
  */
@@ -37,14 +35,14 @@ public class NoStoppingOffsetsInitializer implements OffsetsInitializer {
     private static final long serialVersionUID = 4186323669290142732L;
 
     @Override
-    public Map<TopicPartition, Long> getPartitionOffsets(
-            Collection<TopicPartition> partitions,
+    public Map<TopicUriPartition, Long> getPartitionOffsets(
+            Collection<TopicUriPartition> topicUriPartitions,
             PartitionOffsetsRetriever partitionOffsetsRetriever) {
         return Collections.emptyMap();
     }
 
     @Override
-    public OffsetResetStrategy getAutoOffsetResetStrategy() {
+    public String getAutoOffsetResetStrategy() {
         throw new UnsupportedOperationException(
                 "The NoStoppingOffsetsInitializer does not have an OffsetResetStrategy. It should only be used "
                         + "to end offset.");
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java
index ae8f712..46d464f 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializer.java
@@ -18,17 +18,11 @@
 
 package com.pinterest.flink.connector.psc.source.enumerator.initializer;
 
+import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
+import com.pinterest.psc.common.MessageId;
+import com.pinterest.psc.common.TopicUriPartition;
+import com.pinterest.psc.config.PscConfiguration;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.SpecifiedOffsetsInitializer;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer;
-import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -36,7 +30,7 @@
 
 /**
  * An interface for users to specify the starting / stopping offset of a {@link
- * KafkaPartitionSplit}.
+ * PscTopicUriPartitionSplit}.
  *
  * @see ReaderHandledOffsetsInitializer
  * @see SpecifiedOffsetsInitializer
@@ -50,15 +44,15 @@ public interface OffsetsInitializer extends Serializable {
      * starting offsets or stopping offsets of the Kafka partitions.
      *
      * <p>If the implementation returns a starting offset which causes {@code
-     * OffsetsOutOfRangeException} from Kafka. The {@link OffsetResetStrategy} provided by the
+     * OffsetsOutOfRangeException} from Kafka. The offsetResetStrategy provided by the
      * {@link #getAutoOffsetResetStrategy()} will be used to reset the offset.
      *
      * @param partitions the Kafka partitions to get the starting offsets.
      * @param partitionOffsetsRetriever a helper to retrieve information of the Kafka partitions.
      * @return A mapping from Kafka partition to their offsets to start consuming from.
      */
-    Map<TopicPartition, Long> getPartitionOffsets(
-            Collection<TopicPartition> partitions,
+    Map<TopicUriPartition, Long> getPartitionOffsets(
+            Collection<TopicUriPartition> partitions,
             PartitionOffsetsRetriever partitionOffsetsRetriever);
 
     /**
@@ -67,10 +61,10 @@ Map<TopicPartition, Long> getPartitionOffsets(
      * <p>The OffsetStrategy is only used when the offset initializer is used to initialize the
      * starting offsets and the starting offsets is out of range.
      *
-     * @return An {@link OffsetResetStrategy} to use if the initialized offsets are out of the
+     * @return An offsetResetStrategy to use if the initialized offsets are out of the
      *     range.
      */
-    OffsetResetStrategy getAutoOffsetResetStrategy();
+    String getAutoOffsetResetStrategy();
 
     /**
      * An interface that provides necessary information to the {@link OffsetsInitializer} to get the
@@ -79,23 +73,22 @@ Map<TopicPartition, Long> getPartitionOffsets(
     interface PartitionOffsetsRetriever {
 
         /**
-         * The group id should be the set for {@link KafkaSource KafkaSource} before invoking this
+         * The group id should be the set for {@link com.pinterest.flink.connector.psc.source.PscSource PscSource} before invoking this
          * method. Otherwise an {@code IllegalStateException} will be thrown.
          *
-         * @see KafkaAdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)
          * @throws IllegalStateException if the group id is not set for the {@code KafkaSource}.
          */
-        Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions);
+        Map<TopicUriPartition, Long> committedOffsets(Collection<TopicUriPartition> partitions);
 
         /** List end offsets for the specified partitions. */
-        Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
+        Map<TopicUriPartition, Long> endOffsets(Collection<TopicUriPartition> partitions);
 
         /** List beginning offsets for the specified partitions. */
-        Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
+        Map<TopicUriPartition, Long> beginningOffsets(Collection<TopicUriPartition> partitions);
 
         /** List offsets matching a timestamp for the specified partitions. */
-        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
-                Map<TopicPartition, Long> timestampsToSearch);
+        Map<TopicUriPartition, MessageId> offsetsForTimes(
+                Map<TopicUriPartition, Long> timestampsToSearch);
     }
 
     // --------------- factory methods ---------------
@@ -107,21 +100,21 @@ Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
      * @return an offset initializer which initialize the offsets to the committed offsets.
      */
     static OffsetsInitializer committedOffsets() {
-        return committedOffsets(OffsetResetStrategy.NONE);
+        return committedOffsets(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_NONE);
     }
 
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets to the committed offsets. Use
-     * the given {@link OffsetResetStrategy} to initialize the offsets if the committed offsets does
+     * the given offsetResetStrategy to initialize the offsets if the committed offsets does
      * not exist.
      *
      * @param offsetResetStrategy the offset reset strategy to use when the committed offsets do not
      *     exist.
      * @return an {@link OffsetsInitializer} which initializes the offsets to the committed offsets.
      */
-    static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrategy) {
+    static OffsetsInitializer committedOffsets(String offsetResetStrategy) {
         return new ReaderHandledOffsetsInitializer(
-                KafkaPartitionSplit.COMMITTED_OFFSET, offsetResetStrategy);
+                PscTopicUriPartitionSplit.COMMITTED_OFFSET, offsetResetStrategy);
     }
 
     /**
@@ -132,7 +125,6 @@ static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrate
      * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
      *     timestamp.
-     * @see KafkaAdminClient#listOffsets(Map)
      */
     static OffsetsInitializer timestamp(long timestamp) {
         return new TimestampOffsetsInitializer(timestamp);
@@ -147,7 +139,7 @@ static OffsetsInitializer timestamp(long timestamp) {
      */
     static OffsetsInitializer earliest() {
         return new ReaderHandledOffsetsInitializer(
-                KafkaPartitionSplit.EARLIEST_OFFSET, OffsetResetStrategy.EARLIEST);
+                PscTopicUriPartitionSplit.EARLIEST_OFFSET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
     }
 
     /**
@@ -158,7 +150,7 @@ static OffsetsInitializer earliest() {
      */
     static OffsetsInitializer latest() {
         return new ReaderHandledOffsetsInitializer(
-                KafkaPartitionSplit.LATEST_OFFSET, OffsetResetStrategy.LATEST);
+                PscTopicUriPartitionSplit.LATEST_OFFSET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_LATEST);
     }
 
     /**
@@ -167,22 +159,22 @@ static OffsetsInitializer latest() {
      * @param offsets the specified offsets for each partition.
      * @return an {@link OffsetsInitializer} which initializes the offsets to the specified offsets.
      */
-    static OffsetsInitializer offsets(Map<TopicPartition, Long> offsets) {
-        return new SpecifiedOffsetsInitializer(offsets, OffsetResetStrategy.EARLIEST);
+    static OffsetsInitializer offsets(Map<TopicUriPartition, Long> offsets) {
+        return new SpecifiedOffsetsInitializer(offsets, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
     }
 
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets to the specified offsets. Use
-     * the given {@link OffsetResetStrategy} to initialize the offsets in case the specified offset
+     * the given offsetResetStrategy to initialize the offsets in case the specified offset
      * is out of range.
      *
      * @param offsets the specified offsets for each partition.
-     * @param offsetResetStrategy the {@link OffsetResetStrategy} to use when the specified offset
+     * @param offsetResetStrategy the offsetResetStrategy to use when the specified offset
      *     is out of range.
      * @return an {@link OffsetsInitializer} which initializes the offsets to the specified offsets.
      */
     static OffsetsInitializer offsets(
-            Map<TopicPartition, Long> offsets, OffsetResetStrategy offsetResetStrategy) {
+            Map<TopicUriPartition, Long> offsets, String offsetResetStrategy) {
         return new SpecifiedOffsetsInitializer(offsets, offsetResetStrategy);
     }
 }
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java
index 2920d77..1f1c0fe 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerValidator.java
@@ -19,13 +19,12 @@
 package com.pinterest.flink.connector.psc.source.enumerator.initializer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 
 import java.util.Properties;
 
 /**
  * Interface for validating {@link OffsetsInitializer} with properties from {@link
- * org.apache.flink.connector.kafka.source.KafkaSource}.
+ * com.pinterest.flink.connector.psc.source.PscSource}.
  */
 @Internal
 public interface OffsetsInitializerValidator {
@@ -33,8 +32,8 @@ public interface OffsetsInitializerValidator {
     /**
      * Validate offsets initializer with properties of Kafka source.
      *
-     * @param kafkaSourceProperties Properties of Kafka source
+     * @param pscSourceProperties Properties of Kafka source
      * @throws IllegalStateException if validation fails
      */
-    void validate(Properties kafkaSourceProperties) throws IllegalStateException;
+    void validate(Properties pscSourceProperties) throws IllegalStateException;
 }
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
index e290202..4b3f414 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
@@ -18,12 +18,9 @@
 
 package com.pinterest.flink.connector.psc.source.enumerator.initializer;
 
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
-import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
+import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
+import com.pinterest.psc.common.TopicUriPartition;
+import com.pinterest.psc.config.PscConfiguration;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -42,43 +39,43 @@
 class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
     private static final long serialVersionUID = 172938052008787981L;
     private final long startingOffset;
-    private final OffsetResetStrategy offsetResetStrategy;
+    private final String offsetResetStrategy;
 
     /**
      * The only valid value for startingOffset is following. {@link
-     * KafkaPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link
-     * KafkaPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link KafkaPartitionSplit#COMMITTED_OFFSET
+     * PscTopicUriPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link
+     * PscTopicUriPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link PscTopicUriPartitionSplit#COMMITTED_OFFSET
      * COMMITTED_OFFSET}
      */
-    ReaderHandledOffsetsInitializer(long startingOffset, OffsetResetStrategy offsetResetStrategy) {
+    ReaderHandledOffsetsInitializer(long startingOffset, String offsetResetStrategy) {
         this.startingOffset = startingOffset;
         this.offsetResetStrategy = offsetResetStrategy;
     }
 
     @Override
-    public Map<TopicPartition, Long> getPartitionOffsets(
-            Collection<TopicPartition> partitions,
+    public Map<TopicUriPartition, Long> getPartitionOffsets(
+            Collection<TopicUriPartition> partitions,
             PartitionOffsetsRetriever partitionOffsetsRetriever) {
-        Map<TopicPartition, Long> initialOffsets = new HashMap<>();
-        for (TopicPartition tp : partitions) {
+        Map<TopicUriPartition, Long> initialOffsets = new HashMap<>();
+        for (TopicUriPartition tp : partitions) {
             initialOffsets.put(tp, startingOffset);
         }
         return initialOffsets;
     }
 
     @Override
-    public OffsetResetStrategy getAutoOffsetResetStrategy() {
+    public String getAutoOffsetResetStrategy() {
         return offsetResetStrategy;
     }
 
     @Override
     public void validate(Properties kafkaSourceProperties) {
-        if (startingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+        if (startingOffset == PscTopicUriPartitionSplit.COMMITTED_OFFSET) {
             checkState(
-                    kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+                    kafkaSourceProperties.containsKey(PscConfiguration.PSC_CONSUMER_GROUP_ID),
                     String.format(
                             "Property %s is required when using committed offset for offsets initializer",
-                            ConsumerConfig.GROUP_ID_CONFIG));
+                            PscConfiguration.PSC_CONSUMER_GROUP_ID));
         }
     }
 }
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/SpecifiedOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
index cbba063..450b17f 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
@@ -18,12 +18,9 @@
 
 package com.pinterest.flink.connector.psc.source.enumerator.initializer;
 
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
-import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
+import com.pinterest.flink.connector.psc.source.split.PscTopicUriPartitionSplit;
+import com.pinterest.psc.common.TopicUriPartition;
+import com.pinterest.psc.config.PscConfiguration;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,22 +40,22 @@
  */
 class SpecifiedOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
     private static final long serialVersionUID = 1649702397250402877L;
-    private final Map<TopicPartition, Long> initialOffsets;
-    private final OffsetResetStrategy offsetResetStrategy;
+    private final Map<TopicUriPartition, Long> initialOffsets;
+    private final String offsetResetStrategy;
 
     SpecifiedOffsetsInitializer(
-            Map<TopicPartition, Long> initialOffsets, OffsetResetStrategy offsetResetStrategy) {
+            Map<TopicUriPartition, Long> initialOffsets, String offsetResetStrategy) {
         this.initialOffsets = Collections.unmodifiableMap(initialOffsets);
         this.offsetResetStrategy = offsetResetStrategy;
     }
 
     @Override
-    public Map<TopicPartition, Long> getPartitionOffsets(
-            Collection<TopicPartition> partitions,
+    public Map<TopicUriPartition, Long> getPartitionOffsets(
+            Collection<TopicUriPartition> partitions,
             PartitionOffsetsRetriever partitionOffsetsRetriever) {
-        Map<TopicPartition, Long> offsets = new HashMap<>();
-        List<TopicPartition> toLookup = new ArrayList<>();
-        for (TopicPartition tp : partitions) {
+        Map<TopicUriPartition, Long> offsets = new HashMap<>();
+        List<TopicUriPartition> toLookup = new ArrayList<>();
+        for (TopicUriPartition tp : partitions) {
             Long offset = initialOffsets.get(tp);
             if (offset == null) {
                 toLookup.add(tp);
@@ -68,16 +65,16 @@ public Map<TopicPartition, Long> getPartitionOffsets(
         }
         if (!toLookup.isEmpty()) {
             // First check the committed offsets.
-            Map<TopicPartition, Long> committedOffsets =
+            Map<TopicUriPartition, Long> committedOffsets =
                     partitionOffsetsRetriever.committedOffsets(toLookup);
             offsets.putAll(committedOffsets);
             toLookup.removeAll(committedOffsets.keySet());
 
             switch (offsetResetStrategy) {
-                case EARLIEST:
+                case (PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST):
                     offsets.putAll(partitionOffsetsRetriever.beginningOffsets(toLookup));
                     break;
-                case LATEST:
+                case (PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_LATEST):
                     offsets.putAll(partitionOffsetsRetriever.endOffsets(toLookup));
                     break;
                 default:
@@ -89,20 +86,20 @@ public Map<TopicPartition, Long> getPartitionOffsets(
     }
 
     @Override
-    public OffsetResetStrategy getAutoOffsetResetStrategy() {
+    public String getAutoOffsetResetStrategy() {
         return offsetResetStrategy;
     }
 
     @Override
-    public void validate(Properties kafkaSourceProperties) {
+    public void validate(Properties pscSourceProperties) {
         initialOffsets.forEach(
                 (tp, offset) -> {
-                    if (offset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+                    if (offset == PscTopicUriPartitionSplit.COMMITTED_OFFSET) {
                         checkState(
-                                kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+                                pscSourceProperties.containsKey(PscConfiguration.PSC_CONSUMER_GROUP_ID),
                                 String.format(
                                         "Property %s is required because partition %s is initialized with committed offset",
-                                        ConsumerConfig.GROUP_ID_CONFIG, tp));
+                                        PscConfiguration.PSC_CONSUMER_GROUP_ID, tp));
                     }
                 });
     }
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java
index c13047f..7d1cc83 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/TimestampOffsetsInitializer.java
@@ -18,9 +18,8 @@
 
 package com.pinterest.flink.connector.psc.source.enumerator.initializer;
 
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
+import com.pinterest.psc.common.TopicUriPartition;
+import com.pinterest.psc.config.PscConfiguration;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,11 +39,11 @@ class TimestampOffsetsInitializer implements OffsetsInitializer {
     }
 
     @Override
-    public Map<TopicPartition, Long> getPartitionOffsets(
-            Collection<TopicPartition> partitions,
+    public Map<TopicUriPartition, Long> getPartitionOffsets(
+            Collection<TopicUriPartition> partitions,
             PartitionOffsetsRetriever partitionOffsetsRetriever) {
-        Map<TopicPartition, Long> startingTimestamps = new HashMap<>();
-        Map<TopicPartition, Long> initialOffsets = new HashMap<>();
+        Map<TopicUriPartition, Long> startingTimestamps = new HashMap<>();
+        Map<TopicUriPartition, Long> initialOffsets = new HashMap<>();
 
         // First get the current end offsets of the partitions. This is going to be used
         // in case we cannot find a suitable offsets based on the timestamp, i.e. the message
@@ -52,14 +51,14 @@ public Map<TopicPartition, Long> getPartitionOffsets(
         // this case, we just use the latest offset.
         // We need to get the latest offsets before querying offsets by time to ensure that
         // no message is going to be missed.
-        Map<TopicPartition, Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);
+        Map<TopicUriPartition, Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);
         partitions.forEach(tp -> startingTimestamps.put(tp, startingTimestamp));
         partitionOffsetsRetriever
                 .offsetsForTimes(startingTimestamps)
                 .forEach(
                         (tp, offsetMetadata) -> {
                             if (offsetMetadata != null) {
-                                initialOffsets.put(tp, offsetMetadata.offset());
+                                initialOffsets.put(tp, offsetMetadata.getOffset());
                             } else {
                                 // The timestamp does not exist in the partition yet, we will just
                                 // consume from the latest.
@@ -70,7 +69,7 @@ public Map<TopicPartition, Long> getPartitionOffsets(
     }
 
     @Override
-    public OffsetResetStrategy getAutoOffsetResetStrategy() {
-        return OffsetResetStrategy.NONE;
+    public String getAutoOffsetResetStrategy() {
+        return PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_NONE;
     }
 }
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplit.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplit.java
index 9f20022..159e763 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplit.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplit.java
@@ -18,10 +18,10 @@
 
 package com.pinterest.flink.connector.psc.source.split;
 
+import com.pinterest.psc.common.TopicUriPartition;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.kafka.common.TopicPartition;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -29,7 +29,7 @@
 import java.util.Optional;
 import java.util.Set;
 
-/** A {@link SourceSplit} for a Kafka partition. */
+/** A {@link SourceSplit} for a PSC topicUriPartition. */
 @Internal
 public class PscTopicUriPartitionSplit implements SourceSplit {
     public static final long NO_STOPPING_OFFSET = Long.MIN_VALUE;
@@ -46,31 +46,31 @@ public class PscTopicUriPartitionSplit implements SourceSplit {
     public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
             new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
 
-    private final TopicPartition tp;
+    private final TopicUriPartition tup;
     private final long startingOffset;
     private final long stoppingOffset;
 
-    public PscTopicUriPartitionSplit(TopicPartition tp, long startingOffset) {
-        this(tp, startingOffset, NO_STOPPING_OFFSET);
+    public PscTopicUriPartitionSplit(TopicUriPartition tup, long startingOffset) {
+        this(tup, startingOffset, NO_STOPPING_OFFSET);
     }
 
-    public PscTopicUriPartitionSplit(TopicPartition tp, long startingOffset, long stoppingOffset) {
-        verifyInitialOffset(tp, startingOffset, stoppingOffset);
-        this.tp = tp;
+    public PscTopicUriPartitionSplit(TopicUriPartition tup, long startingOffset, long stoppingOffset) {
+        verifyInitialOffset(tup, startingOffset, stoppingOffset);
+        this.tup = tup;
         this.startingOffset = startingOffset;
         this.stoppingOffset = stoppingOffset;
     }
 
-    public String getTopic() {
-        return tp.topic();
+    public String getTopicUri() {
+        return tup.getTopicUriAsString();
     }
 
     public int getPartition() {
-        return tp.partition();
+        return tup.getPartition();
     }
 
-    public TopicPartition getTopicPartition() {
-        return tp;
+    public TopicUriPartition getTopicPartition() {
+        return tup;
     }
 
     public long getStartingOffset() {
@@ -87,19 +87,19 @@ public Optional<Long> getStoppingOffset() {
 
     @Override
     public String splitId() {
-        return toSplitId(tp);
+        return toSplitId(tup);
     }
 
     @Override
     public String toString() {
         return String.format(
                 "[Partition: %s, StartingOffset: %d, StoppingOffset: %d]",
-                tp, startingOffset, stoppingOffset);
+                tup, startingOffset, stoppingOffset);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(tp, startingOffset, stoppingOffset);
+        return Objects.hash(tup, startingOffset, stoppingOffset);
     }
 
     @Override
@@ -108,19 +108,19 @@ public boolean equals(Object obj) {
             return false;
         }
         PscTopicUriPartitionSplit other = (PscTopicUriPartitionSplit) obj;
-        return tp.equals(other.tp)
+        return tup.equals(other.tup)
                 && startingOffset == other.startingOffset
                 && stoppingOffset == other.stoppingOffset;
     }
 
-    public static String toSplitId(TopicPartition tp) {
-        return tp.toString();
+    public static String toSplitId(TopicUriPartition tup) {
+        return tup.toString();
     }
 
     // ------------ private methods ---------------
 
     private static void verifyInitialOffset(
-            TopicPartition tp, Long startingOffset, long stoppingOffset) {
+            TopicUriPartition tp, Long startingOffset, long stoppingOffset) {
         if (startingOffset == null) {
             throw new FlinkRuntimeException(
                     "Cannot initialize starting offset for partition " + tp);
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java
index b1bb205..cc10ece 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java
@@ -18,10 +18,9 @@
 
 package com.pinterest.flink.connector.psc.source.split;
 
+import com.pinterest.psc.common.TopicUriPartition;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.kafka.common.TopicPartition;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -31,11 +30,11 @@
 
 /**
  * The {@link SimpleVersionedSerializer serializer} for {@link
- * org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit}.
+ * PscTopicUriPartitionSplit}.
  */
 @Internal
 public class PscTopicUriPartitionSplitSerializer
-        implements SimpleVersionedSerializer<org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit> {
+        implements SimpleVersionedSerializer<PscTopicUriPartitionSplit> {
 
     private static final int CURRENT_VERSION = 0;
 
@@ -45,28 +44,28 @@ public int getVersion() {
     }
 
     @Override
-    public byte[] serialize(org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit split) throws IOException {
+    public byte[] serialize(PscTopicUriPartitionSplit split) throws IOException {
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 DataOutputStream out = new DataOutputStream(baos)) {
-            out.writeUTF(split.getTopic());
+            out.writeUTF(split.getTopicUri());
             out.writeInt(split.getPartition());
             out.writeLong(split.getStartingOffset());
-            out.writeLong(split.getStoppingOffset().orElse(org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit.NO_STOPPING_OFFSET));
+            out.writeLong(split.getStoppingOffset().orElse(PscTopicUriPartitionSplit.NO_STOPPING_OFFSET));
             out.flush();
             return baos.toByteArray();
         }
     }
 
     @Override
-    public org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException {
+    public PscTopicUriPartitionSplit deserialize(int version, byte[] serialized) throws IOException {
         try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                 DataInputStream in = new DataInputStream(bais)) {
-            String topic = in.readUTF();
+            String topicUri = in.readUTF();
             int partition = in.readInt();
             long offset = in.readLong();
             long stoppingOffset = in.readLong();
-            return new KafkaPartitionSplit(
-                    new TopicPartition(topic, partition), offset, stoppingOffset);
+            return new PscTopicUriPartitionSplit(
+                    new TopicUriPartition(topicUri, partition), offset, stoppingOffset);
         }
     }
 }
diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java
index ad7d4f5..df0e621 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitState.java
@@ -19,15 +19,14 @@
 package com.pinterest.flink.connector.psc.source.split;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
-/** This class extends KafkaPartitionSplit to track a mutable current offset. */
+/** This class extends PscTopicUriPartitionSplit to track a mutable current offset. */
 @Internal
-public class PscTopicUriPartitionSplitState extends org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit {
+public class PscTopicUriPartitionSplitState extends PscTopicUriPartitionSplit {
 
     private long currentOffset;
 
-    public PscTopicUriPartitionSplitState(org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit partitionSplit) {
+    public PscTopicUriPartitionSplitState(PscTopicUriPartitionSplit partitionSplit) {
         super(
                 partitionSplit.getTopicPartition(),
                 partitionSplit.getStartingOffset(),
@@ -48,8 +47,8 @@ public void setCurrentOffset(long currentOffset) {
      *
      * @return a new KafkaPartitionSplit which uses the current offset as its starting offset.
      */
-    public org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit toKafkaPartitionSplit() {
-        return new KafkaPartitionSplit(
+    public PscTopicUriPartitionSplit toPscTopicUriPartitionSplit() {
+        return new PscTopicUriPartitionSplit(
                 getTopicPartition(),
                 getCurrentOffset(),
                 getStoppingOffset().orElse(NO_STOPPING_OFFSET));