From 1d23f3cb00373122892db0cabcc589ba633ada90 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Wed, 27 Nov 2024 19:46:52 +0100 Subject: [PATCH] Revert "[hotfix][core] Generalizes FutureUtils#runAsync" This reverts commit e64b2fd36fe5990193877fb69a6b2d66944c1c45. --- .../apache/flink/util/concurrent/FutureUtils.java | 5 ++--- .../nonha/embedded/EmbeddedLeaderService.java | 11 +++++++---- .../leaderelection/DefaultLeaderElectionService.java | 12 +++++++++--- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java index e959a974bcf9d1..f20baa9f2cc6c1 100644 --- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java @@ -24,7 +24,6 @@ import org.apache.flink.util.FatalExitExceptionHandler; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.SupplierWithException; -import org.apache.flink.util.function.ThrowingRunnable; import javax.annotation.Nullable; @@ -923,14 +922,14 @@ public static CompletableFuture supplyAsync( } /** - * Returns a future which is completed when {@code runnable} is finished. + * Returns a future which is completed when {@link RunnableWithException} is finished. * * @param runnable represents the task * @param executor to execute the runnable * @return Future which is completed when runnable is finished */ public static CompletableFuture runAsync( - ThrowingRunnable runnable, Executor executor) { + RunnableWithException runnable, Executor executor) { return CompletableFuture.runAsync( () -> { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java index a963b2e36285c4..4a5df85471f435 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; @@ -484,9 +483,13 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { @Override public CompletableFuture runAsLeader( UUID leaderSessionId, ThrowingRunnable callback) { - return FutureUtils.runAsync( - () -> EmbeddedLeaderService.this.runAsLeader(this, leaderSessionId, callback), - Executors.directExecutor()); + try { + EmbeddedLeaderService.this.runAsLeader(this, leaderSessionId, callback); + } catch (LeadershipLostException e) { + return FutureUtils.completedExceptionally(e); + } + + return FutureUtils.completedVoidFuture(); } void shutdown(Exception cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index 334be4514dc811..1c90ba099777fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -367,14 +368,19 @@ CompletableFuture runAsLeader( String componentId, UUID leaderSessionId, ThrowingRunnable callback) { - return FutureUtils.runAsync( + return CompletableFuture.runAsync( () -> { synchronized (lock) { if (!hasLeadership(componentId, leaderSessionId)) { - throw new LeadershipLostException(leaderSessionId); + throw new CompletionException( + new LeadershipLostException(leaderSessionId)); } - callback.run(); + try { + callback.run(); + } catch (Throwable e) { + throw new CompletionException(e); + } } }, leadershipOperationExecutor);