From 5db3ae05be39f3b673d8c587be1010ef6b444b0f Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 29 Jan 2025 14:32:42 +0100 Subject: [PATCH] Loss detection fixes (#1729) * [Java] Update active loss gap upon length change. Gap length can increase when heartbeats advance the high-water mark position, or it can decrease if messages arrive out of order. In both cases we want to adjust the NAK range of the receiver. * [Java] Update offset masking code to cast the result and not the position. * [Java] Read high-water mark position after computing the rebuild position to ensure the widest possible loss detection scan window. Loss detection code is concurrent to the application threads that are consuming from the image but also to the receiver thread that inserts new packets into the image and advances the high-water mark position counter. Reading that counter last ensures that we take into account all inserted packets up to this point. * [Java] Simplify `sendData` path by assuming that the window is almost never zero. Also do not track padding bytes as the "sent bytes". * [Java] Take into account a trailing padding frame when adjusting high-water mark on the receiver. A single network packet might contain multiple frames (up to an MTU). When such packet contains only `HDR_TYPE_DATA` then packet length will correspond to the combined lengths of the frames it contains. However, if there is a trailing `HDR_TYPE_PAD` packet then the packet length will not be equal to the sum of the frame sizes. For example, given a packet containing two 80 byte messages (whose aligned length is 128 bytes each) and a padding for 224 bytes. The length of the packet will be 288 bytes (256 + 32) but the "frame size" will be 512 bytes (256 + 256). * [Java] Simplify term gap scanning logic by removing `ALIGNED_HEADER_LENGTH` and streamlining non-zero frame lookup. * [Java] Add a test for `rcv-hwm` position update when a packet contains trailing padding frame + rename method that computes the target position offset. * [Java] Increment `snd-bpe` if window is zero or too small to fit a frame of data. * [Java] Do not take into account actual padding size as the `rcv-hwm` will advance on next heartbeat/data frame anyway. * Revert "[Java] Increment `snd-bpe` if window is zero or too small to fit a frame of data." This reverts commit 9b8229ceeda8d48a3435c1a11cd2e617347aaf20. * Revert "[Java] Simplify `sendData` path by assuming that the window is almost never zero. Also do not track padding bytes as the "sent bytes"." This reverts commit 411f102a887869935b9147586712437523ff10e6. * Revert "[Java] Read high-water mark position after computing the rebuild position to ensure the widest possible loss detection scan window." This reverts commit 60e6047ecc1a999af922ab217dfee0f122c1a50b. * [C] Simplify term gap scanning logic by removing `ALIGNED_HEADER_LENGTH` and streamlining non-zero frame lookup. * [C] Update active loss gap upon length change. Gap length can increase when heartbeats advance the high-water mark position, or it can decrease if messages arrive out of order. In both cases we want to adjust the NAK range of the receiver. --- .../c/concurrent/aeron_term_gap_scanner.h | 12 +- .../io/aeron/logbuffer/TermGapScanner.java | 15 +- .../aeron/logbuffer/TermGapScannerTest.java | 37 +++ aeron-driver/src/main/c/aeron_loss_detector.h | 3 +- .../java/io/aeron/driver/LossDetector.java | 8 +- .../io/aeron/driver/PublicationImage.java | 13 +- .../src/test/c/aeron_loss_detector_test.cpp | 124 +++++++-- .../io/aeron/driver/LossDetectorTest.java | 249 +++++++++++++----- .../io/aeron/driver/PublicationImageTest.java | 249 ++++++++++++++++++ 9 files changed, 598 insertions(+), 112 deletions(-) create mode 100644 aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java diff --git a/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h b/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h index 1c2fd35401..d72902d560 100644 --- a/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h +++ b/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h @@ -23,8 +23,6 @@ typedef void (*aeron_term_gap_scanner_on_gap_detected_func_t)(void *clientd, int32_t term_id, int32_t term_offset, size_t length); -#define AERON_ALIGNED_HEADER_LENGTH (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)) - inline int32_t aeron_term_gap_scanner_scan_for_gap( const uint8_t *buffer, int32_t term_id, @@ -53,23 +51,21 @@ inline int32_t aeron_term_gap_scanner_scan_for_gap( const int32_t gap_begin_offset = offset; if (offset < limit_offset) { - const int32_t limit = limit_offset - AERON_ALIGNED_HEADER_LENGTH; - while (offset < limit) + offset += AERON_DATA_HEADER_LENGTH; + while (offset < limit_offset) { - offset += AERON_LOGBUFFER_FRAME_ALIGNMENT; - aeron_frame_header_t *hdr = (aeron_frame_header_t *)(buffer + offset); int32_t frame_length; AERON_GET_ACQUIRE(frame_length, hdr->frame_length); if (0 != frame_length) { - offset -= AERON_ALIGNED_HEADER_LENGTH; break; } + offset += AERON_DATA_HEADER_LENGTH; } - const size_t gap_length = (offset - gap_begin_offset) + AERON_ALIGNED_HEADER_LENGTH; + const size_t gap_length = offset - gap_begin_offset; on_gap_detected(clientd, term_id, gap_begin_offset, gap_length); } diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java index 38a67d0656..23425c66d8 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java @@ -15,7 +15,6 @@ */ package io.aeron.logbuffer; -import org.agrona.BitUtil; import org.agrona.concurrent.UnsafeBuffer; import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; @@ -31,8 +30,6 @@ */ public class TermGapScanner { - private static final int ALIGNED_HEADER_LENGTH = BitUtil.align(HEADER_LENGTH, FRAME_ALIGNMENT); - /** * Handler for notifying of gaps in the log. */ @@ -82,19 +79,17 @@ public static int scanForGap( final int gapBeginOffset = offset; if (offset < limitOffset) { - final int limit = limitOffset - ALIGNED_HEADER_LENGTH; - while (offset < limit) + offset += HEADER_LENGTH; + while (offset < limitOffset) { - offset += FRAME_ALIGNMENT; - - if (0 != termBuffer.getIntVolatile(offset)) + if (0 != frameLengthVolatile(termBuffer, offset)) { - offset -= ALIGNED_HEADER_LENGTH; break; } + offset += HEADER_LENGTH; } - final int gapLength = (offset - gapBeginOffset) + ALIGNED_HEADER_LENGTH; + final int gapLength = offset - gapBeginOffset; handler.onGap(termId, gapBeginOffset, gapLength); } diff --git a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java index f8e3c40a04..8b0e6adfd5 100644 --- a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java +++ b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java @@ -24,6 +24,7 @@ import static org.agrona.BitUtil.align; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verifyNoMoreInteractions; class TermGapScannerTest { @@ -51,6 +52,7 @@ void shouldReportGapAtBeginningOfBuffer() assertEquals(0, TermGapScanner.scanForGap(termBuffer, TERM_ID, 0, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, 0, frameOffset); + verifyNoMoreInteractions(gapHandler); } @Test @@ -67,6 +69,7 @@ void shouldReportSingleGapWhenBufferNotFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -83,6 +86,7 @@ void shouldReportSingleGapWhenBufferIsFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -100,4 +104,37 @@ void shouldReportNoGapWhenHwmIsInPadding() verifyNoInteractions(gapHandler); } + + @Test + void shouldReportSingleHeaderGap() + { + final int offset = 8192 + 384; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + when(termBuffer.getIntVolatile(offset + HEADER_LENGTH)).thenReturn(128); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, HEADER_LENGTH); + verifyNoMoreInteractions(gapHandler, termBuffer); + } + + @Test + void shouldReportGapAtTheEndOfTheBuffer() + { + final int offset = LOG_BUFFER_CAPACITY - 128; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 2 * HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 3 * HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, 128); + verifyNoMoreInteractions(gapHandler, termBuffer); + } } diff --git a/aeron-driver/src/main/c/aeron_loss_detector.h b/aeron-driver/src/main/c/aeron_loss_detector.h index cfdce2a887..58a60eb77a 100644 --- a/aeron-driver/src/main/c/aeron_loss_detector.h +++ b/aeron-driver/src/main/c/aeron_loss_detector.h @@ -96,7 +96,8 @@ inline void aeron_loss_detector_on_gap(void *clientd, int32_t term_id, int32_t t inline bool aeron_loss_detector_gaps_match(aeron_loss_detector_t *detector) { return detector->active_gap.term_id == detector->scanned_gap.term_id && - detector->active_gap.term_offset == detector->scanned_gap.term_offset; + detector->active_gap.term_offset == detector->scanned_gap.term_offset && + detector->active_gap.length == detector->scanned_gap.length; } inline void aeron_loss_detector_activate_gap(aeron_loss_detector_t *detector, int64_t now_ns) diff --git a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java index da66826e23..60cc574975 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java +++ b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java @@ -77,7 +77,7 @@ public long scan( final int initialTermId) { boolean lossFound = false; - int rebuildOffset = (int)rebuildPosition & termLengthMask; + int rebuildOffset = (int)(rebuildPosition & termLengthMask); if (rebuildPosition < hwmPosition) { @@ -85,13 +85,15 @@ public long scan( final int hwmTermCount = (int)(hwmPosition >>> positionBitsToShift); final int rebuildTermId = initialTermId + rebuildTermCount; - final int hwmTermOffset = (int)hwmPosition & termLengthMask; + final int hwmTermOffset = (int)(hwmPosition & termLengthMask); final int limitOffset = rebuildTermCount == hwmTermCount ? hwmTermOffset : termLengthMask + 1; rebuildOffset = scanForGap(termBuffer, rebuildTermId, rebuildOffset, limitOffset, this); if (rebuildOffset < limitOffset) { - if (scannedTermOffset != activeTermOffset || scannedTermId != activeTermId) + if (scannedTermOffset != activeTermOffset || + scannedTermId != activeTermId || + scannedLength != activeLength) { activateGap(nowNs); lossFound = true; diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 385c8ab781..0d694e8329 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -117,6 +117,7 @@ enum State private static final VarHandle END_SM_CHANGE_VH; private static final VarHandle BEGIN_LOSS_CHANGE_VH; private static final VarHandle END_LOSS_CHANGE_VH; + static { try @@ -561,7 +562,7 @@ int trackRebuild(final long nowNs) positionBitsToShift, initialTermId); - final int rebuildTermOffset = (int)rebuildPosition & termLengthMask; + final int rebuildTermOffset = (int)(rebuildPosition & termLengthMask); final long newRebuildPosition = (rebuildPosition - rebuildTermOffset) + rebuildOffset(scanOutcome); this.rebuildPosition.proposeMaxOrdered(newRebuildPosition); @@ -631,12 +632,12 @@ int insertPacket( { final long nowNs = cachedNanoClock.nanoTime(); timeOfLastPacketNs = nowNs; - trackConnection(transportIndex, srcAddress, nowNs); + final ImageConnection imageConnection = trackConnection(transportIndex, srcAddress, nowNs); if (isEndOfStream) { - imageConnections[transportIndex].eosPosition = packetPosition; - imageConnections[transportIndex].isEos = true; + imageConnection.eosPosition = packetPosition; + imageConnection.isEos = true; if (!this.isEndOfStream && isAllConnectedEos()) { @@ -1017,7 +1018,8 @@ private void cleanBufferTo(final long position) } } - private void trackConnection(final int transportIndex, final InetSocketAddress srcAddress, final long nowNs) + private ImageConnection trackConnection( + final int transportIndex, final InetSocketAddress srcAddress, final long nowNs) { imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1); ImageConnection imageConnection = imageConnections[transportIndex]; @@ -1030,6 +1032,7 @@ private void trackConnection(final int transportIndex, final InetSocketAddress s imageConnection.timeOfLastActivityNs = nowNs; imageConnection.timeOfLastFrameNs = nowNs; + return imageConnection; } private boolean isAllConnectedEos() diff --git a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp index ddc36c2388..35147b913b 100644 --- a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp +++ b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp @@ -25,7 +25,6 @@ extern "C" } #define CAPACITY (AERON_LOGBUFFER_TERM_MIN_LENGTH) -#define HEADER_LENGTH (AERON_DATA_HEADER_LENGTH) #define POSITION_BITS_TO_SHIFT (aeron_number_of_trailing_zeroes(CAPACITY)) #define MASK (CAPACITY - 1) @@ -59,11 +58,11 @@ class TermGapScannerTest : public testing::Test TEST_F(TermGapScannerTest, shouldReportGapAtBeginningOfBuffer) { - const int32_t frame_offset = AERON_ALIGN((HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT); - const int32_t high_water_mark = frame_offset + AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t frame_offset = AERON_ALIGN((AERON_DATA_HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t high_water_mark = frame_offset + AERON_DATA_HEADER_LENGTH; auto *hdr = (aeron_frame_header_t *)(m_ptr + frame_offset); - hdr->frame_length = HEADER_LENGTH; + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -83,19 +82,19 @@ TEST_F(TermGapScannerTest, shouldReportGapAtBeginningOfBuffer) TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) { - const int32_t tail = AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t tail = AERON_DATA_HEADER_LENGTH; const int32_t high_water_mark = AERON_LOGBUFFER_FRAME_ALIGNMENT * 3; aeron_frame_header_t *hdr; - hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = 0; - hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -103,7 +102,7 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) { EXPECT_EQ(term_id, TERM_ID); EXPECT_EQ(term_offset, tail); - EXPECT_EQ(length, (size_t)AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); + EXPECT_EQ(length, (size_t)AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); on_gap_detected_called = true; }; @@ -115,19 +114,19 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) { - const int32_t tail = CAPACITY - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT) * 2); + const int32_t tail = CAPACITY - (AERON_DATA_HEADER_LENGTH * 2); const int32_t high_water_mark = CAPACITY; aeron_frame_header_t *hdr; - hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = 0; - hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -135,7 +134,7 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) { EXPECT_EQ(term_id, TERM_ID); EXPECT_EQ(term_offset, tail); - EXPECT_EQ(length, (size_t)AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); + EXPECT_EQ(length, (size_t)AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); on_gap_detected_called = true; }; @@ -147,16 +146,16 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) TEST_F(TermGapScannerTest, shouldReportNoGapWhenHwmIsInPadding) { - const int32_t padding_length = AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT) * 2; + const int32_t padding_length = AERON_DATA_HEADER_LENGTH * 2; const int32_t tail = CAPACITY - padding_length; - const int32_t high_water_mark = CAPACITY - padding_length + HEADER_LENGTH; + const int32_t high_water_mark = CAPACITY - padding_length + AERON_DATA_HEADER_LENGTH; aeron_frame_header_t *hdr; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = padding_length; - hdr = (aeron_frame_header_t *)(m_ptr + tail + HEADER_LENGTH); + hdr = (aeron_frame_header_t *)(m_ptr + tail + AERON_DATA_HEADER_LENGTH); hdr->frame_length = 0; bool on_gap_detected_called = false; @@ -173,7 +172,7 @@ TEST_F(TermGapScannerTest, shouldReportNoGapWhenHwmIsInPadding) } #define DATA_LENGTH (36) -#define MESSAGE_LENGTH (DATA_LENGTH + HEADER_LENGTH) +#define MESSAGE_LENGTH (DATA_LENGTH + AERON_DATA_HEADER_LENGTH) #define ALIGNED_FRAME_LENGTH (AERON_ALIGN(MESSAGE_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)) class LossDetectorTest : public testing::Test @@ -684,3 +683,88 @@ TEST_F(LossDetectorTest, shouldHandleNonZeroInitialTermOffset) EXPECT_EQ(called, 1); EXPECT_TRUE(loss_found); } + +TEST_F(LossDetectorTest, shouldDetectChangesInTheGapLength) +{ + const int64_t rebuild_position = ALIGNED_FRAME_LENGTH * 3; + bool loss_found; + bool called = false; + + insert_frame(offset_of_message(2)); + insert_frame(offset_of_message(5)); + + ASSERT_EQ(feedback_delay_state_init(true), 0); + ASSERT_EQ(aeron_loss_detector_init( + &m_detector, &m_delay_generator_state, LossDetectorTest::on_gap_detected, this), 0); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 32); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 32, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 64); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 64, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 64, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_FALSE(called); + EXPECT_FALSE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 32); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 32, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, ALIGNED_FRAME_LENGTH * 2); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + CAPACITY, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); +} diff --git a/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java b/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java index 85376d7ec7..de06c1a2c9 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java @@ -27,6 +27,9 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import static io.aeron.driver.LossDetector.lossFound; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MIN_LENGTH; import static io.aeron.logbuffer.LogBufferDescriptor.computePosition; @@ -67,7 +70,7 @@ class LossDetectorTest private final LossHandler lossHandler = mock(LossHandler.class); private LossDetector lossDetector = new LossDetector(DELAY_GENERATOR, lossHandler); - private long currentTime = 0; + private long currentTimeNs = 0; { dataHeader.wrap(rcvBuffer); @@ -79,9 +82,11 @@ void shouldNotSendNakWhenBufferIsEmpty() final long rebuildPosition = ACTIVE_TERM_POSITION; final long hwmPosition = ACTIVE_TERM_POSITION; - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -96,9 +101,11 @@ void shouldNotNakIfNoMissingData() insertDataFrame(offsetOfMessage(1)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -112,11 +119,13 @@ void shouldNakMissingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -128,13 +137,16 @@ void shouldRetransmitNakForMissingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(30); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(60); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(30); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(60); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, atLeast(2)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler, atLeast(2)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -146,13 +158,16 @@ void shouldStopNakOnReceivingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(20); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(20); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -168,19 +183,23 @@ void shouldHandleMoreThan2Gaps() insertDataFrame(offsetOfMessage(4)); insertDataFrame(offsetOfMessage(6)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(80); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(80); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); final InOrder inOrder = inOrder(lossHandler); - inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); - inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); - inOrder.verify(lossHandler, never()).onGapDetected(TERM_ID, offsetOfMessage(5), gapLength()); + inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler, never()).onGapDetected(TERM_ID, offsetOfMessage(5), ALIGNED_FRAME_LENGTH); } @Test @@ -192,20 +211,24 @@ void shouldReplaceOldNakWithNewNak() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(20); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(20); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); insertDataFrame(offsetOfMessage(4)); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); hwmPosition = (ALIGNED_FRAME_LENGTH * 5L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); + verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); } @Test @@ -219,23 +242,27 @@ void shouldHandleLongerRetryDelay() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); - currentTime = TimeUnit.MILLISECONDS.toNanos(80); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(80); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoMoreInteractions(lossHandler); - currentTime = TimeUnit.MILLISECONDS.toNanos(240); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(240); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, times(2)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler, times(2)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -247,7 +274,8 @@ void shouldNotNakImmediatelyByDefault() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -263,12 +291,15 @@ void shouldOnlySendNaksOnceOnMultipleScans() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -282,9 +313,11 @@ void shouldHandleHwmGreaterThanCompletedBuffer() insertDataFrame(offsetOfMessage(0)); rebuildPosition += ALIGNED_FRAME_LENGTH; - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), TERM_BUFFER_LENGTH - (int)rebuildPosition); } @@ -300,14 +333,105 @@ void shouldHandleNonZeroInitialTermOffset() insertDataFrame(offsetOfMessage(2)); insertDataFrame(offsetOfMessage(4)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); verifyNoMoreInteractions(lossHandler); } + @Test + void shouldDetectChangesInTheGapLength() + { + lossDetector = getLossHandlerWithLongRetry(); + + final long rebuildPosition = ACTIVE_TERM_POSITION + (ALIGNED_FRAME_LENGTH * 3L); + + insertDataFrame(offsetOfMessage(2)); + insertDataFrame(offsetOfMessage(5)); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 32, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 32, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(200); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(300); + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(400); + assertFalse(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + insertDataFrame(offsetOfMessage(4)); + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(500); + assertFalse(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(600); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + final InOrder inOrder = inOrder(lossHandler); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 32); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 64); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH * 2); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 64); + inOrder.verifyNoMoreInteractions(); + } + private LossDetector getLossHandlerWithLongRetry() { return new LossDetector(DELAY_GENERATOR_WITH_LONGER_RETRY, lossHandler); @@ -339,9 +463,4 @@ private int offsetOfMessage(final int index) { return index * ALIGNED_FRAME_LENGTH; } - - private int gapLength() - { - return ALIGNED_FRAME_LENGTH; - } } diff --git a/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java new file mode 100644 index 0000000000..c2cf747b54 --- /dev/null +++ b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2014-2025 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.driver; + +import io.aeron.ChannelUri; +import io.aeron.driver.buffer.RawLog; +import io.aeron.driver.media.ReceiveChannelEndpoint; +import io.aeron.driver.media.UdpChannel; +import io.aeron.driver.status.ReceiverHwm; +import io.aeron.driver.status.ReceiverPos; +import io.aeron.driver.status.SystemCounterDescriptor; +import io.aeron.driver.status.SystemCounters; +import io.aeron.logbuffer.FrameDescriptor; +import io.aeron.protocol.DataHeaderFlyweight; +import org.agrona.BitUtil; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.concurrent.CachedEpochClock; +import org.agrona.concurrent.CachedNanoClock; +import org.agrona.concurrent.UnsafeBuffer; +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersManager; +import org.agrona.concurrent.status.Position; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static io.aeron.logbuffer.LogBufferDescriptor.*; +import static io.aeron.protocol.DataHeaderFlyweight.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PublicationImageTest +{ + private static final int TERM_LENGTH = 64 * 1024; + private static final int INITIAL_WINDOW_LENGTH = 128 * 1024; + private static final int MAX_WINDOW_LENGHT = 1024 * 1024; + private static final long CORRELATION_ID = 42; + private static final int TRANSPORT_INDEX = 3; + private static final int SESSION_ID = 888; + private static final int STREAM_ID = 101010; + private static final int INITIAL_TERM_ID = -444666; + private static final int ACTIVE_TERM_ID = INITIAL_TERM_ID + 111; + private static final int TERM_OFFSET = TERM_LENGTH - TERM_LENGTH / 4; + private static final short FLAGS = FrameDescriptor.UNFRAGMENTED; + private static final String SOURCE_IDENTITY = "aeron:udp?endpoint=localhost:5555"; + private final MediaDriver.Context ctx = new MediaDriver.Context(); + private final ReceiveChannelEndpoint receiveChannelEndpoint = mock(ReceiveChannelEndpoint.class); + private final InetSocketAddress controlAddress = mock(InetSocketAddress.class); + private final RawLog rawLog = mock(RawLog.class); + private final FeedbackDelayGenerator feedbackDelayGenerator = mock(FeedbackDelayGenerator.class); + private final CongestionControl congestionControl = mock(CongestionControl.class); + private final CachedEpochClock epochClock = new CachedEpochClock(); + private final CachedNanoClock nanoClock = new CachedNanoClock(); + private final UnsafeBuffer buffer = new UnsafeBuffer(new byte[1024]); + private final CountersManager countersManager = new CountersManager( + new UnsafeBuffer(ByteBuffer.allocateDirect(256 * 1024)), + new UnsafeBuffer(ByteBuffer.allocateDirect(64 * 1024)), + StandardCharsets.US_ASCII); + private final DataHeaderFlyweight headerFlyweight = new DataHeaderFlyweight(); + private Position hwmPosition; + private Position rcvPosition; + private PublicationImage image; + + @BeforeEach + void before() + { + epochClock.update(TimeUnit.HOURS.toMillis(1)); + nanoClock.update(TimeUnit.HOURS.toNanos(1)); + ctx + .receiverCachedNanoClock(nanoClock) + .nanoClock(nanoClock) + .epochClock(epochClock) + .imageLivenessTimeoutNs(TimeUnit.SECONDS.toNanos(10)) + .untetheredWindowLimitTimeoutNs(TimeUnit.SECONDS.toNanos(1)) + .untetheredRestingTimeoutNs(TimeUnit.SECONDS.toNanos(1)) + .statusMessageTimeoutNs(TimeUnit.MILLISECONDS.toNanos(150)) + .systemCounters(new SystemCounters(countersManager)); + + final String channel = "aeron:udp?endpoint=localhost:5555"; + final ChannelUri channelUri = ChannelUri.parse(channel); + final UdpChannel udpChannel = mock(UdpChannel.class); + when(udpChannel.channelUri()).thenReturn(channelUri); + when(receiveChannelEndpoint.subscriptionUdpChannel()).thenReturn(udpChannel); + + final SubscriptionLink subscriptionLink1 = mock(SubscriptionLink.class); + when(subscriptionLink1.isReliable()).thenReturn(true); + when(subscriptionLink1.isTether()).thenReturn(true); + final SubscriberPosition subscriberPosition1 = mock(SubscriberPosition.class); + when(subscriberPosition1.subscription()).thenReturn(subscriptionLink1); + final SubscriptionLink subscriptionLink2 = mock(SubscriptionLink.class); + when(subscriptionLink1.isReliable()).thenReturn(false); + when(subscriptionLink1.isTether()).thenReturn(false); + final SubscriberPosition subscriberPosition2 = mock(SubscriberPosition.class); + when(subscriberPosition2.subscription()).thenReturn(subscriptionLink2); + final ArrayList subscriberPositions = new ArrayList<>(); + subscriberPositions.add(subscriberPosition1); + subscriberPositions.add(subscriberPosition2); + + final UnsafeBuffer[] termBuffers = new UnsafeBuffer[PARTITION_COUNT]; + for (int i = 0; i < termBuffers.length; i++) + { + termBuffers[i] = new UnsafeBuffer(new byte[TERM_LENGTH]); + } + when(rawLog.termBuffers()).thenReturn(termBuffers); + when(rawLog.metaData()).thenReturn(new UnsafeBuffer(new byte[LOG_META_DATA_LENGTH])); + when(rawLog.termLength()).thenReturn(TERM_LENGTH); + + when(congestionControl.initialWindowLength()).thenReturn(INITIAL_WINDOW_LENGTH); + when(congestionControl.maxWindowLength()).thenReturn(MAX_WINDOW_LENGHT); + + final long registrationId = 73249234983274L; + final ExpandableArrayBuffer tempBuffer = new ExpandableArrayBuffer(); + hwmPosition = ReceiverHwm.allocate(tempBuffer, countersManager, registrationId, SESSION_ID, STREAM_ID, channel); + rcvPosition = ReceiverPos.allocate( + tempBuffer, countersManager, registrationId, SESSION_ID, STREAM_ID, channel); + + image = new PublicationImage( + CORRELATION_ID, + ctx, + receiveChannelEndpoint, + TRANSPORT_INDEX, + controlAddress, + SESSION_ID, + STREAM_ID, + INITIAL_TERM_ID, + ACTIVE_TERM_ID, + TERM_OFFSET, + FLAGS, + rawLog, + feedbackDelayGenerator, + subscriberPositions, + hwmPosition, + rcvPosition, + SOURCE_IDENTITY, + congestionControl); + + final long position = computePosition( + ACTIVE_TERM_ID, TERM_OFFSET, positionBitsToShift(TERM_LENGTH), INITIAL_TERM_ID); + assertEquals(position, hwmPosition.get()); + assertEquals(position, rcvPosition.get()); + + ThreadLocalRandom.current().nextBytes(buffer.byteArray()); + } + + @Test + void shouldAdvanceHighWaterMarkByPacketLengthWhenItContainsPaddingFrame() + { + final int totalLength = 512; + final int packetLength = 288; + final int termId = ACTIVE_TERM_ID; + final int termOffset = TERM_LENGTH - totalLength; + int offset = 0; + offset += writeFrame(offset, termOffset, termId, 65, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, 65); + offset += writeFrame(offset, termOffset + offset, termId, 96, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, 96); + offset += writeFrame(offset, termOffset + offset, termId, 224, BEGIN_AND_END_FLAGS, HDR_TYPE_PAD, 0x888AA888); + assertEquals(totalLength, offset); + final InetSocketAddress srcAddress = mock(InetSocketAddress.class); + + final int bytes = image.insertPacket(termId, termOffset, buffer, packetLength, TRANSPORT_INDEX, srcAddress); + + assertEquals(packetLength, bytes); + final int positionBitsToShift = positionBitsToShift(TERM_LENGTH); + final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, INITIAL_TERM_ID); + assertEquals(packetPosition + packetLength, hwmPosition.get()); + final UnsafeBuffer activeTermBuffer = + rawLog.termBuffers()[indexByPosition(packetPosition, positionBitsToShift)]; + for (int i = 0; i < packetLength; i++) + { + assertEquals(buffer.getByte(i), activeTermBuffer.getByte(termOffset + i)); + } + for (int i = packetLength; i < totalLength; i++) + { + assertEquals(0, activeTermBuffer.getByte(termOffset + i)); + } + } + + @Test + void shouldAdvanceHighWaterMarkPositionOnHeartbeat() + { + final int termId = ACTIVE_TERM_ID; + final int termOffset = TERM_OFFSET + 1024; + writeFrame(0, termOffset, termId, 0, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, -1); + FrameDescriptor.frameLengthOrdered(buffer, 0, 0); + final InetSocketAddress srcAddress = mock(InetSocketAddress.class); + final int packetLength = HEADER_LENGTH; + final AtomicCounter heartBeatsCounter = ctx.systemCounters().get(SystemCounterDescriptor.HEARTBEATS_RECEIVED); + final long oldHeartBeatCount = heartBeatsCounter.getWeak(); + + final int bytes = image.insertPacket(termId, termOffset, buffer, packetLength, TRANSPORT_INDEX, srcAddress); + + assertEquals(packetLength, bytes); + final int positionBitsToShift = positionBitsToShift(TERM_LENGTH); + final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, INITIAL_TERM_ID); + assertEquals(packetPosition, hwmPosition.get()); + assertEquals(oldHeartBeatCount + 1, heartBeatsCounter.getWeak()); + final UnsafeBuffer activeTermBuffer = + rawLog.termBuffers()[indexByPosition(packetPosition, positionBitsToShift)]; + for (int i = 0; i < packetLength; i++) + { + assertEquals(0, activeTermBuffer.getByte(termOffset + i)); + } + } + + private int writeFrame( + final int offset, + final int termOffset, + final int termId, + final int length, + final short flags, + final int type, + final int reservedValue) + { + final int frameLength = length + HEADER_LENGTH; + headerFlyweight.wrap(buffer, offset, frameLength); + headerFlyweight + .frameLength(frameLength) + .version(CURRENT_VERSION) + .flags(flags) + .headerType(type); + headerFlyweight + .termOffset(termOffset) + .sessionId(SESSION_ID) + .streamId(STREAM_ID) + .termId(termId) + .reservedValue(reservedValue); + + return BitUtil.align(frameLength, FrameDescriptor.FRAME_ALIGNMENT); + } +}