Skip to content

Commit

Permalink
Merge pull request #32 from Worklytics/s192-better-ds-exception-retry
Browse files Browse the repository at this point in the history
s192 - Better handle wrapped DatastoreExceptions
  • Loading branch information
jlorper authored Jan 31, 2025
2 parents f2fd2da + 1635d95 commit 6808293
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.google.cloud.datastore.DatastoreException;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -35,5 +40,15 @@ public <V> void onRetry(Attempt<V> attempt) {
};
}


public static Predicate<Throwable> handleDatastoreExceptionRetry() {
return t -> {
Iterator<DatastoreException> datastoreExceptionIterator = Iterables.filter(Throwables.getCausalChain(t), DatastoreException.class).iterator();
if (datastoreExceptionIterator.hasNext()) {
DatastoreException de = datastoreExceptionIterator.next();
return de.isRetryable() ||
(de.getMessage() != null && de.getMessage().toLowerCase().contains("retry the transaction"));
}
return false;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,7 @@ public class ShardedJobRunner implements ShardedJobHandler {
public static RetryerBuilder getRetryerBuilder() {
return RetryerBuilder.newBuilder()
.withWaitStrategy(RetryUtils.defaultWaitStrategy())
.retryIfException( e -> {
if (e instanceof RuntimeException) {
// contains a DatastoreException in the cause
Throwable cause = e.getCause();
if (cause instanceof DatastoreException) {
return ((DatastoreException) cause).isRetryable() || cause.getMessage().contains("Please retry the transaction");
}
}
return false;
})
.retryIfException(e -> {
if (e instanceof DatastoreException) {
return ((DatastoreException) e).isRetryable() || e.getMessage().contains("Please retry the transaction");
}
return false;
})
.retryIfException(RetryUtils.handleDatastoreExceptionRetry())
.retryIfExceptionOfType(ApiProxyException.class)
.retryIfExceptionOfType(ConcurrentModificationException.class) // don't think this is thrown by new datastore lib
.retryIfExceptionOfType(TransientFailureException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.google.appengine.tools.pipeline.impl.backend;

import com.github.rholder.retry.*;
import com.google.appengine.tools.mapreduce.RetryUtils;
import com.google.appengine.tools.pipeline.JobRunId;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.impl.QueueSettings;
Expand All @@ -27,7 +28,10 @@
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.datastore.*;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.datastore.v1.QueryResultBatch;
import lombok.Builder;
Expand Down Expand Up @@ -64,26 +68,39 @@ public class AppEngineBackEnd implements PipelineBackEnd, SerializationStrategy
public static final int RETRY_BACKOFF_MULTIPLIER = 300;
public static final int RETRY_MAX_BACKOFF_MS = 5000;

// TODO: RetryUtils is in mapreduce package, so duplicated to not mix on purpose
// TODO: consider moving to a shared package
// TODO: possibly we should inspect error code in more detail? see https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes
@SuppressWarnings("DuplicatedCode")
public static Predicate<Throwable> handleDatastoreExceptionRetry() {
return t -> {
Iterator<DatastoreException> datastoreExceptionIterator = Iterables.filter(Throwables.getCausalChain(t), DatastoreException.class).iterator();
if (datastoreExceptionIterator.hasNext()) {
DatastoreException de = datastoreExceptionIterator.next();
return de.isRetryable() ||
(de.getMessage() != null && de.getMessage().toLowerCase().contains("retry the transaction"));
}
return false;
};
}

private <E> Retryer<E> withDefaults(RetryerBuilder<E> builder) {
return builder
.withWaitStrategy(
WaitStrategies.exponentialWait(RETRY_BACKOFF_MULTIPLIER, RETRY_MAX_BACKOFF_MS, TimeUnit.MILLISECONDS))
// TODO: possibly we should inspect error code in more detail? see https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes
.retryIfException(e -> e instanceof DatastoreException && (((DatastoreException) e).isRetryable()))
.retryIfException(handleDatastoreExceptionRetry())
.retryIfExceptionOfType(IOException.class) //q: can this happen?
.withStopStrategy((Attempt failedAttempt) ->
failedAttempt.getAttemptNumber() >= MAX_RETRY_ATTEMPTS
//
//|| (failedAttempt.hasException() && (failedAttempt.getExceptionCause() instanceOf SomePersistentIOException)
)
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_ATTEMPTS))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
String className = AppEngineBackEnd.class.getName();
if (attempt.hasException()) {
logger.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()), attempt.getExceptionCause());
} else {
logger.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d. No exception?".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()));
if (attempt.getAttemptNumber() > 1 || attempt.hasException()) {
String className = AppEngineBackEnd.class.getName();
if (attempt.hasException()) {
log.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()), attempt.getExceptionCause());
} else {
log.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d. No exception?".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.google.appengine.tools.mapreduce;

import com.google.cloud.BaseServiceException;
import com.google.cloud.datastore.DatastoreException;
import com.google.common.collect.ImmutableSet;
import org.junit.jupiter.api.Test;

import java.util.function.IntSupplier;

import static org.junit.jupiter.api.Assertions.*;

class RetryUtilsTest {

@Test
void handleDatastoreExceptionRetry() {

// copied from DatastoreException.RETRYABLE_ERROR_CODES (private)
ImmutableSet<BaseServiceException.Error> retryableErrors = ImmutableSet.of(
new BaseServiceException.Error(10, "ABORTED", false),
new BaseServiceException.Error(4, "DEADLINE_EXCEEDED", false),
new BaseServiceException.Error(14, "UNAVAILABLE", true));
ImmutableSet<BaseServiceException.Error> nonRetryableErrors = ImmutableSet.of(
new BaseServiceException.Error(52222, "WHATEVER", false),
new BaseServiceException.Error(989898, "WHO_KNOWS", false));

for (BaseServiceException.Error error : retryableErrors) {
DatastoreException de = new DatastoreException(error.getCode(), "something broke", error.getReason(), true, null);
assertTrue(RetryUtils.handleDatastoreExceptionRetry().test(de));
}
for (BaseServiceException.Error error : nonRetryableErrors) {
DatastoreException de = new DatastoreException(error.getCode(), "something broke", error.getReason(), false, null);
assertFalse(RetryUtils.handleDatastoreExceptionRetry().test(de));
}

// Test with message containing "retry the transaction"
for (BaseServiceException.Error error : nonRetryableErrors) {
DatastoreException de = new DatastoreException(error.getCode(), "RETRY THE TRANSACTION", error.getReason(), false, null);
assertTrue(RetryUtils.handleDatastoreExceptionRetry().test(de));
}

// embedded DatastoreException
retryableErrors.forEach(error -> {
DatastoreException root = new DatastoreException(error.getCode(), "something broken", error.getReason(), true, null);
assertTrue(RetryUtils.handleDatastoreExceptionRetry().test(root));
Exception chain1 = new Exception("Test Exception 1", root);
Exception chain2 = new Exception("Test Exception 2", chain1);
assertTrue(RetryUtils.handleDatastoreExceptionRetry().test(chain2));
});

}
}

0 comments on commit 6808293

Please sign in to comment.