Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove legacy retry stages and supporting classes #5784

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncBeforeTransmissionExecutionInterceptorsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage2;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncSigningStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.HttpChecksumStage;
Expand Down Expand Up @@ -205,8 +205,8 @@ public <OutputT> CompletableFuture<OutputT> execute(
.then(AsyncBeforeTransmissionExecutionInterceptorsStage::new)
.then(d -> new MakeAsyncHttpRequestStage<>(responseHandler, d))
.wrappedWith(AsyncApiCallAttemptMetricCollectionStage::new)
.wrappedWith((deps, wrapped) -> new AsyncRetryableStage2<>(responseHandler, deps,
wrapped))
.wrappedWith((deps, wrapped) -> new AsyncRetryableStage<>(responseHandler, deps,
wrapped))
.then(async(() -> new UnwrapResponseContainer<>()))
.then(async(() -> new AfterExecutionInterceptorsStage<>()))
.wrappedWith(AsyncExecutionFailureExceptionReportingStage::new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.MergeCustomHeadersStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MergeCustomQueryParamsStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.QueryParametersToBodyStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2;
import software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.UnwrapResponseContainer;
Expand Down Expand Up @@ -199,7 +199,7 @@ public <OutputT> OutputT execute(HttpResponseHandler<Response<OutputT>> response
.wrappedWith(ApiCallAttemptTimeoutTrackingStage::new)
.wrappedWith(TimeoutExceptionHandlingStage::new)
.wrappedWith((deps, wrapped) -> new ApiCallAttemptMetricCollectionStage<>(wrapped))
.wrappedWith(RetryableStage2::new)::build)
.wrappedWith(RetryableStage::new)::build)
.wrappedWith(StreamManagingStage::new)
.wrappedWith(ApiCallTimeoutTrackingStage::new)::build)
.wrappedWith((deps, wrapped) -> new ApiCallMetricCollectionStage<>(wrapped))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,19 @@

import java.io.IOException;
import java.time.Duration;
import java.util.OptionalDouble;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.core.Response;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
import software.amazon.awssdk.core.internal.retry.RateLimitingTokenBucket;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;

Expand All @@ -49,97 +46,54 @@ public final class AsyncRetryableStage<OutputT> implements RequestPipeline<SdkHt
private final RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline;
private final ScheduledExecutorService scheduledExecutor;
private final HttpClientDependencies dependencies;
private final RateLimitingTokenBucket rateLimitingTokenBucket;

public AsyncRetryableStage(TransformingAsyncResponseHandler<Response<OutputT>> responseHandler,
HttpClientDependencies dependencies,
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline) {
this.responseHandler = responseHandler;
this.dependencies = dependencies;
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
this.rateLimitingTokenBucket = new RateLimitingTokenBucket();
this.requestPipeline = requestPipeline;
}

@SdkTestInternalApi
public AsyncRetryableStage(TransformingAsyncResponseHandler<Response<OutputT>> responseHandler,
HttpClientDependencies dependencies,
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline,
RateLimitingTokenBucket rateLimitingTokenBucket) {
this.responseHandler = responseHandler;
this.dependencies = dependencies;
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
this.requestPipeline = requestPipeline;
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
}

@Override
public CompletableFuture<Response<OutputT>> execute(SdkHttpFullRequest request,
RequestExecutionContext context) throws Exception {
return new RetryingExecutor(request, context).execute();
}

