Skip to content

Commit

Permalink
Fix identity for sticky worker (#659)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 23, 2021
1 parent 317135c commit 216a287
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions src/main/java/com/uber/cadence/worker/WorkerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import com.uber.cadence.internal.worker.WorkflowPollTaskFactory;
import com.uber.m3.tally.Scope;
import com.uber.m3.util.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -45,8 +48,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Maintains worker creation and lifecycle. */
public final class WorkerFactory {
Expand All @@ -63,7 +64,7 @@ public static WorkerFactory newInstance(
private final List<Worker> workers = new ArrayList<>();
private final WorkflowClient workflowClient;
// Guarantee uniqueness for stickyTaskListName when multiple factories
private final UUID id = UUID.randomUUID();
private final UUID stickyTasklistRandomId = UUID.randomUUID();
private final ThreadPoolExecutor workflowThreadPool;
private final AtomicInteger workflowThreadCounter = new AtomicInteger();
private final WorkerFactoryOptions factoryOptions;
Expand Down Expand Up @@ -122,20 +123,20 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
dispatcher = new PollDecisionTaskDispatcher(workflowClient.getService());
stickyPoller =
new Poller<>(
id.toString(),
new WorkflowPollTaskFactory(
workflowClient.getService(),
workflowClient.getOptions().getDomain(),
getStickyTaskListName(),
stickyScope,
id.toString())
.get(),
dispatcher,
PollerOptions.newBuilder()
.setPollThreadNamePrefix(POLL_THREAD_NAME)
.setPollThreadCount(this.factoryOptions.getStickyPollerCount())
.build(),
stickyScope);
workflowClient.getOptions().getIdentity(),
new WorkflowPollTaskFactory(
workflowClient.getService(),
workflowClient.getOptions().getDomain(),
getStickyTaskListName(),
stickyScope,
workflowClient.getOptions().getIdentity())
.get(),
dispatcher,
PollerOptions.newBuilder()
.setPollThreadNamePrefix(POLL_THREAD_NAME)
.setPollThreadCount(this.factoryOptions.getStickyPollerCount())
.build(),
stickyScope);
}

/**
Expand Down Expand Up @@ -336,8 +337,8 @@ private String getHostName() {
@VisibleForTesting
String getStickyTaskListName() {
return this.factoryOptions.isDisableStickyExecution()
? null
: String.format("%s:%s:%s", STICKY_TASK_LIST_PREFIX, getHostName(), id);
? null
: String.format("%s:%s:%s", STICKY_TASK_LIST_PREFIX, getHostName(), stickyTasklistRandomId);
}

public synchronized void suspendPolling() {
Expand Down

0 comments on commit 216a287

Please sign in to comment.