diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java index c9887ff633b7f..b344f7ba4e0e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; @@ -45,12 +46,13 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; @@ -63,6 +65,10 @@ */ class CheckpointResourcesCleanupRunnerTest { + @RegisterExtension + private static final TestExecutorExtension EXECUTOR_EXTENSION = + new TestExecutorExtension<>(java.util.concurrent.Executors::newCachedThreadPool); + private static final Time TIMEOUT_FOR_REQUESTS = Time.milliseconds(0); private static final ThrowingConsumer @@ -120,7 +126,7 @@ void testSuccessfulCloseAsyncAfterStart() throws Exception { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -169,7 +175,7 @@ void testCloseAsyncAfterStartAndErrorInCompletedCheckpointStoreShutdown() throws final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -214,7 +220,7 @@ void testCloseAsyncAfterStartAndErrorInCheckpointIDCounterShutdown() throws Exce final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); testInstance.start(); @@ -242,7 +248,7 @@ void testCloseAsyncAfterStartAndErrorInCheckpointIDCounterShutdown() throws Exce @Test void testCancellationBeforeStart() throws Exception { final CheckpointResourcesCleanupRunner testInstance = - new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build(); + new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build(); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) .eventuallyFailsWith(ExecutionException.class) @@ -262,7 +268,7 @@ void testCancellationAfterStart() throws Exception { final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() .withCheckpointRecoveryFactory(checkpointRecoveryFactory) - .withExecutor(ForkJoinPool.commonPool()) + .withExecutor(EXECUTOR_EXTENSION.getExecutor()) .build(); AFTER_START.accept(testInstance); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) @@ -278,7 +284,7 @@ void testCancellationAfterStart() throws Exception { @Test void testCancellationAfterClose() throws Exception { final CheckpointResourcesCleanupRunner testInstance = - new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build(); + new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build(); AFTER_CLOSE.accept(testInstance); assertThatFuture(testInstance.cancel(TIMEOUT_FOR_REQUESTS)) .eventuallyFailsWith(ExecutionException.class)