From 25a72e06ae8df65960c508c5c756c36bc876a0e0 Mon Sep 17 00:00:00 2001 From: ArtemTetenkin Date: Mon, 22 Apr 2024 17:35:36 -0700 Subject: [PATCH] Remove memqConsumer reset every hour --- .../psc/consumer/memq/PscMemqConsumer.java | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java index e296f7f..bcaf3f4 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java @@ -63,8 +63,6 @@ public class PscMemqConsumer extends PscBackendConsumer { private Properties properties; private TopicUri topicUri; - private long lastResetTime; - private final Map initialSeekOffsets = new ConcurrentHashMap<>(); public PscMemqConsumer() { @@ -101,7 +99,6 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, } catch (Exception e) { throw new ConsumerException("Could not instantiate a Memq consumer instance.", e); } - lastResetTime = System.currentTimeMillis(); initializeMetricRegistry(memqConsumer); } @@ -266,42 +263,12 @@ public Set assignment() throws ConsumerException { }).collect(Collectors.toSet()); } - private void resetMemqClient() throws ConsumerException{ - super.resetBackendClient(); - - executeBackendCallWithRetries(() -> { - try { - memqConsumer.close(); - } catch (IOException e) { - throw new RuntimeException("Failed to close Memq consumer instance.", e); - } - }); - - try { - memqConsumer = new MemqConsumer<>(properties); - } - catch (Exception e){ - throw new ConsumerException("Unable to instantiate a Memq consumer instance.", e); - } - - if (!currentAssignment.isEmpty()) - assign(currentAssignment); - else if (!currentSubscription.isEmpty()) - subscribe(currentSubscription); - } - @Override public PscConsumerPollMessageIterator poll(Duration pollTimeout) throws ConsumerException, WakeupException { if (memqConsumer == null) throw new ConsumerException("[Memq] Consumer is not initialized prior to call to poll()."); - long now = System.currentTimeMillis(); - if (now - lastResetTime > 3600000) { - resetMemqClient(); - lastResetTime = now; - } - long startTs = System.currentTimeMillis(); CloseableIterator> memqLogMessageIterator; try {