Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve event loop integration #1097

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public <T> T remove(Class<T> clazz) {
return clazz.cast(data.remove(clazz));
}

public boolean has(Class<?> clazz) {
return data.containsKey(clazz);
}

public <T> T get(Class<T> clazz) {
return clazz.cast(data.get(clazz));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public Future<V> apply(FaultToleranceContext<V> ctx) {

LOG.trace("RememberEventLoopExecutor started");
try {
if (eventLoop.isEventLoopThread()) {
ctx.set(Executor.class, eventLoop.executor());
Executor executor = eventLoop.executor();
if (executor != null) {
ctx.set(Executor.class, executor);
}

return delegate.apply(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public ThreadOffload(FaultToleranceStrategy<V> delegate, Executor executor, bool
@Override
public Future<V> apply(FaultToleranceContext<V> ctx) {
// required for `@ApplyGuard`
if (!ctx.get(ThreadOffloadEnabled.class, defaultEnabled).value) {
boolean hasRememberedExecutor = ctx.has(Executor.class);
if (!hasRememberedExecutor && !ctx.get(ThreadOffloadEnabled.class, defaultEnabled).value) {
return delegate.apply(ctx);
}

Expand All @@ -37,13 +38,31 @@ public Future<V> apply(FaultToleranceContext<V> ctx) {
Executor executor = ctx.get(Executor.class, this.executor);

Completer<V> result = Completer.create();
executor.execute(() -> {
try {
delegate.apply(ctx).thenComplete(result);
} catch (Exception e) {
result.completeWithError(e);
}
});
if (hasRememberedExecutor) {
executor.execute(() -> {
try {
delegate.apply(ctx).then((value, error) -> {
executor.execute(() -> {
if (error == null) {
result.complete(value);
} else {
result.completeWithError(error);
}
});
});
} catch (Exception e) {
result.completeWithError(e);
}
});
} else {
executor.execute(() -> {
try {
delegate.apply(ctx).thenComplete(result);
} catch (Exception e) {
result.completeWithError(e);
}
});
}
return result.future();
} finally {
LOG.trace("ThreadOffload finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,9 @@
* Discovered using {@link ServiceLoader}. At most one implementation may be present on the classpath.
*/
public interface EventLoop {
/**
* Returns whether current thread is an event loop thread.
* <p>
* When this method returns {@code false}, calling {@link #executor()}
* doesn't make sense and throws {@link UnsupportedOperationException}.
*/
boolean isEventLoopThread();

/**
* Returns an {@link Executor} that runs tasks on the current thread's event loop.
* If the current thread is not an event loop thread, returns {@code null}.
* <p>
* Pay attention to when you call this method. If you want to <em>later</em> use an executor
* for current thread's event loop, possibly even from a different thread, call this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@ private NoEventLoop() {
// avoid instantiation
}

@Override
public boolean isEventLoopThread() {
return false;
}

@Override
public Executor executor() {
throw new UnsupportedOperationException();
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import static io.smallrye.faulttolerance.core.util.Preconditions.check;
import static io.smallrye.faulttolerance.core.util.Preconditions.checkNotNull;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Executor;

import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;

Expand Down Expand Up @@ -39,26 +39,21 @@ public Future<V> apply(FaultToleranceContext<V> ctx) {
ctx.fireEvent(TimeoutEvents.Started.INSTANCE);

// must extract `FutureTimeoutNotification` early, because if retries are present,
// a different `FutureTimeoutNotification` may be present in the `InvocationContext`
// a different `FutureTimeoutNotification` may be present in the `FaultToleranceContext`
// by the time the timeout callback is invoked
FutureTimeoutNotification notification = ctx.remove(FutureTimeoutNotification.class);

AtomicBoolean completedWithTimeout = new AtomicBoolean(false);
Runnable onTimeout = () -> {
if (completedWithTimeout.compareAndSet(false, true)) {
LOG.debugf("%s invocation timed out (%d ms)", description, timeoutInMillis);
ctx.fireEvent(TimeoutEvents.Finished.TIMED_OUT);
TimeoutException timeout = new TimeoutException(description + " timed out");
if (notification != null) {
notification.accept(timeout);
}
result.completeWithError(timeout);
}
};

Thread executingThread = ctx.isSync() ? Thread.currentThread() : null;
TimeoutExecution execution = new TimeoutExecution(executingThread, timeoutInMillis, onTimeout);
TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt);
TimeoutExecution execution = new TimeoutExecution(executingThread, () -> {
LOG.debugf("%s invocation timed out (%d ms)", description, timeoutInMillis);
ctx.fireEvent(TimeoutEvents.Finished.TIMED_OUT);
TimeoutException timeout = new TimeoutException(description + " timed out");
if (notification != null) {
notification.accept(timeout);
}
result.completeWithError(timeout);
});
TimerTask task = timer.schedule(timeoutInMillis, execution::timeoutAndInterrupt, ctx.get(Executor.class));

Future<V> originalResult;
try {
Expand All @@ -81,7 +76,7 @@ public Future<V> apply(FaultToleranceContext<V> ctx) {
}

if (execution.hasTimedOut()) {
onTimeout.run();
// the "on timeout" callback is called by `execution::timeoutAndInterrupt` above
} else if (error == null) {
ctx.fireEvent(TimeoutEvents.Finished.NORMALLY);
result.complete(value);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.faulttolerance.core.timeout;

import java.lang.invoke.ConstantBootstraps;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

Expand All @@ -8,14 +9,8 @@ final class TimeoutExecution {
private static final int STATE_FINISHED = 1;
private static final int STATE_TIMED_OUT = 2;

private static final VarHandle STATE;
static {
try {
STATE = MethodHandles.lookup().findVarHandle(TimeoutExecution.class, "state", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
private static final VarHandle STATE = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(),
"state", VarHandle.class, TimeoutExecution.class, int.class);

private volatile int state;

Expand All @@ -24,23 +19,12 @@ final class TimeoutExecution {
// can be null, if no action shall be performed upon timeout
private final Runnable timeoutAction;

private final long timeoutInMillis;

TimeoutExecution(Thread executingThread, long timeoutInMillis) {
this(executingThread, timeoutInMillis, null);
}

TimeoutExecution(Thread executingThread, long timeoutInMillis, Runnable timeoutAction) {
TimeoutExecution(Thread executingThread, Runnable timeoutAction) {
this.state = STATE_RUNNING;
this.executingThread = executingThread;
this.timeoutInMillis = timeoutInMillis;
this.timeoutAction = timeoutAction;
}

long timeoutInMillis() {
return timeoutInMillis;
}

boolean isRunning() {
return state == STATE_RUNNING;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ public boolean cancel() {

@Override
public TimerTask schedule(long delayInMillis, Runnable task, Executor executor) {
// not used in `Timeout` / `CompletionStageTimeout`
throw new UnsupportedOperationException();
// in the test, the `executor` is always `null`
return schedule(delayInMillis, task);
}

@Override
public int countScheduledTasks() {
// not used in `Timeout` / `CompletionStageTimeout`
// not used in `Timeout`
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,19 @@

public class TimeoutExecutionTest {
private TimeoutExecution execution;
private AtomicBoolean timedOut;

@BeforeEach
public void setUp() {
execution = new TimeoutExecution(Thread.currentThread(), 1000L);
execution = new TimeoutExecution(Thread.currentThread(), () -> timedOut.set(true));
timedOut = new AtomicBoolean(false);
}

@Test
public void initialState() {
assertThat(execution.isRunning()).isTrue();
}

@Test
public void timeoutValue() {
assertThat(execution.timeoutInMillis()).isEqualTo(1000L);
}

@Test
public void finish() {
AtomicBoolean flag = new AtomicBoolean(false);
Expand All @@ -38,6 +35,7 @@ public void timeout() {
execution.timeoutAndInterrupt();
assertThat(execution.hasTimedOut()).isTrue();
assertThat(Thread.interrupted()).isTrue(); // clear the current thread interruption status
assertThat(timedOut).isTrue();
}

@Test
Expand All @@ -49,6 +47,7 @@ public void timeoutAfterFinish() {
assertThat(execution.hasTimedOut()).isFalse();
assertThat(flag).isTrue();
assertThat(Thread.currentThread().isInterrupted()).isFalse();
assertThat(timedOut).isFalse();
}

@Test
Expand All @@ -60,5 +59,6 @@ public void finishAfterTimeout() {
assertThat(execution.hasTimedOut()).isTrue();
assertThat(flag).isFalse();
assertThat(Thread.interrupted()).isTrue(); // clear the current thread interruption status
assertThat(timedOut).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,12 @@
import io.vertx.core.Vertx;

public final class VertxEventLoop implements EventLoop {
@Override
public boolean isEventLoopThread() {
return Context.isOnEventLoopThread();
}

private void checkEventLoopThread() {
if (!isEventLoopThread()) {
throw new UnsupportedOperationException();
}
}

@Override
public Executor executor() {
checkEventLoopThread();
return new VertxExecutor(Vertx.currentContext());
if (Context.isOnVertxThread()) {
// all Vert.x threads are "event loops", even worker threads
return new VertxExecutor(Vertx.currentContext(), Context.isOnWorkerThread());
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,29 @@

import io.smallrye.faulttolerance.core.util.RunnableWrapper;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

final class VertxExecutor implements Executor {
private final Context vertxContext;
private final boolean offloadToWorkerThread;

VertxExecutor(Context vertxContext) {
VertxExecutor(Context vertxContext, boolean offloadToWorkerThread) {
this.vertxContext = vertxContext;
this.offloadToWorkerThread = offloadToWorkerThread;
}

@Override
public void execute(Runnable runnable) {
// fast path: if we're on the correct event loop thread already,
// we can run the task directly
if (Vertx.currentContext() == vertxContext) {
runnable.run();
return;
}

Runnable wrappedRunnable = RunnableWrapper.INSTANCE.wrap(runnable);

vertxContext.runOnContext(ignored -> {
wrappedRunnable.run();
});
if (vertxContext.isEventLoopContext() && offloadToWorkerThread) {
vertxContext.executeBlocking(() -> {
wrappedRunnable.run();
return null;
});
} else {
vertxContext.runOnContext(ignored -> {
wrappedRunnable.run();
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.smallrye.faulttolerance.vertx;

import java.util.Locale;
import java.util.Objects;

public final class ContextDescription {
public final ExecutionStyle executionStyle;
public final String contextClass;
public final String uuid;
public final String contextHash;

ContextDescription(ExecutionStyle executionStyle, String contextClass, String uuid, String contextHash) {
this.executionStyle = executionStyle;
this.contextClass = contextClass;
this.contextHash = contextHash;
this.uuid = uuid;
}

public boolean isDuplicatedContext() {
return "DuplicatedContext".equals(contextClass);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ContextDescription)) {
return false;
}
ContextDescription that = (ContextDescription) o;
return Objects.equals(executionStyle, that.executionStyle)
&& Objects.equals(contextClass, that.contextClass)
&& Objects.equals(uuid, that.uuid)
&& Objects.equals(contextHash, that.contextHash);
}

@Override
public int hashCode() {
return Objects.hash(executionStyle, contextClass, uuid, contextHash);
}

@Override
public String toString() {
return executionStyle.toString().toLowerCase(Locale.ROOT)
+ "|" + contextClass
+ "|" + uuid
+ "|" + contextHash;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.smallrye.faulttolerance.vertx;

public enum ExecutionStyle {
EVENT_LOOP,
WORKER,
UNKNOWN,
}
Loading
Loading