-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lambda processor should retry for certain class of exceptions #5320
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,5 +36,7 @@ public interface Buffer { | |
Long getPayloadRequestSize(); | ||
|
||
Duration stopLatencyWatch(); | ||
|
||
void completeCodec(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package org.opensearch.dataprepper.plugins.lambda.common.util; | ||
|
||
import software.amazon.awssdk.core.retry.conditions.RetryCondition; | ||
import software.amazon.awssdk.core.retry.RetryPolicyContext; | ||
|
||
public class CustomLambdaRetryCondition implements RetryCondition { | ||
|
||
@Override | ||
public boolean shouldRetry(RetryPolicyContext context) { | ||
Throwable exception = context.exception(); | ||
if (exception != null) { | ||
return LambdaRetryStrategy.isRetryableException(exception); | ||
} | ||
|
||
return false; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package org.opensearch.dataprepper.plugins.lambda.common.util; | ||
|
||
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; | ||
import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig; | ||
import software.amazon.awssdk.core.exception.SdkClientException; | ||
import software.amazon.awssdk.services.lambda.LambdaAsyncClient; | ||
import software.amazon.awssdk.services.lambda.model.InvokeRequest; | ||
import software.amazon.awssdk.services.lambda.model.InvokeResponse; | ||
import software.amazon.awssdk.services.lambda.model.TooManyRequestsException; | ||
import software.amazon.awssdk.services.lambda.model.ServiceException; | ||
|
||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import org.slf4j.Logger; | ||
|
||
import static org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler.isSuccess; | ||
|
||
/** | ||
* Similar to BulkRetryStrategy in the OpenSearch sink. | ||
* Categorizes AWS Lambda exceptions and status codes into | ||
* retryable and non-retryable scenarios. | ||
*/ | ||
public final class LambdaRetryStrategy { | ||
|
||
private LambdaRetryStrategy() { | ||
} | ||
|
||
/** | ||
* Possibly a set of “bad request” style errors which might fall | ||
*/ | ||
private static final Set<Integer> BAD_REQUEST_ERRORS = new HashSet<>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better to use |
||
Arrays.asList( | ||
400, // Bad Request | ||
422, // Unprocessable Entity | ||
417, // Expectation Failed | ||
406 // Not Acceptable | ||
) | ||
); | ||
|
||
/** | ||
* Status codes which may indicate a security or policy problem, so we don't retry. | ||
*/ | ||
private static final Set<Integer> NOT_ALLOWED_ERRORS = new HashSet<>( | ||
Arrays.asList( | ||
401, // Unauthorized | ||
403, // Forbidden | ||
405 // Method Not Allowed | ||
) | ||
); | ||
|
||
/** | ||
* Examples of input or payload errors that are likely not retryable | ||
* unless the pipeline itself corrects them. | ||
*/ | ||
private static final Set<Integer> INVALID_INPUT_ERRORS = new HashSet<>( | ||
Arrays.asList( | ||
413, // Payload Too Large | ||
414, // URI Too Long | ||
416 // Range Not Satisfiable | ||
) | ||
); | ||
|
||
/** | ||
* Example of a “timeout” scenario. Lambda can return 429 for "Too Many Requests" or | ||
* 408 (if applicable) for timeouts in some contexts. | ||
* This can be considered retryable if you want to handle the throttling scenario. | ||
*/ | ||
private static final Set<Integer> TIMEOUT_ERRORS = new HashSet<>( | ||
Arrays.asList( | ||
408, // Request Timeout | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this retryable category? |
||
429 // Too Many Requests (often used as "throttling" for Lambda) | ||
) | ||
); | ||
|
||
public static boolean isRetryable(final int statusCode) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not seeing any references to this method. is this unused (other than tests)? If it is unused, we can remove it. If we have to keep it, then I would recommend changing the name to |
||
return TIMEOUT_ERRORS.contains(statusCode) || (statusCode >= 500 && statusCode < 600); | ||
} | ||
|
||
/* | ||
* Note:isRetryable and isRetryableException should match | ||
*/ | ||
public static boolean isRetryableException(final Throwable t) { | ||
if (t instanceof TooManyRequestsException) { | ||
// Throttling => often can retry with backoff | ||
return true; | ||
} | ||
if (t instanceof ServiceException) { | ||
// Usually indicates a 5xx => can retry | ||
return true; | ||
} | ||
if (t instanceof SdkClientException) { | ||
// Possibly network/connection error => can retry | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* Determines if this is definitely NOT retryable (client error or permanent failure). | ||
*/ | ||
public static boolean isNonRetryable(final InvokeResponse response) { | ||
if(response == null) return false; | ||
|
||
int statusCode = response.statusCode(); | ||
return BAD_REQUEST_ERRORS.contains(statusCode) | ||
|| NOT_ALLOWED_ERRORS.contains(statusCode) | ||
|| INVALID_INPUT_ERRORS.contains(statusCode); | ||
} | ||
|
||
/** | ||
* For convenience, you can create more fine-grained checks or | ||
* direct set membership checks (e.g. isBadRequest(...), isTimeout(...)) if you want. | ||
*/ | ||
public static boolean isTimeoutError(final InvokeResponse response) { | ||
return TIMEOUT_ERRORS.contains(response.statusCode()); | ||
} | ||
|
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -267,4 +267,4 @@ public boolean isReadyForShutdown() { | |
public void shutdown() { | ||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any advantage in moving this to another public method? One disadvantage I see is that the caller needs to know and must follow that he should call
completeCodec
first before he callgetRequestPayload
method - right?