Skip to content

Commit

Permalink
fix the Vert.x event loop integration
Browse files Browse the repository at this point in the history
The Vert.x event loop integration used to assume that all Vert.x threads
always have a current context associated. That isn't necessarily true,
especially in case of worker threads, where a task can easily be submitted
to the worker thread pool as an ordinary thread pool, outside of a Vert.x
context. In that case, the thread cannot be treated as an event loop.

Additionally, this commit synchronizes the thread offload fallback function
with the thread offload strategy, so that if there's an event loop
remembered, the fallback is executed on it instead of the main executor.
  • Loading branch information
Ladicek committed Jan 9, 2025
1 parent 210eb68 commit 0a82020
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,35 @@ public ThreadOffloadFallbackFunction(Function<FailureContext, Future<T>> delegat

@Override
public Future<T> apply(FailureContext ctx) {
boolean hasRememberedExecutor = ctx.context.has(Executor.class);
Executor executor = ctx.context.get(Executor.class, this.executor);

Completer<T> result = Completer.create();
executor.execute(() -> {
try {
delegate.apply(ctx).thenComplete(result);
} catch (Exception e) {
result.completeWithError(e);
}
});
if (hasRememberedExecutor) {
executor.execute(() -> {
try {
delegate.apply(ctx).then((value, error) -> {
executor.execute(() -> {
if (error == null) {
result.complete(value);
} else {
result.completeWithError(error);
}
});
});
} catch (Exception e) {
result.completeWithError(e);
}
});
} else {
executor.execute(() -> {
try {
delegate.apply(ctx).thenComplete(result);
} catch (Exception e) {
result.completeWithError(e);
}
});
}
return result.future();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ public final class VertxEventLoop implements EventLoop {
public Executor executor() {
if (Context.isOnVertxThread()) {
// all Vert.x threads are "event loops", even worker threads
return new VertxExecutor(Vertx.currentContext(), Context.isOnWorkerThread());
//
// beware that a Vert.x thread (especially worker) doesn't necessarily have to have a current context set,
// because a task can be submitted to it outside of a Vert.x context
Context context = Vertx.currentContext();
if (context != null) {
return new VertxExecutor(context, Context.isOnWorkerThread());
}
}
return null;
}
Expand Down

0 comments on commit 0a82020

Please sign in to comment.