From 0cf0c44f8b4635d6534152f16e34b7805a8bf383 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Tue, 24 Oct 2023 10:22:06 +1300 Subject: [PATCH] [Java] Try to make Archive bounded replay test less intermittent. --- .../io/aeron/archive/BasicArchiveTest.java | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java index 49a3494762..ffe3792ad3 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/BasicArchiveTest.java @@ -39,6 +39,8 @@ import java.io.File; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static io.aeron.Aeron.NULL_VALUE; import static io.aeron.archive.ArchiveSystemTests.*; @@ -674,46 +676,54 @@ public void shouldRecordThenBoundReplayWithCounter() final String channel = new ChannelUriStringBuilder(REPLAY_CHANNEL).sessionId((int)replaySessionId).build(); - long subscriptionPosition = 0; - final MutableReference replayImage = new MutableReference<>(); + final AtomicReference replayImage = new AtomicReference<>(); try (Subscription replaySubscription = aeron.addSubscription( channel, REPLAY_STREAM_ID, replayImage::set, image -> {})) { boundingCounter.setOrdered(halfPosition); - final long halfPollDeadline = System.currentTimeMillis() + timeout; - while (System.currentTimeMillis() < halfPollDeadline) + while (null == replayImage.get()) + { + Tests.yieldingIdle("replay image did not become available"); + } + + final Supplier halfErrorMessage = + () -> "replayImage.position(" + replayImage.get().position() + ") < halfPosition(" + halfPosition + ")"; + + while (replayImage.get().position() < halfPosition) { - if (0 < replaySubscription.poll((buffer, offset, length, header) -> {}, 20)) + if (0 == replaySubscription.poll((buffer, offset, length, header) -> {}, 20)) { - if (null != replayImage.get()) - { - subscriptionPosition = replayImage.get().position(); - } + Tests.yieldingIdle(halfErrorMessage); } - - assertThat(subscriptionPosition, Matchers.lessThanOrEqualTo(halfPosition)); } - assertThat(subscriptionPosition, Matchers.greaterThan(0L)); + final long halfPollDeadline = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < halfPollDeadline) + { + replaySubscription.poll((buffer, offset, length, header) -> {}, 20); + assertThat(replayImage.get().position(), Matchers.lessThanOrEqualTo(halfPosition)); + } boundingCounter.setOrdered(tqPosition); - final long tqPollDeadline = System.currentTimeMillis() + timeout; - while (System.currentTimeMillis() < tqPollDeadline) + final Supplier tqErrorMessage = + () -> "replayImage.position(" + replayImage.get().position() + ") < tqPosition(" + tqPosition + ")"; + + while (replayImage.get().position() < tqPosition) { - if (0 < replaySubscription.poll((buffer, offset, length, header) -> {}, 20)) + if (0 == replaySubscription.poll((buffer, offset, length, header) -> {}, 20)) { - if (null != replayImage.get()) - { - subscriptionPosition = replayImage.get().position(); - } + Tests.yieldingIdle(tqErrorMessage); } - - assertThat(subscriptionPosition, Matchers.lessThanOrEqualTo(tqPosition)); } - assertThat(subscriptionPosition, Matchers.greaterThan(halfPosition)); + final long tqPollDeadline = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < tqPollDeadline) + { + replaySubscription.poll((buffer, offset, length, header) -> {}, 20); + assertThat(replayImage.get().position(), Matchers.lessThanOrEqualTo(tqPosition)); + } } }