Skip to content

Commit

Permalink
improve Vert.x event loop integration
Browse files Browse the repository at this point in the history
The `EventLoop` interface was simplified: the `isEventLoopThread()` method was
removed and instead, the `executor()` method is supposed to return `null`
if the current thread is not an event loop thread.

Further, the `VertxEventLoop` implementation considers all Vert.x threads
"event loops", even worker threads.

Finally, the `VertxExecutor` and `ThreadOffload` classes make bigger
effort to keep the execution on the original Vert.x context as remembered
at the beginning by `RememberEventLoop`.
  • Loading branch information
Ladicek committed Jan 6, 2025
1 parent 2d55dd0 commit 11ffb4e
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public <T> T remove(Class<T> clazz) {
return clazz.cast(data.remove(clazz));
}

public boolean has(Class<?> clazz) {
return data.containsKey(clazz);
}

public <T> T get(Class<T> clazz) {
return clazz.cast(data.get(clazz));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public Future<V> apply(FaultToleranceContext<V> ctx) {

LOG.trace("RememberEventLoopExecutor started");
try {
if (eventLoop.isEventLoopThread()) {
ctx.set(Executor.class, eventLoop.executor());
Executor executor = eventLoop.executor();
if (executor != null) {
ctx.set(Executor.class, executor);
}

return delegate.apply(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public ThreadOffload(FaultToleranceStrategy<V> delegate, Executor executor, bool
@Override
public Future<V> apply(FaultToleranceContext<V> ctx) {
// required for `@ApplyGuard`
if (!ctx.get(ThreadOffloadEnabled.class, defaultEnabled).value) {
boolean hasRememberedExecutor = ctx.has(Executor.class);
if (!hasRememberedExecutor && !ctx.get(ThreadOffloadEnabled.class, defaultEnabled).value) {
return delegate.apply(ctx);
}

Expand All @@ -37,13 +38,31 @@ public Future<V> apply(FaultToleranceContext<V> ctx) {
Executor executor = ctx.get(Executor.class, this.executor);

Completer<V> result = Completer.create();
executor.execute(() -> {
try {
delegate.apply(ctx).thenComplete(result);
} catch (Exception e) {
result.completeWithError(e);
}
});
if (hasRememberedExecutor) {
executor.execute(() -> {
try {
delegate.apply(ctx).then((value, error) -> {
executor.execute(() -> {
if (error == null) {
result.complete(value);
} else {
result.completeWithError(error);
}
});
});
} catch (Exception e) {
result.completeWithError(e);
}
});
} else {
executor.execute(() -> {
try {
delegate.apply(ctx).thenComplete(result);
} catch (Exception e) {
result.completeWithError(e);
}
});
}
return result.future();
} finally {
LOG.trace("ThreadOffload finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,9 @@
* Discovered using {@link ServiceLoader}. At most one implementation may be present on the classpath.
*/
public interface EventLoop {
/**
* Returns whether current thread is an event loop thread.
* <p>
* When this method returns {@code false}, calling {@link #executor()}
* doesn't make sense and throws {@link UnsupportedOperationException}.
*/
boolean isEventLoopThread();

/**
* Returns an {@link Executor} that runs tasks on the current thread's event loop.
* If this thread is not an event loop thread, returns {@code null}.
* <p>
* Pay attention to when you call this method. If you want to <em>later</em> use an executor
* for current thread's event loop, possibly even from a different thread, call this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@ private NoEventLoop() {
// avoid instantiation
}

@Override
public boolean isEventLoopThread() {
return false;
}

@Override
public Executor executor() {
throw new UnsupportedOperationException();
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,12 @@
import io.vertx.core.Vertx;

public final class VertxEventLoop implements EventLoop {
@Override
public boolean isEventLoopThread() {
return Context.isOnEventLoopThread();
}

private void checkEventLoopThread() {
if (!isEventLoopThread()) {
throw new UnsupportedOperationException();
}
}

@Override
public Executor executor() {
checkEventLoopThread();
return new VertxExecutor(Vertx.currentContext());
if (Context.isOnVertxThread()) {
// all Vert.x threads are "event loops", even worker threads
return new VertxExecutor(Vertx.currentContext(), Context.isOnWorkerThread());
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,29 @@

import io.smallrye.faulttolerance.core.util.RunnableWrapper;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

final class VertxExecutor implements Executor {
private final Context vertxContext;
private final boolean offloadToWorkerThread;

VertxExecutor(Context vertxContext) {
VertxExecutor(Context vertxContext, boolean offloadToWorkerThread) {
this.vertxContext = vertxContext;
this.offloadToWorkerThread = offloadToWorkerThread;
}

@Override
public void execute(Runnable runnable) {
// fast path: if we're on the correct event loop thread already,
// we can run the task directly
if (Vertx.currentContext() == vertxContext) {
runnable.run();
return;
}

Runnable wrappedRunnable = RunnableWrapper.INSTANCE.wrap(runnable);

vertxContext.runOnContext(ignored -> {
wrappedRunnable.run();
});
if (vertxContext.isEventLoopContext() && offloadToWorkerThread) {
vertxContext.executeBlocking(() -> {
wrappedRunnable.run();
return null;
});
} else {
vertxContext.runOnContext(ignored -> {
wrappedRunnable.run();
});
}
}
}

0 comments on commit 11ffb4e

Please sign in to comment.