Skip to content

Commit

Permalink
WIP finish Source and Sink tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 11, 2024
1 parent cd5a930 commit e873e57
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.Properties;

public class PscFlinkConfiguration {
public static final String CLUSTER_URI_CONFIG = "psc.producer.cluster.uri";
public static final String CLUSTER_URI_CONFIG = "psc.cluster.uri";

public static TopicUri validateAndGetBaseClusterUri(Properties properties) throws TopicUriSyntaxException {
if (!properties.containsKey(CLUSTER_URI_CONFIG)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ protected void onSplitFinished(Map<String, PscTopicUriPartitionSplitState> finis
if (splitState.getCurrentOffset() >= 0) {
offsetsOfFinishedSplits.put(
splitState.getTopicUriPartition(),
new MessageId(splitState.getTopicUriPartition(), splitState.getCurrentOffset()));
// last processed offset. split.getCurrentOffset() is the next offset to be processed
new MessageId(splitState.getTopicUriPartition(), splitState.getCurrentOffset() - 1));
}
});
}
Expand All @@ -115,7 +116,8 @@ public List<PscTopicUriPartitionSplit> snapshotState(long checkpointId) {
if (split.getStartingOffset() >= 0) {
offsetsMap.put(
split.getTopicUriPartition(),
new MessageId(split.getTopicUriPartition(), split.getStartingOffset()));
// last processed offset. split.getStartingOffset() is the next offset to be processed
new MessageId(split.getTopicUriPartition(), split.getStartingOffset() - 1));
}
}
// Put offsets of all the finished splits.
Expand Down Expand Up @@ -162,7 +164,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
committedPartitions.forEach(
(tp, offset) ->
pscSourceReaderMetrics.recordCommittedOffset(
tp, offset.getOffset()));
tp, offset.getOffset() + 1)); // offset.getOffset() is the last processed offset
offsetsOfFinishedSplits
.entrySet()
.removeIf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void testCommitOffsetsWithoutAliveFetchers() throws Exception {
assertThat(committedOffsets).hasSize(1);
assertThat(committedOffsets.values())
.extracting(OffsetAndMetadata::offset)
.allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT + 1);
.allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT);
}
}

Expand Down Expand Up @@ -249,7 +249,7 @@ void testOffsetCommitOnCheckpointComplete() throws Exception {
assertThat(committedOffsets).hasSize(numSplits);
assertThat(committedOffsets.values())
.extracting(OffsetAndMetadata::offset)
.allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT + 1);
.allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void testSimpleConsumptionCount() throws Exception {

int numPartitions = parallelism;
String topicName = "test_simple_count";
createTestTopic(topicName, numPartitions, 1);
createTestTopic(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topicName, numPartitions, 1);
pscTestEnvWithKafka.produceToKafka(topicName, recordsInEachPartition, numPartitions, "hello");

Properties readProps = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void testFlushAfterClosed() throws ProducerException, ConfigurationExcept
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
String topic = "flink-kafka-producer-txn-coordinator-changed";
String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
createTestTopic(topic, 1, 2);
createTestTopic(topicUri, 1, 2);
PscProducer<String, String> pscProducer = new FlinkPscInternalProducer<>(extraProducerProperties);
try {
// <added> a simple transactional call to launch a backend producer
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void runAutoOffsetResetTest() throws Exception {

Properties tprops = new Properties();
tprops.setProperty("retention.ms", "250");
pscTestEnvWithKafka.createTestTopic(topic, parallelism, 1, tprops);
pscTestEnvWithKafka.createTestTopic(topicUri, parallelism, 1, tprops);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
Expand Down Expand Up @@ -234,7 +234,7 @@ public void runFailOnAutoOffsetResetNone() throws Exception {
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
final int parallelism = 1;

pscTestEnvWithKafka.createTestTopic(topic, parallelism, 1);
pscTestEnvWithKafka.createTestTopic(topicUri, parallelism, 1);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
Expand Down Expand Up @@ -272,7 +272,7 @@ public void runFailOnAutoOffsetResetNoneEager() throws Exception {
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
final int parallelism = 1;

pscTestEnvWithKafka.createTestTopic(topic, parallelism, 1);
pscTestEnvWithKafka.createTestTopic(topicUri, parallelism, 1);

// ----------- add consumer ----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.producer.ProducerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metrics.NullMetricsReporter;
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
Expand Down Expand Up @@ -238,8 +239,18 @@ public void createTestTopic(String topicUriString,
NewTopic topicObj = new NewTopic(BaseTopicUri.validate(topicUriString).getTopic(), numberOfPartitions, (short) replicationFactor);
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
} catch (Exception e) {
e.printStackTrace();
fail("Create test topic : " + topicUriString + " failed, " + e.getMessage());
// try to create it assuming that it's not a topicUriString
if (e instanceof TopicUriSyntaxException) {
LOG.warn("Trying to create assuming that {} is just the topicName", topicUriString);
try (AdminClient adminClient = AdminClient.create(getStandardKafkaProperties())) {
NewTopic topicObj = new NewTopic(topicUriString, numberOfPartitions, (short) replicationFactor);
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
} catch (Exception e2) {
fail("Create test topic : " + topicUriString + " failed, " + e.getMessage());
}
} else {
fail("Create test topic : " + topicUriString + " failed, " + e.getMessage());
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.logging.PscLogger;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -12,7 +13,7 @@

// base topic URI only parses the URI down to topic RN; <protocol>:<separator>[topicRN]

public class BaseTopicUri implements TopicUri {
public class BaseTopicUri implements TopicUri, Serializable {
private static final PscLogger logger = PscLogger.getLogger(BaseTopicUri.class);
private static final String DEFAULT_PROTOCOL = "plaintext";
private static final String[] SCHEMAS = {
Expand All @@ -23,6 +24,7 @@ public class BaseTopicUri implements TopicUri {
"((?<protocol>[a-zA-Z0-9-_.]+):%s)?(?<topicRn>[a-zA-Z0-9-_.:]+)", SEPARATOR
))
};
private static final long serialVersionUID = 6728709313941136158L;

private final String topicUriAsString;
protected final String protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.pinterest.psc.exception.startup.TopicUriSyntaxException;

import java.io.Serializable;

/**
* A topic URI and partition pair to identify the most granular pub/sub concept plus the access protocol.
*/
public class TopicUriPartition implements Comparable<TopicUriPartition> {
public class TopicUriPartition implements Comparable<TopicUriPartition>, Serializable {
private static final long serialVersionUID = -7784054113828809322L;
private final String topicUriStr;
private final int partition;
private TopicUri backendTopicUri;
Expand Down

0 comments on commit e873e57

Please sign in to comment.