Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lambda processor should retry for certain class of exceptions #5320

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,28 @@
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import org.opensearch.dataprepper.plugins.lambda.common.util.CustomLambdaRetryCondition;
import org.opensearch.dataprepper.plugins.lambda.utils.CountingHttpClient;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.TooManyRequestsException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -94,9 +105,12 @@ private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorCon

@BeforeEach
public void setup() {
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");
// lambdaRegion = System.getProperty("tests.lambda.processor.region");
// functionName = System.getProperty("tests.lambda.processor.functionName");
// role = System.getProperty("tests.lambda.processor.sts_role_arn");
lambdaRegion = "us-west-2";
functionName = "lambdaNoReturn";
role = "arn:aws:iam::176893235612:role/osis-s3-opensearch-role";

pluginMetrics = mock(PluginMetrics.class);
pluginSetting = mock(PluginSetting.class);
Expand Down Expand Up @@ -373,4 +387,81 @@ private List<Record<Event>> createRecords(int numRecords) {
}
return records;
}

/*
* For this test, set concurrency limit to 1
*/
@Test
void testTooManyRequestsExceptionWithCustomRetryCondition() {
//Note lambda function for this test looks like this:
/*def lambda_handler(event, context):
# Simulate a slow operation so that
# if concurrency = 1, multiple parallel invocations
# will result in TooManyRequestsException for the second+ invocation.
time.sleep(10)
# Return a simple success response
return {
"statusCode": 200,
"body": "Hello from concurrency-limited Lambda!"
}
*/

// Wrap the default HTTP client to count requests
CountingHttpClient countingHttpClient = new CountingHttpClient(
NettyNioAsyncHttpClient.builder().build()
);

// Configure a custom retry policy with 3 retries and your custom condition
RetryPolicy retryPolicy = RetryPolicy.builder()
.numRetries(3)
.retryCondition(new CustomLambdaRetryCondition())
.build();

// Build the real Lambda client
LambdaAsyncClient client = LambdaAsyncClient.builder()
.overrideConfiguration(
ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.build()
)
.region(Region.of(lambdaRegion))
.httpClient(countingHttpClient)
.build();

// Parallel invocations to force concurrency=1 to throw TooManyRequestsException
int parallelInvocations = 10;
CompletableFuture<?>[] futures = new CompletableFuture[parallelInvocations];
for (int i = 0; i < parallelInvocations; i++) {
InvokeRequest request = InvokeRequest.builder()
.functionName(functionName)
.build();

futures[i] = client.invoke(request);
}

// 5) Wait for all to complete
CompletableFuture.allOf(futures).join();

// 6) Check how many had TooManyRequestsException
long tooManyRequestsCount = Arrays.stream(futures)
.filter(f -> {
try {
f.join();
return false; // no error => no TMR
} catch (CompletionException e) {
return e.getCause() instanceof TooManyRequestsException;
}
})
.count();

// 7) Observe how many total network requests occurred (including SDK retries)
int totalRequests = countingHttpClient.getRequestCount();
System.out.println("Total network requests (including retries): " + totalRequests);

// Optionally: If you want to confirm the EXACT number,
// this might vary depending on how many parallel calls and how your TMR throttles them.
// For example, if all 5 calls are blocked, you might see 5*(numRetries + 1) in worst case.
assertTrue(totalRequests >= parallelInvocations,
"Should be at least one request per initial invocation, plus retries.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ private LambdaCommonHandler() {
}

public static boolean isSuccess(InvokeResponse response) {
if(response == null) {
return false;
}
int statusCode = response.statusCode();
return statusCode >= 200 && statusCode < 300;
}
Expand Down Expand Up @@ -65,12 +68,14 @@ private static List<Buffer> createBufferBatches(Collection<Record<Event>> record
if (ThresholdCheck.checkThresholdExceed(currentBufferPerBatch, maxEvents, maxBytes,
maxCollectionDuration)) {
batchedBuffers.add(currentBufferPerBatch);
currentBufferPerBatch.completeCodec();
currentBufferPerBatch = new InMemoryBuffer(keyName, outputCodecContext);
}
}

if (currentBufferPerBatch.getEventCount() > 0) {
batchedBuffers.add(currentBufferPerBatch);
currentBufferPerBatch.completeCodec();
}
return batchedBuffers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ public interface Buffer {
Long getPayloadRequestSize();

Duration stopLatencyWatch();

void completeCodec();

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ public void addRecord(Record<Event> record) {
eventCount++;
}

public void completeCodec() {
if (eventCount > 0) {
try {
requestCodec.complete(this.byteArrayOutputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

public List<Record<Event>> getRecords() {
return records;
}
Expand All @@ -98,12 +108,6 @@ public InvokeRequest getRequestPayload(String functionName, String invocationTyp
return null;
}

try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any advantage in moving this to another public method? One disadvantage I see is that the caller needs to know and must follow that he should call completeCodec first before he call getRequestPayload method - right?

requestCodec.complete(this.byteArrayOutputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}

SdkBytes payload = getPayload();
payloadRequestSize = payload.asByteArray().length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions;
import org.opensearch.dataprepper.plugins.lambda.common.util.CustomLambdaRetryCondition;
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
Expand Down Expand Up @@ -48,13 +49,14 @@ private static ClientOverrideConfiguration createOverrideConfiguration(
.maxBackoffTime(clientOptions.getMaxBackoff())
.build();

final RetryPolicy retryPolicy = RetryPolicy.builder()
final RetryPolicy customRetryPolicy = RetryPolicy.builder()
.retryCondition(new CustomLambdaRetryCondition())
.numRetries(clientOptions.getMaxConnectionRetries())
.backoffStrategy(backoffStrategy)
.build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.retryPolicy(customRetryPolicy)
.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))
.apiCallTimeout(clientOptions.getApiCallTimeout())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.dataprepper.plugins.lambda.common.util;

import software.amazon.awssdk.core.retry.conditions.RetryCondition;
import software.amazon.awssdk.core.retry.RetryPolicyContext;

public class CustomLambdaRetryCondition implements RetryCondition {

@Override
public boolean shouldRetry(RetryPolicyContext context) {
Throwable exception = context.exception();
if (exception != null) {
return LambdaRetryStrategy.isRetryableException(exception);
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package org.opensearch.dataprepper.plugins.lambda.common.util;

import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.TooManyRequestsException;
import software.amazon.awssdk.services.lambda.model.ServiceException;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;

import static org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler.isSuccess;

/**
* Similar to BulkRetryStrategy in the OpenSearch sink.
* Categorizes AWS Lambda exceptions and status codes into
* retryable and non-retryable scenarios.
*/
public final class LambdaRetryStrategy {

private LambdaRetryStrategy() {
}

/**
* Possibly a set of “bad request” style errors which might fall
*/
private static final Set<Integer> BAD_REQUEST_ERRORS = new HashSet<>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use Set.of() method instead of HashSet with Arrays.asList within it.

Arrays.asList(
400, // Bad Request
422, // Unprocessable Entity
417, // Expectation Failed
406 // Not Acceptable
)
);

/**
* Status codes which may indicate a security or policy problem, so we don't retry.
*/
private static final Set<Integer> NOT_ALLOWED_ERRORS = new HashSet<>(
Arrays.asList(
401, // Unauthorized
403, // Forbidden
405 // Method Not Allowed
)
);

/**
* Examples of input or payload errors that are likely not retryable
* unless the pipeline itself corrects them.
*/
private static final Set<Integer> INVALID_INPUT_ERRORS = new HashSet<>(
Arrays.asList(
413, // Payload Too Large
414, // URI Too Long
416 // Range Not Satisfiable
)
);

/**
* Example of a “timeout” scenario. Lambda can return 429 for "Too Many Requests" or
* 408 (if applicable) for timeouts in some contexts.
* This can be considered retryable if you want to handle the throttling scenario.
*/
private static final Set<Integer> TIMEOUT_ERRORS = new HashSet<>(
Arrays.asList(
408, // Request Timeout
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this retryable category?

429 // Too Many Requests (often used as "throttling" for Lambda)
)
);

public static boolean isRetryable(final int statusCode) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not seeing any references to this method. is this unused (other than tests)? If it is unused, we can remove it. If we have to keep it, then I would recommend changing the name to isRetryableStatuscode to be inline with the other method isRetryableException

return TIMEOUT_ERRORS.contains(statusCode) || (statusCode >= 500 && statusCode < 600);
}

/*
* Note:isRetryable and isRetryableException should match
*/
public static boolean isRetryableException(final Throwable t) {
if (t instanceof TooManyRequestsException) {
// Throttling => often can retry with backoff
return true;
}
if (t instanceof ServiceException) {
// Usually indicates a 5xx => can retry
return true;
}
if (t instanceof SdkClientException) {
// Possibly network/connection error => can retry
return true;
}
return false;
}

/**
* Determines if this is definitely NOT retryable (client error or permanent failure).
*/
public static boolean isNonRetryable(final InvokeResponse response) {
if(response == null) return false;

int statusCode = response.statusCode();
return BAD_REQUEST_ERRORS.contains(statusCode)
|| NOT_ALLOWED_ERRORS.contains(statusCode)
|| INVALID_INPUT_ERRORS.contains(statusCode);
}

/**
* For convenience, you can create more fine-grained checks or
* direct set membership checks (e.g. isBadRequest(...), isTimeout(...)) if you want.
*/
public static boolean isTimeoutError(final InvokeResponse response) {
return TIMEOUT_ERRORS.contains(response.statusCode());
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,4 @@ public boolean isReadyForShutdown() {
public void shutdown() {
}

}
}
Loading
Loading