Skip to content

Commit

Permalink
Fix shutdown behavior of unstarted LA slot queue (#2196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Aug 20, 2024
1 parent 0c135d6 commit 4eaeb9e
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import io.temporal.worker.tuning.LocalActivitySlotInfo;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.workflow.Functions;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LocalActivitySlotSupplierQueue {
class LocalActivitySlotSupplierQueue implements Shutdownable {
static final class QueuedLARequest {
final boolean isRetry;
final SlotReservationData data;
Expand All @@ -47,10 +45,11 @@ static final class QueuedLARequest {
private final Semaphore newExecutionsBackpressureSemaphore;
private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
private final Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback;
private final Thread queueThread;
private final ExecutorService queueThreadService;
private static final Logger log =
LoggerFactory.getLogger(LocalActivitySlotSupplierQueue.class.getName());
private volatile boolean running = true;
private volatile boolean wasEverStarted = false;

LocalActivitySlotSupplierQueue(
TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier,
Expand All @@ -73,13 +72,13 @@ static final class QueuedLARequest {
return 0;
});
this.slotSupplier = slotSupplier;
this.queueThread = new Thread(this::processQueue, "LocalActivitySlotSupplierQueue");
this.queueThread.start();
this.queueThreadService =
Executors.newSingleThreadExecutor(r -> new Thread(r, "LocalActivitySlotSupplierQueue"));
}

private void processQueue() {
try {
while (running) {
while (running || !requestQueue.isEmpty()) {
QueuedLARequest request = requestQueue.take();
SlotPermit slotPermit;
try {
Expand All @@ -102,9 +101,9 @@ private void processQueue() {
}
}

void shutdown() {
running = false;
queueThread.interrupt();
void start() {
wasEverStarted = true;
this.queueThreadService.submit(this::processQueue);
}

boolean waitOnBackpressure(@Nullable Long acceptanceTimeoutMs) throws InterruptedException {
Expand Down Expand Up @@ -134,4 +133,40 @@ void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttem
newExecutionsBackpressureSemaphore.release();
}
}

@Override
public boolean isShutdown() {
return queueThreadService.isShutdown();
}

@Override
public boolean isTerminated() {
return queueThreadService.isTerminated();
}

@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
running = false;
if (requestQueue.isEmpty()) {
// Just interrupt the thread, so that if we're waiting on blocking take the thread will
// be interrupted and exit. Otherwise the loop will exit once the queue is empty.
queueThreadService.shutdownNow();
}

return interruptTasks
? shutdownManager.shutdownExecutorNowUntimed(
queueThreadService, "LocalActivitySlotSupplierQueue")
: shutdownManager.shutdownExecutorUntimed(
queueThreadService, "LocalActivitySlotSupplierQueue");
}

@Override
public void awaitTermination(long timeout, TimeUnit unit) {
if (!wasEverStarted) {
// Not entirely clear why this is necessary, but await termination will hang the whole
// timeout duration if no task was ever submitted.
return;
}
ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ public boolean start() {
false);

this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
this.slotQueue.start();
return true;
} else {
return false;
Expand All @@ -698,9 +699,9 @@ public boolean start() {
@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) {
slotQueue.shutdown();
return activityAttemptTaskExecutor
return slotQueue
.shutdown(shutdownManager, interruptTasks)
.thenCompose(r -> activityAttemptTaskExecutor.shutdown(shutdownManager, interruptTasks))
.thenCompose(
r ->
shutdownManager.shutdownExecutor(
Expand All @@ -717,21 +718,24 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean

@Override
public void awaitTermination(long timeout, TimeUnit unit) {
slotQueue.shutdown();
long timeoutMillis = unit.toMillis(timeout);
ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
long remainingTimeout = ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
ShutdownManager.awaitTermination(slotQueue, remainingTimeout);
}

@Override
public boolean isShutdown() {
return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isShutdown();
return activityAttemptTaskExecutor != null
&& activityAttemptTaskExecutor.isShutdown()
&& slotQueue.isShutdown();
}

@Override
public boolean isTerminated() {
return activityAttemptTaskExecutor != null
&& activityAttemptTaskExecutor.isTerminated()
&& scheduledExecutor.isTerminated();
&& scheduledExecutor.isTerminated()
&& slotQueue.isTerminated();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.worker;

import static org.junit.Assert.assertTrue;

import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import org.junit.Rule;
import org.junit.Test;

public class LocalActivityWorkerNoneRegisteredNotStartedTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(NothingWorkflowImpl.class)
.setWorkerOptions(WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build())
// Don't start the worker
.setDoNotStart(true)
.build();

@Test
public void canShutDownProperlyWhenNotStarted() {
// Shut down the (never started) worker
Instant shutdownTime = Instant.now();
testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown();
testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread thread : threadSet) {
if (thread.getName().contains("LocalActivitySlotSupplierQueue")) {
throw new RuntimeException("Thread should be terminated");
}
}
Duration elapsed = Duration.between(shutdownTime, Instant.now());
// Shutdown should not have taken long
assertTrue(elapsed.getSeconds() < 2);
}

@WorkflowInterface
public interface NothingWorkflow {
@WorkflowMethod
void execute();
}

public static class NothingWorkflowImpl implements NothingWorkflow {
@Override
public void execute() {
Workflow.sleep(500);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.worker;

import static org.junit.Assert.assertTrue;

import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.shared.TestActivities;
import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import org.junit.Rule;
import org.junit.Test;

public class LocalActivityWorkerNotStartedTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(NothingWorkflowImpl.class)
.setActivityImplementations(new NothingActivityImpl())
.setWorkerOptions(WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build())
// Don't start the worker
.setDoNotStart(true)
.build();

@Test
public void canShutDownProperlyWhenNotStarted() {
// Shut down the (never started) worker
Instant shutdownTime = Instant.now();
testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown();
testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS);
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread thread : threadSet) {
if (thread.getName().contains("LocalActivitySlotSupplierQueue")) {
throw new RuntimeException("Thread should be terminated");
}
}
Duration elapsed = Duration.between(shutdownTime, Instant.now());
// Shutdown should not have taken long
assertTrue(elapsed.getSeconds() < 2);
}

@WorkflowInterface
public interface NothingWorkflow {
@WorkflowMethod
void execute();
}

public static class NothingWorkflowImpl implements NothingWorkflow {
@Override
public void execute() {
Workflow.sleep(500);
}
}

public static class NothingActivityImpl implements TestActivities.NoArgsActivity {
@Override
public void execute() {}
}
}

0 comments on commit 4eaeb9e

Please sign in to comment.