From 0a820204ce4cd600238efbb8392cb0627eca33aa Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Thu, 9 Jan 2025 13:30:23 +0100 Subject: [PATCH] fix the Vert.x event loop integration The Vert.x event loop integration used to assume that all Vert.x threads always have a current context associated. That isn't necessarily true, especially in case of worker threads, where a task can easily be submitted to the worker thread pool as an ordinary thread pool, outside of a Vert.x context. In that case, the thread cannot be treated as an event loop. Additionally, this commit synchronizes the thread offload fallback function with the thread offload strategy, so that if there's an event loop remembered, the fallback is executed on it instead of the main executor. --- .../ThreadOffloadFallbackFunction.java | 35 +++++++++++++++---- .../faulttolerance/vertx/VertxEventLoop.java | 8 ++++- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/fallback/ThreadOffloadFallbackFunction.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/fallback/ThreadOffloadFallbackFunction.java index 41417c7a..136cf7cf 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/fallback/ThreadOffloadFallbackFunction.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/fallback/ThreadOffloadFallbackFunction.java @@ -18,14 +18,35 @@ public ThreadOffloadFallbackFunction(Function> delegat @Override public Future apply(FailureContext ctx) { + boolean hasRememberedExecutor = ctx.context.has(Executor.class); + Executor executor = ctx.context.get(Executor.class, this.executor); + Completer result = Completer.create(); - executor.execute(() -> { - try { - delegate.apply(ctx).thenComplete(result); - } catch (Exception e) { - result.completeWithError(e); - } - }); + if (hasRememberedExecutor) { + executor.execute(() -> { + try { + delegate.apply(ctx).then((value, error) -> { + executor.execute(() -> { + if (error == null) { + result.complete(value); + } else { + result.completeWithError(error); + } + }); + }); + } catch (Exception e) { + result.completeWithError(e); + } + }); + } else { + executor.execute(() -> { + try { + delegate.apply(ctx).thenComplete(result); + } catch (Exception e) { + result.completeWithError(e); + } + }); + } return result.future(); } } diff --git a/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java b/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java index 6cd5b3d4..eedd82f5 100644 --- a/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java +++ b/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java @@ -11,7 +11,13 @@ public final class VertxEventLoop implements EventLoop { public Executor executor() { if (Context.isOnVertxThread()) { // all Vert.x threads are "event loops", even worker threads - return new VertxExecutor(Vertx.currentContext(), Context.isOnWorkerThread()); + // + // beware that a Vert.x thread (especially worker) doesn't necessarily have to have a current context set, + // because a task can be submitted to it outside of a Vert.x context + Context context = Vertx.currentContext(); + if (context != null) { + return new VertxExecutor(context, Context.isOnWorkerThread()); + } } return null; }