Skip to content

Commit

Permalink
Add MemQ Flink consumer support (#35)
Browse files Browse the repository at this point in the history
* Add MemQ Flink consumer support

* After CR1

* Remove unnecessary dependencies

* Update version to 3.1.4-SNAPSHOT

* Add memqConsumer reset every hour

* Remove memqConsumer reset every hour

* Try pre-fetch into memory for PscMemqConsumer to catch SdkClientException

* Remove unnecessary log

* Revert memq consumer reset logic

* Remove commented code

* Bump memq version to 0.2.21

* Change version to 3.1.4-RC1

* Replace unnecessary key objects creation with map of keys, add PscEvent doc

* Add javadoc

* Try to fix ClassCastException for MemqTopicUri

* Remove else if statement to ensure that validateTopicUriPartition is always called

* Update version to 3.2, other minor changes

* Update version to 3.2.0-RC1, other minor changes

* Update version to 3.2.0

---------

Co-authored-by: artem <[email protected]>
Co-authored-by: ArtemTetenkin <[email protected]>
  • Loading branch information
3 people authored May 16, 2024
1 parent 1815e7a commit 6c36655
Show file tree
Hide file tree
Showing 32 changed files with 884 additions and 178 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-java-oss</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>pom</packaging>
<name>psc-java-oss</name>
<modules>
Expand Down
2 changes: 1 addition & 1 deletion psc-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.1.3</version>
<version>3.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
6 changes: 3 additions & 3 deletions psc-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.1.3</version>
<version>3.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>psc-examples</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<name>psc-examples</name>

<properties>
<memq.version>0.2.20</memq.version>
<memq.version>0.2.21</memq.version>
</properties>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions psc-flink-logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.1.3</version>
<version>3.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>psc-flink-logging</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>

<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-java-oss</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>psc-flink</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.pinterest.flink.streaming.connectors.psc.internals;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
Expand All @@ -37,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import static com.pinterest.flink.streaming.connectors.psc.internals.metrics.PscConsumerMetricConstants.COMMITTED_OFFSETS_METRICS_GAUGE;
Expand Down Expand Up @@ -91,7 +92,7 @@ public abstract class AbstractFetcher<T, TUPH> {
/**
* All partitions (and their state) that this fetcher is subscribed to.
*/
private final List<PscTopicUriPartitionState<T, TUPH>> subscribedTopicUriPartitionStates;
private final Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> subscribedTopicUriPartitionStates;

/**
* Queue of partitions that are not yet assigned to any PSC clients for consuming.
Expand Down Expand Up @@ -184,27 +185,27 @@ protected AbstractFetcher(
userCodeClassLoader);

// check that all seed partition states have a defined offset
for (PscTopicUriPartitionState<?, ?> partitionState : subscribedTopicUriPartitionStates) {
for (PscTopicUriPartitionState<?, ?> partitionState : subscribedTopicUriPartitionStates.values()) {
if (!partitionState.isOffsetDefined()) {
throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
}
}

// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates) {
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates.values()) {
unassignedTopicUriPartitionsQueue.add(partition);
}

// register metrics for the initial seed partitions
if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, subscribedTopicUriPartitionStates);
registerOffsetMetrics(consumerMetricGroup, subscribedTopicUriPartitionStates.values());
}

// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
PeriodicWatermarkEmitter<T, TUPH> periodicEmitter = new PeriodicWatermarkEmitter<>(
checkpointLock,
subscribedTopicUriPartitionStates,
subscribedTopicUriPartitionStates.values(),
watermarkOutputMultiplexer,
processingTimeProvider,
autoWatermarkInterval);
Expand All @@ -227,22 +228,22 @@ protected AbstractFetcher(
* @param newPartitions discovered partitions to add
*/
public void addDiscoveredPartitions(List<PscTopicUriPartition> newPartitions) throws IOException, ClassNotFoundException {
List<PscTopicUriPartitionState<T, TUPH>> newPartitionStates = createPartitionStateHolders(
Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> newPartitionStates = createPartitionStateHolders(
newPartitions,
PscTopicUriPartitionStateSentinel.EARLIEST_OFFSET,
timestampWatermarkMode,
watermarkStrategy,
userCodeClassLoader);

if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, newPartitionStates);
registerOffsetMetrics(consumerMetricGroup, newPartitionStates.values());
}

for (PscTopicUriPartitionState<T, TUPH> newPartitionState : newPartitionStates) {
for (Map.Entry<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> newPartitionStateEntry : newPartitionStates.entrySet()) {
// the ordering is crucial here; first register the state holder, then
// push it to the partitions queue to be read
subscribedTopicUriPartitionStates.add(newPartitionState);
unassignedTopicUriPartitionsQueue.add(newPartitionState);
subscribedTopicUriPartitionStates.put(newPartitionStateEntry.getKey(), newPartitionStateEntry.getValue());
unassignedTopicUriPartitionsQueue.add(newPartitionStateEntry.getValue());
}
}

Expand All @@ -255,7 +256,7 @@ public void addDiscoveredPartitions(List<PscTopicUriPartition> newPartitions) th
*
* @return All subscribed partitions.
*/
protected final List<PscTopicUriPartitionState<T, TUPH>> subscribedPartitionStates() {
protected final Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> subscribedPartitionStates() {
return subscribedTopicUriPartitionStates;
}

Expand Down Expand Up @@ -329,7 +330,7 @@ public HashMap<PscTopicUriPartition, Long> snapshotCurrentState() {
assert Thread.holdsLock(checkpointLock);

HashMap<PscTopicUriPartition, Long> state = new HashMap<>(subscribedTopicUriPartitionStates.size());
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates) {
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates.values()) {
state.put(partition.getPscTopicUriPartition(), partition.getOffset());
}
return state;
Expand Down Expand Up @@ -375,15 +376,15 @@ protected void emitRecordsWithTimestamps(
* Utility method that takes the topic partitions and creates the topic partition state
* holders, depending on the timestamp / watermark mode.
*/
private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
Map<PscTopicUriPartition, Long> partitionsToInitialOffsets,
private Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
Map<PscTopicUriPartition, Long> partitionsToInitialOffsets,
int timestampWatermarkMode,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

// CopyOnWrite as adding discovered partitions could happen in parallel
// ConcurrentHashMap as adding discovered partitions could happen in parallel
// while different threads iterate the partitions list
List<PscTopicUriPartitionState<T, TUPH>> partitionStates = new CopyOnWriteArrayList<>();
Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> partitionStates = new ConcurrentHashMap<>();

switch (timestampWatermarkMode) {
case NO_TIMESTAMPS_WATERMARKS: {
Expand All @@ -395,7 +396,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
new PscTopicUriPartitionState<>(partitionEntry.getKey(), pscTopicUriPartitionHandle);
partitionState.setOffset(partitionEntry.getValue());

partitionStates.add(partitionState);
partitionStates.put(partitionEntry.getKey(), partitionState);;
}

return partitionStates;
Expand Down Expand Up @@ -427,7 +428,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(

partitionState.setOffset(partitionEntry.getValue());

partitionStates.add(partitionState);
partitionStates.put(partitionEntry.getKey(), partitionState);
}

return partitionStates;
Expand All @@ -443,7 +444,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
* Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, ClassLoader)}
* that uses the same offset for all partitions when creating their state holders.
*/
private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
private Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
List<PscTopicUriPartition> partitions,
long initialOffset,
int timestampWatermarkMode,
Expand Down Expand Up @@ -476,7 +477,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
*/
private void registerOffsetMetrics(
MetricGroup consumerMetricGroup,
List<PscTopicUriPartitionState<T, TUPH>> partitionOffsetStates) {
Collection<PscTopicUriPartitionState<T, TUPH>> partitionOffsetStates) {

for (PscTopicUriPartitionState<T, TUPH> ktp : partitionOffsetStates) {
MetricGroup topicPartitionGroup = consumerMetricGroup
Expand Down Expand Up @@ -538,7 +539,7 @@ private static class PeriodicWatermarkEmitter<T, KPH> implements ProcessingTimeC

private final Object checkpointLock;

private final List<PscTopicUriPartitionState<T, KPH>> allPartitions;
private final Collection<PscTopicUriPartitionState<T, KPH>> allPartitions;

private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;

Expand All @@ -550,7 +551,7 @@ private static class PeriodicWatermarkEmitter<T, KPH> implements ProcessingTimeC

PeriodicWatermarkEmitter(
Object checkpointLock,
List<PscTopicUriPartitionState<T, KPH>> allPartitions,
Collection<PscTopicUriPartitionState<T, KPH>> allPartitions,
WatermarkOutputMultiplexer watermarkOutputMultiplexer,
ProcessingTimeService timerService,
long autoWatermarkInterval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package com.pinterest.flink.streaming.connectors.psc.internals;

import com.pinterest.psc.common.event.EventHandler;
import com.pinterest.psc.common.event.PscEvent;
import com.pinterest.psc.consumer.PscConsumerPollMessageIterator;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;

Expand All @@ -45,14 +50,16 @@
*/
@ThreadSafe
@Internal
public final class Handover implements Closeable {
public final class Handover implements Closeable, EventHandler {

private final Object lock = new Object();

private PscConsumerPollMessageIterator<byte[], byte[]> next;
private Throwable error;
private boolean wakeupProducer;

private Queue<PscEvent> eventQueue = new ConcurrentLinkedQueue<>();

/**
* Polls the next element from the Handover, possibly blocking until the next element is
* available. This method behaves similar to polling from a blocking queue.
Expand Down Expand Up @@ -167,6 +174,13 @@ public void reportError(Throwable t) {
@Override
public void close() {
synchronized (lock) {
if (next != null) {
try {
next.close();
} catch (IOException ioe) {
// pass through; best effort close
}
}
next = null;
wakeupProducer = false;

Expand All @@ -189,6 +203,17 @@ public void wakeupProducer() {
}
}

public void handle(PscEvent pscEvent) {
eventQueue.offer(pscEvent);
if (!wakeupProducer) {
wakeupProducer();
}
}

public Queue<PscEvent> getEventQueue() {
return eventQueue;
}

// ------------------------------------------------------------------------

/**
Expand Down
Loading

0 comments on commit 6c36655

Please sign in to comment.