From fdadd2fbde85d1775922bda96b642de79bb7b426 Mon Sep 17 00:00:00 2001 From: Luca Venturi Date: Tue, 15 Aug 2023 22:40:50 +0200 Subject: [PATCH] Adds scheduleWithFixedRate() and fixes scheduleWithFixedDelay(). --- .../java/eu/lucaventuri/fibry/BaseActor.java | 16 ++- .../eu/lucaventuri/fibry/Stereotypes.java | 108 ++++++++++++------ .../eu/lucaventuri/fibry/TestAutoHealing.java | 5 +- 3 files changed, 93 insertions(+), 36 deletions(-) diff --git a/src/main/java/eu/lucaventuri/fibry/BaseActor.java b/src/main/java/eu/lucaventuri/fibry/BaseActor.java index c4466d3..45622a3 100644 --- a/src/main/java/eu/lucaventuri/fibry/BaseActor.java +++ b/src/main/java/eu/lucaventuri/fibry/BaseActor.java @@ -199,6 +199,19 @@ public void execAsync(Runnable worker) { ActorUtils.execAsync(queue, state -> worker.run()); } + public void execAsyncNoHealing(Runnable worker) { + var that = this; + + if (!isExiting()) { + ActorUtils.execAsync(queue, state -> { + // Allow shorter timeout than the scheduling time + HealRegistry.INSTANCE.remove(that, Thread.currentThread(), new AtomicBoolean()); + worker.run(); + } + ); + } + } + /** * Synchronously executes some logic in the actor. */ @@ -300,8 +313,7 @@ public final void processMessages() { e.getClass() == InterruptedException.class || e.getCause().getClass() == InterruptedException.class)) { Exceptions.logShort(() -> autoHealing.onInterruption.accept(e)); } - } - finally { + } finally { HealRegistry.INSTANCE.remove(this, curThread, threadShouldDie); } if (threadShouldDie.get()) { // Notification done in HealRegistry, earlier diff --git a/src/main/java/eu/lucaventuri/fibry/Stereotypes.java b/src/main/java/eu/lucaventuri/fibry/Stereotypes.java index 9c1e606..b64ba55 100644 --- a/src/main/java/eu/lucaventuri/fibry/Stereotypes.java +++ b/src/main/java/eu/lucaventuri/fibry/Stereotypes.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.*; @@ -565,24 +566,46 @@ public SinkActorSingleTask schedule(Runnable run, long scheduleMs, long ma * Creates an actor that runs a Runnable with fixed delay; it optionally supports autoHealing */ public SinkActorSingleTask scheduleWithFixedDelay(Runnable run, long initialDelay, long delay, TimeUnit timeUnit, ActorSystem.AutoHealingSettings autoHealing) { - Actor actor = (Actor) sinkHealing((Void) null, autoHealing); + Actor actor = createActorForScheduling(initialDelay, timeUnit, autoHealing); + AtomicReference runThenWait = new AtomicReference<>(); - // Deadlock prevention - actor.setCloseStrategy(Exitable.CloseStrategy.ASK_EXIT); + runThenWait.set(() -> { + // As we are inside the event loop of the actor, we can only call execAsync + actor.execAsync(run); + actor.execAsyncNoHealing(() -> { + // When this runs, run.run() has been executed + SystemUtils.sleep(timeUnit.toMillis(delay)); } + ); + actor.execAsync(runThenWait.get()); + }); - if (initialDelay > 0) - actor.execAsync(() -> SystemUtils.sleep(timeUnit.toMillis(initialDelay))); + actor.execAsync(runThenWait.get()); + return actor; + } + /** + * Creates an actor that runs a Runnable with fixed delay; it optionally supports autoHealing + */ + public SinkActorSingleTask scheduleWithFixedRate(Runnable run, long initialDelay, long delay, TimeUnit timeUnit, ActorSystem.AutoHealingSettings autoHealing) { + Actor actor = createActorForScheduling(initialDelay, timeUnit, autoHealing); AtomicReference runThenWait = new AtomicReference<>(); - AtomicBoolean dummyThreadShouldDie = new AtomicBoolean(); + AtomicLong nextRun = new AtomicLong(System.currentTimeMillis() + timeUnit.toMillis(delay)); runThenWait.set(() -> { - actor.execAsync(() -> { - // Allow shorter timeout than the scheduling time - HealRegistry.INSTANCE.remove(actor, Thread.currentThread(), dummyThreadShouldDie); - SystemUtils.sleep(timeUnit.toMillis(delay)); }); + // TODO: It will not work well if the execution time exceeds the delay (e.g. if a task scheduled once an hour uses more than one hour to run) + // As we are inside the event loop of the actor, we can only call execAsync actor.execAsync(run); + actor.execAsyncNoHealing(() -> { + // When this runs, run.run() has been executed + + long wait = nextRun.get() - System.currentTimeMillis(); + + // TODO: in case of overlap, should we skip? Should we have a policy for that? CHeck Java? + nextRun.set(nextRun.get() + timeUnit.toMillis(delay)); + if (wait > 0) + SystemUtils.sleep(timeUnit.toMillis(wait)); } + ); actor.execAsync(runThenWait.get()); }); @@ -591,6 +614,17 @@ public SinkActorSingleTask scheduleWithFixedDelay(Runnable run, long initi return actor; } + private Actor createActorForScheduling(long initialDelay, TimeUnit timeUnit, ActorSystem.AutoHealingSettings autoHealing) { + Actor actor = (Actor) sinkHealing((Void) null, autoHealing); + + // Deadlock prevention + actor.setCloseStrategy(Exitable.CloseStrategy.ASK_EXIT); + + if (initialDelay > 0) + actor.execAsync(() -> SystemUtils.sleep(timeUnit.toMillis(initialDelay))); + return actor; + } + /** * Creates an actor that can accept new TCP connections on the specified port and spawn a new actor for each connection. * This method will return after the server socket has been created. If the server socket cannot be created, it will throw an exception. @@ -894,16 +928,7 @@ public Actor, Void, Void> binaryDownloader() { public BaseActor rateLimited(Consumer actorLogic, int msBetweenCalls) { NamedStateActorCreator config = anonymous().initialState(null); - Consumer rateLimitedLogic = value -> { - long start = System.currentTimeMillis(); - - actorLogic.accept(value); - - long end = System.currentTimeMillis() - start; - - if (msBetweenCalls > (end - start)) - SystemUtils.sleep(msBetweenCalls - (end - start)); - }; + Consumer rateLimitedLogic = wrapWithRateLimiting(actorLogic, msBetweenCalls); return config.newActor(rateLimitedLogic); } @@ -911,19 +936,7 @@ public BaseActor rateLimited(Consumer actorLogic, int msBe public BaseActor rateLimitedReturn(Function actorLogic, int msBetweenCalls) { NamedStateActorCreator config = anonymous().initialState(null); - Function rateLimitedLogic = value -> { - long start = System.currentTimeMillis(); - - R result = actorLogic.apply(value); - - long end = System.currentTimeMillis(); - - if (msBetweenCalls > (end - start)) { - SystemUtils.sleep(msBetweenCalls - (end - start)); - } - - return result; - }; + Function rateLimitedLogic = wrapWithRateLimitingReturn(actorLogic, msBetweenCalls); return config.newActorWithReturn(rateLimitedLogic); } @@ -1152,6 +1165,35 @@ private NamedStrategyActorCreator named(String name) { } } + public static Consumer wrapWithRateLimiting(Consumer actorLogic, int msBetweenCalls) { + return value -> { + long start = System.currentTimeMillis(); + + actorLogic.accept(value); + + long end = System.currentTimeMillis() - start; + + if (msBetweenCalls > (end - start)) + SystemUtils.sleep(msBetweenCalls - (end - start)); + }; + } + + public static Function wrapWithRateLimitingReturn(Function actorLogic, int msBetweenCalls) { + return value -> { + long start = System.currentTimeMillis(); + + R result = actorLogic.apply(value); + + long end = System.currentTimeMillis(); + + if (msBetweenCalls > (end - start)) { + SystemUtils.sleep(msBetweenCalls - (end - start)); + } + + return result; + }; + } + private static void manageHttpClientResponse(Scheduler scheduler, boolean sendLastError, AtomicReference, Void, Void>> refActor, HttpUrlDownload download, HttpResponse response, Throwable ex) { if (ex != null) { if (download.numRetries > 0) diff --git a/src/test/java/eu/lucaventuri/fibry/TestAutoHealing.java b/src/test/java/eu/lucaventuri/fibry/TestAutoHealing.java index 18bdd23..f285c2c 100644 --- a/src/test/java/eu/lucaventuri/fibry/TestAutoHealing.java +++ b/src/test/java/eu/lucaventuri/fibry/TestAutoHealing.java @@ -186,8 +186,10 @@ public void testSchedulingHealing2() throws Exception { AtomicInteger count = new AtomicInteger(); AtomicInteger interruptions = new AtomicInteger(); AtomicInteger recreations = new AtomicInteger(); + long start = System.currentTimeMillis(); try (SinkActorSingleTask actor = Stereotypes.threads().scheduleWithFixedDelay(() -> { + System.out.println("Call " + calls.get() + "... " + (System.currentTimeMillis() - start)); calls.incrementAndGet(); if (count.get() < 3) { long ms = 2000 / calls.get(); @@ -195,6 +197,7 @@ public void testSchedulingHealing2() throws Exception { SystemUtils.sleepEnsure(ms); latch.countDown(); count.incrementAndGet(); + System.out.println("Done waiting " + ms); } }, 0, 1, TimeUnit.MILLISECONDS, new ActorSystem.AutoHealingSettings(1, 5, e -> interruptions.incrementAndGet(), recreations::incrementAndGet))) { latch.await(); @@ -208,7 +211,7 @@ public void testSchedulingHealing2() throws Exception { SystemUtils.sleep(1200); - Assert.assertEquals(4, calls.get()); + Assert.assertEquals(3, calls.get()); Assert.assertEquals(3, count.get()); Assert.assertEquals(0, interruptions.get()); Assert.assertEquals(1, recreations.get());