Skip to content

Commit

Permalink
Add MemQ Flink consumer support
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtemTetenkin committed Mar 7, 2024
1 parent 62bcf04 commit 2b27c68
Show file tree
Hide file tree
Showing 25 changed files with 860 additions and 159 deletions.
17 changes: 17 additions & 0 deletions psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-netty</artifactId>
<version>4.1.70.Final-15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down Expand Up @@ -386,6 +391,18 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.pinterest.flink.streaming.connectors.psc.internals;

import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
Expand Down Expand Up @@ -91,7 +92,7 @@ public abstract class AbstractFetcher<T, TUPH> {
/**
* All partitions (and their state) that this fetcher is subscribed to.
*/
private final List<PscTopicUriPartitionState<T, TUPH>> subscribedTopicUriPartitionStates;
private final Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> subscribedTopicUriPartitionStates;

/**
* Queue of partitions that are not yet assigned to any PSC clients for consuming.
Expand Down Expand Up @@ -184,27 +185,27 @@ protected AbstractFetcher(
userCodeClassLoader);

// check that all seed partition states have a defined offset
for (PscTopicUriPartitionState<?, ?> partitionState : subscribedTopicUriPartitionStates) {
for (PscTopicUriPartitionState<?, ?> partitionState : subscribedTopicUriPartitionStates.values()) {
if (!partitionState.isOffsetDefined()) {
throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
}
}

// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates) {
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates.values()) {
unassignedTopicUriPartitionsQueue.add(partition);
}

// register metrics for the initial seed partitions
if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, subscribedTopicUriPartitionStates);
registerOffsetMetrics(consumerMetricGroup, subscribedTopicUriPartitionStates.values());
}

// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
PeriodicWatermarkEmitter<T, TUPH> periodicEmitter = new PeriodicWatermarkEmitter<>(
checkpointLock,
subscribedTopicUriPartitionStates,
subscribedTopicUriPartitionStates.values(),
watermarkOutputMultiplexer,
processingTimeProvider,
autoWatermarkInterval);
Expand All @@ -227,22 +228,22 @@ protected AbstractFetcher(
* @param newPartitions discovered partitions to add
*/
public void addDiscoveredPartitions(List<PscTopicUriPartition> newPartitions) throws IOException, ClassNotFoundException {
List<PscTopicUriPartitionState<T, TUPH>> newPartitionStates = createPartitionStateHolders(
Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> newPartitionStates = createPartitionStateHolders(
newPartitions,
PscTopicUriPartitionStateSentinel.EARLIEST_OFFSET,
timestampWatermarkMode,
watermarkStrategy,
userCodeClassLoader);

if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, newPartitionStates);
registerOffsetMetrics(consumerMetricGroup, newPartitionStates.values());
}

for (PscTopicUriPartitionState<T, TUPH> newPartitionState : newPartitionStates) {
for (Map.Entry<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> newPartitionStateEntry : newPartitionStates.entrySet()) {
// the ordering is crucial here; first register the state holder, then
// push it to the partitions queue to be read
subscribedTopicUriPartitionStates.add(newPartitionState);
unassignedTopicUriPartitionsQueue.add(newPartitionState);
subscribedTopicUriPartitionStates.put(newPartitionStateEntry.getKey(), newPartitionStateEntry.getValue());
unassignedTopicUriPartitionsQueue.add(newPartitionStateEntry.getValue());
}
}

Expand All @@ -255,7 +256,7 @@ public void addDiscoveredPartitions(List<PscTopicUriPartition> newPartitions) th
*
* @return All subscribed partitions.
*/
protected final List<PscTopicUriPartitionState<T, TUPH>> subscribedPartitionStates() {
protected final Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> subscribedPartitionStates() {
return subscribedTopicUriPartitionStates;
}

Expand Down Expand Up @@ -329,7 +330,7 @@ public HashMap<PscTopicUriPartition, Long> snapshotCurrentState() {
assert Thread.holdsLock(checkpointLock);

HashMap<PscTopicUriPartition, Long> state = new HashMap<>(subscribedTopicUriPartitionStates.size());
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates) {
for (PscTopicUriPartitionState<T, TUPH> partition : subscribedTopicUriPartitionStates.values()) {
state.put(partition.getPscTopicUriPartition(), partition.getOffset());
}
return state;
Expand Down Expand Up @@ -375,15 +376,15 @@ protected void emitRecordsWithTimestamps(
* Utility method that takes the topic partitions and creates the topic partition state
* holders, depending on the timestamp / watermark mode.
*/
private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
Map<PscTopicUriPartition, Long> partitionsToInitialOffsets,
private Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
Map<PscTopicUriPartition, Long> partitionsToInitialOffsets,
int timestampWatermarkMode,
SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

// CopyOnWrite as adding discovered partitions could happen in parallel
// while different threads iterate the partitions list
List<PscTopicUriPartitionState<T, TUPH>> partitionStates = new CopyOnWriteArrayList<>();
Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> partitionStates = new HashMap<>();

switch (timestampWatermarkMode) {
case NO_TIMESTAMPS_WATERMARKS: {
Expand All @@ -395,7 +396,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
new PscTopicUriPartitionState<>(partitionEntry.getKey(), pscTopicUriPartitionHandle);
partitionState.setOffset(partitionEntry.getValue());

partitionStates.add(partitionState);
partitionStates.put(partitionEntry.getKey(), partitionState);;
}

return partitionStates;
Expand Down Expand Up @@ -427,7 +428,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(

partitionState.setOffset(partitionEntry.getValue());

partitionStates.add(partitionState);
partitionStates.put(partitionEntry.getKey(), partitionState);
}

return partitionStates;
Expand All @@ -443,7 +444,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
* Shortcut variant of {@link #createPartitionStateHolders(Map, int, SerializedValue, ClassLoader)}
* that uses the same offset for all partitions when creating their state holders.
*/
private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
private Map<PscTopicUriPartition, PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
List<PscTopicUriPartition> partitions,
long initialOffset,
int timestampWatermarkMode,
Expand Down Expand Up @@ -476,7 +477,7 @@ private List<PscTopicUriPartitionState<T, TUPH>> createPartitionStateHolders(
*/
private void registerOffsetMetrics(
MetricGroup consumerMetricGroup,
List<PscTopicUriPartitionState<T, TUPH>> partitionOffsetStates) {
Collection<PscTopicUriPartitionState<T, TUPH>> partitionOffsetStates) {

for (PscTopicUriPartitionState<T, TUPH> ktp : partitionOffsetStates) {
MetricGroup topicPartitionGroup = consumerMetricGroup
Expand Down Expand Up @@ -538,7 +539,7 @@ private static class PeriodicWatermarkEmitter<T, KPH> implements ProcessingTimeC

private final Object checkpointLock;

private final List<PscTopicUriPartitionState<T, KPH>> allPartitions;
private final Collection<PscTopicUriPartitionState<T, KPH>> allPartitions;

private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;

Expand All @@ -550,7 +551,7 @@ private static class PeriodicWatermarkEmitter<T, KPH> implements ProcessingTimeC

PeriodicWatermarkEmitter(
Object checkpointLock,
List<PscTopicUriPartitionState<T, KPH>> allPartitions,
Collection<PscTopicUriPartitionState<T, KPH>> allPartitions,
WatermarkOutputMultiplexer watermarkOutputMultiplexer,
ProcessingTimeService timerService,
long autoWatermarkInterval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package com.pinterest.flink.streaming.connectors.psc.internals;

import com.pinterest.psc.common.event.EventHandler;
import com.pinterest.psc.common.event.PscEvent;
import com.pinterest.psc.consumer.PscConsumerPollMessageIterator;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;

Expand All @@ -45,14 +50,16 @@
*/
@ThreadSafe
@Internal
public final class Handover implements Closeable {
public final class Handover implements Closeable, EventHandler {

private final Object lock = new Object();

private PscConsumerPollMessageIterator<byte[], byte[]> next;
private Throwable error;
private boolean wakeupProducer;

private Queue<PscEvent> eventQueue = new ConcurrentLinkedQueue<>();

/**
* Polls the next element from the Handover, possibly blocking until the next element is
* available. This method behaves similar to polling from a blocking queue.
Expand Down Expand Up @@ -167,6 +174,13 @@ public void reportError(Throwable t) {
@Override
public void close() {
synchronized (lock) {
if (next != null) {
try {
next.close();
} catch (IOException ioe) {
// pass through; best effort close
}
}
next = null;
wakeupProducer = false;

Expand All @@ -189,6 +203,17 @@ public void wakeupProducer() {
}
}

public void handle(PscEvent pscEvent) {
eventQueue.offer(pscEvent);
if (!wakeupProducer) {
wakeupProducer();
}
}

public Queue<PscEvent> getEventQueue() {
return eventQueue;
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.common.event.PscEvent;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.config.PscConfigurationUtils;
Expand All @@ -30,9 +31,13 @@
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.consumer.WakeupException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.interceptor.TypePreservingInterceptor;
import com.pinterest.psc.metrics.Metric;
import com.pinterest.psc.metrics.MetricName;
import com.pinterest.psc.metrics.PscMetricRegistryManager;
import java.io.IOException;
import java.util.Arrays;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -190,6 +195,8 @@ public PscConsumerThread(Logger log,
this.running = true;
this.hasInitializedMetrics = false;
this.pscConfigurationInternal = PscConfigurationUtils.propertiesToPscConfigurationInternal(pscProperties, PscConfiguration.PSC_CLIENT_TYPE_CONSUMER);

initializeInterceptor();
}

// ------------------------------------------------------------------------
Expand All @@ -214,18 +221,18 @@ public void run() {
return;
}

// the latest bulk of records. May carry across the loop if the thread is woken
// up
// from blocking on the handover
PscConsumerPollMessageIterator<byte[], byte[]> records = null;

// from here on, the consumer is guaranteed to be closed properly
try {
// early exit check
if (!running) {
return;
}

// the latest bulk of records. May carry across the loop if the thread is woken
// up
// from blocking on the handover
PscConsumerPollMessageIterator<byte[], byte[]> records = null;

// reused variable to hold found unassigned new partitions.
// found partitions are not carried across loops using this variable;
// they are carried across via re-adding them to the unassigned partitions queue
Expand Down Expand Up @@ -294,6 +301,10 @@ public void run() {
handover.produce(records);
records = null;
} catch (Handover.WakeupException e) {
Queue<PscEvent> events = handover.getEventQueue();
while (!events.isEmpty()) {
consumer.onEvent(events.poll());
}
// fall through the loop
}
}
Expand All @@ -308,6 +319,14 @@ public void run() {
// make sure the handover is closed if it is not already closed or has an error
handover.close();

if (records != null) {
try {
records.close();
} catch (IOException ioe) {
// pass through; best effort close
}
}

// make sure the PscConsumer is closed
try {
consumer.close();
Expand Down Expand Up @@ -345,6 +364,16 @@ private void initializeMetrics() throws ClientException {
}
}

private void initializeInterceptor() {
TypePreservingInterceptor<byte[], byte[]> eventInterceptor = new PscFlinkConsumerEventInterceptor<>(handover);
Object interceptors = pscProperties.getProperty(PscConfiguration.PSC_CONSUMER_INTERCEPTORS_RAW_CLASSES);
if (interceptors == null) {
pscProperties.put(PscConfiguration.PSC_CONSUMER_INTERCEPTORS_RAW_CLASSES, eventInterceptor);
} else {
pscProperties.put(PscConfiguration.PSC_CONSUMER_INTERCEPTORS_RAW_CLASSES, Arrays.asList(interceptors, eventInterceptor));
}
}

private void initializePscMetrics() {
log.info("Initializing PSC metrics in PscConsumerThread from threadId {}", Thread.currentThread().getId());
if (pscMetricsInitialized == null)
Expand Down
Loading

0 comments on commit 2b27c68

Please sign in to comment.