Skip to content

Commit

Permalink
[FLINK-35435][Connectors/base] Add timeout Configuration to Async Sink (
Browse files Browse the repository at this point in the history
  • Loading branch information
vahmed-hamdy authored Jun 10, 2024
1 parent 74b100b commit c9def98
Show file tree
Hide file tree
Showing 8 changed files with 544 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

import java.io.Serializable;

import static org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT;
import static org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS;

/**
* A generic sink for destinations that provide an async client to persist data.
*
Expand Down Expand Up @@ -54,6 +57,8 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
private final long maxBatchSizeInBytes;
private final long maxTimeInBufferMS;
private final long maxRecordSizeInBytes;
private final long requestTimeoutMS;
private final boolean failOnTimeout;

protected AsyncSinkBase(
ElementConverter<InputT, RequestEntryT> elementConverter,
Expand All @@ -63,6 +68,28 @@ protected AsyncSinkBase(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes) {
this(
elementConverter,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
DEFAULT_REQUEST_TIMEOUT_MS,
DEFAULT_FAIL_ON_TIMEOUT);
}

protected AsyncSinkBase(
ElementConverter<InputT, RequestEntryT> elementConverter,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
long requestTimeoutMS,
boolean failOnTimeout) {
this.elementConverter =
Preconditions.checkNotNull(
elementConverter,
Expand All @@ -73,6 +100,8 @@ protected AsyncSinkBase(
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxTimeInBufferMS = maxTimeInBufferMS;
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
this.requestTimeoutMS = requestTimeoutMS;
this.failOnTimeout = failOnTimeout;
}

protected ElementConverter<InputT, RequestEntryT> getElementConverter() {
Expand Down Expand Up @@ -102,4 +131,12 @@ protected long getMaxTimeInBufferMS() {
protected long getMaxRecordSizeInBytes() {
return maxRecordSizeInBytes;
}

protected long getRequestTimeoutMS() {
return requestTimeoutMS;
}

protected boolean getFailOnTimeout() {
return failOnTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import java.util.Deque;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -85,6 +88,9 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
private final long maxTimeInBufferMS;
private final long maxRecordSizeInBytes;

private final long requestTimeoutMS;
private final boolean failOnTimeout;

/**
* The ElementConverter provides a mapping between for the elements of a stream to request
* entries that can be sent to the destination.
Expand Down Expand Up @@ -181,15 +187,88 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
* <p>During checkpointing, the sink needs to ensure that there are no outstanding in-flight
* requests.
*
* <p>This method is {@deprecated} in favor of {@code submitRequestEntries(List<RequestEntryT>
* requestEntries, ResultHandler<RequestEntryT> resultHandler)}
*
* @param requestEntries a set of request entries that should be sent to the destination
* @param requestToRetry the {@code accept} method should be called on this Consumer once the
* processing of the {@code requestEntries} are complete. Any entries that encountered
* difficulties in persisting should be re-queued through {@code requestToRetry} by
* including that element in the collection of {@code RequestEntryT}s passed to the {@code
* accept} method. All other elements are assumed to have been successfully persisted.
*/
protected abstract void submitRequestEntries(
List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry);
@Deprecated
protected void submitRequestEntries(
List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry) {
throw new UnsupportedOperationException(
"This method is deprecated. Please override the method that accepts a ResultHandler.");
}

/**
* This method specifies how to persist buffered request entries into the destination. It is
* implemented when support for a new destination is added.
*
* <p>The method is invoked with a set of request entries according to the buffering hints (and
* the valid limits of the destination). The logic then needs to create and execute the request
* asynchronously against the destination (ideally by batching together multiple request entries
* to increase efficiency). The logic also needs to identify individual request entries that
* were not persisted successfully and resubmit them using the {@code requestToRetry} callback.
*
* <p>From a threading perspective, the mailbox thread will call this method and initiate the
* asynchronous request to persist the {@code requestEntries}. NOTE: The client must support
* asynchronous requests and the method called to persist the records must asynchronously
* execute and return a future with the results of that request. A thread from the destination
* client thread pool should complete the request and trigger the {@code resultHandler} to
* complete the processing of the request entries. The {@code resultHandler} actions will run on
* the mailbox thread.
*
* <p>An example implementation of this method is included:
*
* <pre>{@code
* @Override
* protected void submitRequestEntries
* (List<RequestEntryT> records, ResultHandler<RequestEntryT> resultHandler) {
* Future<Response> response = destinationClient.putRecords(records);
* response.whenComplete(
* (response, error) -> {
* if(error != null && isFatal(error)){
* resultHandler.completeExceptionally(error);
* }else if(error != null){
* List<RequestEntryT> retryableFailedRecords = getRetryableFailed(response);
* resultHandler.retryForEntries(retryableFailedRecords);
* }else{
* resultHandler.complete();
* }
* }
* );
* }
*
* }</pre>
*
* <p>During checkpointing, the sink needs to ensure that there are no outstanding in-flight
* requests.
*
* @param requestEntries a set of request entries that should be sent to the destination
* @param resultHandler the {@code complete} method should be called on this ResultHandler once
* the processing of the {@code requestEntries} are complete. Any entries that encountered
* difficulties in persisting should be re-queued through {@code retryForEntries} by
* including that element in the collection of {@code RequestEntryT}s passed to the {@code
* retryForEntries} method. All other elements are assumed to have been successfully
* persisted. In case of encountering fatal exceptions, the {@code completeExceptionally}
* method should be called.
*/
protected void submitRequestEntries(
List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> resultHandler) {
submitRequestEntries(
requestEntries,
requestsToRetry -> {
if (requestsToRetry.isEmpty()) {
resultHandler.complete();
} else {
resultHandler.retryForEntries(requestsToRetry);
}
});
}

/**
* This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
Expand Down Expand Up @@ -292,6 +371,8 @@ public AsyncSinkWriter(
this.maxTimeInBufferMS = configuration.getMaxTimeInBufferMS();
this.maxRecordSizeInBytes = configuration.getMaxRecordSizeInBytes();
this.rateLimitingStrategy = configuration.getRateLimitingStrategy();
this.requestTimeoutMS = configuration.getRequestTimeoutMS();
this.failOnTimeout = configuration.isFailOnTimeout();

this.inFlightRequestsCount = 0;
this.bufferedRequestEntriesTotalSizeInBytes = 0;
Expand Down Expand Up @@ -383,23 +464,16 @@ private void flush() throws InterruptedException {
}

List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
if (batch.size() == 0) {
if (batch.isEmpty()) {
return;
}

int batchSize = requestInfo.getBatchSize();
long requestTimestamp = System.currentTimeMillis();
Consumer<List<RequestEntryT>> requestToRetry =
failedRequestEntries ->
mailboxExecutor.execute(
() ->
completeRequest(
failedRequestEntries, batchSize, requestTimestamp),
"Mark in-flight request as completed and requeue %d request entries",
failedRequestEntries.size());

rateLimitingStrategy.registerInFlightRequest(requestInfo);
inFlightRequestsCount++;
submitRequestEntries(batch, requestToRetry);
submitRequestEntries(
batch, new AsyncSinkWriterResultHandler(requestTimestamp, batch, requestInfo));
}

private int getNextBatchSize() {
Expand Down Expand Up @@ -533,4 +607,78 @@ private int getNextBatchSizeLimit() {
protected Consumer<Exception> getFatalExceptionCons() {
return fatalExceptionCons;
}

/** An implementation of {@link ResultHandler} that supports timeout. */
private class AsyncSinkWriterResultHandler implements ResultHandler<RequestEntryT> {
private final ScheduledFuture<?> scheduledFuture;
private final long requestTimestamp;
private final int batchSize;
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private final List<RequestEntryT> batchEntries;

public AsyncSinkWriterResultHandler(
long requestTimestamp, List<RequestEntryT> batchEntries, RequestInfo requestInfo) {
this.scheduledFuture =
timeService.registerTimer(
timeService.getCurrentProcessingTime() + requestTimeoutMS,
instant -> this.timeout());
this.requestTimestamp = requestTimestamp;
this.batchEntries = batchEntries;
this.batchSize = requestInfo.getBatchSize();
}

@Override
public void complete() {
if (isCompleted.compareAndSet(false, true)) {
scheduledFuture.cancel(false);
mailboxExecutor.execute(
() -> completeRequest(Collections.emptyList(), batchSize, requestTimestamp),
"Mark in-flight request as completed successfully for batch with size %d",
batchSize);
}
}

@Override
public void completeExceptionally(Exception e) {
if (isCompleted.compareAndSet(false, true)) {
scheduledFuture.cancel(false);
mailboxExecutor.execute(
() -> getFatalExceptionCons().accept(e),
"Mark in-flight request as failed with fatal exception %s",
e.getMessage());
}
}

@Override
public void retryForEntries(List<RequestEntryT> requestEntriesToRetry) {
if (isCompleted.compareAndSet(false, true)) {
scheduledFuture.cancel(false);
mailboxExecutor.execute(
() -> completeRequest(requestEntriesToRetry, batchSize, requestTimestamp),
"Mark in-flight request as completed with %d failed request entries",
requestEntriesToRetry.size());
}
}

public void timeout() {
if (isCompleted.compareAndSet(false, true)) {
mailboxExecutor.execute(
() -> {
if (failOnTimeout) {
getFatalExceptionCons()
.accept(
new TimeoutException(
"Request timed out after "
+ requestTimeoutMS
+ "ms with failOnTimeout set to true."));
} else {
// Retry the request on timeout
completeRequest(batchEntries, batchSize, requestTimestamp);
}
},
"Mark in-flight request as completed with timeout after %l",
requestTimeoutMS);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.base.sink.writer;

import org.apache.flink.annotation.PublicEvolving;

import java.util.List;

/**
* Interface to handle the result of a {@link AsyncSinkWriter} request.
*
* @param <T> Type of the request entries.
*/
@PublicEvolving
public interface ResultHandler<T> {

/** Mark the in-flight request as completed successfully. */
void complete();

/** Fail job with fatal exception. */
void completeExceptionally(Exception e);

/** Requeue Failed entries to retry. */
void retryForEntries(List<T> requestEntriesToRetry);
}
Loading

0 comments on commit c9def98

Please sign in to comment.