private class RetryingExecutor {
private final class RetryingExecutor {
private final AsyncRequestBody originalRequestBody;
private final RequestExecutionContext context;
private final RetryableStageHelper retryableStageHelper;

private RetryingExecutor(SdkHttpFullRequest request, RequestExecutionContext context) {
this.originalRequestBody = context.requestProvider();
this.context = context;
this.retryableStageHelper = new RetryableStageHelper(request, context, rateLimitingTokenBucket, dependencies);
this.retryableStageHelper = new RetryableStageHelper(request, context, dependencies);
}

public CompletableFuture<Response<OutputT>> execute() throws Exception {
public CompletableFuture<Response<OutputT>> execute() {
CompletableFuture<Response<OutputT>> future = new CompletableFuture<>();
maybeAttemptExecute(future);
attemptFirstExecute(future);
return future;
}

public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
retryableStageHelper.startingAttempt();

if (!retryableStageHelper.retryPolicyAllowsRetry()) {
future.completeExceptionally(retryableStageHelper.retryPolicyDisallowedRetryException());
return;
}

if (retryableStageHelper.getAttemptNumber() > 1) {
// We failed the last attempt, but will retry. The response handler wants to know when that happens.
responseHandler.onError(retryableStageHelper.getLastException());

// Reset the request provider to the original one before retries, in case it was modified downstream.
context.requestProvider(originalRequestBody);
}

Duration backoffDelay = retryableStageHelper.getBackoffDelay();

OptionalDouble tokenAcquireTimeSeconds = retryableStageHelper.getSendTokenNonBlocking();
if (!tokenAcquireTimeSeconds.isPresent()) {
String errorMessage = "Unable to acquire a send token immediately without waiting. This indicates that ADAPTIVE "
+ "retry mode is enabled, fast fail rate limiting is enabled, and that rate limiting is "
+ "engaged because of prior throttled requests. The request will not be executed.";
future.completeExceptionally(SdkClientException.create(errorMessage));
return;
}
long tokenAcquireTimeMillis = (long) (tokenAcquireTimeSeconds.getAsDouble() * 1000);

if (!backoffDelay.isZero()) {
public void attemptFirstExecute(CompletableFuture<Response<OutputT>> future) {
Duration backoffDelay = retryableStageHelper.acquireInitialToken();
if (backoffDelay.isZero()) {
attemptExecute(future);
} else {
retryableStageHelper.logBackingOff(backoffDelay);
}

long totalDelayMillis = backoffDelay.toMillis() + tokenAcquireTimeMillis;

if (totalDelayMillis > 0) {
long totalDelayMillis = backoffDelay.toMillis();
scheduledExecutor.schedule(() -> attemptExecute(future), totalDelayMillis, MILLISECONDS);
} else {
attemptExecute(future);
}
}

private void attemptExecute(CompletableFuture<Response<OutputT>> future) {
CompletableFuture<Response<OutputT>> responseFuture;
try {
retryableStageHelper.startingAttempt();
retryableStageHelper.logSendingRequest();
responseFuture = requestPipeline.execute(retryableStageHelper.requestToSend(), context);

Expand All @@ -164,23 +118,37 @@ private void attemptExecute(CompletableFuture<Response<OutputT>> future) {
}

retryableStageHelper.setLastResponse(response.httpResponse());

if (!response.isSuccess()) {
retryableStageHelper.adjustClockIfClockSkew(response);
maybeRetryExecute(future, response.exception());
return;
}

retryableStageHelper.updateClientSendingRateForSuccessResponse();

retryableStageHelper.attemptSucceeded();
retryableStageHelper.recordAttemptSucceeded();
future.complete(response);
});
}

public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
Optional<Duration> delay = retryableStageHelper.tryRefreshToken(Duration.ZERO);
if (!delay.isPresent()) {
future.completeExceptionally(retryableStageHelper.retryPolicyDisallowedRetryException());
return;
}
// We failed the last attempt, but will retry. The response handler wants to know when that happens.
responseHandler.onError(retryableStageHelper.getLastException());

// Reset the request provider to the original one before retries, in case it was modified downstream.
context.requestProvider(originalRequestBody);

Duration backoffDelay = delay.get();
retryableStageHelper.logBackingOff(backoffDelay);
long totalDelayMillis = backoffDelay.toMillis();
scheduledExecutor.schedule(() -> attemptExecute(future), totalDelayMillis, MILLISECONDS);
}

private void maybeRetryExecute(CompletableFuture<Response<OutputT>> future, Exception exception) {
retryableStageHelper.setLastException(exception);
retryableStageHelper.updateClientSendingRateForErrorResponse();
maybeAttemptExecute(future);
}
}
Expand Down

This file was deleted.

Loading
Loading