diff --git a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java index 8c35ca8f..7040ecbf 100644 --- a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java +++ b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java @@ -11,6 +11,7 @@ import java.time.temporal.ChronoUnit; import java.util.UUID; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -283,14 +284,27 @@ private void materializeEvents(UUID worker) { worker, workers.getTimestamp(worker).toEpochMilli(), endTimestamp.get()); recordOffsetMetric(); + // We end the stream exactly after the first timestamp we encounter + // after the set target end timestamp. + AtomicLong shouldEndAfter = new AtomicLong(Long.MAX_VALUE); + loadEvents(workers.getTimestamp(worker)) .takeWhile(e -> { long end = endTimestamp.get(); + long t = timestampOf(e).toEpochMilli(); if (end == -1) { + // no end timestamp has been set return true; - } else { - return timestampOf(e).toEpochMilli() < end; + } else if (t > shouldEndAfter.get()) { + return false; + } else if (t >= end) { + shouldEndAfter.set(t); + } + // Race condition for if we've updated endTimestamp after reaching shouldEndAfter + if (endTimestamp.get() > shouldEndAfter.get()) { + shouldEndAfter.set(Long.MAX_VALUE); } + return true; }) // get a Seq of where each Seq has the same timestamp, or emit buffer after [rollback] // (assuming no events with that timestamp after that) diff --git a/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerActorSpec.java b/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerActorSpec.java index 7ca69f3c..5b60819d 100644 --- a/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerActorSpec.java +++ b/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerActorSpec.java @@ -5,14 +5,18 @@ import static org.forgerock.cuppa.Cuppa.describe; import static org.forgerock.cuppa.Cuppa.it; import static org.forgerock.cuppa.Cuppa.when; +import static org.forgerock.cuppa.Cuppa.only; import java.time.Duration; import java.time.Instant; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import com.tradeshift.reaktive.materialize.MaterializerActor.CreateWorker; +import com.tradeshift.reaktive.materialize.MaterializerActor.Progress; import com.tradeshift.reaktive.testkit.SharedActorSystemSpec; +import static com.tradeshift.reaktive.testkit.Await.eventuallyDo; import com.typesafe.config.ConfigFactory; import org.forgerock.cuppa.junit.CuppaRunner; @@ -75,10 +79,20 @@ private void assertReceiveOutOfOrder(Iterable events) { it("runs concurrently if a second worker is started", () -> { ActorRef actor = system.actorOf(Props.create(TestActor.class, () -> new TestActor(Source.from(events), materialized.getRef()))); - actor.tell(new CreateWorker(Instant.ofEpochMilli(1000000 + (N/2) * 1000), none()), system.deadLetters()); + actor.tell(new CreateWorker(Instant.ofEpochMilli(1000000 + (N/3) * 1000 + 176), none()), system.deadLetters()); + actor.tell(new CreateWorker(Instant.ofEpochMilli(1000000 + (N*2/3) * 1000 + 176), none()), system.deadLetters()); assertReceiveOutOfOrder(events); + TestKit sender = new TestKit(system); + + eventuallyDo(() -> { + // Intermediate worker must have stopped. + sender.send(actor, MaterializerActor.QueryProgress.instance); + Progress p = sender.expectMsgClass(Progress.class); + assertThat(p.getWorkers()).hasSize(1); + }); + system.stop(actor); }); }); @@ -97,6 +111,32 @@ private void assertReceiveOutOfOrder(Iterable events) { system.stop(actor); }); + + it("runs concurrently if more workers are started", () -> { + ActorRef actor = system.actorOf(Props.create(TestActor.class, () -> + new TestActor(Source.from(events), materialized.getRef()))); + actor.tell(new CreateWorker(Instant.ofEpochMilli(1000000 + (N/3) * 1000 + 176), none()), system.deadLetters()); + actor.tell(new CreateWorker(Instant.ofEpochMilli(1000000 + (N*2/3) * 1000 + 176), none()), system.deadLetters()); + + assertReceiveOutOfOrder(events); + + TestKit sender = new TestKit(system); + + eventuallyDo(() -> { + // Intermediate worker must have stopped. + sender.send(actor, MaterializerActor.QueryProgress.instance); + Progress p = sender.expectMsgClass(Progress.class); + assertThat(p.getWorkers()).hasSize(1); + }); + + system.stop(actor); + }); + }); + + when("importing sparse events", () -> { + it("should complete a worker started in the past", () -> { + + }); }); }); }