Skip to content

Commit

Permalink
Add memo and search attributes to child workflow (#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
spmistry authored Sep 30, 2021
1 parent 9898a41 commit 7e385ac
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/main/java/com/uber/cadence/internal/common/InternalUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.HistoryEventFilterType;
import com.uber.cadence.Memo;
import com.uber.cadence.SearchAttributes;
import com.uber.cadence.TaskList;
import com.uber.cadence.TaskListKind;
Expand Down Expand Up @@ -142,6 +143,16 @@ public static Object getValueOrDefault(Object value, Class<?> valueClass) {
return Defaults.defaultValue(valueClass);
}

public static Memo convertMapToMemo(Map<String, Object> memo) {
DataConverter converter = JsonDataConverter.getInstance();
Map<String, ByteBuffer> mapOfByteBuffer = new HashMap<>();
memo.forEach(
(key, value) -> {
mapOfByteBuffer.put(key, ByteBuffer.wrap(converter.toData(value)));
});
return new Memo().setFields(mapOfByteBuffer);
}

public static SearchAttributes convertMapToSearchAttributes(
Map<String, Object> searchAttributes) {
DataConverter converter = JsonDataConverter.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public static final class Builder {

private String cronSchedule;

private Map<String, Object> memo;

private Map<String, Object> searchAttributes;

private Map<String, byte[]> context;

private ParentClosePolicy parentClosePolicy;
Expand Down Expand Up @@ -111,6 +115,16 @@ public Builder setCronSchedule(String cronSchedule) {
return this;
}

public Builder setMemo(Map<String, Object> memo) {
this.memo = memo;
return this;
}

public Builder setSearchAttributes(Map<String, Object> searchAttributes) {
this.searchAttributes = searchAttributes;
return this;
}

public Builder setContext(Map<String, byte[]> context) {
this.context = context;
return this;
Expand All @@ -134,6 +148,8 @@ public StartChildWorkflowExecutionParameters build() {
workflowIdReusePolicy,
retryParameters,
cronSchedule,
memo,
searchAttributes,
context,
parentClosePolicy);
}
Expand Down Expand Up @@ -161,6 +177,10 @@ public StartChildWorkflowExecutionParameters build() {

private final String cronSchedule;

private Map<String, Object> memo;

private Map<String, Object> searchAttributes;

private Map<String, byte[]> context;

private final ParentClosePolicy parentClosePolicy;
Expand All @@ -177,6 +197,8 @@ private StartChildWorkflowExecutionParameters(
WorkflowIdReusePolicy workflowIdReusePolicy,
RetryParameters retryParameters,
String cronSchedule,
Map<String, Object> memo,
Map<String, Object> searchAttributes,
Map<String, byte[]> context,
ParentClosePolicy parentClosePolicy) {
this.domain = domain;
Expand All @@ -190,6 +212,8 @@ private StartChildWorkflowExecutionParameters(
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.retryParameters = retryParameters;
this.cronSchedule = cronSchedule;
this.memo = memo;
this.searchAttributes = searchAttributes;
this.context = context;
this.parentClosePolicy = parentClosePolicy;
}
Expand Down Expand Up @@ -238,6 +262,14 @@ public String getCronSchedule() {
return cronSchedule;
}

public Map<String, Object> getMemo() {
return memo;
}

public Map<String, Object> getSearchAttributes() {
return searchAttributes;
}

public Map<String, byte[]> getContext() {
return context;
}
Expand All @@ -262,6 +294,8 @@ public boolean equals(Object o) {
&& workflowIdReusePolicy == that.workflowIdReusePolicy
&& Objects.equals(retryParameters, that.retryParameters)
&& Objects.equals(cronSchedule, that.cronSchedule)
&& Objects.equals(memo, that.memo)
&& Objects.equals(searchAttributes, that.searchAttributes)
&& Objects.equals(context, that.context)
&& Objects.equals(parentClosePolicy, that.parentClosePolicy);
}
Expand All @@ -280,6 +314,8 @@ public int hashCode() {
workflowIdReusePolicy,
retryParameters,
cronSchedule,
memo,
searchAttributes,
context,
parentClosePolicy);
result = 31 * result + Arrays.hashCode(input);
Expand Down Expand Up @@ -315,6 +351,10 @@ public String toString() {
+ retryParameters
+ ", cronSchedule="
+ cronSchedule
+ ", memo="
+ memo
+ ", searchAttributes="
+ searchAttributes
+ ", context='"
+ context
+ ", parentClosePolicy="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.uber.cadence.TaskList;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowType;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.common.RetryParameters;
import com.uber.cadence.workflow.ChildWorkflowTerminatedException;
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
Expand Down Expand Up @@ -153,6 +154,17 @@ Consumer<Exception> startChildWorkflow(
attributes.setParentClosePolicy(parentClosePolicy);
}

Map<String, Object> memoMap = parameters.getMemo();
if (memoMap != null) {
attributes.setMemo(InternalUtils.convertMapToMemo(memoMap));
}

Map<String, Object> searchAttributesMap = parameters.getSearchAttributes();
if (searchAttributesMap != null) {
attributes.setSearchAttributes(
InternalUtils.convertMapToSearchAttributes(searchAttributesMap));
}

long initiatedEventId = decisions.startChildWorkflowExecution(attributes);
final OpenChildWorkflowRequestInfo context =
new OpenChildWorkflowRequestInfo(executionCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ private Promise<byte[]> executeChildWorkflow(
.setTaskStartToCloseTimeout(options.getTaskStartToCloseTimeout())
.setWorkflowId(options.getWorkflowId())
.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy())
.setMemo(options.getMemo())
.setSearchAttributes(options.getSearchAttributes())
.setParentClosePolicy(options.getParentClosePolicy())
.build();
return WorkflowRetryerInternal.retryAsync(
Expand Down Expand Up @@ -412,6 +414,8 @@ private Promise<byte[]> executeChildWorkflowOnce(
.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy())
.setRetryParameters(retryParameters)
.setCronSchedule(options.getCronSchedule())
.setMemo(options.getMemo())
.setSearchAttributes(options.getSearchAttributes())
.setContext(extractContextsAndConvertToBytes(propagators))
.setParentClosePolicy(options.getParentClosePolicy())
.build();
Expand Down

0 comments on commit 7e385ac

Please sign in to comment.