Skip to content

Commit

Permalink
Adds scheduleWithFixedRate() and fixes scheduleWithFixedDelay().
Browse files Browse the repository at this point in the history
  • Loading branch information
lucav76 committed Aug 15, 2023
1 parent 747af80 commit fdadd2f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 36 deletions.
16 changes: 14 additions & 2 deletions src/main/java/eu/lucaventuri/fibry/BaseActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@ public void execAsync(Runnable worker) {
ActorUtils.execAsync(queue, state -> worker.run());
}

public void execAsyncNoHealing(Runnable worker) {
var that = this;

if (!isExiting()) {
ActorUtils.execAsync(queue, state -> {
// Allow shorter timeout than the scheduling time
HealRegistry.INSTANCE.remove(that, Thread.currentThread(), new AtomicBoolean());
worker.run();
}
);
}
}

/**
* Synchronously executes some logic in the actor.
*/
Expand Down Expand Up @@ -300,8 +313,7 @@ public final void processMessages() {
e.getClass() == InterruptedException.class || e.getCause().getClass() == InterruptedException.class)) {
Exceptions.logShort(() -> autoHealing.onInterruption.accept(e));
}
}
finally {
} finally {
HealRegistry.INSTANCE.remove(this, curThread, threadShouldDie);
}
if (threadShouldDie.get()) { // Notification done in HealRegistry, earlier
Expand Down
108 changes: 75 additions & 33 deletions src/main/java/eu/lucaventuri/fibry/Stereotypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.*;

Expand Down Expand Up @@ -565,24 +566,46 @@ public SinkActorSingleTask<Void> schedule(Runnable run, long scheduleMs, long ma
* Creates an actor that runs a Runnable with fixed delay; it optionally supports autoHealing
*/
public SinkActorSingleTask<Void> scheduleWithFixedDelay(Runnable run, long initialDelay, long delay, TimeUnit timeUnit, ActorSystem.AutoHealingSettings autoHealing) {
Actor<Object, Void, Void> actor = (Actor<Object, Void, Void>) sinkHealing((Void) null, autoHealing);
Actor<Object, Void, Void> actor = createActorForScheduling(initialDelay, timeUnit, autoHealing);
AtomicReference<Runnable> runThenWait = new AtomicReference<>();

// Deadlock prevention
actor.setCloseStrategy(Exitable.CloseStrategy.ASK_EXIT);
runThenWait.set(() -> {
// As we are inside the event loop of the actor, we can only call execAsync
actor.execAsync(run);
actor.execAsyncNoHealing(() -> {
// When this runs, run.run() has been executed
SystemUtils.sleep(timeUnit.toMillis(delay)); }
);
actor.execAsync(runThenWait.get());
});

if (initialDelay > 0)
actor.execAsync(() -> SystemUtils.sleep(timeUnit.toMillis(initialDelay)));
actor.execAsync(runThenWait.get());

return actor;
}

/**
* Creates an actor that runs a Runnable with fixed delay; it optionally supports autoHealing
*/
public SinkActorSingleTask<Void> scheduleWithFixedRate(Runnable run, long initialDelay, long delay, TimeUnit timeUnit, ActorSystem.AutoHealingSettings autoHealing) {
Actor<Object, Void, Void> actor = createActorForScheduling(initialDelay, timeUnit, autoHealing);
AtomicReference<Runnable> runThenWait = new AtomicReference<>();
AtomicBoolean dummyThreadShouldDie = new AtomicBoolean();
AtomicLong nextRun = new AtomicLong(System.currentTimeMillis() + timeUnit.toMillis(delay));

runThenWait.set(() -> {
actor.execAsync(() -> {
// Allow shorter timeout than the scheduling time
HealRegistry.INSTANCE.remove(actor, Thread.currentThread(), dummyThreadShouldDie);
SystemUtils.sleep(timeUnit.toMillis(delay)); });
// TODO: It will not work well if the execution time exceeds the delay (e.g. if a task scheduled once an hour uses more than one hour to run)
// As we are inside the event loop of the actor, we can only call execAsync
actor.execAsync(run);
actor.execAsyncNoHealing(() -> {
// When this runs, run.run() has been executed

long wait = nextRun.get() - System.currentTimeMillis();

// TODO: in case of overlap, should we skip? Should we have a policy for that? CHeck Java?
nextRun.set(nextRun.get() + timeUnit.toMillis(delay));
if (wait > 0)
SystemUtils.sleep(timeUnit.toMillis(wait)); }
);
actor.execAsync(runThenWait.get());
});

Expand All @@ -591,6 +614,17 @@ public SinkActorSingleTask<Void> scheduleWithFixedDelay(Runnable run, long initi
return actor;
}

private Actor<Object, Void, Void> createActorForScheduling(long initialDelay, TimeUnit timeUnit, ActorSystem.AutoHealingSettings autoHealing) {
Actor<Object, Void, Void> actor = (Actor<Object, Void, Void>) sinkHealing((Void) null, autoHealing);

// Deadlock prevention
actor.setCloseStrategy(Exitable.CloseStrategy.ASK_EXIT);

if (initialDelay > 0)
actor.execAsync(() -> SystemUtils.sleep(timeUnit.toMillis(initialDelay)));
return actor;
}

/**
* Creates an actor that can accept new TCP connections on the specified port and spawn a new actor for each connection.
* This method will return after the server socket has been created. If the server socket cannot be created, it will throw an exception.
Expand Down Expand Up @@ -894,36 +928,15 @@ public Actor<HttpUrlDownload<byte[]>, Void, Void> binaryDownloader() {
public <T> BaseActor<T, Void, Void> rateLimited(Consumer<T> actorLogic, int msBetweenCalls) {
NamedStateActorCreator<Void> config = anonymous().initialState(null);

Consumer<T> rateLimitedLogic = value -> {
long start = System.currentTimeMillis();

actorLogic.accept(value);

long end = System.currentTimeMillis() - start;

if (msBetweenCalls > (end - start))
SystemUtils.sleep(msBetweenCalls - (end - start));
};
Consumer<T> rateLimitedLogic = wrapWithRateLimiting(actorLogic, msBetweenCalls);

return config.newActor(rateLimitedLogic);
}

public <T, R> BaseActor<T, R, Void> rateLimitedReturn(Function<T, R> actorLogic, int msBetweenCalls) {
NamedStateActorCreator<Void> config = anonymous().initialState(null);

Function<T, R> rateLimitedLogic = value -> {
long start = System.currentTimeMillis();

R result = actorLogic.apply(value);

long end = System.currentTimeMillis();

if (msBetweenCalls > (end - start)) {
SystemUtils.sleep(msBetweenCalls - (end - start));
}

return result;
};
Function<T, R> rateLimitedLogic = wrapWithRateLimitingReturn(actorLogic, msBetweenCalls);

return config.newActorWithReturn(rateLimitedLogic);
}
Expand Down Expand Up @@ -1152,6 +1165,35 @@ private NamedStrategyActorCreator named(String name) {
}
}

public static <T> Consumer<T> wrapWithRateLimiting(Consumer<T> actorLogic, int msBetweenCalls) {
return value -> {
long start = System.currentTimeMillis();

actorLogic.accept(value);

long end = System.currentTimeMillis() - start;

if (msBetweenCalls > (end - start))
SystemUtils.sleep(msBetweenCalls - (end - start));
};
}

public static <T, R> Function<T, R> wrapWithRateLimitingReturn(Function<T, R> actorLogic, int msBetweenCalls) {
return value -> {
long start = System.currentTimeMillis();

R result = actorLogic.apply(value);

long end = System.currentTimeMillis();

if (msBetweenCalls > (end - start)) {
SystemUtils.sleep(msBetweenCalls - (end - start));
}

return result;
};
}

private static <T> void manageHttpClientResponse(Scheduler scheduler, boolean sendLastError, AtomicReference<Actor<HttpUrlDownload<T>, Void, Void>> refActor, HttpUrlDownload<T> download, HttpResponse<T> response, Throwable ex) {
if (ex != null) {
if (download.numRetries > 0)
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/eu/lucaventuri/fibry/TestAutoHealing.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,18 @@ public void testSchedulingHealing2() throws Exception {
AtomicInteger count = new AtomicInteger();
AtomicInteger interruptions = new AtomicInteger();
AtomicInteger recreations = new AtomicInteger();
long start = System.currentTimeMillis();

try (SinkActorSingleTask<Void> actor = Stereotypes.threads().scheduleWithFixedDelay(() -> {
System.out.println("Call " + calls.get() + "... " + (System.currentTimeMillis() - start));
calls.incrementAndGet();
if (count.get() < 3) {
long ms = 2000 / calls.get();
System.out.println("Waiting for " + ms);
SystemUtils.sleepEnsure(ms);
latch.countDown();
count.incrementAndGet();
System.out.println("Done waiting " + ms);
}
}, 0, 1, TimeUnit.MILLISECONDS, new ActorSystem.AutoHealingSettings(1, 5, e -> interruptions.incrementAndGet(), recreations::incrementAndGet))) {
latch.await();
Expand All @@ -208,7 +211,7 @@ public void testSchedulingHealing2() throws Exception {

SystemUtils.sleep(1200);

Assert.assertEquals(4, calls.get());
Assert.assertEquals(3, calls.get());
Assert.assertEquals(3, count.get());
Assert.assertEquals(0, interruptions.get());
Assert.assertEquals(1, recreations.get());
Expand Down

0 comments on commit fdadd2f

Please sign in to comment.