Skip to content

Commit

Permalink
Reconcile typed search attributes with schedules (#1848)
Browse files Browse the repository at this point in the history
Reconcile typed search attributes with schedules
  • Loading branch information
Quinn-With-Two-Ns authored Aug 28, 2023
1 parent c012357 commit 4298dad
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public <T> Builder setWorkflowType(Class<T> workflowInterface) {
* Set the workflow options to use when starting a workflow action.
*
* @note ID and TaskQueue are required. Some options like ID reuse policy, cron schedule, and
* start signal cannot be set or an error will occur.
* start signal cannot be set or an error will occur. Schedules requires the use of typed
* search attributes and untyped search attributes will be ignored.
*/
public Builder setOptions(WorkflowOptions options) {
this.options = options;
Expand All @@ -99,7 +100,10 @@ public Builder setArguments(Object... arguments) {

public ScheduleActionStartWorkflow build() {
return new ScheduleActionStartWorkflow(
workflowType, options, header == null ? Header.empty() : header, arguments);
workflowType,
options,
header == null ? Header.empty() : header,
arguments == null ? new EncodedValues() : arguments);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.client.schedules;

import io.temporal.api.common.v1.Payload;
import io.temporal.common.SearchAttributes;
import io.temporal.common.converter.DataConverter;
import java.lang.reflect.Type;
import java.util.List;
Expand All @@ -35,6 +36,7 @@ public final class ScheduleDescription {
private final ScheduleInfo info;
private final Schedule schedule;
private final Map<String, List<?>> searchAttributes;
private final SearchAttributes typedSearchAttributes;
private final Map<String, Payload> memo;
private final DataConverter dataConverter;

Expand All @@ -43,12 +45,14 @@ public ScheduleDescription(
ScheduleInfo info,
Schedule schedule,
Map<String, List<?>> searchAttributes,
SearchAttributes typedSearchAttributes,
Map<String, Payload> memo,
DataConverter dataConverter) {
this.id = id;
this.info = info;
this.schedule = schedule;
this.searchAttributes = searchAttributes;
this.typedSearchAttributes = typedSearchAttributes;
this.memo = memo;
this.dataConverter = dataConverter;
}
Expand Down Expand Up @@ -84,12 +88,23 @@ public ScheduleDescription(
* Gets the search attributes on the schedule.
*
* @return search attributes
* @deprecated use {@link ScheduleDescription#getTypedSearchAttributes} instead.
*/
@Nonnull
public Map<String, List<?>> getSearchAttributes() {
return searchAttributes;
}

/**
* Gets the search attributes on the schedule.
*
* @return search attributes
*/
@Nonnull
public SearchAttributes getTypedSearchAttributes() {
return typedSearchAttributes;
}

@Nullable
public <T> Object getMemo(String key, Class<T> valueClass) {
return getMemo(key, valueClass, valueClass);
Expand All @@ -113,12 +128,13 @@ public boolean equals(Object o) {
&& Objects.equals(info, that.info)
&& Objects.equals(schedule, that.schedule)
&& Objects.equals(searchAttributes, that.searchAttributes)
&& Objects.equals(typedSearchAttributes, that.typedSearchAttributes)
&& Objects.equals(memo, that.memo);
}

@Override
public int hashCode() {
return Objects.hash(id, info, schedule, searchAttributes, memo);
return Objects.hash(id, info, schedule, searchAttributes, typedSearchAttributes, memo);
}

@Override
Expand All @@ -133,6 +149,8 @@ public String toString() {
+ schedule
+ ", searchAttributes="
+ searchAttributes
+ ", typedSearchAttributes="
+ typedSearchAttributes
+ ", memo="
+ memo
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.client.schedules;

import io.temporal.common.SearchAttributes;
import java.util.List;
import java.util.Map;

Expand All @@ -38,6 +39,7 @@ public static final class Builder {
private List<ScheduleBackfill> backfills;
private Map<String, Object> memo;
private Map<String, ?> searchAttributes;
private SearchAttributes typedSearchAttributes;

private Builder() {}

Expand All @@ -49,6 +51,7 @@ private Builder(ScheduleOptions options) {
this.backfills = options.backfills;
this.memo = options.memo;
this.searchAttributes = options.searchAttributes;
this.typedSearchAttributes = options.typedSearchAttributes;
}

/** Set if the schedule will be triggered immediately upon creation. */
Expand All @@ -69,31 +72,45 @@ public Builder setMemo(Map<String, Object> memo) {
return this;
}

/** Set the search attributes for the schedule. */
/**
* Set the search attributes for the schedule.
*
* @deprecated use {@link ScheduleOptions.Builder#setTypedSearchAttributes} instead.
*/
public Builder setSearchAttributes(Map<String, ?> searchAttributes) {
this.searchAttributes = searchAttributes;
return this;
}

/** Set the search attributes for the schedule. */
public Builder setTypedSearchAttributes(SearchAttributes searchAttributes) {
this.typedSearchAttributes = searchAttributes;
return this;
}

public ScheduleOptions build() {
return new ScheduleOptions(triggerImmediately, backfills, memo, searchAttributes);
return new ScheduleOptions(
triggerImmediately, backfills, memo, searchAttributes, typedSearchAttributes);
}
}

private final boolean triggerImmediately;
private final List<ScheduleBackfill> backfills;
private final Map<String, Object> memo;
private final Map<String, ?> searchAttributes;
private final SearchAttributes typedSearchAttributes;

private ScheduleOptions(
boolean triggerImmediately,
List<ScheduleBackfill> backfills,
Map<String, Object> memo,
Map<String, ?> searchAttributes) {
Map<String, ?> searchAttributes,
SearchAttributes typedSearchAttributes) {
this.triggerImmediately = triggerImmediately;
this.backfills = backfills;
this.memo = memo;
this.searchAttributes = searchAttributes;
this.typedSearchAttributes = typedSearchAttributes;
}

/**
Expand Down Expand Up @@ -127,8 +144,18 @@ public Map<String, Object> getMemo() {
* Get the search attributes for the schedule.
*
* @return search attributes for the schedule
* @deprecated use {@link ScheduleOptions#getTypedSearchAttributes()} instead.
*/
public Map<String, ?> getSearchAttributes() {
return searchAttributes;
}

/**
* Get the search attributes for the schedule.
*
* @return search attributes for the schedule
*/
public SearchAttributes getTypedSearchAttributes() {
return typedSearchAttributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.common;

import com.google.common.reflect.TypeToken;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.enums.v1.IndexedValueType;
import java.lang.reflect.Type;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -73,6 +74,17 @@ public static SearchAttributeKey<List<String>> forKeywordList(String name) {
KEYWORD_LIST_REFLECT_TYPE);
}

/**
* Create a search attribute key for an untyped attribute type.
*
* <p>This should only be used when the server can return untyped search attributes, for example,
* when describing a schedule workflow action.
*/
public static SearchAttributeKey<Payload> forUntyped(String name) {
return new SearchAttributeKey<>(
name, IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED, Payload.class);
}

private final String name;
private final IndexedValueType valueType;
private final Class<? super T> valueClass;
Expand Down Expand Up @@ -115,11 +127,17 @@ public Type getValueReflectType() {

/** Create an update that sets a value for this key. */
public SearchAttributeUpdate<T> valueSet(@Nonnull T value) {
if (valueType == IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED) {
throw new IllegalStateException("untyped keys should not be used in workflows");
}
return SearchAttributeUpdate.valueSet(this, value);
}

/** Create an update that unsets a value for this key. */
public SearchAttributeUpdate<T> valueUnset() {
if (valueType == IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED) {
throw new IllegalStateException("untyped keys should not be used in workflows");
}
return SearchAttributeUpdate.valueUnset(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public RootScheduleClientInvoker(
}

@Override
@SuppressWarnings("deprecation")
public void createSchedule(CreateScheduleInput input) {

CreateScheduleRequest.Builder request =
Expand All @@ -75,8 +76,15 @@ public void createSchedule(CreateScheduleInput input) {

if (input.getOptions().getSearchAttributes() != null
&& !input.getOptions().getSearchAttributes().isEmpty()) {
if (input.getOptions().getTypedSearchAttributes() != null) {
throw new IllegalArgumentException(
"Cannot have search attributes and typed search attributes");
}
request.setSearchAttributes(
SearchAttributesUtil.encode(input.getOptions().getSearchAttributes()));
} else if (input.getOptions().getTypedSearchAttributes() != null) {
request.setSearchAttributes(
SearchAttributesUtil.encodeTyped(input.getOptions().getTypedSearchAttributes()));
}

if (input.getOptions().isTriggerImmediately()
Expand Down Expand Up @@ -194,6 +202,7 @@ public DescribeScheduleOutput describeSchedule(DescribeScheduleInput input) {
scheduleRequestHeader.protoToSchedule(response.getSchedule()),
Collections.unmodifiableMap(
SearchAttributesUtil.decode(response.getSearchAttributes())),
SearchAttributesUtil.decodeTyped(response.getSearchAttributes()),
response.getMemo().getFieldsMap(),
clientOptions.getDataConverter()));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ final class SearchAttributePayloadConverter {
new SearchAttributePayloadConverter();

public Payload encodeTyped(SearchAttributeKey<?> key, @Nullable Object value) {
if (key.getValueType() == IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED) {
// If we don't have the type we should just leave the payload as is
return (Payload) value;
}
// We can encode as-is because we know it's strictly typed to expected key value. We
// accept a null value because updates for unset can be null.
return DefaultDataConverter.STANDARD_INSTANCE.toPayload(value).get().toBuilder()
Expand All @@ -62,6 +66,13 @@ public void decodeTyped(SearchAttributes.Builder builder, String name, @Nonnull
// Get key type
SearchAttributeKey key;
IndexedValueType indexType = getIndexType(payload.getMetadataMap().get(METADATA_TYPE_KEY));
if (indexType == null) {
// If the server didn't write the type metadata we
// don't know how to decode this search attribute
key = SearchAttributeKey.forUntyped(name);
builder.set(key, payload);
return;
}
switch (indexType) {
case INDEXED_VALUE_TYPE_TEXT:
key = SearchAttributeKey.forText(name);
Expand Down
Loading

0 comments on commit 4298dad

Please sign in to comment.