From 0ea7049336b472243ee72c9b0df724c168953fa4 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 23 Jan 2025 11:49:11 +0100 Subject: [PATCH 01/14] [Java] Update active loss gap upon length change. Gap length can increase when heartbeats advance the high-water mark position, or it can decrease if messages arrive out of order. In both cases we want to adjust the NAK range of the receiver. --- .../java/io/aeron/driver/LossDetector.java | 4 +- .../io/aeron/driver/LossDetectorTest.java | 249 +++++++++++++----- 2 files changed, 187 insertions(+), 66 deletions(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java index da66826e23..9996bfb9bc 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java +++ b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java @@ -91,7 +91,9 @@ public long scan( rebuildOffset = scanForGap(termBuffer, rebuildTermId, rebuildOffset, limitOffset, this); if (rebuildOffset < limitOffset) { - if (scannedTermOffset != activeTermOffset || scannedTermId != activeTermId) + if (scannedTermOffset != activeTermOffset || + scannedTermId != activeTermId || + scannedLength != activeLength) { activateGap(nowNs); lossFound = true; diff --git a/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java b/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java index 85376d7ec7..de06c1a2c9 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java @@ -27,6 +27,9 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import static io.aeron.driver.LossDetector.lossFound; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MIN_LENGTH; import static io.aeron.logbuffer.LogBufferDescriptor.computePosition; @@ -67,7 +70,7 @@ class LossDetectorTest private final LossHandler lossHandler = mock(LossHandler.class); private LossDetector lossDetector = new LossDetector(DELAY_GENERATOR, lossHandler); - private long currentTime = 0; + private long currentTimeNs = 0; { dataHeader.wrap(rcvBuffer); @@ -79,9 +82,11 @@ void shouldNotSendNakWhenBufferIsEmpty() final long rebuildPosition = ACTIVE_TERM_POSITION; final long hwmPosition = ACTIVE_TERM_POSITION; - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -96,9 +101,11 @@ void shouldNotNakIfNoMissingData() insertDataFrame(offsetOfMessage(1)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -112,11 +119,13 @@ void shouldNakMissingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -128,13 +137,16 @@ void shouldRetransmitNakForMissingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(30); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(60); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(30); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(60); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, atLeast(2)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler, atLeast(2)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -146,13 +158,16 @@ void shouldStopNakOnReceivingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(20); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(20); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -168,19 +183,23 @@ void shouldHandleMoreThan2Gaps() insertDataFrame(offsetOfMessage(4)); insertDataFrame(offsetOfMessage(6)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(80); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(80); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); final InOrder inOrder = inOrder(lossHandler); - inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); - inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); - inOrder.verify(lossHandler, never()).onGapDetected(TERM_ID, offsetOfMessage(5), gapLength()); + inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler, never()).onGapDetected(TERM_ID, offsetOfMessage(5), ALIGNED_FRAME_LENGTH); } @Test @@ -192,20 +211,24 @@ void shouldReplaceOldNakWithNewNak() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(20); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(20); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); insertDataFrame(offsetOfMessage(4)); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); hwmPosition = (ALIGNED_FRAME_LENGTH * 5L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); + verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); } @Test @@ -219,23 +242,27 @@ void shouldHandleLongerRetryDelay() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); - currentTime = TimeUnit.MILLISECONDS.toNanos(80); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(80); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoMoreInteractions(lossHandler); - currentTime = TimeUnit.MILLISECONDS.toNanos(240); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(240); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, times(2)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler, times(2)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -247,7 +274,8 @@ void shouldNotNakImmediatelyByDefault() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -263,12 +291,15 @@ void shouldOnlySendNaksOnceOnMultipleScans() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -282,9 +313,11 @@ void shouldHandleHwmGreaterThanCompletedBuffer() insertDataFrame(offsetOfMessage(0)); rebuildPosition += ALIGNED_FRAME_LENGTH; - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), TERM_BUFFER_LENGTH - (int)rebuildPosition); } @@ -300,14 +333,105 @@ void shouldHandleNonZeroInitialTermOffset() insertDataFrame(offsetOfMessage(2)); insertDataFrame(offsetOfMessage(4)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); verifyNoMoreInteractions(lossHandler); } + @Test + void shouldDetectChangesInTheGapLength() + { + lossDetector = getLossHandlerWithLongRetry(); + + final long rebuildPosition = ACTIVE_TERM_POSITION + (ALIGNED_FRAME_LENGTH * 3L); + + insertDataFrame(offsetOfMessage(2)); + insertDataFrame(offsetOfMessage(5)); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 32, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 32, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(200); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(300); + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(400); + assertFalse(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + insertDataFrame(offsetOfMessage(4)); + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(500); + assertFalse(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(600); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + final InOrder inOrder = inOrder(lossHandler); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 32); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 64); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH * 2); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 64); + inOrder.verifyNoMoreInteractions(); + } + private LossDetector getLossHandlerWithLongRetry() { return new LossDetector(DELAY_GENERATOR_WITH_LONGER_RETRY, lossHandler); @@ -339,9 +463,4 @@ private int offsetOfMessage(final int index) { return index * ALIGNED_FRAME_LENGTH; } - - private int gapLength() - { - return ALIGNED_FRAME_LENGTH; - } } From 192989ed1017b963ca94bc429fd5b1a9529fff08 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 23 Jan 2025 13:23:24 +0100 Subject: [PATCH 02/14] [Java] Update offset masking code to cast the result and not the position. --- aeron-driver/src/main/java/io/aeron/driver/LossDetector.java | 4 ++-- .../src/main/java/io/aeron/driver/PublicationImage.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java index 9996bfb9bc..60cc574975 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java +++ b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java @@ -77,7 +77,7 @@ public long scan( final int initialTermId) { boolean lossFound = false; - int rebuildOffset = (int)rebuildPosition & termLengthMask; + int rebuildOffset = (int)(rebuildPosition & termLengthMask); if (rebuildPosition < hwmPosition) { @@ -85,7 +85,7 @@ public long scan( final int hwmTermCount = (int)(hwmPosition >>> positionBitsToShift); final int rebuildTermId = initialTermId + rebuildTermCount; - final int hwmTermOffset = (int)hwmPosition & termLengthMask; + final int hwmTermOffset = (int)(hwmPosition & termLengthMask); final int limitOffset = rebuildTermCount == hwmTermCount ? hwmTermOffset : termLengthMask + 1; rebuildOffset = scanForGap(termBuffer, rebuildTermId, rebuildOffset, limitOffset, this); diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 385c8ab781..b227178143 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -561,7 +561,7 @@ int trackRebuild(final long nowNs) positionBitsToShift, initialTermId); - final int rebuildTermOffset = (int)rebuildPosition & termLengthMask; + final int rebuildTermOffset = (int)(rebuildPosition & termLengthMask); final long newRebuildPosition = (rebuildPosition - rebuildTermOffset) + rebuildOffset(scanOutcome); this.rebuildPosition.proposeMaxOrdered(newRebuildPosition); From 60e6047ecc1a999af922ab217dfee0f122c1a50b Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 23 Jan 2025 13:32:28 +0100 Subject: [PATCH 03/14] [Java] Read high-water mark position after computing the rebuild position to ensure the widest possible loss detection scan window. Loss detection code is concurrent to the application threads that are consuming from the image but also to the receiver thread that inserts new packets into the image and advances the high-water mark position counter. Reading that counter last ensures that we take into account all inserted packets up to this point. --- .../src/main/java/io/aeron/driver/PublicationImage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index b227178143..1d0a777845 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -540,7 +540,6 @@ int trackRebuild(final long nowNs) if (isRebuilding) { - final long hwmPosition = this.hwmPosition.getVolatile(); long minSubscriberPosition = Long.MAX_VALUE; long maxSubscriberPosition = 0; @@ -552,6 +551,7 @@ int trackRebuild(final long nowNs) } final long rebuildPosition = Math.max(this.rebuildPosition.get(), maxSubscriberPosition); + final long hwmPosition = this.hwmPosition.getVolatile(); final long scanOutcome = lossDetector.scan( termBuffers[indexByPosition(rebuildPosition, positionBitsToShift)], rebuildPosition, From 411f102a887869935b9147586712437523ff10e6 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 23 Jan 2025 14:33:29 +0100 Subject: [PATCH 04/14] [Java] Simplify `sendData` path by assuming that the window is almost never zero. Also do not track padding bytes as the "sent bytes". --- .../java/io/aeron/logbuffer/TermScanner.java | 3 +- .../io/aeron/driver/NetworkPublication.java | 46 +++++++------------ 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java b/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java index 4b6da23ac3..622334c575 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java @@ -46,7 +46,7 @@ public static long scanForAvailability( int available = 0; int padding = 0; - do + while (0 == padding && available < limit) { final int termOffset = offset + available; final int frameLength = frameLengthVolatile(termBuffer, termOffset); @@ -71,7 +71,6 @@ public static long scanForAvailability( break; } } - while (0 == padding && available < limit); return pack(padding, available); } diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index a6683ca556..4a0cd65055 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -783,43 +783,29 @@ void updateHasReceivers(final long timeNs) private int sendData(final long nowNs, final long senderPosition, final int termOffset) { int bytesSent = 0; - final int availableWindow = (int)(senderLimit.get() - senderPosition); - if (availableWindow > 0) + + final int scanLimit = Math.min((int)(senderLimit.get() - senderPosition), mtuLength); + final int activeIndex = indexByPosition(senderPosition, positionBitsToShift); + final long scanOutcome = scanForAvailability(termBuffers[activeIndex], termOffset, scanLimit); + final int available = available(scanOutcome); + if (available > 0) { - final int scanLimit = Math.min(availableWindow, mtuLength); - final int activeIndex = indexByPosition(senderPosition, positionBitsToShift); + final ByteBuffer sendBuffer = sendBuffers[activeIndex]; + sendBuffer.limit(termOffset + available).position(termOffset); - final long scanOutcome = scanForAvailability(termBuffers[activeIndex], termOffset, scanLimit); - final int available = available(scanOutcome); - if (available > 0) + if (available == doSend(sendBuffer)) { - final ByteBuffer sendBuffer = sendBuffers[activeIndex]; - sendBuffer.limit(termOffset + available).position(termOffset); - - if (available == doSend(sendBuffer)) - { - timeOfLastDataOrHeartbeatNs = nowNs; - trackSenderLimits = true; - - bytesSent = available + padding(scanOutcome); - this.senderPosition.setOrdered(senderPosition + bytesSent); - } - else - { - shortSends.increment(); - } + bytesSent = available; + timeOfLastDataOrHeartbeatNs = nowNs; + trackSenderLimits = true; + this.senderPosition.setOrdered(senderPosition + available + padding(scanOutcome)); } - else if (available < 0) + else { - if (trackSenderLimits) - { - trackSenderLimits = false; - senderBpe.incrementOrdered(); - senderFlowControlLimits.incrementOrdered(); - } + shortSends.increment(); } } - else if (trackSenderLimits) + else if (available < 0 && trackSenderLimits) { trackSenderLimits = false; senderBpe.incrementOrdered(); From 18562a764c08561f074b3a28b5fc07e0170cf7cf Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 23 Jan 2025 23:10:00 +0100 Subject: [PATCH 05/14] [Java] Take into account a trailing padding frame when adjusting high-water mark on the receiver. A single network packet might contain multiple frames (up to an MTU). When such packet contains only `HDR_TYPE_DATA` then packet length will correspond to the combined lengths of the frames it contains. However, if there is a trailing `HDR_TYPE_PAD` packet then the packet length will not be equal to the sum of the frame sizes. For example, given a packet containing two 80 byte messages (whose aligned length is 128 bytes each) and a padding for 224 bytes. The length of the packet will be 288 bytes (256 + 32) but the "frame size" will be 512 bytes (256 + 256). --- .../io/aeron/driver/PublicationImage.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 1d0a777845..b83535ed99 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -21,11 +21,13 @@ import io.aeron.driver.media.ReceiveDestinationTransport; import io.aeron.driver.reports.LossReport; import io.aeron.driver.status.SystemCounters; +import io.aeron.logbuffer.FrameDescriptor; import io.aeron.logbuffer.LogBufferDescriptor; import io.aeron.logbuffer.TermRebuilder; import io.aeron.protocol.DataHeaderFlyweight; import io.aeron.protocol.RttMeasurementFlyweight; import io.aeron.protocol.StatusMessageFlyweight; +import org.agrona.BitUtil; import org.agrona.CloseHelper; import org.agrona.ErrorHandler; import org.agrona.SystemUtil; @@ -117,6 +119,7 @@ enum State private static final VarHandle END_SM_CHANGE_VH; private static final VarHandle BEGIN_LOSS_CHANGE_VH; private static final VarHandle END_LOSS_CHANGE_VH; + static { try @@ -618,7 +621,15 @@ int insertPacket( final boolean isHeartbeat = DataHeaderFlyweight.isHeartbeat(buffer, length); final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, initialTermId); - final long proposedPosition = isHeartbeat ? packetPosition : packetPosition + length; + final long proposedPosition; + if (isHeartbeat) + { + proposedPosition = packetPosition; + } + else + { + proposedPosition = packetPosition + computeFullPacketLength(buffer, length); + } if (!isFlowControlOverRun(proposedPosition)) { @@ -952,6 +963,21 @@ void stopStatusMessagesIfNotActive() } } + private static int computeFullPacketLength(final UnsafeBuffer buffer, final int packetLength) + { + int offset = 0; + while (offset < packetLength) + { + final int frameLength = FrameDescriptor.frameLength(buffer, offset); + if (frameLength <= 0) + { + break; + } + offset += BitUtil.align(frameLength, FrameDescriptor.FRAME_ALIGNMENT); + } + return offset; + } + private void state(final State state) { this.state = state; From f33e0cac6391eba83299212ce8a10c6bf69d0fc5 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Fri, 24 Jan 2025 12:06:37 +0100 Subject: [PATCH 06/14] [Java] Simplify term gap scanning logic by removing `ALIGNED_HEADER_LENGTH` and streamlining non-zero frame lookup. --- .../io/aeron/logbuffer/TermGapScanner.java | 15 +++----- .../aeron/logbuffer/TermGapScannerTest.java | 37 +++++++++++++++++++ 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java index 38a67d0656..23425c66d8 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java @@ -15,7 +15,6 @@ */ package io.aeron.logbuffer; -import org.agrona.BitUtil; import org.agrona.concurrent.UnsafeBuffer; import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; @@ -31,8 +30,6 @@ */ public class TermGapScanner { - private static final int ALIGNED_HEADER_LENGTH = BitUtil.align(HEADER_LENGTH, FRAME_ALIGNMENT); - /** * Handler for notifying of gaps in the log. */ @@ -82,19 +79,17 @@ public static int scanForGap( final int gapBeginOffset = offset; if (offset < limitOffset) { - final int limit = limitOffset - ALIGNED_HEADER_LENGTH; - while (offset < limit) + offset += HEADER_LENGTH; + while (offset < limitOffset) { - offset += FRAME_ALIGNMENT; - - if (0 != termBuffer.getIntVolatile(offset)) + if (0 != frameLengthVolatile(termBuffer, offset)) { - offset -= ALIGNED_HEADER_LENGTH; break; } + offset += HEADER_LENGTH; } - final int gapLength = (offset - gapBeginOffset) + ALIGNED_HEADER_LENGTH; + final int gapLength = offset - gapBeginOffset; handler.onGap(termId, gapBeginOffset, gapLength); } diff --git a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java index f8e3c40a04..8b0e6adfd5 100644 --- a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java +++ b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java @@ -24,6 +24,7 @@ import static org.agrona.BitUtil.align; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verifyNoMoreInteractions; class TermGapScannerTest { @@ -51,6 +52,7 @@ void shouldReportGapAtBeginningOfBuffer() assertEquals(0, TermGapScanner.scanForGap(termBuffer, TERM_ID, 0, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, 0, frameOffset); + verifyNoMoreInteractions(gapHandler); } @Test @@ -67,6 +69,7 @@ void shouldReportSingleGapWhenBufferNotFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -83,6 +86,7 @@ void shouldReportSingleGapWhenBufferIsFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -100,4 +104,37 @@ void shouldReportNoGapWhenHwmIsInPadding() verifyNoInteractions(gapHandler); } + + @Test + void shouldReportSingleHeaderGap() + { + final int offset = 8192 + 384; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + when(termBuffer.getIntVolatile(offset + HEADER_LENGTH)).thenReturn(128); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, HEADER_LENGTH); + verifyNoMoreInteractions(gapHandler, termBuffer); + } + + @Test + void shouldReportGapAtTheEndOfTheBuffer() + { + final int offset = LOG_BUFFER_CAPACITY - 128; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 2 * HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 3 * HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, 128); + verifyNoMoreInteractions(gapHandler, termBuffer); + } } From a6c9d7d253e3372d6eeb7c2824dcae46a130f486 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 27 Jan 2025 12:31:25 +0100 Subject: [PATCH 07/14] [Java] Add a test for `rcv-hwm` position update when a packet contains trailing padding frame + rename method that computes the target position offset. --- .../io/aeron/driver/PublicationImage.java | 14 +- .../io/aeron/driver/PublicationImageTest.java | 249 ++++++++++++++++++ 2 files changed, 257 insertions(+), 6 deletions(-) create mode 100644 aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index b83535ed99..fb69ecce70 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -628,7 +628,7 @@ int insertPacket( } else { - proposedPosition = packetPosition + computeFullPacketLength(buffer, length); + proposedPosition = packetPosition + computeActualFrameLength(buffer, length); } if (!isFlowControlOverRun(proposedPosition)) @@ -642,12 +642,12 @@ int insertPacket( { final long nowNs = cachedNanoClock.nanoTime(); timeOfLastPacketNs = nowNs; - trackConnection(transportIndex, srcAddress, nowNs); + final ImageConnection imageConnection = trackConnection(transportIndex, srcAddress, nowNs); if (isEndOfStream) { - imageConnections[transportIndex].eosPosition = packetPosition; - imageConnections[transportIndex].isEos = true; + imageConnection.eosPosition = packetPosition; + imageConnection.isEos = true; if (!this.isEndOfStream && isAllConnectedEos()) { @@ -963,7 +963,7 @@ void stopStatusMessagesIfNotActive() } } - private static int computeFullPacketLength(final UnsafeBuffer buffer, final int packetLength) + private static int computeActualFrameLength(final UnsafeBuffer buffer, final int packetLength) { int offset = 0; while (offset < packetLength) @@ -1043,7 +1043,8 @@ private void cleanBufferTo(final long position) } } - private void trackConnection(final int transportIndex, final InetSocketAddress srcAddress, final long nowNs) + private ImageConnection trackConnection( + final int transportIndex, final InetSocketAddress srcAddress, final long nowNs) { imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1); ImageConnection imageConnection = imageConnections[transportIndex]; @@ -1056,6 +1057,7 @@ private void trackConnection(final int transportIndex, final InetSocketAddress s imageConnection.timeOfLastActivityNs = nowNs; imageConnection.timeOfLastFrameNs = nowNs; + return imageConnection; } private boolean isAllConnectedEos() diff --git a/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java new file mode 100644 index 0000000000..fbdc1b2449 --- /dev/null +++ b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2014-2025 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.driver; + +import io.aeron.ChannelUri; +import io.aeron.driver.buffer.RawLog; +import io.aeron.driver.media.ReceiveChannelEndpoint; +import io.aeron.driver.media.UdpChannel; +import io.aeron.driver.status.ReceiverHwm; +import io.aeron.driver.status.ReceiverPos; +import io.aeron.driver.status.SystemCounterDescriptor; +import io.aeron.driver.status.SystemCounters; +import io.aeron.logbuffer.FrameDescriptor; +import io.aeron.protocol.DataHeaderFlyweight; +import org.agrona.BitUtil; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.concurrent.CachedEpochClock; +import org.agrona.concurrent.CachedNanoClock; +import org.agrona.concurrent.UnsafeBuffer; +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersManager; +import org.agrona.concurrent.status.Position; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static io.aeron.logbuffer.LogBufferDescriptor.*; +import static io.aeron.protocol.DataHeaderFlyweight.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PublicationImageTest +{ + private static final int TERM_LENGTH = 64 * 1024; + private static final int INITIAL_WINDOW_LENGTH = 128 * 1024; + private static final int MAX_WINDOW_LENGHT = 1024 * 1024; + private static final long CORRELATION_ID = 42; + private static final int TRANSPORT_INDEX = 3; + private static final int SESSION_ID = 888; + private static final int STREAM_ID = 101010; + private static final int INITIAL_TERM_ID = -444666; + private static final int ACTIVE_TERM_ID = INITIAL_TERM_ID + 111; + private static final int TERM_OFFSET = TERM_LENGTH - TERM_LENGTH / 4; + private static final short FLAGS = FrameDescriptor.UNFRAGMENTED; + private static final String SOURCE_IDENTITY = "aeron:udp?endpoint=localhost:5555"; + private final MediaDriver.Context ctx = new MediaDriver.Context(); + private final ReceiveChannelEndpoint receiveChannelEndpoint = mock(ReceiveChannelEndpoint.class); + private final InetSocketAddress controlAddress = mock(InetSocketAddress.class); + private final RawLog rawLog = mock(RawLog.class); + private final FeedbackDelayGenerator feedbackDelayGenerator = mock(FeedbackDelayGenerator.class); + private final CongestionControl congestionControl = mock(CongestionControl.class); + private final CachedEpochClock epochClock = new CachedEpochClock(); + private final CachedNanoClock nanoClock = new CachedNanoClock(); + private final UnsafeBuffer buffer = new UnsafeBuffer(new byte[1024]); + private final CountersManager countersManager = new CountersManager( + new UnsafeBuffer(ByteBuffer.allocateDirect(256 * 1024)), + new UnsafeBuffer(ByteBuffer.allocateDirect(64 * 1024)), + StandardCharsets.US_ASCII); + private final DataHeaderFlyweight headerFlyweight = new DataHeaderFlyweight(); + private Position hwmPosition; + private Position rcvPosition; + private PublicationImage image; + + @BeforeEach + void before() + { + epochClock.update(TimeUnit.HOURS.toMillis(1)); + nanoClock.update(TimeUnit.HOURS.toNanos(1)); + ctx + .receiverCachedNanoClock(nanoClock) + .nanoClock(nanoClock) + .epochClock(epochClock) + .imageLivenessTimeoutNs(TimeUnit.SECONDS.toNanos(10)) + .untetheredWindowLimitTimeoutNs(TimeUnit.SECONDS.toNanos(1)) + .untetheredRestingTimeoutNs(TimeUnit.SECONDS.toNanos(1)) + .statusMessageTimeoutNs(TimeUnit.MILLISECONDS.toNanos(150)) + .systemCounters(new SystemCounters(countersManager)); + + final String channel = "aeron:udp?endpoint=localhost:5555"; + final ChannelUri channelUri = ChannelUri.parse(channel); + final UdpChannel udpChannel = mock(UdpChannel.class); + when(udpChannel.channelUri()).thenReturn(channelUri); + when(receiveChannelEndpoint.subscriptionUdpChannel()).thenReturn(udpChannel); + + final SubscriptionLink subscriptionLink1 = mock(SubscriptionLink.class); + when(subscriptionLink1.isReliable()).thenReturn(true); + when(subscriptionLink1.isTether()).thenReturn(true); + final SubscriberPosition subscriberPosition1 = mock(SubscriberPosition.class); + when(subscriberPosition1.subscription()).thenReturn(subscriptionLink1); + final SubscriptionLink subscriptionLink2 = mock(SubscriptionLink.class); + when(subscriptionLink1.isReliable()).thenReturn(false); + when(subscriptionLink1.isTether()).thenReturn(false); + final SubscriberPosition subscriberPosition2 = mock(SubscriberPosition.class); + when(subscriberPosition2.subscription()).thenReturn(subscriptionLink2); + final ArrayList subscriberPositions = new ArrayList<>(); + subscriberPositions.add(subscriberPosition1); + subscriberPositions.add(subscriberPosition2); + + final UnsafeBuffer[] termBuffers = new UnsafeBuffer[PARTITION_COUNT]; + for (int i = 0; i < termBuffers.length; i++) + { + termBuffers[i] = new UnsafeBuffer(new byte[TERM_LENGTH]); + } + when(rawLog.termBuffers()).thenReturn(termBuffers); + when(rawLog.metaData()).thenReturn(new UnsafeBuffer(new byte[LOG_META_DATA_LENGTH])); + when(rawLog.termLength()).thenReturn(TERM_LENGTH); + + when(congestionControl.initialWindowLength()).thenReturn(INITIAL_WINDOW_LENGTH); + when(congestionControl.maxWindowLength()).thenReturn(MAX_WINDOW_LENGHT); + + final long registrationId = 73249234983274L; + final ExpandableArrayBuffer tempBuffer = new ExpandableArrayBuffer(); + hwmPosition = ReceiverHwm.allocate(tempBuffer, countersManager, registrationId, SESSION_ID, STREAM_ID, channel); + rcvPosition = ReceiverPos.allocate( + tempBuffer, countersManager, registrationId, SESSION_ID, STREAM_ID, channel); + + image = new PublicationImage( + CORRELATION_ID, + ctx, + receiveChannelEndpoint, + TRANSPORT_INDEX, + controlAddress, + SESSION_ID, + STREAM_ID, + INITIAL_TERM_ID, + ACTIVE_TERM_ID, + TERM_OFFSET, + FLAGS, + rawLog, + feedbackDelayGenerator, + subscriberPositions, + hwmPosition, + rcvPosition, + SOURCE_IDENTITY, + congestionControl); + + final long position = computePosition( + ACTIVE_TERM_ID, TERM_OFFSET, positionBitsToShift(TERM_LENGTH), INITIAL_TERM_ID); + assertEquals(position, hwmPosition.get()); + assertEquals(position, rcvPosition.get()); + + ThreadLocalRandom.current().nextBytes(buffer.byteArray()); + } + + @Test + void shouldTakeIntoAccountTrailingPaddingFrameWhenIncrementingHighWaterMarkPosition() + { + final int totalLength = 512; + final int packetLength = 288; + final int termId = ACTIVE_TERM_ID; + final int termOffset = TERM_LENGTH - totalLength; + int offset = 0; + offset += writeFrame(offset, termOffset, termId, 65, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, 65); + offset += writeFrame(offset, termOffset + offset, termId, 96, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, 96); + offset += writeFrame(offset, termOffset + offset, termId, 224, BEGIN_AND_END_FLAGS, HDR_TYPE_PAD, 0x888AA888); + assertEquals(totalLength, offset); + final InetSocketAddress srcAddress = mock(InetSocketAddress.class); + + final int bytes = image.insertPacket(termId, termOffset, buffer, packetLength, TRANSPORT_INDEX, srcAddress); + + assertEquals(packetLength, bytes); + final int positionBitsToShift = positionBitsToShift(TERM_LENGTH); + final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, INITIAL_TERM_ID); + assertEquals(packetPosition + totalLength, hwmPosition.get()); + final UnsafeBuffer activeTermBuffer = + rawLog.termBuffers()[indexByPosition(packetPosition, positionBitsToShift)]; + for (int i = 0; i < packetLength; i++) + { + assertEquals(buffer.getByte(i), activeTermBuffer.getByte(termOffset + i)); + } + for (int i = packetLength; i < totalLength; i++) + { + assertEquals(0, activeTermBuffer.getByte(termOffset + i)); + } + } + + @Test + void shouldAdvanceHighWaterMarkPositionOnHeartbeat() + { + final int termId = ACTIVE_TERM_ID; + final int termOffset = TERM_OFFSET + 1024; + writeFrame(0, termOffset, termId, 0, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, -1); + FrameDescriptor.frameLengthOrdered(buffer, 0, 0); + final InetSocketAddress srcAddress = mock(InetSocketAddress.class); + final int packetLength = HEADER_LENGTH; + final AtomicCounter heartBeatsCounter = ctx.systemCounters().get(SystemCounterDescriptor.HEARTBEATS_RECEIVED); + final long oldHeartBeatCount = heartBeatsCounter.getWeak(); + + final int bytes = image.insertPacket(termId, termOffset, buffer, packetLength, TRANSPORT_INDEX, srcAddress); + + assertEquals(packetLength, bytes); + final int positionBitsToShift = positionBitsToShift(TERM_LENGTH); + final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, INITIAL_TERM_ID); + assertEquals(packetPosition, hwmPosition.get()); + assertEquals(oldHeartBeatCount + 1, heartBeatsCounter.getWeak()); + final UnsafeBuffer activeTermBuffer = + rawLog.termBuffers()[indexByPosition(packetPosition, positionBitsToShift)]; + for (int i = 0; i < packetLength; i++) + { + assertEquals(0, activeTermBuffer.getByte(termOffset + i)); + } + } + + private int writeFrame( + final int offset, + final int termOffset, + final int termId, + final int length, + final short flags, + final int type, + final int reservedValue) + { + final int frameLength = length + HEADER_LENGTH; + headerFlyweight.wrap(buffer, offset, frameLength); + headerFlyweight + .frameLength(frameLength) + .version(CURRENT_VERSION) + .flags(flags) + .headerType(type); + headerFlyweight + .termOffset(termOffset) + .sessionId(SESSION_ID) + .streamId(STREAM_ID) + .termId(termId) + .reservedValue(reservedValue); + + return BitUtil.align(frameLength, FrameDescriptor.FRAME_ALIGNMENT); + } +} From 9b8229ceeda8d48a3435c1a11cd2e617347aaf20 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Tue, 28 Jan 2025 12:33:52 +0100 Subject: [PATCH 08/14] [Java] Increment `snd-bpe` if window is zero or too small to fit a frame of data. --- .../src/main/java/io/aeron/driver/NetworkPublication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index 4a0cd65055..5d82f21c4b 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -805,7 +805,7 @@ private int sendData(final long nowNs, final long senderPosition, final int term shortSends.increment(); } } - else if (available < 0 && trackSenderLimits) + else if (trackSenderLimits) { trackSenderLimits = false; senderBpe.incrementOrdered(); From 1bf37d749f37305f03372a8b20b7acf0753d5e43 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Tue, 28 Jan 2025 20:25:02 +0100 Subject: [PATCH 09/14] [Java] Do not take into account actual padding size as the `rcv-hwm` will advance on next heartbeat/data frame anyway. --- .../io/aeron/driver/PublicationImage.java | 27 +------------------ .../io/aeron/driver/PublicationImageTest.java | 4 +-- 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index fb69ecce70..f5b014824c 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -21,13 +21,11 @@ import io.aeron.driver.media.ReceiveDestinationTransport; import io.aeron.driver.reports.LossReport; import io.aeron.driver.status.SystemCounters; -import io.aeron.logbuffer.FrameDescriptor; import io.aeron.logbuffer.LogBufferDescriptor; import io.aeron.logbuffer.TermRebuilder; import io.aeron.protocol.DataHeaderFlyweight; import io.aeron.protocol.RttMeasurementFlyweight; import io.aeron.protocol.StatusMessageFlyweight; -import org.agrona.BitUtil; import org.agrona.CloseHelper; import org.agrona.ErrorHandler; import org.agrona.SystemUtil; @@ -621,15 +619,7 @@ int insertPacket( final boolean isHeartbeat = DataHeaderFlyweight.isHeartbeat(buffer, length); final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, initialTermId); - final long proposedPosition; - if (isHeartbeat) - { - proposedPosition = packetPosition; - } - else - { - proposedPosition = packetPosition + computeActualFrameLength(buffer, length); - } + final long proposedPosition = isHeartbeat ? packetPosition : packetPosition + length; if (!isFlowControlOverRun(proposedPosition)) { @@ -963,21 +953,6 @@ void stopStatusMessagesIfNotActive() } } - private static int computeActualFrameLength(final UnsafeBuffer buffer, final int packetLength) - { - int offset = 0; - while (offset < packetLength) - { - final int frameLength = FrameDescriptor.frameLength(buffer, offset); - if (frameLength <= 0) - { - break; - } - offset += BitUtil.align(frameLength, FrameDescriptor.FRAME_ALIGNMENT); - } - return offset; - } - private void state(final State state) { this.state = state; diff --git a/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java index fbdc1b2449..c2cf747b54 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java @@ -163,7 +163,7 @@ void before() } @Test - void shouldTakeIntoAccountTrailingPaddingFrameWhenIncrementingHighWaterMarkPosition() + void shouldAdvanceHighWaterMarkByPacketLengthWhenItContainsPaddingFrame() { final int totalLength = 512; final int packetLength = 288; @@ -181,7 +181,7 @@ void shouldTakeIntoAccountTrailingPaddingFrameWhenIncrementingHighWaterMarkPosit assertEquals(packetLength, bytes); final int positionBitsToShift = positionBitsToShift(TERM_LENGTH); final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, INITIAL_TERM_ID); - assertEquals(packetPosition + totalLength, hwmPosition.get()); + assertEquals(packetPosition + packetLength, hwmPosition.get()); final UnsafeBuffer activeTermBuffer = rawLog.termBuffers()[indexByPosition(packetPosition, positionBitsToShift)]; for (int i = 0; i < packetLength; i++) From 063059e08e8f99a19b4b2232c772773c464b14ce Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:42:46 +0100 Subject: [PATCH 10/14] Revert "[Java] Increment `snd-bpe` if window is zero or too small to fit a frame of data." This reverts commit 9b8229ceeda8d48a3435c1a11cd2e617347aaf20. --- .../src/main/java/io/aeron/driver/NetworkPublication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index 5d82f21c4b..4a0cd65055 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -805,7 +805,7 @@ private int sendData(final long nowNs, final long senderPosition, final int term shortSends.increment(); } } - else if (trackSenderLimits) + else if (available < 0 && trackSenderLimits) { trackSenderLimits = false; senderBpe.incrementOrdered(); From ea08a3a8a33da0dfbb6294bab55eeb8a060b3c96 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:42:59 +0100 Subject: [PATCH 11/14] Revert "[Java] Simplify `sendData` path by assuming that the window is almost never zero. Also do not track padding bytes as the "sent bytes"." This reverts commit 411f102a887869935b9147586712437523ff10e6. --- .../java/io/aeron/logbuffer/TermScanner.java | 3 +- .../io/aeron/driver/NetworkPublication.java | 46 ++++++++++++------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java b/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java index 622334c575..4b6da23ac3 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/TermScanner.java @@ -46,7 +46,7 @@ public static long scanForAvailability( int available = 0; int padding = 0; - while (0 == padding && available < limit) + do { final int termOffset = offset + available; final int frameLength = frameLengthVolatile(termBuffer, termOffset); @@ -71,6 +71,7 @@ public static long scanForAvailability( break; } } + while (0 == padding && available < limit); return pack(padding, available); } diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index 4a0cd65055..a6683ca556 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -783,29 +783,43 @@ void updateHasReceivers(final long timeNs) private int sendData(final long nowNs, final long senderPosition, final int termOffset) { int bytesSent = 0; - - final int scanLimit = Math.min((int)(senderLimit.get() - senderPosition), mtuLength); - final int activeIndex = indexByPosition(senderPosition, positionBitsToShift); - final long scanOutcome = scanForAvailability(termBuffers[activeIndex], termOffset, scanLimit); - final int available = available(scanOutcome); - if (available > 0) + final int availableWindow = (int)(senderLimit.get() - senderPosition); + if (availableWindow > 0) { - final ByteBuffer sendBuffer = sendBuffers[activeIndex]; - sendBuffer.limit(termOffset + available).position(termOffset); + final int scanLimit = Math.min(availableWindow, mtuLength); + final int activeIndex = indexByPosition(senderPosition, positionBitsToShift); - if (available == doSend(sendBuffer)) + final long scanOutcome = scanForAvailability(termBuffers[activeIndex], termOffset, scanLimit); + final int available = available(scanOutcome); + if (available > 0) { - bytesSent = available; - timeOfLastDataOrHeartbeatNs = nowNs; - trackSenderLimits = true; - this.senderPosition.setOrdered(senderPosition + available + padding(scanOutcome)); + final ByteBuffer sendBuffer = sendBuffers[activeIndex]; + sendBuffer.limit(termOffset + available).position(termOffset); + + if (available == doSend(sendBuffer)) + { + timeOfLastDataOrHeartbeatNs = nowNs; + trackSenderLimits = true; + + bytesSent = available + padding(scanOutcome); + this.senderPosition.setOrdered(senderPosition + bytesSent); + } + else + { + shortSends.increment(); + } } - else + else if (available < 0) { - shortSends.increment(); + if (trackSenderLimits) + { + trackSenderLimits = false; + senderBpe.incrementOrdered(); + senderFlowControlLimits.incrementOrdered(); + } } } - else if (available < 0 && trackSenderLimits) + else if (trackSenderLimits) { trackSenderLimits = false; senderBpe.incrementOrdered(); From 7f7f769ea2a33b12a95baab10c51fb88af12c5e2 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:46:30 +0100 Subject: [PATCH 12/14] Revert "[Java] Read high-water mark position after computing the rebuild position to ensure the widest possible loss detection scan window." This reverts commit 60e6047ecc1a999af922ab217dfee0f122c1a50b. --- .../src/main/java/io/aeron/driver/PublicationImage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index f5b014824c..0d694e8329 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -541,6 +541,7 @@ int trackRebuild(final long nowNs) if (isRebuilding) { + final long hwmPosition = this.hwmPosition.getVolatile(); long minSubscriberPosition = Long.MAX_VALUE; long maxSubscriberPosition = 0; @@ -552,7 +553,6 @@ int trackRebuild(final long nowNs) } final long rebuildPosition = Math.max(this.rebuildPosition.get(), maxSubscriberPosition); - final long hwmPosition = this.hwmPosition.getVolatile(); final long scanOutcome = lossDetector.scan( termBuffers[indexByPosition(rebuildPosition, positionBitsToShift)], rebuildPosition, From ff9186cc8efcb8684ca1dbfb64af980bba67bb30 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:06:05 +0100 Subject: [PATCH 13/14] [C] Simplify term gap scanning logic by removing `ALIGNED_HEADER_LENGTH` and streamlining non-zero frame lookup. --- .../c/concurrent/aeron_term_gap_scanner.h | 12 ++---- .../src/test/c/aeron_loss_detector_test.cpp | 39 +++++++++---------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h b/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h index 1c2fd35401..d72902d560 100644 --- a/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h +++ b/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h @@ -23,8 +23,6 @@ typedef void (*aeron_term_gap_scanner_on_gap_detected_func_t)(void *clientd, int32_t term_id, int32_t term_offset, size_t length); -#define AERON_ALIGNED_HEADER_LENGTH (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)) - inline int32_t aeron_term_gap_scanner_scan_for_gap( const uint8_t *buffer, int32_t term_id, @@ -53,23 +51,21 @@ inline int32_t aeron_term_gap_scanner_scan_for_gap( const int32_t gap_begin_offset = offset; if (offset < limit_offset) { - const int32_t limit = limit_offset - AERON_ALIGNED_HEADER_LENGTH; - while (offset < limit) + offset += AERON_DATA_HEADER_LENGTH; + while (offset < limit_offset) { - offset += AERON_LOGBUFFER_FRAME_ALIGNMENT; - aeron_frame_header_t *hdr = (aeron_frame_header_t *)(buffer + offset); int32_t frame_length; AERON_GET_ACQUIRE(frame_length, hdr->frame_length); if (0 != frame_length) { - offset -= AERON_ALIGNED_HEADER_LENGTH; break; } + offset += AERON_DATA_HEADER_LENGTH; } - const size_t gap_length = (offset - gap_begin_offset) + AERON_ALIGNED_HEADER_LENGTH; + const size_t gap_length = offset - gap_begin_offset; on_gap_detected(clientd, term_id, gap_begin_offset, gap_length); } diff --git a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp index ddc36c2388..0c5a6ef24d 100644 --- a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp +++ b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp @@ -25,7 +25,6 @@ extern "C" } #define CAPACITY (AERON_LOGBUFFER_TERM_MIN_LENGTH) -#define HEADER_LENGTH (AERON_DATA_HEADER_LENGTH) #define POSITION_BITS_TO_SHIFT (aeron_number_of_trailing_zeroes(CAPACITY)) #define MASK (CAPACITY - 1) @@ -59,11 +58,11 @@ class TermGapScannerTest : public testing::Test TEST_F(TermGapScannerTest, shouldReportGapAtBeginningOfBuffer) { - const int32_t frame_offset = AERON_ALIGN((HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT); - const int32_t high_water_mark = frame_offset + AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t frame_offset = AERON_ALIGN((AERON_DATA_HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t high_water_mark = frame_offset + AERON_DATA_HEADER_LENGTH; auto *hdr = (aeron_frame_header_t *)(m_ptr + frame_offset); - hdr->frame_length = HEADER_LENGTH; + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -83,19 +82,19 @@ TEST_F(TermGapScannerTest, shouldReportGapAtBeginningOfBuffer) TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) { - const int32_t tail = AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t tail = AERON_DATA_HEADER_LENGTH; const int32_t high_water_mark = AERON_LOGBUFFER_FRAME_ALIGNMENT * 3; aeron_frame_header_t *hdr; - hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = 0; - hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -103,7 +102,7 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) { EXPECT_EQ(term_id, TERM_ID); EXPECT_EQ(term_offset, tail); - EXPECT_EQ(length, (size_t)AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); + EXPECT_EQ(length, (size_t)AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); on_gap_detected_called = true; }; @@ -115,19 +114,19 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) { - const int32_t tail = CAPACITY - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT) * 2); + const int32_t tail = CAPACITY - (AERON_DATA_HEADER_LENGTH * 2); const int32_t high_water_mark = CAPACITY; aeron_frame_header_t *hdr; - hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = 0; - hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -135,7 +134,7 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) { EXPECT_EQ(term_id, TERM_ID); EXPECT_EQ(term_offset, tail); - EXPECT_EQ(length, (size_t)AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); + EXPECT_EQ(length, (size_t)AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); on_gap_detected_called = true; }; @@ -147,16 +146,16 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) TEST_F(TermGapScannerTest, shouldReportNoGapWhenHwmIsInPadding) { - const int32_t padding_length = AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT) * 2; + const int32_t padding_length = AERON_DATA_HEADER_LENGTH * 2; const int32_t tail = CAPACITY - padding_length; - const int32_t high_water_mark = CAPACITY - padding_length + HEADER_LENGTH; + const int32_t high_water_mark = CAPACITY - padding_length + AERON_DATA_HEADER_LENGTH; aeron_frame_header_t *hdr; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = padding_length; - hdr = (aeron_frame_header_t *)(m_ptr + tail + HEADER_LENGTH); + hdr = (aeron_frame_header_t *)(m_ptr + tail + AERON_DATA_HEADER_LENGTH); hdr->frame_length = 0; bool on_gap_detected_called = false; @@ -173,7 +172,7 @@ TEST_F(TermGapScannerTest, shouldReportNoGapWhenHwmIsInPadding) } #define DATA_LENGTH (36) -#define MESSAGE_LENGTH (DATA_LENGTH + HEADER_LENGTH) +#define MESSAGE_LENGTH (DATA_LENGTH + AERON_DATA_HEADER_LENGTH) #define ALIGNED_FRAME_LENGTH (AERON_ALIGN(MESSAGE_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)) class LossDetectorTest : public testing::Test From acaa061016fa8c40f13455f82ca6cc3f3a36749f Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:28:07 +0100 Subject: [PATCH 14/14] [C] Update active loss gap upon length change. Gap length can increase when heartbeats advance the high-water mark position, or it can decrease if messages arrive out of order. In both cases we want to adjust the NAK range of the receiver. --- aeron-driver/src/main/c/aeron_loss_detector.h | 3 +- .../src/test/c/aeron_loss_detector_test.cpp | 85 +++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/aeron-driver/src/main/c/aeron_loss_detector.h b/aeron-driver/src/main/c/aeron_loss_detector.h index cfdce2a887..58a60eb77a 100644 --- a/aeron-driver/src/main/c/aeron_loss_detector.h +++ b/aeron-driver/src/main/c/aeron_loss_detector.h @@ -96,7 +96,8 @@ inline void aeron_loss_detector_on_gap(void *clientd, int32_t term_id, int32_t t inline bool aeron_loss_detector_gaps_match(aeron_loss_detector_t *detector) { return detector->active_gap.term_id == detector->scanned_gap.term_id && - detector->active_gap.term_offset == detector->scanned_gap.term_offset; + detector->active_gap.term_offset == detector->scanned_gap.term_offset && + detector->active_gap.length == detector->scanned_gap.length; } inline void aeron_loss_detector_activate_gap(aeron_loss_detector_t *detector, int64_t now_ns) diff --git a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp index 0c5a6ef24d..35147b913b 100644 --- a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp +++ b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp @@ -683,3 +683,88 @@ TEST_F(LossDetectorTest, shouldHandleNonZeroInitialTermOffset) EXPECT_EQ(called, 1); EXPECT_TRUE(loss_found); } + +TEST_F(LossDetectorTest, shouldDetectChangesInTheGapLength) +{ + const int64_t rebuild_position = ALIGNED_FRAME_LENGTH * 3; + bool loss_found; + bool called = false; + + insert_frame(offset_of_message(2)); + insert_frame(offset_of_message(5)); + + ASSERT_EQ(feedback_delay_state_init(true), 0); + ASSERT_EQ(aeron_loss_detector_init( + &m_detector, &m_delay_generator_state, LossDetectorTest::on_gap_detected, this), 0); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 32); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 32, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 64); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 64, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 64, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_FALSE(called); + EXPECT_FALSE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 32); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 32, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, ALIGNED_FRAME_LENGTH * 2); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + CAPACITY, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); +}