Skip to content

Commit

Permalink
Merge pull request #34 from Worklytics/s192-handle-out-of-sync-tasks
Browse files Browse the repository at this point in the history
S192 handle out of sync task states
  • Loading branch information
jlorper authored Feb 1, 2025
2 parents 6e49154 + c6cca6f commit ae9a1a1
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/test-java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on: [push]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
package: [ "com.google.appengine.tools.cloudtasktest", "com.google.appengine.tools.mapreduce", "com.google.appengine.tools.pipeline" ]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
Expand All @@ -16,9 +19,9 @@ jobs:
run: |
cd java/
mvn clean compile
- name: Build with Maven
- name: Run Tests for Package ${{ matrix.package }}
env:
APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY: ${{ secrets.APPENGINE_MAPREDUCE_CI_SERVICE_ACCOUNT_KEY }}
run: |
cd java/
mvn test
mvn test -Dtest=${{ matrix.package }}.**
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;

import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
Expand All @@ -21,7 +22,9 @@ public class RetryUtils {

public static WaitStrategy defaultWaitStrategy() {
return WaitStrategies.join(
WaitStrategies.exponentialWait(1_000,30_000, TimeUnit.MILLISECONDS)
WaitStrategies.randomWait(200, TimeUnit.MILLISECONDS),
// before we had 30s max, seems too high
WaitStrategies.exponentialWait(1_000, 10_000, TimeUnit.MILLISECONDS)
);
}

Expand All @@ -31,9 +34,9 @@ public static RetryListener logRetry(final Logger log, String className) {
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.getAttemptNumber() > 1 || attempt.hasException()) {
if (attempt.hasException()) {
log.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()), attempt.getExceptionCause());
log.log(Level.WARNING, "%s, Attempt #%d. Retrying...".formatted(className, attempt.getAttemptNumber()), attempt.getExceptionCause());
} else {
log.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d. No exception?".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()));
log.log(Level.WARNING, "%s, Attempt #%d OK, wait: %s".formatted(className, attempt.getAttemptNumber(), Duration.ofMillis(attempt.getDelaySinceFirstAttempt())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ private static RetryerBuilder baseRetryerBuilder() {
.withWaitStrategy(RetryUtils.defaultWaitStrategy())
.retryIfException(RetryUtils.handleDatastoreExceptionRetry())
.retryIfExceptionOfType(ApiProxyException.class)
.retryIfExceptionOfType(ConcurrentModificationException.class) // don't think this is thrown by new datastore lib
// don't think this is thrown by new datastore lib
// thrown by us if the task state is from the past
.retryIfExceptionOfType(ConcurrentModificationException.class)
.retryIfExceptionOfType(TransientFailureException.class)
.retryIfExceptionOfType(TransactionalTaskException.class)
.withRetryListener(RetryUtils.logRetry(log, ShardedJobRunner.class.getName()));
Expand Down Expand Up @@ -282,8 +284,10 @@ private <T extends IncrementalTask> IncrementalTaskState<T> getAndValidateTaskSt
log.info(taskId + ": Task sequence number " + sequenceNumber + " already completed: "
+ taskState);
} else {
//q : throw here??
log.severe(taskId + " sequenceNumber=" + sequenceNumber + " : Task state is from the past: " + taskState);
// presumably we are reading an old state, maybe being updated concurrently?
// we should not proceed with this state, throw an ConcurrentModificationException to force retry
throw new ConcurrentModificationException("Task state is from the past: " + taskState);
}
return null;
}
Expand Down Expand Up @@ -396,9 +400,6 @@ public void runTask(final ShardedJobRunId jobId, final IncrementalTaskId taskId,
long eta = System.currentTimeMillis() + new Random().nextInt(5000) + 5000;
scheduleWorkerTask(jobState.getSettings(), taskState, eta);
}
} catch (ConcurrentModificationException ex) {
// don't believe this is possible with new datastore lib
throw new IllegalStateException("Concurrent modification exception should not happen here", ex);
} finally {
rollbackIfActive(lockAcquisition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import lombok.extern.java.Log;

import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -97,9 +98,9 @@ public <V> void onRetry(Attempt<V> attempt) {
if (attempt.getAttemptNumber() > 1 || attempt.hasException()) {
String className = AppEngineBackEnd.class.getName();
if (attempt.hasException()) {
log.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()), attempt.getExceptionCause());
log.log(Level.WARNING, "%s, Attempt #%d. Retrying...".formatted(className, attempt.getAttemptNumber()), attempt.getExceptionCause());
} else {
log.log(Level.WARNING, "%s, Retry attempt: %d, wait: %d. No exception?".formatted(className, attempt.getAttemptNumber(), attempt.getDelaySinceFirstAttempt()));
log.log(Level.WARNING, "%s, Attempt #%d OK, wait: %s".formatted(className, attempt.getAttemptNumber(), Duration.ofMillis(attempt.getDelaySinceFirstAttempt())));
}
}
}
Expand Down

0 comments on commit ae9a1a1

Please sign in to comment.