Skip to content

Commit

Permalink
Add consistent query support (#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Aug 13, 2021
1 parent 93424e2 commit 6fd74c6
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 23 deletions.
4 changes: 2 additions & 2 deletions docker/buildkite/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
- "8126:8126"

cadence:
image: ubercadence/server:latestRelease-auto-setup
image: ubercadence/server:master-auto-setup
deploy:
resources:
limits:
Expand Down Expand Up @@ -122,4 +122,4 @@ services:
- COVERALLS_REPO_TOKEN
volumes:
- "../../:/cadence-java-client"
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent
2 changes: 1 addition & 1 deletion src/main/idls
Submodule idls updated from b1a9b9 to 7d2d22
9 changes: 9 additions & 0 deletions src/main/java/com/uber/cadence/client/WorkflowStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.uber.cadence.client;

import com.uber.cadence.QueryConsistencyLevel;
import com.uber.cadence.QueryRejectCondition;
import com.uber.cadence.WorkflowExecution;
import java.lang.reflect.InvocationHandler;
Expand Down Expand Up @@ -170,6 +171,14 @@ <R> R query(
QueryRejectCondition queryRejectCondition,
Object... args);

<R> R query(
String queryType,
Class<R> resultClass,
Type resultType,
QueryRejectCondition queryRejectCondition,
QueryConsistencyLevel queryConsistencyLevel,
Object... args);

/** Request cancellation. */
void cancel();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/uber/cadence/internal/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class Version {
* support. This can be used for client capibility check, on Cadence server, for backward
* compatibility Format: MAJOR.MINOR.PATCH
*/
public static final String FEATURE_VERSION = "1.4.0";
public static final String FEATURE_VERSION = "1.5.0";

This comment has been minimized.

Copy link
@WellingR

WellingR Sep 30, 2021

I updated the cadence library version today and ran into the error message
"clientVersionNotSupportedError:ClientVersionNotSupportedError(featureVersion:1.5.0, clientImpl:uber-java, supportedVersions:<=1.4.0))"
It appears that we need to update our cadence server before we can use the latest java library.

It would have been nice if this was mentioned in the release notes

This comment has been minimized.

Copy link
@longquanzheng

longquanzheng Oct 1, 2021

Contributor

That’s a good call out. I updated the release note. We will make sure to add that in the future. Thanks! https://github.com/uber/cadence-java-client/releases/tag/v3.2.0


static {
// Load version from version.properties generated by Gradle into build/resources/main directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete
query.setQueryType(queryParameters.getQueryType());
request.setQuery(query);
request.setQueryRejectCondition(queryParameters.getQueryRejectCondition());
request.setQueryConsistencyLevel(queryParameters.getQueryConsistencyLevel());
try {
QueryWorkflowResponse response =
RpcRetryer.retryWithResult(
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/com/uber/cadence/internal/replay/Decider.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.uber.cadence.Decision;
import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.WorkflowQuery;
import com.uber.cadence.WorkflowQueryResult;
import java.util.List;
import java.util.Map;

public interface Decider {

Expand All @@ -32,17 +34,26 @@ public interface Decider {

class DecisionResult {
private final List<Decision> decisions;
private final Map<String, WorkflowQueryResult> queryResults;
private final boolean forceCreateNewDecisionTask;

public DecisionResult(List<Decision> decisions, boolean forceCreateNewDecisionTask) {
public DecisionResult(
List<Decision> decisions,
Map<String, WorkflowQueryResult> queryResults,
boolean forceCreateNewDecisionTask) {
this.decisions = decisions;
this.queryResults = queryResults;
this.forceCreateNewDecisionTask = forceCreateNewDecisionTask;
}

public List<Decision> getDecisions() {
return decisions;
}

public Map<String, WorkflowQueryResult> getQueryResults() {
return queryResults;
}

public boolean getForceCreateNewDecisionTask() {
return forceCreateNewDecisionTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.uber.cadence.internal.replay;

import com.uber.cadence.QueryConsistencyLevel;
import com.uber.cadence.QueryRejectCondition;
import java.nio.charset.StandardCharsets;

Expand All @@ -32,6 +33,8 @@ public class QueryWorkflowParameters implements Cloneable {

private QueryRejectCondition queryRejectCondition;

private QueryConsistencyLevel queryConsistencyLevel;

public QueryWorkflowParameters() {}

public byte[] getInput() {
Expand Down Expand Up @@ -100,6 +103,20 @@ public QueryWorkflowParameters withQueryRejectCondition(
return this;
}

public QueryConsistencyLevel getQueryConsistencyLevel() {
return queryConsistencyLevel;
}

public void setQueryConsistencyLevel(QueryConsistencyLevel queryConsistencyLevel) {
this.queryConsistencyLevel = queryConsistencyLevel;
}

public QueryWorkflowParameters withQueryConsistencyLevel(
QueryConsistencyLevel queryConsistencyLevel) {
this.queryConsistencyLevel = queryConsistencyLevel;
return this;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand All @@ -109,6 +126,7 @@ public String toString() {
sb.append("WorkflowId: " + workflowId + ", ");
sb.append("RunId: " + runId + ", ");
sb.append("QueryRejectCondition: " + queryRejectCondition + ", ");
sb.append("queryConsistencyLevel: " + queryConsistencyLevel + ", ");
sb.append("}");
return sb.toString();
}
Expand All @@ -120,6 +138,7 @@ public QueryWorkflowParameters copy() {
result.setQueryType(queryType);
result.setWorkflowId(workflowId);
result.setQueryRejectCondition(queryRejectCondition);
result.setQueryConsistencyLevel(queryConsistencyLevel);
return result;
}
}
35 changes: 33 additions & 2 deletions src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.QueryResultType;
import com.uber.cadence.TimerFiredEventAttributes;
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
import com.uber.cadence.WorkflowQuery;
import com.uber.cadence.WorkflowQueryResult;
import com.uber.cadence.WorkflowType;
import com.uber.cadence.common.RetryOptions;
import com.uber.cadence.internal.common.OptionsUtils;
Expand All @@ -51,6 +53,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
Expand All @@ -59,6 +62,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -387,13 +391,40 @@ private void handleWorkflowExecutionSignaled(HistoryEvent event) {
public DecisionResult decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
lock.lock();
try {
boolean forceCreateNewDecisionTask = decideImpl(decisionTask, null);
return new DecisionResult(decisionsHelper.getDecisions(), forceCreateNewDecisionTask);
AtomicReference<Map<String, WorkflowQueryResult>> queryResults = new AtomicReference<>();
boolean forceCreateNewDecisionTask =
decideImpl(
decisionTask, () -> queryResults.set(getQueryResults(decisionTask.getQueries())));
return new DecisionResult(
decisionsHelper.getDecisions(), queryResults.get(), forceCreateNewDecisionTask);
} finally {
lock.unlock();
}
}

private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQuery> queries) {
if (queries == null) {
return null;
}

return queries
.entrySet()
.stream()
.collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
}

private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
try {
return new WorkflowQueryResult()
.setResultType(QueryResultType.ANSWERED)
.setAnswer(workflow.query(query));
} catch (Throwable e) {
return new WorkflowQueryResult()
.setResultType(QueryResultType.FAILED)
.setErrorMessage(e.getMessage());
}
}

// Returns boolean to indicate whether we need to force create new decision task for local
// activity heartbeating.
private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ private Result createCompletedRequest(
new RespondDecisionTaskCompletedRequest();
completedRequest.setTaskToken(decisionTask.getTaskToken());
completedRequest.setDecisions(result.getDecisions());
completedRequest.setQueryResults(result.getQueryResults());
completedRequest.setForceCreateNewDecisionTask(result.getForceCreateNewDecisionTask());

if (stickyTaskListName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.uber.cadence.PollForActivityTaskResponse;
import com.uber.cadence.PollForDecisionTaskRequest;
import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.QueryConsistencyLevel;
import com.uber.cadence.QueryFailedError;
import com.uber.cadence.QueryRejectCondition;
import com.uber.cadence.QueryWorkflowRequest;
Expand Down Expand Up @@ -990,6 +991,18 @@ public <R> R query(
return next.query(queryType, resultClass, resultType, queryRejectCondition, args);
}

@Override
public <R> R query(
String queryType,
Class<R> resultClass,
Type resultType,
QueryRejectCondition queryRejectCondition,
QueryConsistencyLevel queryConsistencyLevel,
Object... args) {
return next.query(
queryType, resultClass, resultType, queryRejectCondition, queryConsistencyLevel, args);
}

@Override
public void cancel() {
next.cancel();
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.uber.cadence.EntityNotExistsError;
import com.uber.cadence.InternalServiceError;
import com.uber.cadence.QueryConsistencyLevel;
import com.uber.cadence.QueryFailedError;
import com.uber.cadence.QueryRejectCondition;
import com.uber.cadence.QueryWorkflowResponse;
Expand Down Expand Up @@ -429,12 +430,30 @@ public <R> R query(
Type resultType,
QueryRejectCondition queryRejectCondition,
Object... args) {
return query(
queryType,
resultClass,
resultType,
queryRejectCondition,
QueryConsistencyLevel.EVENTUAL,
args);
}

@Override
public <R> R query(
String queryType,
Class<R> resultClass,
Type resultType,
QueryRejectCondition queryRejectCondition,
QueryConsistencyLevel queryConsistencyLevel,
Object... args) {
checkStarted();
QueryWorkflowParameters p = new QueryWorkflowParameters();
p.setInput(dataConverter.toData(args));
p.setQueryType(queryType);
p.setWorkflowId(execution.get().getWorkflowId());
p.setQueryRejectCondition(queryRejectCondition);
p.setQueryConsistencyLevel(queryConsistencyLevel);

QueryWorkflowResponse result;
try {
Expand Down
Loading

0 comments on commit 6fd74c6

Please sign in to comment.