Skip to content

Commit

Permalink
Perform retries on runTask/updateTask
Browse files Browse the repository at this point in the history
jlorper committed Jan 31, 2025
1 parent 6808293 commit 6ade707
Showing 3 changed files with 126 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@

import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.api.taskqueue.TransactionalTaskException;
@@ -36,10 +35,10 @@
import javax.inject.Provider;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.stream.Stream;

import static com.google.appengine.tools.mapreduce.RetryUtils.SYMBOLIC_FOREVER;
import static com.google.appengine.tools.mapreduce.impl.shardedjob.Status.StatusCode.*;
import static java.util.concurrent.Executors.callable;

@@ -79,28 +78,24 @@ public class ShardedJobRunner implements ShardedJobHandler {
// Each task also checks the job state entity to detect if the job has been
// aborted or deleted, and terminates if so.

// NOTE: no StopStrategy set, must be set by the caller prior to build
public static RetryerBuilder getRetryerBuilder() {
return RetryerBuilder.newBuilder()
.withWaitStrategy(RetryUtils.defaultWaitStrategy())
.retryIfException(RetryUtils.handleDatastoreExceptionRetry())
.retryIfExceptionOfType(ApiProxyException.class)
.retryIfExceptionOfType(ConcurrentModificationException.class) // don't think this is thrown by new datastore lib
.retryIfExceptionOfType(TransientFailureException.class)
.retryIfExceptionOfType(TransactionalTaskException.class)
.withRetryListener(RetryUtils.logRetry(log, ShardedJobRunner.class.getName()));
}
public static final RetryerBuilder RETRYER_UNDEFINED_STOP = RetryerBuilder.newBuilder()
.withWaitStrategy(RetryUtils.defaultWaitStrategy())
.retryIfException(RetryUtils.handleDatastoreExceptionRetry())
.retryIfExceptionOfType(ApiProxyException.class)
.retryIfExceptionOfType(ConcurrentModificationException.class) // don't think this is thrown by new datastore lib
.retryIfExceptionOfType(TransientFailureException.class)
.retryIfExceptionOfType(TransactionalTaskException.class)
.withRetryListener(RetryUtils.logRetry(log, ShardedJobRunner.class.getName()));;

public static final RetryerBuilder AGGRESSIVE_RETRYER_UNDEFINED_STOP = RETRYER_UNDEFINED_STOP.retryIfException(e ->
!(e instanceof RequestTooLargeException
|| e instanceof ResponseTooLargeException
|| e instanceof ArgumentException
|| e instanceof DeadlineExceededException));

public static final RetryerBuilder FOREVER_RETRYER = RETRYER_UNDEFINED_STOP.withStopStrategy(StopStrategies.stopAfterAttempt(SYMBOLIC_FOREVER));
public static final RetryerBuilder FOREVER_AGGRESSIVE_RETRYER = AGGRESSIVE_RETRYER_UNDEFINED_STOP.withStopStrategy(StopStrategies.stopAfterAttempt(SYMBOLIC_FOREVER));

// NOTE: no StopStrategy set, must be set by the caller prior to build
public static RetryerBuilder getRetryerBuilderAggressive() {
return getRetryerBuilder()
.retryIfException(e ->
!(e instanceof RequestTooLargeException
|| e instanceof ResponseTooLargeException
|| e instanceof ArgumentException
|| e instanceof DeadlineExceededException))
.withRetryListener(RetryUtils.logRetry(log, ShardedJobRunner.class.getName()));
}

public <T extends IncrementalTask> List<IncrementalTaskState<T>> lookupTasks(
final ShardedJobRunId jobId, final int taskCount, final boolean lenient) {
@@ -211,7 +206,7 @@ public void completeShard(@NonNull final ShardedJobRunId jobId, @NonNull final I
PipelineService pipelineService = pipelineServiceProvider.get();

//below seems to FAIL bc of transaction connection - why!?!?
ShardedJobStateImpl<?> jobState = RetryExecutor.call(getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)), () -> {
ShardedJobStateImpl<?> jobState = RetryExecutor.call(FOREVER_RETRYER, () -> {
Transaction tx = getDatastore().newTransaction();
try {
ShardedJobStateImpl<?> jobState1 = lookupJobState(tx, jobId);
@@ -365,45 +360,56 @@ private <T extends IncrementalTask> boolean lockShard(Transaction tx,
@Override
public void runTask(final ShardedJobRunId jobId, final IncrementalTaskId taskId, final int sequenceNumber) {
//acquire lock (allows this process to START potentially long-running work of task itself)
Transaction lockAcquisition = getDatastore().newTransaction();

try {
final ShardedJobStateImpl<? extends IncrementalTask> jobState = lookupJobState(lockAcquisition, jobId);
RetryExecutor.<Void>call(FOREVER_RETRYER, () -> {
Transaction lockAcquisition = getDatastore().newTransaction();
try {
final ShardedJobStateImpl<? extends IncrementalTask> jobState = lookupJobState(lockAcquisition, jobId);

if (jobState == null) {
log.info(taskId + ": Job is gone, ignoring runTask call.");
return;
}
if (jobState == null) {
log.info(taskId + ": Job is gone, ignoring runTask call.");
return null;
}

//taskState represents attempt of executing a slice of a shard of a sharded job
IncrementalTaskState taskState =
getAndValidateTaskState(lockAcquisition, taskId, sequenceNumber, jobState);
if (taskState == null) {
// some sort of error code happened
//taskState represents attempt of executing a slice of a shard of a sharded job
IncrementalTaskState taskState =
getAndValidateTaskState(lockAcquisition, taskId, sequenceNumber, jobState);
if (taskState == null) {
// some sort of error code happened

// seems like getAndValidationTaskState has potential side-effects, which need to be committed
lockAcquisition.commit();
return null;
}

// seems like getAndValidationTaskState has potential side-effects, which need to be committed
lockAcquisition.commit();
return;
if (lockShard(lockAcquisition, taskState)) {
// committing here, which forces acquisition of lock ...
lockAcquisition.commit();

// actual task execution
runAndUpdateTask(jobState.getShardedJobId(), taskId, sequenceNumber, jobState, taskState);
} else {
log.warning("Failed to acquire the lock, Will reschedule task for: " + taskState.getJobId()
+ " on slice " + taskState.getSequenceNumber());
long eta = System.currentTimeMillis() + new Random().nextInt(5000) + 5000;
scheduleWorkerTask(jobState.getSettings(), taskState, eta);
}
} catch (ConcurrentModificationException ex) {
// don't believe this is possible with new datastore lib
throw new IllegalStateException("Concurrent modification exception should not happen here", ex);
} finally {
rollbackIfActive(lockAcquisition);
}
return null;
});

if (lockShard(lockAcquisition, taskState)) {
// committing here, which forces acquisition of lock ...
lockAcquisition.commit();
}

// actual task execution
runAndUpdateTask(jobState.getShardedJobId(), taskId, sequenceNumber, jobState, taskState);
} else {
log.warning("Failed to acquire the lock, Will reschedule task for: " + taskState.getJobId()
+ " on slice " + taskState.getSequenceNumber());
long eta = System.currentTimeMillis() + new Random().nextInt(5000) + 5000;
scheduleWorkerTask(jobState.getSettings(), taskState, eta);
}
} catch (ConcurrentModificationException ex) {
// don't believe this is possible with new datastore lib
throw new IllegalStateException("Concurrent modification exception should not happen here", ex);
} finally {
rollbackIfActive(lockAcquisition);
}
private enum RetryType {
SLICE,
SHARD,
JOB,
NONE
}

//actual incremental task execution ( run() method )
@@ -413,8 +419,10 @@ private <T extends IncrementalTask> void runAndUpdateTask(
final int sequenceNumber,
final ShardedJobStateImpl<T> jobState,
IncrementalTaskState<T> taskState) {
ShardRetryState<T> retryState = null;
Transaction postRunUpdate = null; //txn limited to 60s, so can't open this before run() call

RetryType retryType = RetryType.NONE;
Throwable t = null;
//txn limited to 60s, so can't open this before run() call
try {
String statusUrl = jobState.getSettings().getPipelineStatusUrl();
log.info("Running task " + taskId + " (job " + jobId + "), sequence number " + sequenceNumber
@@ -434,35 +442,60 @@ private <T extends IncrementalTask> void runAndUpdateTask(
// we want to obscure that has been retried??
// but this is how FW historically worked, so leaving it
taskState.clearRetryCount();

taskState.setMostRecentUpdateTime(Instant.now());
postRunUpdate = getDatastore().newTransaction();
} catch (ShardFailureException ex ) {
postRunUpdate = getDatastore().newTransaction();
retryState = handleShardFailure(postRunUpdate, jobState, taskState, ex);
retryType = RetryType.SHARD;
t = ex;
} catch (JobFailureException ex) {
postRunUpdate = getDatastore().newTransaction();
log.log(Level.WARNING,
"Shard " + taskState.getTaskId() + " triggered job failure", ex);
handleJobFailure(postRunUpdate, taskState, ex);
retryType = RetryType.JOB;
t = ex;
log.log(Level.WARNING, "Shard " + taskState.getTaskId() + " triggered job failure", ex);
} catch (RuntimeException ex) {
postRunUpdate = getDatastore().newTransaction();
retryState = handleSliceFailure(postRunUpdate, jobState, taskState, ex, false);
t = ex;
retryType = RetryType.SLICE;
} catch (Throwable ex) {
postRunUpdate = getDatastore().newTransaction();
t = ex;
retryType = RetryType.SLICE; // this was originally shard, but seems like a mistake
log.log(Level.WARNING, "Slice encountered an Error.");
retryState = handleShardFailure(postRunUpdate, jobState, taskState, new RuntimeException("Error", ex));
} finally {
try {
updateTask(postRunUpdate, jobState, taskState, retryState, true);
postRunUpdate.commit();
} catch (Throwable ex) {
log.severe("Failed to write end of slice for task: " + taskState.getTask());
// TODO(user): consider what to do here when this fail (though options are limited)
throw ex;

RetryType finalRetryType = retryType;
final RuntimeException toThrow;
if (t instanceof RuntimeException) {
toThrow = (RuntimeException) t;
} else {
toThrow = new RuntimeException(t);
}
rollbackIfActive(postRunUpdate);
RetryExecutor.call(FOREVER_RETRYER, () -> {
Transaction postRunUpdate = getDatastore().newTransaction();
try {
ShardRetryState<T> retryState = null;

switch (finalRetryType) {
case SLICE:
retryState = handleSliceFailure(postRunUpdate, jobState, taskState, toThrow, false);
break;
case SHARD:
retryState = handleShardFailure(postRunUpdate, jobState, taskState, toThrow);
break;
case JOB:
handleJobFailure(postRunUpdate, taskState, toThrow);
break;
}
updateTask(postRunUpdate, jobState, taskState, retryState, true);
postRunUpdate.commit();
} catch (Throwable ex) {
log.severe("Failed to write end of slice for task: " + taskState.getTask());
// TODO(user): consider what to do here when this fail (though options are limited)
throw ex;
} finally {
rollbackIfActive(postRunUpdate);
}
return null;
});

}

}

private <T extends IncrementalTask> ShardRetryState<T> handleSliceFailure(
@@ -532,9 +565,9 @@ private <T extends IncrementalTask> void updateTask(
taskState.getLockInfo().unlock();

@SuppressWarnings("rawtypes")
RetryerBuilder exceptionHandler = aggressiveRetry ? getRetryerBuilderAggressive() : getRetryerBuilder();
RetryerBuilder exceptionHandler = aggressiveRetry ? FOREVER_AGGRESSIVE_RETRYER : FOREVER_RETRYER;
// original code retries forever here?
RetryExecutor.call(exceptionHandler.withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)),
RetryExecutor.call(exceptionHandler,
callable(new Runnable() {
@Override
public void run() {
@@ -693,13 +726,16 @@ private void rollbackIfActive(Transaction tx) {
}

public void abortJob(ShardedJobRunId jobId) {
Transaction tx = datastore.newTransaction();
try {
changeJobStatus(tx, jobId, new Status(ABORTED));
tx.commit();
} finally {
rollbackIfActive(tx);
}
RetryExecutor.call(FOREVER_RETRYER, () -> {
Transaction tx = getDatastore().newTransaction();
try {
changeJobStatus(tx, jobId, new Status(ABORTED));
tx.commit();
} finally {
rollbackIfActive(tx);
}
return null;
});
}


@@ -718,7 +754,7 @@ public boolean cleanupJob(ShardedJobRunId jobId) {
}
final Key jobKey = ShardedJobStateImpl.ShardedJobSerializer.makeKey(datastore, jobId);

RetryExecutor.call(getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)), callable(() -> datastore.delete(jobKey)));
RetryExecutor.call(FOREVER_RETRYER, callable(() -> datastore.delete(jobKey)));
return true;
}
}
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ public Value<Void> run() {
addParentKeyToList(tx, toDelete, ShardRetryState.Serializer.makeKey(datastore, taskId));
}
RetryExecutor.call(
ShardedJobRunner.getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)),
ShardedJobRunner.FOREVER_RETRYER,
callable(() -> tx.delete(toDelete.toArray(new Key[toDelete.size()]))));

tx.commit();
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ public Value<Void> run() {
Datastore datastore = datastoreOptions.getService();

RetryExecutor.call(
ShardedJobRunner.getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)),
ShardedJobRunner.FOREVER_RETRYER,
callable(() -> {
Transaction tx = datastore.newTransaction();

0 comments on commit 6ade707

Please sign in to comment.