Skip to content

Commit

Permalink
Attempt to allow for Flink-Kafka checkpoint recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Jan 10, 2025
1 parent 4c5f16b commit ddc8172
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.pinterest.flink.connector.psc.source;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumState;
import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumStateSerializer;
import com.pinterest.flink.connector.psc.source.enumerator.PscSourceEnumerator;
Expand All @@ -34,6 +35,7 @@
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.exception.ClientException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -229,7 +231,13 @@ public SplitEnumerator<PscTopicUriPartitionSplit, PscSourceEnumState> restoreEnu
@Internal
@Override
public SimpleVersionedSerializer<PscTopicUriPartitionSplit> getSplitSerializer() {
return new PscTopicUriPartitionSplitSerializer();
PscTopicUriPartitionSplitSerializer serializer = new PscTopicUriPartitionSplitSerializer();
try {
serializer.setClusterUri(PscFlinkConfiguration.validateAndGetBaseClusterUri(props).getTopicUriAsString());
return serializer;
} catch (TopicUriSyntaxException e) {
throw new RuntimeException("Failed to set clusterUri for splitSerializer", e);
}
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

package com.pinterest.flink.connector.psc.source.split;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -36,13 +40,23 @@
public class PscTopicUriPartitionSplitSerializer
implements SimpleVersionedSerializer<PscTopicUriPartitionSplit> {

private static final Logger LOG = LoggerFactory.getLogger(PscTopicUriPartitionSplitSerializer.class);
private static final int CURRENT_VERSION = 0;
private String clusterUri = null;

@Override
public int getVersion() {
return CURRENT_VERSION;
}

public void setClusterUri(String clusterUri) {
LOG.info("Setting cluster URI: " + clusterUri);
if (clusterUri != null) {
throw new IllegalArgumentException("Cluster URI can only be set once.");
}
this.clusterUri = clusterUri;
}

@Override
public byte[] serialize(PscTopicUriPartitionSplit split) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand All @@ -58,9 +72,24 @@ public byte[] serialize(PscTopicUriPartitionSplit split) throws IOException {

@Override
public PscTopicUriPartitionSplit deserialize(int version, byte[] serialized) throws IOException {
LOG.info("Deserializing split with version: " + version);
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topicUri = in.readUTF();
try {
// try to validate topicUri
BaseTopicUri.validate(topicUri);
} catch (TopicUriSyntaxException e) {
LOG.info("Detected a possible Flink-Kafka checkpoint with topic: " + topicUri);
// we are likely reading from a Flink-Kafka checkpoint here. To support recovering from Flink-Kafka
// checkpoints, we will assume that topicUri here is actually just the topic name, and prepend the
// cluster URI to it.
if (clusterUri == null) {
throw new IllegalStateException("Cluster URI not set. Cannot deserialize split.");
}
topicUri = clusterUri + topicUri;
LOG.info("Prepending cluster URI to topic so that topicUri is now: " + topicUri);
}
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
Expand Down

0 comments on commit ddc8172

Please sign in to comment.