From 6fd74c613494421f4f3f2b646266b27f9e2fa2a9 Mon Sep 17 00:00:00 2001 From: Vytautas Date: Fri, 13 Aug 2021 23:43:10 +0300 Subject: [PATCH] Add consistent query support (#530) --- docker/buildkite/docker-compose.yaml | 4 +- src/main/idls | 2 +- .../com/uber/cadence/client/WorkflowStub.java | 9 +++ .../com/uber/cadence/internal/Version.java | 2 +- .../GenericWorkflowClientExternalImpl.java | 1 + .../uber/cadence/internal/replay/Decider.java | 13 +++- .../replay/QueryWorkflowParameters.java | 19 +++++ .../internal/replay/ReplayDecider.java | 35 ++++++++- .../replay/ReplayDecisionTaskHandler.java | 1 + .../sync/TestWorkflowEnvironmentInternal.java | 13 ++++ .../internal/sync/WorkflowStubImpl.java | 19 +++++ .../TestWorkflowMutableStateImpl.java | 72 +++++++++++++++---- .../sync/DeterministicRunnerTest.java | 2 +- .../uber/cadence/workflow/WorkflowTest.java | 54 ++++++++++++++ 14 files changed, 223 insertions(+), 23 deletions(-) diff --git a/docker/buildkite/docker-compose.yaml b/docker/buildkite/docker-compose.yaml index dced7add1..303f72c44 100644 --- a/docker/buildkite/docker-compose.yaml +++ b/docker/buildkite/docker-compose.yaml @@ -25,7 +25,7 @@ services: - "8126:8126" cadence: - image: ubercadence/server:latestRelease-auto-setup + image: ubercadence/server:master-auto-setup deploy: resources: limits: @@ -122,4 +122,4 @@ services: - COVERALLS_REPO_TOKEN volumes: - "../../:/cadence-java-client" - - /usr/bin/buildkite-agent:/usr/bin/buildkite-agent \ No newline at end of file + - /usr/bin/buildkite-agent:/usr/bin/buildkite-agent diff --git a/src/main/idls b/src/main/idls index b1a9b9ede..7d2d225f3 160000 --- a/src/main/idls +++ b/src/main/idls @@ -1 +1 @@ -Subproject commit b1a9b9ede5fbd29a5b67cb8e082f602488ce0446 +Subproject commit 7d2d225f3137d3105243ee5021b4d44565735d8a diff --git a/src/main/java/com/uber/cadence/client/WorkflowStub.java b/src/main/java/com/uber/cadence/client/WorkflowStub.java index 236b7bf4e..44933e714 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowStub.java +++ b/src/main/java/com/uber/cadence/client/WorkflowStub.java @@ -17,6 +17,7 @@ package com.uber.cadence.client; +import com.uber.cadence.QueryConsistencyLevel; import com.uber.cadence.QueryRejectCondition; import com.uber.cadence.WorkflowExecution; import java.lang.reflect.InvocationHandler; @@ -170,6 +171,14 @@ R query( QueryRejectCondition queryRejectCondition, Object... args); + R query( + String queryType, + Class resultClass, + Type resultType, + QueryRejectCondition queryRejectCondition, + QueryConsistencyLevel queryConsistencyLevel, + Object... args); + /** Request cancellation. */ void cancel(); diff --git a/src/main/java/com/uber/cadence/internal/Version.java b/src/main/java/com/uber/cadence/internal/Version.java index 6770e6f0a..2cd7da195 100644 --- a/src/main/java/com/uber/cadence/internal/Version.java +++ b/src/main/java/com/uber/cadence/internal/Version.java @@ -43,7 +43,7 @@ public class Version { * support. This can be used for client capibility check, on Cadence server, for backward * compatibility Format: MAJOR.MINOR.PATCH */ - public static final String FEATURE_VERSION = "1.4.0"; + public static final String FEATURE_VERSION = "1.5.0"; static { // Load version from version.properties generated by Gradle into build/resources/main directory. diff --git a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java index ebcfc67da..bbbc36279 100644 --- a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java @@ -418,6 +418,7 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete query.setQueryType(queryParameters.getQueryType()); request.setQuery(query); request.setQueryRejectCondition(queryParameters.getQueryRejectCondition()); + request.setQueryConsistencyLevel(queryParameters.getQueryConsistencyLevel()); try { QueryWorkflowResponse response = RpcRetryer.retryWithResult( diff --git a/src/main/java/com/uber/cadence/internal/replay/Decider.java b/src/main/java/com/uber/cadence/internal/replay/Decider.java index f535d27dd..b44733d30 100644 --- a/src/main/java/com/uber/cadence/internal/replay/Decider.java +++ b/src/main/java/com/uber/cadence/internal/replay/Decider.java @@ -20,7 +20,9 @@ import com.uber.cadence.Decision; import com.uber.cadence.PollForDecisionTaskResponse; import com.uber.cadence.WorkflowQuery; +import com.uber.cadence.WorkflowQueryResult; import java.util.List; +import java.util.Map; public interface Decider { @@ -32,10 +34,15 @@ public interface Decider { class DecisionResult { private final List decisions; + private final Map queryResults; private final boolean forceCreateNewDecisionTask; - public DecisionResult(List decisions, boolean forceCreateNewDecisionTask) { + public DecisionResult( + List decisions, + Map queryResults, + boolean forceCreateNewDecisionTask) { this.decisions = decisions; + this.queryResults = queryResults; this.forceCreateNewDecisionTask = forceCreateNewDecisionTask; } @@ -43,6 +50,10 @@ public List getDecisions() { return decisions; } + public Map getQueryResults() { + return queryResults; + } + public boolean getForceCreateNewDecisionTask() { return forceCreateNewDecisionTask; } diff --git a/src/main/java/com/uber/cadence/internal/replay/QueryWorkflowParameters.java b/src/main/java/com/uber/cadence/internal/replay/QueryWorkflowParameters.java index 39c8023bc..041ae53fa 100644 --- a/src/main/java/com/uber/cadence/internal/replay/QueryWorkflowParameters.java +++ b/src/main/java/com/uber/cadence/internal/replay/QueryWorkflowParameters.java @@ -17,6 +17,7 @@ package com.uber.cadence.internal.replay; +import com.uber.cadence.QueryConsistencyLevel; import com.uber.cadence.QueryRejectCondition; import java.nio.charset.StandardCharsets; @@ -32,6 +33,8 @@ public class QueryWorkflowParameters implements Cloneable { private QueryRejectCondition queryRejectCondition; + private QueryConsistencyLevel queryConsistencyLevel; + public QueryWorkflowParameters() {} public byte[] getInput() { @@ -100,6 +103,20 @@ public QueryWorkflowParameters withQueryRejectCondition( return this; } + public QueryConsistencyLevel getQueryConsistencyLevel() { + return queryConsistencyLevel; + } + + public void setQueryConsistencyLevel(QueryConsistencyLevel queryConsistencyLevel) { + this.queryConsistencyLevel = queryConsistencyLevel; + } + + public QueryWorkflowParameters withQueryConsistencyLevel( + QueryConsistencyLevel queryConsistencyLevel) { + this.queryConsistencyLevel = queryConsistencyLevel; + return this; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -109,6 +126,7 @@ public String toString() { sb.append("WorkflowId: " + workflowId + ", "); sb.append("RunId: " + runId + ", "); sb.append("QueryRejectCondition: " + queryRejectCondition + ", "); + sb.append("queryConsistencyLevel: " + queryConsistencyLevel + ", "); sb.append("}"); return sb.toString(); } @@ -120,6 +138,7 @@ public QueryWorkflowParameters copy() { result.setQueryType(queryType); result.setWorkflowId(workflowId); result.setQueryRejectCondition(queryRejectCondition); + result.setQueryConsistencyLevel(queryConsistencyLevel); return result; } } diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java index 34a805e9b..650322ba5 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java @@ -25,10 +25,12 @@ import com.uber.cadence.History; import com.uber.cadence.HistoryEvent; import com.uber.cadence.PollForDecisionTaskResponse; +import com.uber.cadence.QueryResultType; import com.uber.cadence.TimerFiredEventAttributes; import com.uber.cadence.WorkflowExecutionSignaledEventAttributes; import com.uber.cadence.WorkflowExecutionStartedEventAttributes; import com.uber.cadence.WorkflowQuery; +import com.uber.cadence.WorkflowQueryResult; import com.uber.cadence.WorkflowType; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.internal.common.OptionsUtils; @@ -51,6 +53,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; @@ -59,6 +62,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -387,13 +391,40 @@ private void handleWorkflowExecutionSignaled(HistoryEvent event) { public DecisionResult decide(PollForDecisionTaskResponse decisionTask) throws Throwable { lock.lock(); try { - boolean forceCreateNewDecisionTask = decideImpl(decisionTask, null); - return new DecisionResult(decisionsHelper.getDecisions(), forceCreateNewDecisionTask); + AtomicReference> queryResults = new AtomicReference<>(); + boolean forceCreateNewDecisionTask = + decideImpl( + decisionTask, () -> queryResults.set(getQueryResults(decisionTask.getQueries()))); + return new DecisionResult( + decisionsHelper.getDecisions(), queryResults.get(), forceCreateNewDecisionTask); } finally { lock.unlock(); } } + private Map getQueryResults(Map queries) { + if (queries == null) { + return null; + } + + return queries + .entrySet() + .stream() + .collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue()))); + } + + private WorkflowQueryResult queryWorkflow(WorkflowQuery query) { + try { + return new WorkflowQueryResult() + .setResultType(QueryResultType.ANSWERED) + .setAnswer(workflow.query(query)); + } catch (Throwable e) { + return new WorkflowQueryResult() + .setResultType(QueryResultType.FAILED) + .setErrorMessage(e.getMessage()); + } + } + // Returns boolean to indicate whether we need to force create new decision task for local // activity heartbeating. private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query) diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java index c1c6d9b05..2ffaaab41 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java @@ -245,6 +245,7 @@ private Result createCompletedRequest( new RespondDecisionTaskCompletedRequest(); completedRequest.setTaskToken(decisionTask.getTaskToken()); completedRequest.setDecisions(result.getDecisions()); + completedRequest.setQueryResults(result.getQueryResults()); completedRequest.setForceCreateNewDecisionTask(result.getForceCreateNewDecisionTask()); if (stickyTaskListName != null) { diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index e5f5cf8a0..3bbea6549 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -50,6 +50,7 @@ import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.PollForDecisionTaskRequest; import com.uber.cadence.PollForDecisionTaskResponse; +import com.uber.cadence.QueryConsistencyLevel; import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryRejectCondition; import com.uber.cadence.QueryWorkflowRequest; @@ -990,6 +991,18 @@ public R query( return next.query(queryType, resultClass, resultType, queryRejectCondition, args); } + @Override + public R query( + String queryType, + Class resultClass, + Type resultType, + QueryRejectCondition queryRejectCondition, + QueryConsistencyLevel queryConsistencyLevel, + Object... args) { + return next.query( + queryType, resultClass, resultType, queryRejectCondition, queryConsistencyLevel, args); + } + @Override public void cancel() { next.cancel(); diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index 978720a23..a25d8432a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -19,6 +19,7 @@ import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.InternalServiceError; +import com.uber.cadence.QueryConsistencyLevel; import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryRejectCondition; import com.uber.cadence.QueryWorkflowResponse; @@ -429,12 +430,30 @@ public R query( Type resultType, QueryRejectCondition queryRejectCondition, Object... args) { + return query( + queryType, + resultClass, + resultType, + queryRejectCondition, + QueryConsistencyLevel.EVENTUAL, + args); + } + + @Override + public R query( + String queryType, + Class resultClass, + Type resultType, + QueryRejectCondition queryRejectCondition, + QueryConsistencyLevel queryConsistencyLevel, + Object... args) { checkStarted(); QueryWorkflowParameters p = new QueryWorkflowParameters(); p.setInput(dataConverter.toData(args)); p.setQueryType(queryType); p.setWorkflowId(execution.get().getWorkflowId()); p.setQueryRejectCondition(queryRejectCondition); + p.setQueryConsistencyLevel(queryConsistencyLevel); QueryWorkflowResponse result; try { diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java index 7fd8e6294..672c2433c 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java @@ -49,9 +49,11 @@ import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.PollForDecisionTaskRequest; import com.uber.cadence.PollForDecisionTaskResponse; +import com.uber.cadence.QueryConsistencyLevel; import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryRejectCondition; import com.uber.cadence.QueryRejected; +import com.uber.cadence.QueryResultType; import com.uber.cadence.QueryTaskCompletedType; import com.uber.cadence.QueryWorkflowRequest; import com.uber.cadence.QueryWorkflowResponse; @@ -88,6 +90,8 @@ import com.uber.cadence.WorkflowExecutionCloseStatus; import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes; import com.uber.cadence.WorkflowExecutionSignaledEventAttributes; +import com.uber.cadence.WorkflowQuery; +import com.uber.cadence.WorkflowQueryResult; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.testservice.StateMachines.Action; import com.uber.cadence.internal.testservice.StateMachines.ActivityTaskData; @@ -159,6 +163,7 @@ void apply(RequestContext ctx) private final Map> queries = new ConcurrentHashMap<>(); private final Map queryRequests = new ConcurrentHashMap<>(); + private final Map pendingQueries = new ConcurrentHashMap<>(); private final Optional continuedExecutionRunId; public StickyExecutionAttributes stickyExecutionAttributes; @@ -230,6 +235,7 @@ private void update(boolean completeDecisionUpdate, UpdateProcedure updater, Str RequestContext ctx = new RequestContext(clock, this, nextEventId); updater.apply(ctx); + setPendingQueries(ctx); if (concurrentDecision && workflow.getState() != State.TIMED_OUT) { concurrentToDecision.add(ctx); ctx.fireCallbacks(0); @@ -250,6 +256,13 @@ private void update(boolean completeDecisionUpdate, UpdateProcedure updater, Str } } + private void setPendingQueries(RequestContext ctx) { + TestWorkflowStore.DecisionTask decisionTask = ctx.getDecisionTask(); + if (decisionTask != null) { + decisionTask.getTask().setQueries(new HashMap<>(pendingQueries)); + } + } + @Override public ExecutionId getExecutionId() { return executionId; @@ -317,6 +330,16 @@ public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRe List decisions = request.getDecisions(); completeDecisionUpdate( ctx -> { + if (request.getQueryResultsSize() > 0) { + request + .getQueryResults() + .forEach( + (queryId, queryResult) -> { + completeQuery(queryId, queryResult); + pendingQueries.remove(queryId); + }); + } + if (ctx.getInitialEventId() != historySize + 1) { throw new BadRequestError( "Expired decision: expectedHistorySize=" @@ -371,6 +394,19 @@ public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRe request.getStickyAttributes()); } + private void completeQuery(String queryId, WorkflowQueryResult queryResult) { + CompletableFuture future = queries.get(queryId); + if (future == null) { + throw new RuntimeException("Unknown query id: " + queryId); + } + if (queryResult.getResultType() == QueryResultType.ANSWERED) { + future.complete(new QueryWorkflowResponse().setQueryResult(queryResult.getAnswer())); + } else { + future.completeExceptionally( + new QueryFailedError().setMessage(queryResult.getErrorMessage())); + } + } + private boolean hasCompleteDecision(List decisions) { for (Decision d : decisions) { if (WorkflowExecutionUtils.isWorkflowExecutionCompleteDecision(d)) { @@ -1536,23 +1572,29 @@ public QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TEx } } - PollForDecisionTaskResponse task = - new PollForDecisionTaskResponse() - .setTaskToken(queryId.toBytes()) - .setWorkflowExecution(executionId.getExecution()) - .setWorkflowType(startRequest.getWorkflowType()) - .setQuery(queryRequest.getQuery()) - .setWorkflowExecutionTaskList(startRequest.getTaskList()); - TaskListId taskListId = - new TaskListId( - queryRequest.getDomain(), - stickyExecutionAttributes == null - ? startRequest.getTaskList().getName() - : stickyExecutionAttributes.getWorkerTaskList().getName()); CompletableFuture result = new CompletableFuture<>(); - queryRequests.put(queryId.getQueryId(), task); queries.put(queryId.getQueryId(), result); - store.sendQueryTask(executionId, taskListId, task); + + if (queryRequest.getQueryConsistencyLevel() == QueryConsistencyLevel.STRONG) { + pendingQueries.put(queryId.getQueryId(), queryRequest.getQuery()); + } else { + PollForDecisionTaskResponse task = + new PollForDecisionTaskResponse() + .setTaskToken(queryId.toBytes()) + .setWorkflowExecution(executionId.getExecution()) + .setWorkflowType(startRequest.getWorkflowType()) + .setQuery(queryRequest.getQuery()) + .setWorkflowExecutionTaskList(startRequest.getTaskList()); + TaskListId taskListId = + new TaskListId( + queryRequest.getDomain(), + stickyExecutionAttributes == null + ? startRequest.getTaskList().getName() + : stickyExecutionAttributes.getWorkerTaskList().getName()); + queryRequests.put(queryId.getQueryId(), task); + store.sendQueryTask(executionId, taskListId, task); + } + try { return result.get(); } catch (InterruptedException e) { diff --git a/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java b/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java index f45f0d7d6..34199fe3d 100644 --- a/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java +++ b/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java @@ -853,7 +853,7 @@ private static class DetermisiticRunnerContainerDecider implements Decider { @Override public DecisionResult decide(PollForDecisionTaskResponse decisionTask) throws Throwable { - return new DecisionResult(new ArrayList<>(), false); + return new DecisionResult(new ArrayList<>(), null, false); } @Override diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 7925d84e5..c54a8f408 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -38,6 +38,7 @@ import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.HistoryEvent; import com.uber.cadence.Memo; +import com.uber.cadence.QueryConsistencyLevel; import com.uber.cadence.QueryFailedError; import com.uber.cadence.QueryRejectCondition; import com.uber.cadence.SearchAttributes; @@ -2579,6 +2580,59 @@ public void testSignalWithStart() { workflowClient.newUntypedWorkflowStub(execution, Optional.empty()).getResult(String.class)); } + public static class TestConsistentQueryImpl implements QueryableWorkflow { + String state = "initial"; + + @Override + public String execute() { + TestActivities testActivities = + Workflow.newActivityStub(TestActivities.class, newActivityOptions2()); + + while (!state.equals("exit")) { + String oldState = state; + Workflow.await(() -> !Objects.equals(state, oldState)); + testActivities.activity(); + } + return ""; + } + + @Override + public String getState() { + return state; + } + + @Override + public void mySignal(String newState) { + log.info("TestConsistentQueryImpl.mySignal newState=" + newState); + state = newState; + } + } + + @Test + @Ignore("until version check is merged in server") + public void testConsistentQuery() throws Exception { + startWorkerFor(TestConsistentQueryImpl.class); + + String workflowType = QueryableWorkflow.class.getSimpleName() + "::execute"; + WorkflowOptions.Builder ob = newWorkflowOptionsBuilder(taskList); + WorkflowStub client = workflowClient.newUntypedWorkflowStub(workflowType, ob.build()); + + java.util.function.Function query = + (consistencyLevel) -> + client.query( + "QueryableWorkflow::getState", String.class, String.class, null, consistencyLevel); + + client.start(); + + client.signal("testSignal", "A"); + assertEquals("A", query.apply(QueryConsistencyLevel.STRONG)); + + client.signal("testSignal", "B"); + assertEquals("B", query.apply(QueryConsistencyLevel.STRONG)); + + client.signal("testSignal", "exit"); + } + public static class TestNoQueryWorkflowImpl implements QueryableWorkflow { CompletablePromise promise = Workflow.newPromise();