Skip to content

Commit

Permalink
After CR1
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtemTetenkin committed Mar 11, 2024
1 parent 2b27c68 commit 170843f
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.pinterest.flink.streaming.connectors.psc.internals;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
Expand All @@ -38,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import static com.pinterest.flink.streaming.connectors.psc.internals.metrics.PscConsumerMetricConstants.COMMITTED_OFFSETS_METRICS_GAUGE;
Expand Down Expand Up @@ -382,9 +382,9 @@ private Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> createPart
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

// CopyOnWrite as adding discovered partitions could happen in parallel
// ConcurrentHashMap as adding discovered partitions could happen in parallel
// while different threads iterate the partitions list
Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> partitionStates = new HashMap<>();
Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> partitionStates = new ConcurrentHashMap<>();

switch (timestampWatermarkMode) {
case NO_TIMESTAMPS_WATERMARKS: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void report() {
public static boolean isThisYou(PscConfiguration pscConfiguration) {
return pscConfiguration != null &&
pscConfiguration.containsKey(PscConfiguration.PSC_PROJECT) &&
pscConfiguration.getProperty(PscConfiguration.PSC_PROJECT).equals("psc") &&
pscConfiguration.getString(PscConfiguration.PSC_PROJECT).equals("psc") &&
pscConfiguration.containsKey(PscConfiguration.PSC_PRODUCER_CLIENT_ID) &&
((String) pscConfiguration.getProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID)).contains(PSC_CONFIGURATION_REPORTER_CLIENT_ID);
pscConfiguration.getString(PscConfiguration.PSC_PRODUCER_CLIENT_ID).contains(PSC_CONFIGURATION_REPORTER_CLIENT_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public class PscMemqConsumerCreator<K, V> extends PscBackendConsumerCreator<K, V

private static final PscLogger logger = PscLogger.getLogger(PscMemqConsumerCreator.class);

public static volatile boolean failed = false;

@Override
public Set<PscBackendConsumer<K, V>> getConsumers(Environment environment,
PscConfigurationInternal pscConfigurationInternal,
Expand Down Expand Up @@ -180,8 +178,6 @@ public Set<PscBackendConsumer<K, V>> getCommitConsumers(Environment environment,
if (shouldWakeup)
pscMemqConsumer.wakeup();
}
if (failed)
Thread.dumpStack();

pscMemqConsumer.setConsumerInterceptors(consumerInterceptors);
if (!pscMemqConsumer.isNotificationSourceInitialized())
Expand Down

0 comments on commit 170843f

Please sign in to comment.