diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java index 44301ed..e997871 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java @@ -17,6 +17,7 @@ package com.pinterest.flink.streaming.connectors.psc.internals; +import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.consumer.PscConsumerPollMessageIterator; @@ -133,6 +134,8 @@ public void runFetchLoop() throws Exception { // kick off the actual PSC consumer consumerThread.start(); + Map> topicUriPartitionsMap = new HashMap<>(); + while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread @@ -140,16 +143,37 @@ public void runFetchLoop() throws Exception { try (PscConsumerPollMessageIterator records = handover.pollNext()) { while (records.hasNext()) { PscConsumerMessage record = records.next(); - PscTopicUriPartition - key = - new PscTopicUriPartition( - record.getMessageId().getTopicUriPartition().getTopicUri(), - record.getMessageId().getTopicUriPartition().getPartition()); + + TopicUri topicUri = record.getMessageId().getTopicUriPartition() + .getTopicUri(); + Integer partition = record.getMessageId().getTopicUriPartition() + .getPartition(); + + PscTopicUriPartition key; + + if (topicUriPartitionsMap.containsKey(topicUri)) { + if (topicUriPartitionsMap.get(topicUri).containsKey(partition)) { + key = topicUriPartitionsMap.get(topicUri).get(partition); + } else { + key = new PscTopicUriPartition(topicUri, partition); + topicUriPartitionsMap.get(topicUri).put(partition, key); + } + } else { + key = new PscTopicUriPartition(topicUri, partition); + Map partitionToKeyMap = new HashMap<>(); + partitionToKeyMap.put(partition, key); + topicUriPartitionsMap.put(topicUri, partitionToKeyMap); + } + PscTopicUriPartitionState partitionState = subscribedPartitionStates().get(key); if (partitionState != null) { topicUriPartitionConsumerRecordsHandler(record, partitionState); + } else { + LOG.warn( + "Found unknown topic and partition: {} {} while reading messages from iterator", + topicUri, partition); } } } diff --git a/psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java b/psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java index 0940cb5..a7964fe 100644 --- a/psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java +++ b/psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java @@ -5,6 +5,14 @@ import java.util.Map; +/** + * PscEvent framework enables event driven interaction between main loop and psc consumer thread. + * At this point it is used only in MemQ consumer case to inform memq consumer that the main loop encountered + * the end of a batch while traversing messages from iterator. + * + * This is an evolving framework. + */ + public class PscEvent { public final static String EVENT_HEADER = "__EVENT_HEADER";