diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java index 4ac8ba7..c53868b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSource.java @@ -18,6 +18,7 @@ package com.pinterest.flink.connector.psc.source; +import com.pinterest.flink.connector.psc.PscFlinkConfiguration; import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumState; import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumStateSerializer; import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumerator; @@ -34,6 +35,7 @@ import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.exception.ClientException; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; @@ -229,7 +231,13 @@ public SplitEnumerator restoreEnu @Internal @Override public SimpleVersionedSerializer getSplitSerializer() { - return new PscTopicUriPartitionSplitSerializer(); + PscTopicUriPartitionSplitSerializer serializer = new PscTopicUriPartitionSplitSerializer(); + try { + serializer.setClusterUri(PscFlinkConfiguration.validateAndGetBaseClusterUri(props).getTopicUriAsString()); + return serializer; + } catch (TopicUriSyntaxException e) { + throw new RuntimeException("Failed to set clusterUri for splitSerializer", e); + } } @Internal diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java index cc10ece..d91630f 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/split/PscTopicUriPartitionSplitSerializer.java @@ -18,9 +18,13 @@ package com.pinterest.flink.connector.psc.source.split; +import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -36,13 +40,23 @@ public class PscTopicUriPartitionSplitSerializer implements SimpleVersionedSerializer { + private static final Logger LOG = LoggerFactory.getLogger(PscTopicUriPartitionSplitSerializer.class); private static final int CURRENT_VERSION = 0; + private String clusterUri = null; @Override public int getVersion() { return CURRENT_VERSION; } + public void setClusterUri(String clusterUri) { + LOG.info("Setting cluster URI: " + clusterUri); + if (clusterUri != null) { + throw new IllegalArgumentException("Cluster URI can only be set once."); + } + this.clusterUri = clusterUri; + } + @Override public byte[] serialize(PscTopicUriPartitionSplit split) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -58,9 +72,24 @@ public byte[] serialize(PscTopicUriPartitionSplit split) throws IOException { @Override public PscTopicUriPartitionSplit deserialize(int version, byte[] serialized) throws IOException { + LOG.info("Deserializing split with version: " + version); try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topicUri = in.readUTF(); + try { + // try to validate topicUri + BaseTopicUri.validate(topicUri); + } catch (TopicUriSyntaxException e) { + LOG.info("Detected a possible Flink-Kafka checkpoint with topic: " + topicUri); + // we are likely reading from a Flink-Kafka checkpoint here. To support recovering from Flink-Kafka + // checkpoints, we will assume that topicUri here is actually just the topic name, and prepend the + // cluster URI to it. + if (clusterUri == null) { + throw new IllegalStateException("Cluster URI not set. Cannot deserialize split."); + } + topicUri = clusterUri + topicUri; + LOG.info("Prepending cluster URI to topic so that topicUri is now: " + topicUri); + } int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong();