Skip to content

Commit

Permalink
[Java] Try to make Archive bounded replay test less intermittent.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Oct 23, 2023
1 parent ab7530d commit 0cf0c44
Showing 1 changed file with 32 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static io.aeron.Aeron.NULL_VALUE;
import static io.aeron.archive.ArchiveSystemTests.*;
Expand Down Expand Up @@ -674,46 +676,54 @@ public void shouldRecordThenBoundReplayWithCounter()

final String channel = new ChannelUriStringBuilder(REPLAY_CHANNEL).sessionId((int)replaySessionId).build();

long subscriptionPosition = 0;
final MutableReference<Image> replayImage = new MutableReference<>();
final AtomicReference<Image> replayImage = new AtomicReference<>();
try (Subscription replaySubscription = aeron.addSubscription(
channel, REPLAY_STREAM_ID, replayImage::set, image -> {}))
{
boundingCounter.setOrdered(halfPosition);

final long halfPollDeadline = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < halfPollDeadline)
while (null == replayImage.get())
{
Tests.yieldingIdle("replay image did not become available");
}

final Supplier<String> halfErrorMessage =
() -> "replayImage.position(" + replayImage.get().position() + ") < halfPosition(" + halfPosition + ")";

while (replayImage.get().position() < halfPosition)
{
if (0 < replaySubscription.poll((buffer, offset, length, header) -> {}, 20))
if (0 == replaySubscription.poll((buffer, offset, length, header) -> {}, 20))
{
if (null != replayImage.get())
{
subscriptionPosition = replayImage.get().position();
}
Tests.yieldingIdle(halfErrorMessage);
}

assertThat(subscriptionPosition, Matchers.lessThanOrEqualTo(halfPosition));
}

assertThat(subscriptionPosition, Matchers.greaterThan(0L));
final long halfPollDeadline = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < halfPollDeadline)
{
replaySubscription.poll((buffer, offset, length, header) -> {}, 20);
assertThat(replayImage.get().position(), Matchers.lessThanOrEqualTo(halfPosition));
}

boundingCounter.setOrdered(tqPosition);

final long tqPollDeadline = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < tqPollDeadline)
final Supplier<String> tqErrorMessage =
() -> "replayImage.position(" + replayImage.get().position() + ") < tqPosition(" + tqPosition + ")";

while (replayImage.get().position() < tqPosition)
{
if (0 < replaySubscription.poll((buffer, offset, length, header) -> {}, 20))
if (0 == replaySubscription.poll((buffer, offset, length, header) -> {}, 20))
{
if (null != replayImage.get())
{
subscriptionPosition = replayImage.get().position();
}
Tests.yieldingIdle(tqErrorMessage);
}

assertThat(subscriptionPosition, Matchers.lessThanOrEqualTo(tqPosition));
}

assertThat(subscriptionPosition, Matchers.greaterThan(halfPosition));
final long tqPollDeadline = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < tqPollDeadline)
{
replaySubscription.poll((buffer, offset, length, header) -> {}, 20);
assertThat(replayImage.get().position(), Matchers.lessThanOrEqualTo(tqPosition));
}
}
}

Expand Down

0 comments on commit 0cf0c44

Please sign in to comment.