Skip to content

Commit

Permalink
Replace unnecessary key objects creation with map of keys, add PscEve…
Browse files Browse the repository at this point in the history
…nt doc
  • Loading branch information
ArtemTetenkin committed May 3, 2024
1 parent 60f26a0 commit adb0228
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.pinterest.flink.streaming.connectors.psc.internals;

import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.consumer.PscConsumerPollMessageIterator;
Expand Down Expand Up @@ -133,23 +134,46 @@ public void runFetchLoop() throws Exception {
// kick off the actual PSC consumer
consumerThread.start();

Map<TopicUri, Map<Integer, PscTopicUriPartition>> topicUriPartitionsMap = new HashMap<>();

while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread

try (PscConsumerPollMessageIterator<byte[], byte[]> records = handover.pollNext()) {
while (records.hasNext()) {
PscConsumerMessage<byte[], byte[]> record = records.next();
PscTopicUriPartition
key =
new PscTopicUriPartition(
record.getMessageId().getTopicUriPartition().getTopicUri(),
record.getMessageId().getTopicUriPartition().getPartition());

TopicUri topicUri = record.getMessageId().getTopicUriPartition()
.getTopicUri();
Integer partition = record.getMessageId().getTopicUriPartition()
.getPartition();

PscTopicUriPartition key;

if (topicUriPartitionsMap.containsKey(topicUri)) {
if (topicUriPartitionsMap.get(topicUri).containsKey(partition)) {
key = topicUriPartitionsMap.get(topicUri).get(partition);
} else {
key = new PscTopicUriPartition(topicUri, partition);
topicUriPartitionsMap.get(topicUri).put(partition, key);
}
} else {
key = new PscTopicUriPartition(topicUri, partition);
Map<Integer, PscTopicUriPartition> partitionToKeyMap = new HashMap<>();
partitionToKeyMap.put(partition, key);
topicUriPartitionsMap.put(topicUri, partitionToKeyMap);
}

PscTopicUriPartitionState<T, TopicUriPartition>
partitionState =
subscribedPartitionStates().get(key);
if (partitionState != null) {
topicUriPartitionConsumerRecordsHandler(record, partitionState);
} else {
LOG.warn(
"Found unknown topic and partition: {} {} while reading messages from iterator",
topicUri, partition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@

import java.util.Map;

/**
* PscEvent framework enables event driven interaction between main loop and psc consumer thread.
* At this point it is used only in MemQ consumer case to inform memq consumer that the main loop encountered
* the end of a batch while traversing messages from iterator.
*
* This is an evolving framework.
*/

public class PscEvent {
public final static String EVENT_HEADER = "__EVENT_HEADER";

Expand Down

0 comments on commit adb0228

Please sign in to comment.