diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqPreFetchIteratorAdapter.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqPreFetchIteratorAdapter.java new file mode 100644 index 0000000..1b1edfc --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqPreFetchIteratorAdapter.java @@ -0,0 +1,31 @@ +package com.pinterest.psc.consumer.memq; + +import com.pinterest.psc.common.CloseableIterator; + +import java.io.IOException; +import java.util.List; + +public class MemqPreFetchIteratorAdapter implements CloseableIterator { + + private final List messages; + private int index = 0; + + public MemqPreFetchIteratorAdapter(List messages) { + this.messages = messages; + } + + @Override + public void close() throws IOException { + messages.clear(); + } + + @Override + public boolean hasNext() { + return index < messages.size(); + } + + @Override + public T next() { + return messages.get(index++); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java index bcaf3f4..2f91978 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java @@ -33,10 +33,14 @@ import com.pinterest.psc.metrics.MetricName; import com.pinterest.psc.metrics.PscMetricRegistryManager; import com.pinterest.psc.metrics.PscMetrics; + +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.logging.Log; +import software.amazon.awssdk.core.exception.SdkClientException; import java.io.IOException; import java.time.Duration; @@ -273,7 +277,12 @@ public PscConsumerPollMessageIterator poll(Duration pollTimeout) throws Co CloseableIterator> memqLogMessageIterator; try { MutableInt count = new MutableInt(); - memqLogMessageIterator = new MemqIteratorAdapter<>(memqConsumer.poll(pollTimeout, count)); + if (pscConfigurationInternal.isAutoResolutionEnabled()) { + logger.info("Using pre-fetch with auto resolution"); + memqLogMessageIterator = preFetchPollResultIntoMemoryWithAutoResolution(memqConsumer.poll(pollTimeout, count)); + } else { + memqLogMessageIterator = new MemqIteratorAdapter<>(memqConsumer.poll(pollTimeout, count)); + } } catch (NoTopicsSubscribedException e) { throw new ConsumerException("[Memq] Consumer is not subscribed to any topic.", e); } catch (IOException e) { @@ -293,6 +302,52 @@ memqLogMessageIterator, backendTopicToTopicUri, getConsumerInterceptors(), initialSeekOffsets); } + private MemqPreFetchIteratorAdapter> preFetchPollResultIntoMemoryWithAutoResolution(com.pinterest.memq.commons.CloseableIterator> it) { + List> preFetched = new ArrayList<>(); + int count = 0; + while (it.hasNext()) { + try { + preFetched.add(it.next()); + count++; + } catch (SdkClientException e) { + logger.warn("Error while pre-fetching messages, " + + "resetting backend MemQ consumer and returning " + count + " messages in iterator for now", e); + try { + it.close(); + resetBackendClient(); + break; + } catch (ConsumerException | IOException ex) { + throw new RuntimeException("Failed to reset backend Memq consumer", ex); + } + } + } + return new MemqPreFetchIteratorAdapter<>(preFetched); + + } + + protected void resetBackendClient() throws ConsumerException { + super.resetBackendClient(); + executeBackendCallWithRetries(() -> { + try { + memqConsumer.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close Memq consumer instance.", e); + } + }); + + try { + memqConsumer = new MemqConsumer<>(properties); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate a Memq consumer instance.", e); + } + + if (!currentAssignment.isEmpty()) + assign(currentAssignment); + else if (!currentSubscription.isEmpty()) + subscribe(currentSubscription); + + } + private void handleMemqConsumerMetrics(MetricRegistry metricRegistry) { }