From 6ade707272ead52c32bad768c7efccc91e3d61bb Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Date: Fri, 31 Jan 2025 14:03:04 -0800 Subject: [PATCH] Perform retries on runTask/updateTask --- .../impl/shardedjob/ShardedJobRunner.java | 212 ++++++++++-------- .../pipeline/DeleteShardsInfos.java | 2 +- .../pipeline/FinalizeShardsInfos.java | 2 +- 3 files changed, 126 insertions(+), 90 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java index deb09658..a8398b25 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java @@ -4,7 +4,6 @@ import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.StopStrategies; -import com.github.rholder.retry.WaitStrategies; import com.google.appengine.api.taskqueue.QueueFactory; import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TransactionalTaskException; @@ -36,10 +35,10 @@ import javax.inject.Provider; import java.time.Instant; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.stream.Stream; +import static com.google.appengine.tools.mapreduce.RetryUtils.SYMBOLIC_FOREVER; import static com.google.appengine.tools.mapreduce.impl.shardedjob.Status.StatusCode.*; import static java.util.concurrent.Executors.callable; @@ -79,28 +78,24 @@ public class ShardedJobRunner implements ShardedJobHandler { // Each task also checks the job state entity to detect if the job has been // aborted or deleted, and terminates if so. - // NOTE: no StopStrategy set, must be set by the caller prior to build - public static RetryerBuilder getRetryerBuilder() { - return RetryerBuilder.newBuilder() - .withWaitStrategy(RetryUtils.defaultWaitStrategy()) - .retryIfException(RetryUtils.handleDatastoreExceptionRetry()) - .retryIfExceptionOfType(ApiProxyException.class) - .retryIfExceptionOfType(ConcurrentModificationException.class) // don't think this is thrown by new datastore lib - .retryIfExceptionOfType(TransientFailureException.class) - .retryIfExceptionOfType(TransactionalTaskException.class) - .withRetryListener(RetryUtils.logRetry(log, ShardedJobRunner.class.getName())); - } + public static final RetryerBuilder RETRYER_UNDEFINED_STOP = RetryerBuilder.newBuilder() + .withWaitStrategy(RetryUtils.defaultWaitStrategy()) + .retryIfException(RetryUtils.handleDatastoreExceptionRetry()) + .retryIfExceptionOfType(ApiProxyException.class) + .retryIfExceptionOfType(ConcurrentModificationException.class) // don't think this is thrown by new datastore lib + .retryIfExceptionOfType(TransientFailureException.class) + .retryIfExceptionOfType(TransactionalTaskException.class) + .withRetryListener(RetryUtils.logRetry(log, ShardedJobRunner.class.getName()));; + + public static final RetryerBuilder AGGRESSIVE_RETRYER_UNDEFINED_STOP = RETRYER_UNDEFINED_STOP.retryIfException(e -> + !(e instanceof RequestTooLargeException + || e instanceof ResponseTooLargeException + || e instanceof ArgumentException + || e instanceof DeadlineExceededException)); + + public static final RetryerBuilder FOREVER_RETRYER = RETRYER_UNDEFINED_STOP.withStopStrategy(StopStrategies.stopAfterAttempt(SYMBOLIC_FOREVER)); + public static final RetryerBuilder FOREVER_AGGRESSIVE_RETRYER = AGGRESSIVE_RETRYER_UNDEFINED_STOP.withStopStrategy(StopStrategies.stopAfterAttempt(SYMBOLIC_FOREVER)); - // NOTE: no StopStrategy set, must be set by the caller prior to build - public static RetryerBuilder getRetryerBuilderAggressive() { - return getRetryerBuilder() - .retryIfException(e -> - !(e instanceof RequestTooLargeException - || e instanceof ResponseTooLargeException - || e instanceof ArgumentException - || e instanceof DeadlineExceededException)) - .withRetryListener(RetryUtils.logRetry(log, ShardedJobRunner.class.getName())); - } public List> lookupTasks( final ShardedJobRunId jobId, final int taskCount, final boolean lenient) { @@ -211,7 +206,7 @@ public void completeShard(@NonNull final ShardedJobRunId jobId, @NonNull final I PipelineService pipelineService = pipelineServiceProvider.get(); //below seems to FAIL bc of transaction connection - why!?!? - ShardedJobStateImpl jobState = RetryExecutor.call(getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)), () -> { + ShardedJobStateImpl jobState = RetryExecutor.call(FOREVER_RETRYER, () -> { Transaction tx = getDatastore().newTransaction(); try { ShardedJobStateImpl jobState1 = lookupJobState(tx, jobId); @@ -365,45 +360,56 @@ private boolean lockShard(Transaction tx, @Override public void runTask(final ShardedJobRunId jobId, final IncrementalTaskId taskId, final int sequenceNumber) { //acquire lock (allows this process to START potentially long-running work of task itself) - Transaction lockAcquisition = getDatastore().newTransaction(); - try { - final ShardedJobStateImpl jobState = lookupJobState(lockAcquisition, jobId); + RetryExecutor.call(FOREVER_RETRYER, () -> { + Transaction lockAcquisition = getDatastore().newTransaction(); + try { + final ShardedJobStateImpl jobState = lookupJobState(lockAcquisition, jobId); - if (jobState == null) { - log.info(taskId + ": Job is gone, ignoring runTask call."); - return; - } + if (jobState == null) { + log.info(taskId + ": Job is gone, ignoring runTask call."); + return null; + } - //taskState represents attempt of executing a slice of a shard of a sharded job - IncrementalTaskState taskState = - getAndValidateTaskState(lockAcquisition, taskId, sequenceNumber, jobState); - if (taskState == null) { - // some sort of error code happened + //taskState represents attempt of executing a slice of a shard of a sharded job + IncrementalTaskState taskState = + getAndValidateTaskState(lockAcquisition, taskId, sequenceNumber, jobState); + if (taskState == null) { + // some sort of error code happened + + // seems like getAndValidationTaskState has potential side-effects, which need to be committed + lockAcquisition.commit(); + return null; + } - // seems like getAndValidationTaskState has potential side-effects, which need to be committed - lockAcquisition.commit(); - return; + if (lockShard(lockAcquisition, taskState)) { + // committing here, which forces acquisition of lock ... + lockAcquisition.commit(); + + // actual task execution + runAndUpdateTask(jobState.getShardedJobId(), taskId, sequenceNumber, jobState, taskState); + } else { + log.warning("Failed to acquire the lock, Will reschedule task for: " + taskState.getJobId() + + " on slice " + taskState.getSequenceNumber()); + long eta = System.currentTimeMillis() + new Random().nextInt(5000) + 5000; + scheduleWorkerTask(jobState.getSettings(), taskState, eta); + } + } catch (ConcurrentModificationException ex) { + // don't believe this is possible with new datastore lib + throw new IllegalStateException("Concurrent modification exception should not happen here", ex); + } finally { + rollbackIfActive(lockAcquisition); } + return null; + }); - if (lockShard(lockAcquisition, taskState)) { - // committing here, which forces acquisition of lock ... - lockAcquisition.commit(); + } - // actual task execution - runAndUpdateTask(jobState.getShardedJobId(), taskId, sequenceNumber, jobState, taskState); - } else { - log.warning("Failed to acquire the lock, Will reschedule task for: " + taskState.getJobId() - + " on slice " + taskState.getSequenceNumber()); - long eta = System.currentTimeMillis() + new Random().nextInt(5000) + 5000; - scheduleWorkerTask(jobState.getSettings(), taskState, eta); - } - } catch (ConcurrentModificationException ex) { - // don't believe this is possible with new datastore lib - throw new IllegalStateException("Concurrent modification exception should not happen here", ex); - } finally { - rollbackIfActive(lockAcquisition); - } + private enum RetryType { + SLICE, + SHARD, + JOB, + NONE } //actual incremental task execution ( run() method ) @@ -413,8 +419,10 @@ private void runAndUpdateTask( final int sequenceNumber, final ShardedJobStateImpl jobState, IncrementalTaskState taskState) { - ShardRetryState retryState = null; - Transaction postRunUpdate = null; //txn limited to 60s, so can't open this before run() call + + RetryType retryType = RetryType.NONE; + Throwable t = null; + //txn limited to 60s, so can't open this before run() call try { String statusUrl = jobState.getSettings().getPipelineStatusUrl(); log.info("Running task " + taskId + " (job " + jobId + "), sequence number " + sequenceNumber @@ -434,35 +442,60 @@ private void runAndUpdateTask( // we want to obscure that has been retried?? // but this is how FW historically worked, so leaving it taskState.clearRetryCount(); - taskState.setMostRecentUpdateTime(Instant.now()); - postRunUpdate = getDatastore().newTransaction(); } catch (ShardFailureException ex ) { - postRunUpdate = getDatastore().newTransaction(); - retryState = handleShardFailure(postRunUpdate, jobState, taskState, ex); + retryType = RetryType.SHARD; + t = ex; } catch (JobFailureException ex) { - postRunUpdate = getDatastore().newTransaction(); - log.log(Level.WARNING, - "Shard " + taskState.getTaskId() + " triggered job failure", ex); - handleJobFailure(postRunUpdate, taskState, ex); + retryType = RetryType.JOB; + t = ex; + log.log(Level.WARNING, "Shard " + taskState.getTaskId() + " triggered job failure", ex); } catch (RuntimeException ex) { - postRunUpdate = getDatastore().newTransaction(); - retryState = handleSliceFailure(postRunUpdate, jobState, taskState, ex, false); + t = ex; + retryType = RetryType.SLICE; } catch (Throwable ex) { - postRunUpdate = getDatastore().newTransaction(); + t = ex; + retryType = RetryType.SLICE; // this was originally shard, but seems like a mistake log.log(Level.WARNING, "Slice encountered an Error."); - retryState = handleShardFailure(postRunUpdate, jobState, taskState, new RuntimeException("Error", ex)); } finally { - try { - updateTask(postRunUpdate, jobState, taskState, retryState, true); - postRunUpdate.commit(); - } catch (Throwable ex) { - log.severe("Failed to write end of slice for task: " + taskState.getTask()); - // TODO(user): consider what to do here when this fail (though options are limited) - throw ex; + + RetryType finalRetryType = retryType; + final RuntimeException toThrow; + if (t instanceof RuntimeException) { + toThrow = (RuntimeException) t; + } else { + toThrow = new RuntimeException(t); } - rollbackIfActive(postRunUpdate); + RetryExecutor.call(FOREVER_RETRYER, () -> { + Transaction postRunUpdate = getDatastore().newTransaction(); + try { + ShardRetryState retryState = null; + + switch (finalRetryType) { + case SLICE: + retryState = handleSliceFailure(postRunUpdate, jobState, taskState, toThrow, false); + break; + case SHARD: + retryState = handleShardFailure(postRunUpdate, jobState, taskState, toThrow); + break; + case JOB: + handleJobFailure(postRunUpdate, taskState, toThrow); + break; + } + updateTask(postRunUpdate, jobState, taskState, retryState, true); + postRunUpdate.commit(); + } catch (Throwable ex) { + log.severe("Failed to write end of slice for task: " + taskState.getTask()); + // TODO(user): consider what to do here when this fail (though options are limited) + throw ex; + } finally { + rollbackIfActive(postRunUpdate); + } + return null; + }); + } + } private ShardRetryState handleSliceFailure( @@ -532,9 +565,9 @@ private void updateTask( taskState.getLockInfo().unlock(); @SuppressWarnings("rawtypes") - RetryerBuilder exceptionHandler = aggressiveRetry ? getRetryerBuilderAggressive() : getRetryerBuilder(); + RetryerBuilder exceptionHandler = aggressiveRetry ? FOREVER_AGGRESSIVE_RETRYER : FOREVER_RETRYER; // original code retries forever here? - RetryExecutor.call(exceptionHandler.withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)), + RetryExecutor.call(exceptionHandler, callable(new Runnable() { @Override public void run() { @@ -693,13 +726,16 @@ private void rollbackIfActive(Transaction tx) { } public void abortJob(ShardedJobRunId jobId) { - Transaction tx = datastore.newTransaction(); - try { - changeJobStatus(tx, jobId, new Status(ABORTED)); - tx.commit(); - } finally { - rollbackIfActive(tx); - } + RetryExecutor.call(FOREVER_RETRYER, () -> { + Transaction tx = getDatastore().newTransaction(); + try { + changeJobStatus(tx, jobId, new Status(ABORTED)); + tx.commit(); + } finally { + rollbackIfActive(tx); + } + return null; + }); } @@ -718,7 +754,7 @@ public boolean cleanupJob(ShardedJobRunId jobId) { } final Key jobKey = ShardedJobStateImpl.ShardedJobSerializer.makeKey(datastore, jobId); - RetryExecutor.call(getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)), callable(() -> datastore.delete(jobKey))); + RetryExecutor.call(FOREVER_RETRYER, callable(() -> datastore.delete(jobKey))); return true; } } diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java index 92380cc5..c1780b05 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/DeleteShardsInfos.java @@ -51,7 +51,7 @@ public Value run() { addParentKeyToList(tx, toDelete, ShardRetryState.Serializer.makeKey(datastore, taskId)); } RetryExecutor.call( - ShardedJobRunner.getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)), + ShardedJobRunner.FOREVER_RETRYER, callable(() -> tx.delete(toDelete.toArray(new Key[toDelete.size()])))); tx.commit(); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java index ee1ba3f5..ccd4cd07 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/pipeline/FinalizeShardsInfos.java @@ -33,7 +33,7 @@ public Value run() { Datastore datastore = datastoreOptions.getService(); RetryExecutor.call( - ShardedJobRunner.getRetryerBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(RetryUtils.SYMBOLIC_FOREVER)), + ShardedJobRunner.FOREVER_RETRYER, callable(() -> { Transaction tx = datastore.newTransaction();