diff --git a/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java b/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java index 9eb15b5cf..026a500c4 100644 --- a/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java @@ -17,7 +17,6 @@ package com.uber.cadence.workflow; -import com.google.common.base.Preconditions; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.activity.Activity; import com.uber.cadence.activity.ActivityMethod; @@ -27,6 +26,7 @@ import com.uber.cadence.testUtils.CadenceTestRule; import java.time.Duration; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import org.junit.Rule; import org.junit.Test; @@ -48,6 +48,9 @@ public interface ManualCompletionActivities { @ActivityMethod String asyncActivity(); + @ActivityMethod + void reset(); + @ActivityMethod void completeAsyncActivity(String result); @@ -68,53 +71,56 @@ public interface ManualCompletionActivities { } private class ManualCompletionActivitiesImpl implements ManualCompletionActivities { - private ActivityTask openTask; + private CompletableFuture openTask = new CompletableFuture<>(); @Override public synchronized String asyncActivity() { - openTask = Activity.getTask(); + openTask.complete(Activity.getTask()); Activity.doNotCompleteOnReturn(); return null; } + @Override + public synchronized void reset() { + openTask = new CompletableFuture<>(); + } + @Override public synchronized void completeAsyncActivity(String details) { - Preconditions.checkState(openTask != null); - getClient().complete(openTask.getTaskToken(), details); + getClient().complete(openTask.join().getTaskToken(), details); } @Override public synchronized void completeAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); - getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details); + getClient().complete(getCurrentWorkflow(), openTask.join().getActivityId(), details); } @Override public synchronized void failAsyncActivity(String details) { - Preconditions.checkState(openTask != null); getClient() - .completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details)); + .completeExceptionally( + openTask.join().getTaskToken(), new ExceptionWithDetaills(details)); } @Override public synchronized void failAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); getClient() .completeExceptionally( - getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details)); + getCurrentWorkflow(), + openTask.join().getActivityId(), + new ExceptionWithDetaills(details)); } @Override public synchronized void cancelAsyncActivity(String details) { - Preconditions.checkState(openTask != null); - getClient().reportCancellation(openTask.getTaskToken(), details); + getClient().reportCancellation(openTask.join().getTaskToken(), details); } @Override public synchronized void cancelAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); - getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details); + getClient() + .reportCancellation(getCurrentWorkflow(), openTask.join().getActivityId(), details); } private WorkflowExecution getCurrentWorkflow() { @@ -146,21 +152,29 @@ public void run() { expectSuccess("1", result); expectFailure(() -> activities.completeAsyncActivity("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.completeAsyncActivityById("2"); expectSuccess("2", result); expectFailure(() -> activities.completeAsyncActivityById("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.failAsyncActivity("3"); expectFailureWithDetails(result, "3"); expectFailure(() -> activities.failAsyncActivity("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.failAsyncActivityById("4"); expectFailureWithDetails(result, "4"); expectFailure(() -> activities.failAsyncActivityById("again")); + activities.reset(); + // Need to request cancellation, then the activity can respond with the cancel CompletablePromise completablePromise = Workflow.newPromise(); CancellationScope scope = @@ -178,6 +192,8 @@ public void run() { activities.cancelAsyncActivity("5"); expectCancelled(result); + activities.reset(); + // Need to request cancellation, then the activity can respond with the cancel CompletablePromise completablePromise2 = Workflow.newPromise(); scope =