diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java index c62491f..30d2175 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/internals/AbstractFetcher.java @@ -19,6 +19,7 @@ package com.pinterest.flink.streaming.connectors.psc.internals; import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; @@ -38,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; import static com.pinterest.flink.streaming.connectors.psc.internals.metrics.PscConsumerMetricConstants.COMMITTED_OFFSETS_METRICS_GAUGE; @@ -382,9 +382,9 @@ private Map> createPart SerializedValue> watermarkStrategy, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - // CopyOnWrite as adding discovered partitions could happen in parallel + // ConcurrentHashMap as adding discovered partitions could happen in parallel // while different threads iterate the partitions list - Map> partitionStates = new HashMap<>(); + Map> partitionStates = new ConcurrentHashMap<>(); switch (timestampWatermarkMode) { case NO_TIMESTAMPS_WATERMARKS: { diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java index 536a7f3..5a0c393 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationReporter.java @@ -83,8 +83,8 @@ public void report() { public static boolean isThisYou(PscConfiguration pscConfiguration) { return pscConfiguration != null && pscConfiguration.containsKey(PscConfiguration.PSC_PROJECT) && - pscConfiguration.getProperty(PscConfiguration.PSC_PROJECT).equals("psc") && + pscConfiguration.getString(PscConfiguration.PSC_PROJECT).equals("psc") && pscConfiguration.containsKey(PscConfiguration.PSC_PRODUCER_CLIENT_ID) && - ((String) pscConfiguration.getProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID)).contains(PSC_CONFIGURATION_REPORTER_CLIENT_ID); + pscConfiguration.getString(PscConfiguration.PSC_PRODUCER_CLIENT_ID).contains(PSC_CONFIGURATION_REPORTER_CLIENT_ID); } } diff --git a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java index bc1aa59..5ec7898 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/creation/PscMemqConsumerCreator.java @@ -27,8 +27,6 @@ public class PscMemqConsumerCreator extends PscBackendConsumerCreator> getConsumers(Environment environment, PscConfigurationInternal pscConfigurationInternal, @@ -180,8 +178,6 @@ public Set> getCommitConsumers(Environment environment, if (shouldWakeup) pscMemqConsumer.wakeup(); } - if (failed) - Thread.dumpStack(); pscMemqConsumer.setConsumerInterceptors(consumerInterceptors); if (!pscMemqConsumer.isNotificationSourceInitialized())