From 2b27c6879446c4b7619369af3ae9470adeb3f0d3 Mon Sep 17 00:00:00 2001 From: artem Date: Thu, 7 Mar 2024 14:16:03 -0800 Subject: [PATCH] Add MemQ Flink consumer support --- psc-flink/pom.xml | 17 + .../psc/internals/AbstractFetcher.java | 43 +-- .../connectors/psc/internals/Handover.java | 27 +- .../psc/internals/PscConsumerThread.java | 39 +- .../connectors/psc/internals/PscFetcher.java | 38 +- .../PscFlinkConsumerEventInterceptor.java | 29 ++ .../psc/internals/PscShuffleFetcher.java | 8 +- .../psc/internals/AbstractFetcherTest.java | 7 +- .../AbstractFetcherWatermarksTest.java | 62 ++- .../consumer/memq/TestPscMemqConsumer.java | 6 +- .../psc/integration/TestAutoRemediation.java | 2 + .../psc/common/PscBackendClient.java | 9 + .../psc/common/event/EventHandler.java | 5 + .../pinterest/psc/common/event/PscEvent.java | 39 ++ .../psc/config/PscConfigurationReporter.java | 4 +- .../pinterest/psc/consumer/PscConsumer.java | 23 +- .../creation/PscBackendConsumerCreator.java | 55 +++ .../creation/PscKafkaConsumerCreator.java | 16 + .../creation/PscMemqConsumerCreator.java | 73 +++- .../psc/consumer/memq/MemqMessageId.java | 22 +- .../psc/consumer/memq/MemqMetricsHandler.java | 17 +- .../psc/consumer/memq/MemqOffset.java | 64 ++++ .../MemqToPscMessageIteratorConverter.java | 52 ++- .../psc/consumer/memq/PscMemqConsumer.java | 354 +++++++++++++++--- .../TestPscConsumerInterceptor.java | 8 +- 25 files changed, 860 insertions(+), 159 deletions(-) create mode 100644 psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFlinkConsumerEventInterceptor.java create mode 100644 psc/src/main/java/com/pinterest/psc/common/event/EventHandler.java create mode 100644 psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java create mode 100644 psc/src/main/java/com/pinterest/psc/consumer/memq/MemqOffset.java diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml index 7f4c9b7..3797d34 100644 --- a/psc-flink/pom.xml +++ b/psc-flink/pom.xml @@ -23,6 +23,11 @@ + + org.apache.flink + flink-shaded-netty + 4.1.70.Final-15.0 + org.apache.flink flink-streaming-java @@ -386,6 +391,18 @@ + + org.apache.maven.plugins + maven-jar-plugin + 3.2.2 + + + + test-jar + + + + org.apache.maven.plugins maven-source-plugin diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java index 1e64c7a..c62491f 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java @@ -18,6 +18,7 @@ package com.pinterest.flink.streaming.connectors.psc.internals; +import java.util.Collection; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; @@ -91,7 +92,7 @@ public abstract class AbstractFetcher { /** * All partitions (and their state) that this fetcher is subscribed to. */ - private final List> subscribedTopicUriPartitionStates; + private final Map> subscribedTopicUriPartitionStates; /** * Queue of partitions that are not yet assigned to any PSC clients for consuming. @@ -184,27 +185,27 @@ protected AbstractFetcher( userCodeClassLoader); // check that all seed partition states have a defined offset - for (PscTopicUriPartitionState partitionState : subscribedTopicUriPartitionStates) { + for (PscTopicUriPartitionState partitionState : subscribedTopicUriPartitionStates.values()) { if (!partitionState.isOffsetDefined()) { throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets."); } } // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue - for (PscTopicUriPartitionState partition : subscribedTopicUriPartitionStates) { + for (PscTopicUriPartitionState partition : subscribedTopicUriPartitionStates.values()) { unassignedTopicUriPartitionsQueue.add(partition); } // register metrics for the initial seed partitions if (useMetrics) { - registerOffsetMetrics(consumerMetricGroup, subscribedTopicUriPartitionStates); + registerOffsetMetrics(consumerMetricGroup, subscribedTopicUriPartitionStates.values()); } // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) { PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter<>( checkpointLock, - subscribedTopicUriPartitionStates, + subscribedTopicUriPartitionStates.values(), watermarkOutputMultiplexer, processingTimeProvider, autoWatermarkInterval); @@ -227,7 +228,7 @@ protected AbstractFetcher( * @param newPartitions discovered partitions to add */ public void addDiscoveredPartitions(List newPartitions) throws IOException, ClassNotFoundException { - List> newPartitionStates = createPartitionStateHolders( + Map> newPartitionStates = createPartitionStateHolders( newPartitions, PscTopicUriPartitionStateSentinel.EARLIEST_OFFSET, timestampWatermarkMode, @@ -235,14 +236,14 @@ public void addDiscoveredPartitions(List newPartitions) th userCodeClassLoader); if (useMetrics) { - registerOffsetMetrics(consumerMetricGroup, newPartitionStates); + registerOffsetMetrics(consumerMetricGroup, newPartitionStates.values()); } - for (PscTopicUriPartitionState newPartitionState : newPartitionStates) { + for (Map.Entry> newPartitionStateEntry : newPartitionStates.entrySet()) { // the ordering is crucial here; first register the state holder, then // push it to the partitions queue to be read - subscribedTopicUriPartitionStates.add(newPartitionState); - unassignedTopicUriPartitionsQueue.add(newPartitionState); + subscribedTopicUriPartitionStates.put(newPartitionStateEntry.getKey(), newPartitionStateEntry.getValue()); + unassignedTopicUriPartitionsQueue.add(newPartitionStateEntry.getValue()); } } @@ -255,7 +256,7 @@ public void addDiscoveredPartitions(List newPartitions) th * * @return All subscribed partitions. */ - protected final List> subscribedPartitionStates() { + protected final Map> subscribedPartitionStates() { return subscribedTopicUriPartitionStates; } @@ -329,7 +330,7 @@ public HashMap snapshotCurrentState() { assert Thread.holdsLock(checkpointLock); HashMap state = new HashMap<>(subscribedTopicUriPartitionStates.size()); - for (PscTopicUriPartitionState partition : subscribedTopicUriPartitionStates) { + for (PscTopicUriPartitionState partition : subscribedTopicUriPartitionStates.values()) { state.put(partition.getPscTopicUriPartition(), partition.getOffset()); } return state; @@ -375,15 +376,15 @@ protected void emitRecordsWithTimestamps( * Utility method that takes the topic partitions and creates the topic partition state * holders, depending on the timestamp / watermark mode. */ - private List> createPartitionStateHolders( - Map partitionsToInitialOffsets, + private Map> createPartitionStateHolders( + Map partitionsToInitialOffsets, int timestampWatermarkMode, SerializedValue> watermarkStrategy, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { // CopyOnWrite as adding discovered partitions could happen in parallel // while different threads iterate the partitions list - List> partitionStates = new CopyOnWriteArrayList<>(); + Map> partitionStates = new HashMap<>(); switch (timestampWatermarkMode) { case NO_TIMESTAMPS_WATERMARKS: { @@ -395,7 +396,7 @@ private List> createPartitionStateHolders( new PscTopicUriPartitionState<>(partitionEntry.getKey(), pscTopicUriPartitionHandle); partitionState.setOffset(partitionEntry.getValue()); - partitionStates.add(partitionState); + partitionStates.put(partitionEntry.getKey(), partitionState);; } return partitionStates; @@ -427,7 +428,7 @@ private List> createPartitionStateHolders( partitionState.setOffset(partitionEntry.getValue()); - partitionStates.add(partitionState); + partitionStates.put(partitionEntry.getKey(), partitionState); } return partitionStates; @@ -443,7 +444,7 @@ private List> createPartitionStateHolders( * Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, ClassLoader)} * that uses the same offset for all partitions when creating their state holders. */ - private List> createPartitionStateHolders( + private Map> createPartitionStateHolders( List partitions, long initialOffset, int timestampWatermarkMode, @@ -476,7 +477,7 @@ private List> createPartitionStateHolders( */ private void registerOffsetMetrics( MetricGroup consumerMetricGroup, - List> partitionOffsetStates) { + Collection> partitionOffsetStates) { for (PscTopicUriPartitionState ktp : partitionOffsetStates) { MetricGroup topicPartitionGroup = consumerMetricGroup @@ -538,7 +539,7 @@ private static class PeriodicWatermarkEmitter implements ProcessingTimeC private final Object checkpointLock; - private final List> allPartitions; + private final Collection> allPartitions; private final WatermarkOutputMultiplexer watermarkOutputMultiplexer; @@ -550,7 +551,7 @@ private static class PeriodicWatermarkEmitter implements ProcessingTimeC PeriodicWatermarkEmitter( Object checkpointLock, - List> allPartitions, + Collection> allPartitions, WatermarkOutputMultiplexer watermarkOutputMultiplexer, ProcessingTimeService timerService, long autoWatermarkInterval) { diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/Handover.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/Handover.java index ce7d974..665a10f 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/Handover.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/Handover.java @@ -18,7 +18,12 @@ package com.pinterest.flink.streaming.connectors.psc.internals; +import com.pinterest.psc.common.event.EventHandler; +import com.pinterest.psc.common.event.PscEvent; import com.pinterest.psc.consumer.PscConsumerPollMessageIterator; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.flink.annotation.Internal; import org.apache.flink.util.ExceptionUtils; @@ -45,7 +50,7 @@ */ @ThreadSafe @Internal -public final class Handover implements Closeable { +public final class Handover implements Closeable, EventHandler { private final Object lock = new Object(); @@ -53,6 +58,8 @@ public final class Handover implements Closeable { private Throwable error; private boolean wakeupProducer; + private Queue eventQueue = new ConcurrentLinkedQueue<>(); + /** * Polls the next element from the Handover, possibly blocking until the next element is * available. This method behaves similar to polling from a blocking queue. @@ -167,6 +174,13 @@ public void reportError(Throwable t) { @Override public void close() { synchronized (lock) { + if (next != null) { + try { + next.close(); + } catch (IOException ioe) { + // pass through; best effort close + } + } next = null; wakeupProducer = false; @@ -189,6 +203,17 @@ public void wakeupProducer() { } } + public void handle(PscEvent pscEvent) { + eventQueue.offer(pscEvent); + if (!wakeupProducer) { + wakeupProducer(); + } + } + + public Queue getEventQueue() { + return eventQueue; + } + // ------------------------------------------------------------------------ /** diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscConsumerThread.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscConsumerThread.java index 95151a1..6d9a77e 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscConsumerThread.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscConsumerThread.java @@ -20,6 +20,7 @@ import com.pinterest.psc.common.MessageId; import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.event.PscEvent; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.config.PscConfigurationUtils; @@ -30,9 +31,13 @@ import com.pinterest.psc.exception.consumer.ConsumerException; import com.pinterest.psc.exception.consumer.WakeupException; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.interceptor.TypePreservingInterceptor; import com.pinterest.psc.metrics.Metric; import com.pinterest.psc.metrics.MetricName; import com.pinterest.psc.metrics.PscMetricRegistryManager; +import java.io.IOException; +import java.util.Arrays; +import java.util.Queue; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; @@ -190,6 +195,8 @@ public PscConsumerThread(Logger log, this.running = true; this.hasInitializedMetrics = false; this.pscConfigurationInternal = PscConfigurationUtils.propertiesToPscConfigurationInternal(pscProperties, PscConfiguration.PSC_CLIENT_TYPE_CONSUMER); + + initializeInterceptor(); } // ------------------------------------------------------------------------ @@ -214,6 +221,11 @@ public void run() { return; } + // the latest bulk of records. May carry across the loop if the thread is woken + // up + // from blocking on the handover + PscConsumerPollMessageIterator records = null; + // from here on, the consumer is guaranteed to be closed properly try { // early exit check @@ -221,11 +233,6 @@ public void run() { return; } - // the latest bulk of records. May carry across the loop if the thread is woken - // up - // from blocking on the handover - PscConsumerPollMessageIterator records = null; - // reused variable to hold found unassigned new partitions. // found partitions are not carried across loops using this variable; // they are carried across via re-adding them to the unassigned partitions queue @@ -294,6 +301,10 @@ public void run() { handover.produce(records); records = null; } catch (Handover.WakeupException e) { + Queue events = handover.getEventQueue(); + while (!events.isEmpty()) { + consumer.onEvent(events.poll()); + } // fall through the loop } } @@ -308,6 +319,14 @@ public void run() { // make sure the handover is closed if it is not already closed or has an error handover.close(); + if (records != null) { + try { + records.close(); + } catch (IOException ioe) { + // pass through; best effort close + } + } + // make sure the PscConsumer is closed try { consumer.close(); @@ -345,6 +364,16 @@ private void initializeMetrics() throws ClientException { } } + private void initializeInterceptor() { + TypePreservingInterceptor eventInterceptor = new PscFlinkConsumerEventInterceptor<>(handover); + Object interceptors = pscProperties.getProperty(PscConfiguration.PSC_CONSUMER_INTERCEPTORS_RAW_CLASSES); + if (interceptors == null) { + pscProperties.put(PscConfiguration.PSC_CONSUMER_INTERCEPTORS_RAW_CLASSES, eventInterceptor); + } else { + pscProperties.put(PscConfiguration.PSC_CONSUMER_INTERCEPTORS_RAW_CLASSES, Arrays.asList(interceptors, eventInterceptor)); + } + } + private void initializePscMetrics() { log.info("Initializing PSC metrics in PscConsumerThread from threadId {}", Thread.currentThread().getId()); if (pscMetricsInitialized == null) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java index b9d9637..a28fb9c 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFetcher.java @@ -20,6 +20,7 @@ import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.consumer.PscConsumerPollMessageIterator; +import java.util.Collection; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.metrics.MetricGroup; @@ -135,16 +136,33 @@ public void runFetchLoop() throws Exception { while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread - final PscConsumerPollMessageIterator records = handover.pollNext(); + try (PscConsumerPollMessageIterator records = handover.pollNext()) { + while (records.hasNext()) { + PscConsumerMessage record = records.next(); + PscTopicUriPartition + key = + new PscTopicUriPartition( + record.getMessageId().getTopicUriPartition().getTopicUri(), + record.getMessageId().getTopicUriPartition().getPartition()); + PscTopicUriPartitionState + partitionState = + subscribedPartitionStates().get(key); + if (partitionState != null) { + topicUriPartitionConsumerRecordsHandler(record, partitionState); + } + } + } + + // retired code; doesn't work for memq due to no support of `iteratorFor` method // get the records for each topic partition - for (PscTopicUriPartitionState partition : subscribedPartitionStates()) { + /* for (PscTopicUriPartitionState partition : subscribedPartitionStates()) { Iterator> partitionRecords = records.iteratorFor(partition.getPscTopicUriPartitionHandle()); topicUriPartitionConsumerRecordsHandler(partitionRecords, partition); - } + }*/ } } finally { // this signals the consumer thread that no more work is to be done @@ -177,11 +195,11 @@ protected String getFetcherName() { } protected void topicUriPartitionConsumerRecordsHandler( - Iterator> topicUriPartitionMessages, + PscConsumerMessage record, PscTopicUriPartitionState pscTopicUriPartitionState) throws Exception { - while (topicUriPartitionMessages.hasNext()) { - PscConsumerMessage record = topicUriPartitionMessages.next(); +// while (topicUriPartitionMessages.hasNext()) { +// PscConsumerMessage record = topicUriPartitionMessages.next(); deserializer.deserialize(record, pscCollector); // emit the actual records. this also updates offset state atomically and emits @@ -195,9 +213,9 @@ protected void topicUriPartitionConsumerRecordsHandler( if (pscCollector.isEndOfStreamSignalled()) { // end of stream signaled running = false; - break; +// break; } - } +// } } // ------------------------------------------------------------------------ @@ -214,14 +232,14 @@ protected void doCommitInternalOffsets( Map topicUriPartitionOffsets, @Nonnull PscCommitCallback commitCallback) throws Exception { - List> pscTopicUriPartitionStates = subscribedPartitionStates(); + Collection> pscTopicUriPartitionStates = subscribedPartitionStates().values(); Map offsetsToCommit = new HashMap<>(pscTopicUriPartitionStates.size()); for (PscTopicUriPartitionState pscTopicUriPartitionState : pscTopicUriPartitionStates) { Long lastProcessedOffset = topicUriPartitionOffsets.get(pscTopicUriPartitionState.getPscTopicUriPartition()); if (lastProcessedOffset != null) { - checkState(lastProcessedOffset >= 0, "Illegal offset value to commit"); + checkState(!PscTopicUriPartitionStateSentinel.isSentinel(lastProcessedOffset), "Illegal offset value to commit"); // committed offsets through the PscConsumer need to be the last processed offset. // PSC will translate that to the proper offset to commit per backend. diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFlinkConsumerEventInterceptor.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFlinkConsumerEventInterceptor.java new file mode 100644 index 0000000..3e16b08 --- /dev/null +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscFlinkConsumerEventInterceptor.java @@ -0,0 +1,29 @@ +package com.pinterest.flink.streaming.connectors.psc.internals; + +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.event.EventHandler; +import com.pinterest.psc.common.event.PscEvent; +import com.pinterest.psc.consumer.PscConsumerMessage; +import com.pinterest.psc.interceptor.TypePreservingInterceptor; + +import java.util.Collections; + +public class PscFlinkConsumerEventInterceptor extends TypePreservingInterceptor { + private final EventHandler eventHandler; + + public PscFlinkConsumerEventInterceptor(EventHandler eventHandler) { + this.eventHandler = eventHandler; + } + + @Override + public PscConsumerMessage onConsume(PscConsumerMessage message) { + if (message.getHeaders().containsKey(PscEvent.EVENT_HEADER)) { + String eventType = new String(message.getHeader(PscEvent.EVENT_HEADER)); + MessageId messageId = message.getMessageId(); + PscEvent event = new PscEvent(messageId.getTopicUriPartition().getTopicUri(), messageId.getTopicUriPartition(), eventType, Collections.emptyMap()); + eventHandler.handle(event); + message.getHeaders().remove(PscEvent.EVENT_HEADER); + } + return super.onConsume(message); + } +} diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscShuffleFetcher.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscShuffleFetcher.java index c7d1848..0760d76 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscShuffleFetcher.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/PscShuffleFetcher.java @@ -106,11 +106,11 @@ protected String getFetcherName() { @Override protected void topicUriPartitionConsumerRecordsHandler( - Iterator> topicUriPartitionMessages, + PscConsumerMessage record, PscTopicUriPartitionState pscTopicUriPartitionState) throws Exception { - while (topicUriPartitionMessages.hasNext()) { - PscConsumerMessage record = topicUriPartitionMessages.next(); + // while (topicUriPartitionMessages.hasNext()) { + // PscConsumerMessage record = topicUriPartitionMessages.next(); final PscShuffleElement element = pscShuffleElementDeserializer.deserialize(record); // TODO: Do we need to check the end of stream if reaching the end watermark @@ -137,7 +137,7 @@ protected void topicUriPartitionConsumerRecordsHandler( Optional newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark); newWatermark.ifPresent(sourceContext::emitWatermark); } - } +// } } /** diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherTest.java index fe332b7..bb4f0ac 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherTest.java @@ -89,7 +89,8 @@ public void onException(Throwable cause) { @Test public void testSkipCorruptedRecord() throws Exception { Map originalPartitions = new HashMap<>(); - originalPartitions.put(new PscTopicUriPartition(TOPIC_URI, 1), PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup = new PscTopicUriPartition(TOPIC_URI, 1); + originalPartitions.put(ptup, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); TestSourceContext sourceContext = new TestSourceContext<>(); @@ -100,7 +101,7 @@ public void testSkipCorruptedRecord() throws Exception { new TestProcessingTimeService(), 0); - final PscTopicUriPartitionState partitionStateHolder = fetcher.subscribedPartitionStates().get(0); + final PscTopicUriPartitionState partitionStateHolder = fetcher.subscribedPartitionStates().get(ptup); emitRecord(fetcher, 1L, partitionStateHolder, 1L); emitRecord(fetcher, 2L, partitionStateHolder, 2L); @@ -211,7 +212,7 @@ private static final class TestFetcher extends AbstractFetcher { @Override public void runFetchLoop() throws Exception { if (fetchLoopWaitLatch != null) { - for (PscTopicUriPartitionState ignored : subscribedPartitionStates()) { + for (PscTopicUriPartitionState ignored : subscribedPartitionStates().values()) { fetchLoopWaitLatch.trigger(); stateIterationBlockLatch.await(); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherWatermarksTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherWatermarksTest.java index d4aafc9..0e36a5a 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherWatermarksTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcherWatermarksTest.java @@ -84,15 +84,12 @@ public static Collection> getParams() { @Test public void testPeriodicWatermarks() throws Exception { Map originalPartitions = new HashMap<>(); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 7), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 13), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 21), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup7 = new PscTopicUriPartition(TOPIC_URI, 7); + originalPartitions.put(ptup7, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup13 = new PscTopicUriPartition(TOPIC_URI, 13); + originalPartitions.put(ptup13, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup21 = new PscTopicUriPartition(TOPIC_URI, 21); + originalPartitions.put(ptup21, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); TestSourceContext sourceContext = new TestSourceContext<>(); @@ -106,11 +103,11 @@ public void testPeriodicWatermarks() throws Exception { 10); final PscTopicUriPartitionState part1 = - fetcher.subscribedPartitionStates().get(0); + fetcher.subscribedPartitionStates().get(ptup7); final PscTopicUriPartitionState part2 = - fetcher.subscribedPartitionStates().get(1); + fetcher.subscribedPartitionStates().get(ptup13); final PscTopicUriPartitionState part3 = - fetcher.subscribedPartitionStates().get(2); + fetcher.subscribedPartitionStates().get(ptup21); // elements generate a watermark if the timestamp is a multiple of three @@ -169,9 +166,8 @@ public void testPeriodicWatermarks() throws Exception { @Test public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception { Map originalPartitions = new HashMap<>(); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 1), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup = new PscTopicUriPartition(TOPIC_URI, 1); + originalPartitions.put(ptup, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); TestSourceContext sourceContext = new TestSourceContext<>(); @@ -185,7 +181,7 @@ public void testSkipCorruptedRecordWithPeriodicWatermarks() throws Exception { 10); final PscTopicUriPartitionState partitionStateHolder = - fetcher.subscribedPartitionStates().get(0); + fetcher.subscribedPartitionStates().get(ptup); // elements generate a watermark if the timestamp is a multiple of three emitRecord(fetcher, 1L, partitionStateHolder, 1L); @@ -239,9 +235,9 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma // counter-test that when the fetcher does actually have partitions, // when the periodic watermark emitter fires again, a watermark really is emitted - fetcher.addDiscoveredPartitions(Collections.singletonList( - new PscTopicUriPartition(TOPIC_URI, 0))); - emitRecord(fetcher, 100L, fetcher.subscribedPartitionStates().get(0), 3L); + PscTopicUriPartition ptup = new PscTopicUriPartition(TOPIC_URI, 0); + fetcher.addDiscoveredPartitions(Collections.singletonList(ptup)); + emitRecord(fetcher, 100L, fetcher.subscribedPartitionStates().get(ptup), 3L);; processingTimeProvider.setCurrentTime(20); assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } @@ -255,9 +251,8 @@ public static class PunctuatedWatermarksSuite { @Test public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception { Map originalPartitions = new HashMap<>(); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 1), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup = new PscTopicUriPartition(TOPIC_URI, 1); + originalPartitions.put(ptup, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); TestSourceContext sourceContext = new TestSourceContext<>(); @@ -274,7 +269,7 @@ public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception { 0); final PscTopicUriPartitionState partitionStateHolder = - fetcher.subscribedPartitionStates().get(0); + fetcher.subscribedPartitionStates().get(ptup); // elements generate a watermark if the timestamp is a multiple of three emitRecord(fetcher, 1L, partitionStateHolder, 1L); @@ -300,15 +295,12 @@ public void testSkipCorruptedRecordWithPunctuatedWatermarks() throws Exception { @Test public void testPunctuatedWatermarks() throws Exception { Map originalPartitions = new HashMap<>(); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 7), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 13), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); - originalPartitions.put( - new PscTopicUriPartition(TOPIC_URI, 21), - PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup7 = new PscTopicUriPartition(TOPIC_URI, 7); + originalPartitions.put(ptup7, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup13 = new PscTopicUriPartition(TOPIC_URI, 13); + originalPartitions.put(ptup13, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); + PscTopicUriPartition ptup21 = new PscTopicUriPartition(TOPIC_URI, 21); + originalPartitions.put(ptup21, PscTopicUriPartitionStateSentinel.LATEST_OFFSET); TestSourceContext sourceContext = new TestSourceContext<>(); @@ -325,11 +317,11 @@ public void testPunctuatedWatermarks() throws Exception { 0); final PscTopicUriPartitionState part1 = - fetcher.subscribedPartitionStates().get(0); + fetcher.subscribedPartitionStates().get(ptup7); final PscTopicUriPartitionState part2 = - fetcher.subscribedPartitionStates().get(1); + fetcher.subscribedPartitionStates().get(ptup13); final PscTopicUriPartitionState part3 = - fetcher.subscribedPartitionStates().get(2); + fetcher.subscribedPartitionStates().get(ptup21); // elements generate a watermark if the timestamp is a multiple of three diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java b/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java index 9199393..c62154c 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java @@ -304,7 +304,7 @@ void commitErrorScenarios() throws Exception { // topic not in subscription MemqTopicUri memqUri = MemqTopicUri.validate(TopicUri.validate(testMemqTopic1)); Set memqMessageIds = Collections - .singleton(new MemqMessageId(new TopicUriPartition(testKafkaTopic, 0), 0)); + .singleton(new MemqMessageId(new TopicUriPartition(testKafkaTopic, 0), 0, false)); assertThrows(ConsumerException.class, () -> pscMemqConsumer.commitSync(memqMessageIds)); // invalid message id @@ -320,9 +320,9 @@ void commit() throws Exception { "test_" + System.currentTimeMillis()); MemqTopicUri memqUri = MemqTopicUri.validate(TopicUri.validate(testMemqTopic1)); MessageId memqMessageId1 = new MemqMessageId( - TestUtils.getFinalizedTopicUriPartition(memqUri, 0), 0); + TestUtils.getFinalizedTopicUriPartition(memqUri, 0), 0, false); MessageId memqMessageId2 = new MemqMessageId( - TestUtils.getFinalizedTopicUriPartition(memqUri, 0), 10); + TestUtils.getFinalizedTopicUriPartition(memqUri, 0), new MemqOffset(10, 0).toLong(), false); pscMemqConsumer.subscribe(Sets.newHashSet(memqUri)); MemqConsumer memqConsumer = Mockito.mock(MemqConsumer.class); diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/integration/TestAutoRemediation.java b/psc-integration-test/src/test/java/com/pinterest/psc/integration/TestAutoRemediation.java index 802d94d..fcd62f0 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/integration/TestAutoRemediation.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/integration/TestAutoRemediation.java @@ -17,6 +17,7 @@ import com.pinterest.psc.logging.PscLogger; import com.pinterest.psc.metrics.PscMetricRegistryManager; import com.pinterest.psc.metrics.PscMetrics; +import com.pinterest.psc.metrics.PscMetricsUtil; import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; import com.pinterest.psc.producer.PscProducerUtils; @@ -131,6 +132,7 @@ public void setup() throws IOException, InterruptedException, ConfigurationExcep PscTestUtils.createTopicAndVerify(sharedKafkaTestResource, topic1, partitions1); PscTestUtils.createTopicAndVerify(sharedKafkaTestResource, topic2, partitions2); + PscMetricsUtil.cleanup(PscMetricRegistryManager.getInstance()); } /** diff --git a/psc/src/main/java/com/pinterest/psc/common/PscBackendClient.java b/psc/src/main/java/com/pinterest/psc/common/PscBackendClient.java index 905d109..a0e25e6 100644 --- a/psc/src/main/java/com/pinterest/psc/common/PscBackendClient.java +++ b/psc/src/main/java/com/pinterest/psc/common/PscBackendClient.java @@ -1,5 +1,6 @@ package com.pinterest.psc.common; +import com.pinterest.psc.common.event.PscEvent; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.exception.ClientException; import com.pinterest.psc.logging.PscLogger; @@ -72,4 +73,12 @@ protected void resetBackendClient() throws ClientException { } * Implements and global retry action for backend producers and consumers APIs */ protected void retryBackendClient() { } + + /** + * Allows event-based logic to be executed by the backend producer/consumer + * @param event an PscEvent that the consumer may handle + */ + public void onEvent(PscEvent event) { + + } } diff --git a/psc/src/main/java/com/pinterest/psc/common/event/EventHandler.java b/psc/src/main/java/com/pinterest/psc/common/event/EventHandler.java new file mode 100644 index 0000000..ecb127f --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/common/event/EventHandler.java @@ -0,0 +1,5 @@ +package com.pinterest.psc.common.event; + +public interface EventHandler { + void handle(PscEvent pscEvent); +} diff --git a/psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java b/psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java new file mode 100644 index 0000000..0940cb5 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/common/event/PscEvent.java @@ -0,0 +1,39 @@ +package com.pinterest.psc.common.event; + +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; + +import java.util.Map; + +public class PscEvent { + public final static String EVENT_HEADER = "__EVENT_HEADER"; + + private final TopicUri topicUri; + private final TopicUriPartition topicUriPartition; + private final String type; + private final Map context; + + public PscEvent(TopicUri topicUri, TopicUriPartition topicUriPartition, String type, + Map context) { + this.topicUri = topicUri; + this.topicUriPartition = topicUriPartition; + this.type = type; + this.context = context; + } + + public TopicUri getTopicUri() { + return topicUri; + } + + public TopicUriPartition getTopicUriPartition() { + return topicUriPartition; + } + + public String getType() { + return type; + } + + public Map getContext() { + return context; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java index 5a0c393..536a7f3 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java @@ -83,8 +83,8 @@ public void report() { public static boolean isThisYou(PscConfiguration pscConfiguration) { return pscConfiguration != null && pscConfiguration.containsKey(PscConfiguration.PSC_PROJECT) && - pscConfiguration.getString(PscConfiguration.PSC_PROJECT).equals("psc") && + pscConfiguration.getProperty(PscConfiguration.PSC_PROJECT).equals("psc") && pscConfiguration.containsKey(PscConfiguration.PSC_PRODUCER_CLIENT_ID) && - pscConfiguration.getString(PscConfiguration.PSC_PRODUCER_CLIENT_ID).contains(PSC_CONFIGURATION_REPORTER_CLIENT_ID); + ((String) pscConfiguration.getProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID)).contains(PSC_CONFIGURATION_REPORTER_CLIENT_ID); } } diff --git a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java index 0bb81bd..d24ba39 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java @@ -8,6 +8,7 @@ import com.pinterest.psc.common.PscCommon; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.event.PscEvent; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.consumer.creation.PscBackendConsumerCreator; @@ -829,7 +830,7 @@ public void commitAsync(Collection messageIds, OffsetCommitCallback o validateMessageIds(messageIds); Map, Set> backendConsumers = - getNoAssignmentBackendConsumers(messageIds, false); + getCommitBackendConsumers(messageIds, false); for (Map.Entry, Set> entry : backendConsumers.entrySet()) { entry.getKey().commitAsync(entry.getValue(), new OffsetCommitCallback() { @Override @@ -879,7 +880,7 @@ public void commitSync(Collection messageIds) throws ConsumerExceptio validateMessageIds(messageIds); Map, Set> backendConsumers = - getNoAssignmentBackendConsumers(messageIds, true); + getCommitBackendConsumers(messageIds, false); for (Map.Entry, Set> entry : backendConsumers.entrySet()) { try { entry.getKey().commitSync(entry.getValue()); @@ -913,7 +914,7 @@ public void commitSync(MessageId messageId) throws ConsumerException, WakeupExce commitSync(Collections.singleton(messageId)); } - private Map, Set> getNoAssignmentBackendConsumers(Collection messageIds, boolean canWakeup) throws ConfigurationException, ConsumerException { + private Map, Set> getCommitBackendConsumers(Collection messageIds, boolean canWakeup) throws ConfigurationException, ConsumerException { // dispatch messageIds to creators based on the backend Map creator = creatorManager.getBackendCreators(); Map> backendToMessageIds = new HashMap<>(); @@ -927,13 +928,12 @@ private Map, Set> getNoAssignmentBackendCons for (Map.Entry> entry : backendToMessageIds.entrySet()) { if (creator.containsKey(entry.getKey())) { for (MessageId messageId : entry.getValue()) { - PscBackendConsumer backendConsumer = creator.get(entry.getKey()).getAssignmentConsumer( + PscBackendConsumer backendConsumer = creator.get(entry.getKey()).getCommitConsumer( environment, pscConfigurationInternal, consumerInterceptors, messageId.getTopicUriPartition(), - canWakeup && wakeups.get() >= 0, - false + canWakeup && wakeups.get() >= 0 ); backendConsumerToMessageIds.computeIfAbsent(backendConsumer, b -> new HashSet<>()) @@ -1772,12 +1772,11 @@ public Map getMessageIdByTimestamp(Map> entry : backendToTopicUriPartitionTimestampMap.entrySet()) { if (creator.containsKey(entry.getKey())) { for (Map.Entry entry2 : entry.getValue().entrySet()) { - PscBackendConsumer backendConsumer = creator.get(entry.getKey()).getAssignmentConsumer( + PscBackendConsumer backendConsumer = creator.get(entry.getKey()).getCommitConsumer( environment, pscConfigurationInternal, consumerInterceptors, entry2.getKey(), - false, false ); @@ -1920,6 +1919,14 @@ else if (messageId.getTopicUriPartition().getTopicUri() == null) } } + public void onEvent(PscEvent event) { + if (subscribed.get() && event.getTopicUri() != null) { + subscriptionMap.get(event.getTopicUri()).onEvent(event); + } else if (assigned.get() && event.getTopicUriPartition() != null) { + assignmentMap.get(event.getTopicUriPartition()).onEvent(event); + } + } + @VisibleForTesting protected Map getBackendConsumerConfigurationPerTopicUri() { if (assigned.get()) diff --git a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscBackendConsumerCreator.java b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscBackendConsumerCreator.java index d2ab609..e44b80f 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscBackendConsumerCreator.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscBackendConsumerCreator.java @@ -216,6 +216,61 @@ public PscBackendConsumer getAssignmentConsumer( return pscBackendConsumers.iterator().next(); } + /** + * Creates/updates the backend consumer(s) and returns a set of active consumers + * The creator is responsible of reusing resources + * + * @param environment the Environment of the current PscConsumer instance + * @param pscConfigurationInternal the PscConfiguration of the current initialization of the consumer + * @param consumerInterceptors the consumer interceptors for consumer APIs + * @param topicUriPartitions {@link TopicUriPartition}s that this backend should handle + * @param shouldWakeup whether the retrieved consumers should be woken up + * @return a Set of PscBackendConsumers that are active and should not be removed/unsubscribed + * @throws ConsumerException thrown if extracting a backend consumer fails due to invalid topic URI, or lack of + * an applicable service discovery method, or ... + * @throws ConfigurationException thrown if service discovery fails based on the provided configuration + */ + public abstract Set> getCommitConsumers( + Environment environment, + PscConfigurationInternal pscConfigurationInternal, + ConsumerInterceptors consumerInterceptors, + Set topicUriPartitions, + boolean shouldWakeup + ) throws ConsumerException, ConfigurationException; + + /** + * Create/updates consumers suitable for performing commits for this backend + * @param environment the Environment of the current PscConsumer instance + * @param pscConfigurationInternal the PscConfiguration of the current initialization of the consumer + * @param consumerInterceptors the consumer interceptors for consumer APIs + * @param topicUriPartition {@link TopicUriPartition} that this backend should handle + * @param shouldWakeup whether the retrieved consumers should be woken up + * @return a Set of PscBackendConsumers that are active and should not be removed/unsubscribed + * @throws ConsumerException thrown if extracting a backend consumer fails due to invalid topic URI, or lack of + * an applicable service discovery method, or ... + * @throws ConfigurationException thrown if service discovery fails based on the provided configuration + */ + public PscBackendConsumer getCommitConsumer( + Environment environment, + PscConfigurationInternal pscConfigurationInternal, + ConsumerInterceptors consumerInterceptors, + TopicUriPartition topicUriPartition, + boolean shouldWakeup + ) throws ConsumerException, ConfigurationException { + Set> pscBackendConsumers = getCommitConsumers( + environment, + pscConfigurationInternal, + consumerInterceptors, + Collections.singleton(topicUriPartition), + shouldWakeup + ); + + if (pscBackendConsumers.isEmpty()) + throw new ConsumerException("No consumer was found for topic URI partition" + topicUriPartition); + + return pscBackendConsumers.iterator().next(); + } + public abstract TopicUri validateBackendTopicUri(TopicUri baseTopicUri) throws TopicUriSyntaxException; /** diff --git a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscKafkaConsumerCreator.java b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscKafkaConsumerCreator.java index 29403a5..b859b47 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscKafkaConsumerCreator.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscKafkaConsumerCreator.java @@ -158,6 +158,22 @@ public Set> getAssignmentConsumers( return consumers; } + @Override + public Set> getCommitConsumers(Environment environment, + PscConfigurationInternal pscConfigurationInternal, + ConsumerInterceptors consumerInterceptors, + Set topicUriPartitions, + boolean shouldWakeup) + throws ConsumerException, ConfigurationException { + return getAssignmentConsumers( + environment, + pscConfigurationInternal, + consumerInterceptors, + topicUriPartitions, + shouldWakeup, + false); + } + @Override public TopicUri validateBackendTopicUri(TopicUri baseTopicUri) throws TopicUriSyntaxException { return KafkaTopicUri.validate(baseTopicUri); diff --git a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java index ce4d391..bc1aa59 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java @@ -16,6 +16,7 @@ import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.interceptor.ConsumerInterceptors; +import com.pinterest.psc.logging.PscLogger; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -24,6 +25,10 @@ @PscConsumerCreatorPlugin(backend = "memq") public class PscMemqConsumerCreator extends PscBackendConsumerCreator { + private static final PscLogger logger = PscLogger.getLogger(PscMemqConsumerCreator.class); + + public static volatile boolean failed = false; + @Override public Set> getConsumers(Environment environment, PscConfigurationInternal pscConfigurationInternal, @@ -56,14 +61,14 @@ public Set> getConsumers(Environment environment, } for (Map.Entry> entry : clusterTopics.entrySet()) { - PscMemqConsumer pscMemqConsumer = (PscMemqConsumer) clusterConsumerCache.get(entry.getKey()); - TopicUri sampleMemeTopicUri = entry.getValue().iterator().next(); + PscMemqConsumer pscMemqConsumer = (PscMemqConsumer) clusterConsumerCache.get(entry.getKey()); + TopicUri sampleMemqTopicUri = entry.getValue().iterator().next(); if (pscMemqConsumer == null) { ServiceDiscoveryConfig discoveryConfig; discoveryConfig = ServiceDiscoveryManager.getServiceDiscoveryConfig(environment, - pscConfigurationInternal.getDiscoveryConfiguration(), sampleMemeTopicUri); + pscConfigurationInternal.getDiscoveryConfiguration(), sampleMemqTopicUri); pscMemqConsumer = new PscMemqConsumer<>(); - pscMemqConsumer.initialize(pscConfigurationInternal, discoveryConfig, sampleMemeTopicUri); + pscMemqConsumer.initialize(pscConfigurationInternal, discoveryConfig, sampleMemqTopicUri); clusterConsumerCache.put(entry.getKey(), pscMemqConsumer); if (shouldWakeup) pscMemqConsumer.wakeup(); @@ -89,6 +94,7 @@ public Set> getConsumers(Environment environment, return consumers; } + @Override public Set> getAssignmentConsumers(Environment environment, PscConfigurationInternal pscConfigurationInternal, @@ -106,7 +112,7 @@ public Set> getAssignmentConsumers(Environment environm } for (Map.Entry> entry : clusterPartitions.entrySet()) { - PscMemqConsumer pscMemqConsumer = (PscMemqConsumer) clusterConsumerCache.get(entry.getKey()); + PscMemqConsumer pscMemqConsumer = (PscMemqConsumer) clusterConsumerCache.get(entry.getKey()); TopicUri sampleMemqTopicUri = entry.getValue().iterator().next().getTopicUri(); if (pscMemqConsumer == null) { ServiceDiscoveryConfig discoveryConfig; @@ -141,8 +147,65 @@ public Set> getAssignmentConsumers(Environment environm return consumers; } + @Override + public Set> getCommitConsumers(Environment environment, + PscConfigurationInternal pscConfigurationInternal, + ConsumerInterceptors consumerInterceptors, + Set topicUriPartitions, + boolean shouldWakeup) + throws ConsumerException, ConfigurationException { + Set> consumers = new HashSet<>(); + Map> clusterPartitions = new HashMap<>(); + + for (TopicUriPartition topicUriPartition : topicUriPartitions) { + MemqTopicUri memqTopicUri = (MemqTopicUri) topicUriPartition.getTopicUri(); + clusterPartitions.computeIfAbsent(memqTopicUri.getTopicUriPrefix(), p -> new HashSet<>()) + .add(topicUriPartition); + } + + for (Map.Entry> entry : clusterPartitions.entrySet()) { + PscMemqConsumer pscMemqConsumer = (PscMemqConsumer) clusterConsumerCache.get(entry.getKey()); + TopicUri sampleMemqTopicUri = entry.getValue().iterator().next().getTopicUri(); + if (pscMemqConsumer == null) { + ServiceDiscoveryConfig discoveryConfig; + try { + discoveryConfig = ServiceDiscoveryManager.getServiceDiscoveryConfig(environment, + pscConfigurationInternal.getDiscoveryConfiguration(), sampleMemqTopicUri); + } catch (Exception e) { + throw new ConsumerException(e); + } + pscMemqConsumer = new PscMemqConsumer<>(); + pscMemqConsumer.initialize(pscConfigurationInternal, discoveryConfig, sampleMemqTopicUri); + clusterConsumerCache.put(entry.getKey(), pscMemqConsumer); + if (shouldWakeup) + pscMemqConsumer.wakeup(); + } + if (failed) + Thread.dumpStack(); + + pscMemqConsumer.setConsumerInterceptors(consumerInterceptors); + if (!pscMemqConsumer.isNotificationSourceInitialized()) + pscMemqConsumer.assign(entry.getValue()); + consumers.add(pscMemqConsumer); + } + + return consumers; + } + @Override public TopicUri validateBackendTopicUri(TopicUri baseTopicUri) throws TopicUriSyntaxException { return MemqTopicUri.validate(baseTopicUri); } + + @Override + public void reset() { + for (PscBackendConsumer pscBackendConsumer : clusterConsumerCache.values()) { + try { + pscBackendConsumer.close(); + } catch (Exception e) { + logger.warn("Could not close backend MemQ consumer."); + } + } + clusterConsumerCache.clear(); + } } diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMessageId.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMessageId.java index e48b93d..340d781 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMessageId.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMessageId.java @@ -7,19 +7,35 @@ public class MemqMessageId extends MessageId { private static final long serialVersionUID = 7724281495471377020L; - public MemqMessageId(TopicUriPartition topicUriPartition, long offset, long timestamp, int serializedKeySizeBytes, int serializedValueSizeBytes) { + private final boolean endOfBatch; + + public MemqMessageId(TopicUriPartition topicUriPartition, long offset, long timestamp, int serializedKeySizeBytes, int serializedValueSizeBytes, boolean endOfBatch) { super(topicUriPartition, offset, timestamp, serializedKeySizeBytes, serializedValueSizeBytes); + this.endOfBatch = endOfBatch; } - public MemqMessageId(TopicUriPartition topicUriPartition, long offset, long timestamp) { + public MemqMessageId(TopicUriPartition topicUriPartition, long offset, long timestamp, boolean endOfBatch) { super(topicUriPartition, offset, timestamp); + this.endOfBatch = endOfBatch; } - public MemqMessageId(TopicUriPartition topicUriPartition, long offset) { + public MemqMessageId(TopicUriPartition topicUriPartition, long offset, boolean endOfBatch) { super(topicUriPartition, offset); + this.endOfBatch = endOfBatch; } public MemqMessageId(MessageId messageId) { super(messageId.getTopicUriPartition(), messageId.getOffset(), messageId.getTimestamp()); + this.endOfBatch = false; + } + + public boolean isEndOfBatch() { + return endOfBatch; + } + + public String toString() { + return timestamp == -1 ? + String.format("[topicUri-partition: %s, offset: %d, isEndOfBatch: %b]", topicUriPartition, offset, endOfBatch) : + String.format("[topicUri-partition: %s, offset: %d, timestamp: %d, isEndOfBatch: %b]", topicUriPartition, offset, timestamp, endOfBatch); } } diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMetricsHandler.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMetricsHandler.java index 08fe115..512a0d5 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMetricsHandler.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqMetricsHandler.java @@ -7,6 +7,7 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.pinterest.memq.commons.storage.s3.AbstractS3StorageHandler; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.metrics.PscMetricRegistryManager; @@ -14,6 +15,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class MemqMetricsHandler { @@ -21,12 +23,15 @@ public class MemqMetricsHandler { private static final String MEMQ_HEAP_MEMORY_USED_METRIC = "netty.heap.memory.used"; private static final PscLogger logger = PscLogger.getLogger(MemqMetricsHandler.class); - private final static Map memqConsumerMetricsMap = new HashMap<>(); + private final static Map memqConsumerMetricsMap = new ConcurrentHashMap<>(); + private final static Map memqConsumerMetricsPrefixMap = new HashMap<>(); static { + memqConsumerMetricsPrefixMap.put(AbstractS3StorageHandler.OBJECT_FETCH_ERROR_KEY, "consumer.fetch.error"); + memqConsumerMetricsMap.put("memqConsumer.messagesProcessedCounter", "consumer.processed.messages"); memqConsumerMetricsMap.put("memqConsumer.bytesProcessedCounter", "consumer.processed.bytes"); - memqConsumerMetricsMap.put("memq.consumer.objectFetchLatencyMs", "consumer.fetch.latency.ms"); + memqConsumerMetricsMap.put("objectFetchLatencyMs", "consumer.fetch.latency.ms"); memqConsumerMetricsMap.put("iterator.exception", "consumer.iterator.processing.error"); memqConsumerMetricsMap.put("loading.exception", "consumer.iterator.load.error"); memqConsumerMetricsMap.put(MEMQ_DIRECT_MEMORY_USED_METRIC, MEMQ_DIRECT_MEMORY_USED_METRIC); @@ -75,4 +80,12 @@ private static long getMetricValue(Metric metric) { logger.warn("[Memq] Could not process metric of type {}", metric.getClass().getName()); return -1; } + + public static void addMetricNameConversion(String memqMetricName) { + for (Map.Entry prefixEntry : memqConsumerMetricsPrefixMap.entrySet()) { + if (memqMetricName.startsWith(prefixEntry.getKey())) { + memqConsumerMetricsMap.putIfAbsent(memqMetricName, memqMetricName.replace(prefixEntry.getKey(), prefixEntry.getValue())); + } + } + } } diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqOffset.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqOffset.java new file mode 100644 index 0000000..da30ba5 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqOffset.java @@ -0,0 +1,64 @@ +package com.pinterest.psc.consumer.memq; + + +/* PSC offsets for memq messages: + * |-1 bit----|--------- 45 bit ---------|------18 bit ------| + * | reserved | batch | message | + * | | offset | offset | + * |----------|--------------------------|-------------------| + * + * e.g. 0xffff_e000_0000_1000L -> batch offset 262143, message offset 4096 + * + * Note: + * This has to be structured this way since PscConsumerThread will perform increments on the offsets, + * which should be equivalent to adding to message offsets + */ +public class MemqOffset { + private long batchOffset; + private long messageOffset; + + private final static int BATCH_OFFSET_BIT_LENGTH = 45; + private final static int MESSAGE_OFFSET_BIT_LENGTH = 64 - BATCH_OFFSET_BIT_LENGTH; + private final static long BATCH_OFFSET_MASK = (1L << BATCH_OFFSET_BIT_LENGTH) - 1; + private final static int MESSAGE_OFFSET_MASK = (1 << MESSAGE_OFFSET_BIT_LENGTH) - 1; + + public MemqOffset(long batchOffset, long messageOffset) { + this.batchOffset = batchOffset; + this.messageOffset = messageOffset; + } + + public long getBatchOffset() { + return batchOffset; + } + + public void setBatchOffset(long batchOffset) { + this.batchOffset = batchOffset; + } + + public long getMessageOffset() { + return messageOffset; + } + + public void setMessageOffset(long messageOffset) { + this.messageOffset = messageOffset; + } + + public static MemqOffset convertPscOffsetToMemqOffset(long pscOffset) { + long batchOffset = (pscOffset >>> MESSAGE_OFFSET_BIT_LENGTH) & BATCH_OFFSET_MASK; + int messageOffset = (int) (pscOffset & MESSAGE_OFFSET_MASK); + return new MemqOffset(batchOffset, messageOffset); + } + + public long toLong() { + return messageOffset | (batchOffset << MESSAGE_OFFSET_BIT_LENGTH); + } + + @Override + public String toString() { + return "MemqOffset{" + + "batchOffset=" + batchOffset + + ", messageOffset=" + messageOffset + + '}'; + } + +} \ No newline at end of file diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqToPscMessageIteratorConverter.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqToPscMessageIteratorConverter.java index f996982..38bc3bf 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqToPscMessageIteratorConverter.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqToPscMessageIteratorConverter.java @@ -1,11 +1,13 @@ package com.pinterest.psc.consumer.memq; +import com.google.common.collect.Iterators; import com.pinterest.memq.commons.MemqLogMessage; import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.CloseableIterator; import com.pinterest.psc.common.PscMessage; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.event.PscEvent; import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.consumer.PscConsumerPollMessageIterator; import com.pinterest.psc.consumer.ToPscMessageIteratorConverter; @@ -14,6 +16,7 @@ import com.pinterest.psc.logging.PscLogger; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -22,23 +25,45 @@ public class MemqToPscMessageIteratorConverter extends ToPscMessageIterato private static final PscLogger logger = PscLogger.getLogger(MemqToPscMessageIteratorConverter.class); private final String topicName; private final CloseableIterator> memqConsumerRecordIterator; + private Iterator> filteredIterator; private final Map memqTopicToTopicUri; + private final Map initialSeekOffsets; public MemqToPscMessageIteratorConverter( String topicName, CloseableIterator> memqConsumerRecordIterator, Map memqTopicToTopicUri, - ConsumerInterceptors consumerInterceptors + ConsumerInterceptors consumerInterceptors, + Map initialSeekOffsets ) { super(consumerInterceptors); this.topicName = topicName; this.memqConsumerRecordIterator = memqConsumerRecordIterator; this.memqTopicToTopicUri = memqTopicToTopicUri; + this.initialSeekOffsets = initialSeekOffsets; + this.filteredIterator = memqConsumerRecordIterator; + if (!this.initialSeekOffsets.isEmpty()) { + this.filteredIterator = Iterators.filter(this.filteredIterator, (m) -> { + MemqOffset offset = this.initialSeekOffsets.get(m.getNotificationPartitionId()); + if ( + offset != null && ( + offset.getBatchOffset() > m.getNotificationPartitionOffset() || + (offset.getBatchOffset() == m.getNotificationPartitionOffset() && offset.getMessageOffset() > m.getMessageOffsetInBatch()) + ) + ) { + return false; + } + if ((offset != null)) { + initialSeekOffsets.remove(m.getNotificationPartitionId()); + } + return true; + }); + } } @Override public boolean hasNext() { - return memqConsumerRecordIterator.hasNext(); + return filteredIterator.hasNext(); } @Override @@ -53,7 +78,7 @@ public Set getTopicUriPartitions() { @Override protected PscConsumerMessage getNextBackendMessage() { - MemqLogMessage memqConsumerRecord = memqConsumerRecordIterator.next(); + MemqLogMessage memqConsumerRecord = filteredIterator.next(); return convertMemqConsumerRecordToPscConsumerMessage(memqConsumerRecord); } @@ -61,25 +86,34 @@ private PscConsumerMessage convertMemqConsumerRecordToPscConsume byte[] key = memqConsumerRecord.getKey(); byte[] value = memqConsumerRecord.getValue(); + TopicUri topicUri = memqTopicToTopicUri.get(topicName); + TopicUriPartition topicUriPartition = new TopicUriPartition( - memqTopicToTopicUri.get(topicName).getTopicUriAsString(), - memqConsumerRecord.getNotificationPartitionId()); - BaseTopicUri.finalizeTopicUriPartition(topicUriPartition, memqTopicToTopicUri.get(topicName)); + topicUri.getTopicUriAsString(), + memqConsumerRecord.getNotificationPartitionId()); + BaseTopicUri.finalizeTopicUriPartition(topicUriPartition, topicUri); + + MemqOffset memqOffset = new MemqOffset(memqConsumerRecord.getNotificationPartitionOffset(), memqConsumerRecord.getMessageOffsetInBatch()); MemqMessageId messageId = new MemqMessageId( topicUriPartition, - memqConsumerRecord.getNotificationPartitionOffset(), + memqOffset.toLong(), memqConsumerRecord.getWriteTimestamp(), memqConsumerRecord.getKey() == null ? -1 : memqConsumerRecord.getKey().length, - memqConsumerRecord.getValue() == null ? -1 : memqConsumerRecord.getValue().length + memqConsumerRecord.getValue() == null ? -1 : memqConsumerRecord.getValue().length, + memqConsumerRecord.isEndOfBatch() ); PscConsumerMessage pscConsumerMessage = new PscConsumerMessage<>( messageId, key, value, memqConsumerRecord.getWriteTimestamp() ); + if (memqConsumerRecord.isEndOfBatch()) { + pscConsumerMessage.setHeader(PscEvent.EVENT_HEADER, PscMemqConsumer.END_OF_BATCH_EVENT.getBytes()); + } + Map headers = memqConsumerRecord.getHeaders(); if (headers != null) { - headers.forEach((key1, value1) -> pscConsumerMessage.setHeader(key1, value1)); + headers.forEach(pscConsumerMessage::setHeader); } IntegerSerializer integerSerializer = new IntegerSerializer(); diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java index edd1b3b..bcaf3f4 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java @@ -1,6 +1,12 @@ package com.pinterest.psc.consumer.memq; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricRegistryListener; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.pinterest.memq.client.commons.ConsumerConfigs; import com.pinterest.memq.client.commons.serde.ByteArrayDeserializer; @@ -13,6 +19,7 @@ import com.pinterest.psc.common.ServiceDiscoveryConfig; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.event.PscEvent; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.config.PscConsumerToMemqConsumerConfigConverter; import com.pinterest.psc.consumer.ConsumerRebalanceListener; @@ -26,6 +33,8 @@ import com.pinterest.psc.metrics.MetricName; import com.pinterest.psc.metrics.PscMetricRegistryManager; import com.pinterest.psc.metrics.PscMetrics; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.logging.Log; @@ -43,6 +52,8 @@ public class PscMemqConsumer extends PscBackendConsumer { + public static final String END_OF_BATCH_EVENT = "end_of_batch"; + private static final PscLogger logger = PscLogger.getLogger(PscMemqConsumer.class); @VisibleForTesting protected MemqConsumer memqConsumer; @@ -52,6 +63,8 @@ public class PscMemqConsumer extends PscBackendConsumer { private Properties properties; private TopicUri topicUri; + private final Map initialSeekOffsets = new ConcurrentHashMap<>(); + public PscMemqConsumer() { } @@ -66,7 +79,7 @@ public String[] getTopicsUri() { @Override public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri topicUri) throws ConsumerException { - this.topicUri = topicUri; + this.topicUri = new MemqTopicUri(topicUri); properties = new PscConsumerToMemqConsumerConfigConverter().convert(pscConfigurationInternal, topicUri); properties.setProperty(ConsumerConfigs.BOOTSTRAP_SERVERS, discoveryConfig.getConnect()); properties.setProperty(ConsumerConfigs.KEY_DESERIALIZER_CLASS_KEY, @@ -86,6 +99,37 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, } catch (Exception e) { throw new ConsumerException("Could not instantiate a Memq consumer instance.", e); } + initializeMetricRegistry(memqConsumer); + } + + private void initializeMetricRegistry(MemqConsumer memqConsumer) { + + memqConsumer.getMetricRegistry().addListener(new MetricRegistryListener.Base() { + @Override + public void onGaugeAdded(String name, Gauge gauge) { + MemqMetricsHandler.addMetricNameConversion(name); + } + + @Override + public void onCounterAdded(String name, Counter counter) { + MemqMetricsHandler.addMetricNameConversion(name); + } + + @Override + public void onHistogramAdded(String name, Histogram histogram) { + MemqMetricsHandler.addMetricNameConversion(name); + } + + @Override + public void onMeterAdded(String name, Meter meter) { + MemqMetricsHandler.addMetricNameConversion(name); + } + + @Override + public void onTimerAdded(String name, Timer timer) { + MemqMetricsHandler.addMetricNameConversion(name); + } + }); } @Override @@ -166,21 +210,29 @@ public void assign(Set topicUriPartitions) throws ConsumerExc "[Memq] Consumer assign() is not supported when consumer is already subscribed to topics."); } + String memqTopic; + Set topicUriSet = topicUriPartitions.stream().map(TopicUriPartition::getTopicUri).collect(Collectors.toSet()); + if (topicUriSet.size() > 1) { + throw new ConsumerException("[Memq] Consumer can only support subscribing/assigning to one topic"); + } else { + memqTopic = topicUriSet.iterator().next().getTopic(); + } + if (topicUriPartitions.isEmpty()) { unsubscribe(); return; } - backendTopicToTopicUri.clear(); - - for (TopicUriPartition topicUriPartition : topicUriPartitions) + for (TopicUriPartition topicUriPartition : topicUriPartitions) { backendTopicToTopicUri.put(topicUriPartition.getTopicUri().getTopic(), - topicUriPartition.getTopicUri()); + topicUriPartition.getTopicUri()); + } // update the assignment list with the new topic uris currentAssignment.retainAll(topicUriPartitions); currentAssignment.addAll(topicUriPartitions); try { + memqConsumer.subscribe(memqTopic); memqConsumer.assign(topicUriPartitions.stream().map(TopicUriPartition::getPartition) .collect(Collectors.toList())); } catch (Exception exception) { @@ -204,8 +256,11 @@ public Set assignment() throws ConsumerException { throw new ConsumerException( "[Memq] Consumer is not initialized prior to call to assignment()."); Set assignment = memqConsumer.assignment(); - return assignment.stream().map(a -> new TopicUriPartition(topicUri.getTopicUriAsString(), a)) - .collect(Collectors.toSet()); + return assignment.stream().map(a -> { + TopicUriPartition tup = new TopicUriPartition(topicUri.getTopicUriAsString(), a); + BaseTopicUri.finalizeTopicUriPartition(tup, topicUri); + return tup; + }).collect(Collectors.toSet()); } @Override @@ -234,7 +289,8 @@ public PscConsumerPollMessageIterator poll(Duration pollTimeout) throws Co } return new MemqToPscMessageIteratorConverter<>(memqConsumer.getTopicName(), - memqLogMessageIterator, backendTopicToTopicUri, getConsumerInterceptors()); + memqLogMessageIterator, backendTopicToTopicUri, getConsumerInterceptors(), + initialSeekOffsets); } private void handleMemqConsumerMetrics(MetricRegistry metricRegistry) { @@ -247,30 +303,93 @@ public void seek(MessageId messageId) throws ConsumerException { @Override public void seek(Set messageIds) throws ConsumerException { + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to seek()."); Set memqMessageIds = messageIds.stream().map(this::getMemqMessageId).collect(Collectors.toSet()); seekToOffset( memqMessageIds.stream() .collect(Collectors.toMap( MessageId::getTopicUriPartition, - messageId -> 1 + messageId.getOffset() + this::getNextOffsetFromMessageId )) ); } + private long getNextOffsetFromMessageId(MemqMessageId memqMessageId) { + MemqOffset memqOffset = MemqOffset.convertPscOffsetToMemqOffset(memqMessageId.getOffset()); + if (memqMessageId.isEndOfBatch()) { + return memqOffset.getBatchOffset() + 1; + } else { + memqOffset.setMessageOffset(memqOffset.getMessageOffset() + 1); + return memqOffset.toLong(); + } + } + @Override public void seekToTimestamp(TopicUri topicUri, long timestamp) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer seek by timestamp is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to seekToTimestamp()."); + + if (!currentSubscription.contains(topicUri)) { + throw new ConsumerException("[Memq] Consumer is not subscribed to topic URI " + topicUri + " passed to seekToTimestamp()."); + } + + Set assignedPartitions; + try { + assignedPartitions = memqConsumer.waitForAssignment(); + } catch (NoTopicsSubscribedException e) { + throw new ConsumerException(e); + } + + Map timestamps = new HashMap<>(); + + for (Integer partition : assignedPartitions) { + timestamps.put(partition, timestamp); + } + + Map offsets = executeBackendCallWithRetriesAndReturn( + () -> memqConsumer.offsetsOfTimestamps(timestamps) + ); + + executeBackendCallWithRetries(() -> memqConsumer.seek(offsets)); } @Override public void seekToTimestamp(Map seekPositions) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer seek by timestamp is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to seekToTimestamp()."); + + if (!currentSubscription.contains(topicUri)) { + throw new ConsumerException("[Memq] Consumer is not subscribed to topic URI " + topicUri + " passed to seekToTimestamp()."); + } + + Map timestamps = new HashMap<>(); + + for (Map.Entry entry : seekPositions.entrySet()) { + if (!isCurrentTopicPartition(entry.getKey())) { + throw new ConsumerException("[Memq] Consumer is not subscribed to TopicUriPartition" + entry.getKey()); + } + timestamps.put(entry.getKey().getPartition(), entry.getValue()); + } + + try { + memqConsumer.waitForAssignment(); + } catch (NoTopicsSubscribedException e) { + throw new ConsumerException(e); + } + + + Map offsets = executeBackendCallWithRetriesAndReturn( + () -> memqConsumer.offsetsOfTimestamps(timestamps) + ); + + executeBackendCallWithRetries(() -> memqConsumer.seek(offsets)); } @Override public void seekToOffset(Map seekPositions) throws ConsumerException { if (memqConsumer == null) - throw new ConsumerException("[Memq] Consumer is not initialized prior to call to seek()."); + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to seekToOffset()."); for (TopicUriPartition topicUriPartition : seekPositions.keySet()) { if (!(topicUriPartition.getTopicUri() instanceof MemqTopicUri)) { @@ -281,10 +400,9 @@ public void seekToOffset(Map seekPositions) throws Cons backendTopicToTopicUri.put(topicUriPartition.getTopicUri().getTopic(), topicUriPartition.getTopicUri()); - if (!this.currentSubscription.contains(topicUriPartition.getTopicUri()) - && !this.currentAssignment.contains(topicUriPartition)) { + if (!isCurrentTopicPartition(topicUriPartition)) { throw new ConsumerException(String.format( - "[Memq] Consumer is not subscribed to the topicUri '%s' or" + "[Memq] Consumer is not subscribed to the topicUri '%s' or " + "assigned the TopicUriPartition '%s' passed to seek().", topicUriPartition.getTopicUri(), topicUriPartition)); } @@ -296,16 +414,24 @@ public void seekToOffset(Map seekPositions) throws Cons throw new ConsumerException(e); } - memqConsumer.seek( - seekPositions.entrySet().stream() - .collect(Collectors.toMap( - entry -> entry.getKey().getPartition(), - Map.Entry::getValue - ) - ) + seekToMemqOffset( + seekPositions.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> MemqOffset.convertPscOffsetToMemqOffset(v.getValue()))) ); } + private void seekToMemqOffset(Map seekPositions) throws ConsumerException { + Map batchOffsetMap = new HashMap<>(); + for (Map.Entry entry : seekPositions.entrySet()) { + MemqOffset memqOffset = entry.getValue(); + if (memqOffset.getMessageOffset() != 0) { + initialSeekOffsets.put(entry.getKey().getPartition(), memqOffset); + } + batchOffsetMap.put(entry.getKey().getPartition(), memqOffset.getBatchOffset()); + } + + memqConsumer.seek(batchOffsetMap); + } + @Override public void commitAsync(OffsetCommitCallback offsetCommitCallback) throws ConsumerException { throw new ConsumerException("[Memq] Consumer async commit is not supported."); @@ -314,14 +440,60 @@ public void commitAsync(OffsetCommitCallback offsetCommitCallback) throws Consum @Override public void commitAsync(Set messageIds, OffsetCommitCallback offsetCommitCallback) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer async commit is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to commitAsync()."); + + Map maxOffsets = new HashMap<>(); + for (MessageId messageId : messageIds) { + TopicUri topicUri = messageId.getTopicUriPartition().getTopicUri(); + if (!(topicUri instanceof MemqTopicUri)) { + throw new ConsumerException( + "[Memq] Message id with illegal topic URI " + topicUri + " was passed in to commitAsync()."); + } + if (!this.topicUri.equals(topicUri)) { + throw new ConsumerException("[Memq] Consumer is not associated with topic URI " + topicUri + + " passed in to commitAsync()."); + } + + MemqMessageId memqMessageId = getMemqMessageId(messageId); + + int partition = memqMessageId.getTopicUriPartition().getPartition(); + long offset = memqMessageId.getOffset(); + MemqOffset memqOffset = MemqOffset.convertPscOffsetToMemqOffset(offset); + long kafkaOffset = memqMessageId.isEndOfBatch() ? memqOffset.getBatchOffset() + 1: memqOffset.getBatchOffset(); + maxOffsets.compute(partition, + (key, val) -> (val == null) ? kafkaOffset : (kafkaOffset > val) ? kafkaOffset : val); + } + + if (offsetCommitCallback != null) { + executeBackendCallWithRetries(() -> { + memqConsumer.commitOffsetAsync(maxOffsets, ((offsets, exception) -> { + if (exception != null) { + exception.printStackTrace(); + } + Map pscOffsets = new HashMap<>(offsets.size()); + offsets.forEach(((partition, offset) -> { + TopicUriPartition topicUriPartition = new TopicUriPartition( + topicUri.getTopicUriAsString(), partition); + BaseTopicUri.finalizeTopicUriPartition(topicUriPartition, topicUri); + pscOffsets.put(topicUriPartition, new MemqMessageId(topicUriPartition, offset, true)); + })); + offsetCommitCallback.onCompletion(pscOffsets, exception); + })); + }); + } else { + memqConsumer.commitOffsetAsync(maxOffsets, (offsets, exception) -> {}); + } } @Override public Set commitSync() throws ConsumerException { if (memqConsumer == null) throw new ConsumerException("[Memq] Consumer is not initialized prior to call to commitSync()."); - + if (memqConsumer.getNotificationSource() == null) { + throw new ConsumerException( + "[Memq] Consumer notificationSource is not initialized prior to call to commitSync()."); + } memqConsumer.commitOffset(); //TODO: extract latest committed offsets from MemQ consumer when the API is available. This is currently unavailable return new HashSet<>(); @@ -339,8 +511,8 @@ public void commitSync(Set messageIds) throws ConsumerException { throw new ConsumerException( "[Memq] Message id with illegal topic URI " + topicUri + " was passed in to commitSync()."); } - if (!subscription().contains(topicUri)) { - throw new ConsumerException("[Memq] Consumer is not subscribed to topic URI " + topicUri + if (!this.topicUri.equals(topicUri)) { + throw new ConsumerException("[Memq] Consumer is not associated with topic URI " + topicUri + " passed in to commitSync()."); } @@ -348,36 +520,57 @@ public void commitSync(Set messageIds) throws ConsumerException { int partition = memqMessageId.getTopicUriPartition().getPartition(); long offset = memqMessageId.getOffset(); + MemqOffset memqOffset = MemqOffset.convertPscOffsetToMemqOffset(offset); + long kafkaOffset = memqMessageId.isEndOfBatch() ? memqOffset.getBatchOffset() + 1: memqOffset.getBatchOffset(); maxOffsets.compute(partition, - (key, val) -> (val == null) ? offset : (offset > val) ? offset : val); + (key, val) -> (val == null) ? kafkaOffset : (kafkaOffset > val) ? kafkaOffset : val); } memqConsumer.commitOffset(maxOffsets); } @Override public void seekToBeginning(Set topicUriPartitions) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer seek to beginning is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to seekToBeginning()."); + Set partitions = new HashSet<>(); + for (TopicUriPartition tup : topicUriPartitions) { + if (!isCurrentTopicPartition(tup)) { + throw new ConsumerException("[Memq] Cannot seek on non-associated topic partition " + tup); + } + partitions.add(tup.getPartition()); + } + memqConsumer.seekToBeginning(partitions); } @Override public void seekToEnd(Set topicUriPartitions) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer seek to beginning is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to seekToEnd()."); + Set partitions = new HashSet<>(); + for (TopicUriPartition tup : topicUriPartitions) { + if (!isCurrentTopicPartition(tup)) { + throw new ConsumerException("[Memq] Cannot seek on non-associated topic partition " + tup); + } + partitions.add(tup.getPartition()); + } + memqConsumer.seekToEnd(partitions); } @Override + // Uses a short-living consumer to support metadata calls without subscribing public Set getPartitions(TopicUri topicUri) throws ConsumerException { - if (memqConsumer == null) - throw new ConsumerException( - "[Memq] Consumer is not initialized prior to call to getPartitions()."); + try (MemqConsumer consumer = getMetadataConsumer(topicUri)) { + MemqTopicUri memqTopicUri = (MemqTopicUri) topicUri; - MemqTopicUri memqTopicUri = (MemqTopicUri) topicUri; - - return memqConsumer.getPartition().stream().map(partition -> { - TopicUriPartition topicUriPartition = new TopicUriPartition( + return consumer.getPartition().stream().map(partition -> { + TopicUriPartition topicUriPartition = new TopicUriPartition( memqTopicUri.getTopicUriAsString(), partition); - BaseTopicUri.finalizeTopicUriPartition(topicUriPartition, memqTopicUri); - return topicUriPartition; - }).collect(Collectors.toSet()); + BaseTopicUri.finalizeTopicUriPartition(topicUriPartition, memqTopicUri); + return topicUriPartition; + }).collect(Collectors.toSet()); + } catch (IOException ioe) { + throw new ConsumerException("[Memq] Failed to close metadata consumer for " + topicUri, ioe); + } } @Override @@ -410,13 +603,19 @@ public void close(Duration timeout) throws ConsumerException { @Override public MessageId committed(TopicUriPartition topicUriPartition) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer committed is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to committed()."); + if (isCurrentTopicPartition(topicUriPartition)) { + long committedOffset = memqConsumer.committed(topicUriPartition.getPartition()); + return committedOffset == -1L ? null : new MemqMessageId(topicUriPartition, new MemqOffset(committedOffset, 0).toLong(), true); + } + throw new ConsumerException("[Memq] Failed to retrieve committed offsets for unassociated TopicUriPartition: " + topicUriPartition); } @Override public Map startOffsets(Set topicUriPartitions) throws ConsumerException { if (memqConsumer == null) - throw new ConsumerException("[Memq] Consumer is not initialized prior to call to getEarliestOffsets()."); + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to startOffsets()."); Map partitionToTopicUriPartition = topicUriPartitions.stream() .collect(Collectors.toMap(TopicUriPartition::getPartition, Function.identity())); @@ -443,12 +642,45 @@ public Map endOffsets(Set topicUriPa @Override public long position(TopicUriPartition topicUriPartition) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer position is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to position()."); + if (isCurrentTopicPartition(topicUriPartition)) { + return new MemqOffset(memqConsumer.position(topicUriPartition.getPartition()), 0).toLong(); + } + throw new ConsumerException("[Memq] Consumer is not associated with " + topicUriPartition); } @Override public Map getMessageIdByTimestamp(Map timestampByTopicUriPartition) throws ConsumerException { - throw new ConsumerException("[Memq] Consumer getMessageIdByTimestamp is not supported."); + if (memqConsumer == null) + throw new ConsumerException("[Memq] Consumer is not initialized prior to call to getMessageIdByTimestamp()."); + + TopicUri topicUri = null; + Map partitionToTimestampMap = new HashMap<>(); + Map partitionToTopicUriPartitionMap = new HashMap<>(); + for (Map.Entry entry : timestampByTopicUriPartition.entrySet()) { + if (topicUri == null) topicUri = entry.getKey().getTopicUri(); + else if (topicUri != entry.getKey().getTopicUri()) { + throw new ConsumerException("[Memq] Consumer cannot handle multiple topics in getMessageIdByTimestamp()"); + } + partitionToTopicUriPartitionMap.put(entry.getKey().getPartition(), entry.getKey()); + partitionToTimestampMap.put(entry.getKey().getPartition(), entry.getValue()); + } + + Map offsetByTopicPartition = + executeBackendCallWithRetriesAndReturn( + () -> memqConsumer.offsetsOfTimestamps(partitionToTimestampMap) + ); + + Map messageIdByTopicUriPartition = new HashMap<>(); + offsetByTopicPartition.forEach(((partition, offsetAndTimestamp) -> + messageIdByTopicUriPartition.put( + partitionToTopicUriPartitionMap.get(partition), + offsetAndTimestamp == null ? null : new MemqMessageId(partitionToTopicUriPartitionMap.get(partition), new MemqOffset(offsetAndTimestamp, 0).toLong(), false) + ) + )); + + return messageIdByTopicUriPartition; } @VisibleForTesting @@ -465,7 +697,22 @@ public PscConfiguration getConfiguration() { @Override public Map metrics() throws ConsumerException { - throw new ConsumerException("[Memq] Consumer metrics is not supported."); + return Collections.emptyMap(); + } + + @Override + public void onEvent(PscEvent event) { + if (event.getType().equals(END_OF_BATCH_EVENT)) { + try { + memqConsumer.commitOffsetAsync(Collections.emptyMap(), (o, e) -> {}); + } catch (Exception e) { + // do nothing if this happens since it is best effort + } + } + } + + public boolean isNotificationSourceInitialized() { + return memqConsumer.getNotificationSource() != null; } private MemqMessageId getMemqMessageId(MessageId messageId) { @@ -473,4 +720,23 @@ private MemqMessageId getMemqMessageId(MessageId messageId) { (MemqMessageId) messageId : new MemqMessageId(messageId); } + + private boolean isCurrentTopicPartition(TopicUriPartition topicUriPartition) { + return this.currentSubscription.contains(topicUriPartition.getTopicUri()) || this.currentAssignment.contains(topicUriPartition); + } + + private MemqConsumer getMetadataConsumer(TopicUri topicUri) throws ConsumerException { + try { + Properties tmpProps = new Properties(properties); + tmpProps.setProperty(ConsumerConfigs.CLIENT_ID, tmpProps.getProperty(ConsumerConfigs.CLIENT_ID) + "_metadata"); + tmpProps.setProperty(ConsumerConfigs.GROUP_ID, topicUri.getTopic() + "_metadata_cg_" + ThreadLocalRandom.current().nextInt()); + MemqConsumer consumer = new MemqConsumer<>(tmpProps); + consumer.subscribe(topicUri.getTopic()); + return consumer; + } catch (InstantiationError e) { + throw new ConsumerException(e); + } catch (Exception e) { + throw new ConsumerException("Could not instantiate a Memq consumer instance.", e); + } + } } diff --git a/psc/src/test/java/com/pinterest/psc/interceptor/TestPscConsumerInterceptor.java b/psc/src/test/java/com/pinterest/psc/interceptor/TestPscConsumerInterceptor.java index bbf5192..7f34684 100644 --- a/psc/src/test/java/com/pinterest/psc/interceptor/TestPscConsumerInterceptor.java +++ b/psc/src/test/java/com/pinterest/psc/interceptor/TestPscConsumerInterceptor.java @@ -106,7 +106,7 @@ void testSimpleInterceptors() throws Exception { verifyConsumerPollResult(pscConsumer.poll(Duration.ofMillis(defaultPollTimeoutMs)), messagesCp); assertEquals(valuesList.get(0).length, interceptor.onConsumeCounter); - when(creator.getAssignmentConsumer(any(), any(), any(), any(), anyBoolean(), anyBoolean())) + when(creator.getCommitConsumer(any(), any(), any(), any(), anyBoolean())) .thenReturn(backendConsumer); pscConsumer.commitSync(Collections.singleton(messagesCp2.next().getMessageId())); assertEquals(1, interceptor.onCommitCounter); @@ -158,7 +158,7 @@ void testInterceptorChain() throws Exception { assertEquals(valuesList.get(0).length, interceptor1.onConsumeCounter); assertEquals(valuesList.get(0).length, interceptor2.onConsumeCounter); - when(creator.getAssignmentConsumer(any(), any(), any(), any(), anyBoolean(), anyBoolean())) + when(creator.getCommitConsumer(any(), any(), any(), any(), anyBoolean())) .thenReturn(backendConsumer); pscConsumer.commitSync(Collections.singleton(messagesCp2.next().getMessageId())); assertEquals(1, interceptor1.onCommitCounter); @@ -207,7 +207,7 @@ void testSingleInterceptorError() throws Exception { pscConsumer.subscribe(Collections.singleton(testTopic1)); verifyConsumerPollResult(pscConsumer.poll(Duration.ofMillis(defaultPollTimeoutMs)), messagesCp); - when(creator.getAssignmentConsumer(any(), any(), any(), any(), anyBoolean(), anyBoolean())) + when(creator.getCommitConsumer(any(), any(), any(), any(), anyBoolean())) .thenReturn(backendConsumer); pscConsumer.commitSync(Collections.singleton(messagesCp2.next().getMessageId())); assertEquals(1, interceptor.onCommitCounter); @@ -261,7 +261,7 @@ void testInterceptorChainError() throws Exception { assertEquals(valuesList.get(0).length, goodInterceptor.onConsumeCounter); // two list are being traversed for comparison - when(creator.getAssignmentConsumer(any(), any(), any(), any(), anyBoolean(), anyBoolean())) + when(creator.getCommitConsumer(any(), any(), any(), any(), anyBoolean())) .thenReturn(backendConsumer); pscConsumer.commitSync(Collections.singleton(messagesCp2.next().getMessageId())); assertEquals(1, badInterceptor.onCommitCounter);