diff --git a/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h b/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h index 1c2fd35401..d72902d560 100644 --- a/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h +++ b/aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h @@ -23,8 +23,6 @@ typedef void (*aeron_term_gap_scanner_on_gap_detected_func_t)(void *clientd, int32_t term_id, int32_t term_offset, size_t length); -#define AERON_ALIGNED_HEADER_LENGTH (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)) - inline int32_t aeron_term_gap_scanner_scan_for_gap( const uint8_t *buffer, int32_t term_id, @@ -53,23 +51,21 @@ inline int32_t aeron_term_gap_scanner_scan_for_gap( const int32_t gap_begin_offset = offset; if (offset < limit_offset) { - const int32_t limit = limit_offset - AERON_ALIGNED_HEADER_LENGTH; - while (offset < limit) + offset += AERON_DATA_HEADER_LENGTH; + while (offset < limit_offset) { - offset += AERON_LOGBUFFER_FRAME_ALIGNMENT; - aeron_frame_header_t *hdr = (aeron_frame_header_t *)(buffer + offset); int32_t frame_length; AERON_GET_ACQUIRE(frame_length, hdr->frame_length); if (0 != frame_length) { - offset -= AERON_ALIGNED_HEADER_LENGTH; break; } + offset += AERON_DATA_HEADER_LENGTH; } - const size_t gap_length = (offset - gap_begin_offset) + AERON_ALIGNED_HEADER_LENGTH; + const size_t gap_length = offset - gap_begin_offset; on_gap_detected(clientd, term_id, gap_begin_offset, gap_length); } diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java index 38a67d0656..23425c66d8 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java @@ -15,7 +15,6 @@ */ package io.aeron.logbuffer; -import org.agrona.BitUtil; import org.agrona.concurrent.UnsafeBuffer; import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT; @@ -31,8 +30,6 @@ */ public class TermGapScanner { - private static final int ALIGNED_HEADER_LENGTH = BitUtil.align(HEADER_LENGTH, FRAME_ALIGNMENT); - /** * Handler for notifying of gaps in the log. */ @@ -82,19 +79,17 @@ public static int scanForGap( final int gapBeginOffset = offset; if (offset < limitOffset) { - final int limit = limitOffset - ALIGNED_HEADER_LENGTH; - while (offset < limit) + offset += HEADER_LENGTH; + while (offset < limitOffset) { - offset += FRAME_ALIGNMENT; - - if (0 != termBuffer.getIntVolatile(offset)) + if (0 != frameLengthVolatile(termBuffer, offset)) { - offset -= ALIGNED_HEADER_LENGTH; break; } + offset += HEADER_LENGTH; } - final int gapLength = (offset - gapBeginOffset) + ALIGNED_HEADER_LENGTH; + final int gapLength = offset - gapBeginOffset; handler.onGap(termId, gapBeginOffset, gapLength); } diff --git a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java index f8e3c40a04..8b0e6adfd5 100644 --- a/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java +++ b/aeron-client/src/test/java/io/aeron/logbuffer/TermGapScannerTest.java @@ -24,6 +24,7 @@ import static org.agrona.BitUtil.align; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verifyNoMoreInteractions; class TermGapScannerTest { @@ -51,6 +52,7 @@ void shouldReportGapAtBeginningOfBuffer() assertEquals(0, TermGapScanner.scanForGap(termBuffer, TERM_ID, 0, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, 0, frameOffset); + verifyNoMoreInteractions(gapHandler); } @Test @@ -67,6 +69,7 @@ void shouldReportSingleGapWhenBufferNotFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -83,6 +86,7 @@ void shouldReportSingleGapWhenBufferIsFull() assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler)); verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT)); + verifyNoMoreInteractions(gapHandler); } @Test @@ -100,4 +104,37 @@ void shouldReportNoGapWhenHwmIsInPadding() verifyNoInteractions(gapHandler); } + + @Test + void shouldReportSingleHeaderGap() + { + final int offset = 8192 + 384; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + when(termBuffer.getIntVolatile(offset + HEADER_LENGTH)).thenReturn(128); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, HEADER_LENGTH); + verifyNoMoreInteractions(gapHandler, termBuffer); + } + + @Test + void shouldReportGapAtTheEndOfTheBuffer() + { + final int offset = LOG_BUFFER_CAPACITY - 128; + when(termBuffer.getIntVolatile(offset)).thenReturn(0); + + assertEquals( + offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler)); + + verify(termBuffer).getIntVolatile(offset); + verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 2 * HEADER_LENGTH); + verify(termBuffer).getIntVolatile(offset + 3 * HEADER_LENGTH); + verify(gapHandler).onGap(TERM_ID, offset, 128); + verifyNoMoreInteractions(gapHandler, termBuffer); + } } diff --git a/aeron-driver/src/main/c/aeron_loss_detector.h b/aeron-driver/src/main/c/aeron_loss_detector.h index cfdce2a887..58a60eb77a 100644 --- a/aeron-driver/src/main/c/aeron_loss_detector.h +++ b/aeron-driver/src/main/c/aeron_loss_detector.h @@ -96,7 +96,8 @@ inline void aeron_loss_detector_on_gap(void *clientd, int32_t term_id, int32_t t inline bool aeron_loss_detector_gaps_match(aeron_loss_detector_t *detector) { return detector->active_gap.term_id == detector->scanned_gap.term_id && - detector->active_gap.term_offset == detector->scanned_gap.term_offset; + detector->active_gap.term_offset == detector->scanned_gap.term_offset && + detector->active_gap.length == detector->scanned_gap.length; } inline void aeron_loss_detector_activate_gap(aeron_loss_detector_t *detector, int64_t now_ns) diff --git a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java index da66826e23..60cc574975 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java +++ b/aeron-driver/src/main/java/io/aeron/driver/LossDetector.java @@ -77,7 +77,7 @@ public long scan( final int initialTermId) { boolean lossFound = false; - int rebuildOffset = (int)rebuildPosition & termLengthMask; + int rebuildOffset = (int)(rebuildPosition & termLengthMask); if (rebuildPosition < hwmPosition) { @@ -85,13 +85,15 @@ public long scan( final int hwmTermCount = (int)(hwmPosition >>> positionBitsToShift); final int rebuildTermId = initialTermId + rebuildTermCount; - final int hwmTermOffset = (int)hwmPosition & termLengthMask; + final int hwmTermOffset = (int)(hwmPosition & termLengthMask); final int limitOffset = rebuildTermCount == hwmTermCount ? hwmTermOffset : termLengthMask + 1; rebuildOffset = scanForGap(termBuffer, rebuildTermId, rebuildOffset, limitOffset, this); if (rebuildOffset < limitOffset) { - if (scannedTermOffset != activeTermOffset || scannedTermId != activeTermId) + if (scannedTermOffset != activeTermOffset || + scannedTermId != activeTermId || + scannedLength != activeLength) { activateGap(nowNs); lossFound = true; diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 385c8ab781..0d694e8329 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -117,6 +117,7 @@ enum State private static final VarHandle END_SM_CHANGE_VH; private static final VarHandle BEGIN_LOSS_CHANGE_VH; private static final VarHandle END_LOSS_CHANGE_VH; + static { try @@ -561,7 +562,7 @@ int trackRebuild(final long nowNs) positionBitsToShift, initialTermId); - final int rebuildTermOffset = (int)rebuildPosition & termLengthMask; + final int rebuildTermOffset = (int)(rebuildPosition & termLengthMask); final long newRebuildPosition = (rebuildPosition - rebuildTermOffset) + rebuildOffset(scanOutcome); this.rebuildPosition.proposeMaxOrdered(newRebuildPosition); @@ -631,12 +632,12 @@ int insertPacket( { final long nowNs = cachedNanoClock.nanoTime(); timeOfLastPacketNs = nowNs; - trackConnection(transportIndex, srcAddress, nowNs); + final ImageConnection imageConnection = trackConnection(transportIndex, srcAddress, nowNs); if (isEndOfStream) { - imageConnections[transportIndex].eosPosition = packetPosition; - imageConnections[transportIndex].isEos = true; + imageConnection.eosPosition = packetPosition; + imageConnection.isEos = true; if (!this.isEndOfStream && isAllConnectedEos()) { @@ -1017,7 +1018,8 @@ private void cleanBufferTo(final long position) } } - private void trackConnection(final int transportIndex, final InetSocketAddress srcAddress, final long nowNs) + private ImageConnection trackConnection( + final int transportIndex, final InetSocketAddress srcAddress, final long nowNs) { imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1); ImageConnection imageConnection = imageConnections[transportIndex]; @@ -1030,6 +1032,7 @@ private void trackConnection(final int transportIndex, final InetSocketAddress s imageConnection.timeOfLastActivityNs = nowNs; imageConnection.timeOfLastFrameNs = nowNs; + return imageConnection; } private boolean isAllConnectedEos() diff --git a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp index ddc36c2388..35147b913b 100644 --- a/aeron-driver/src/test/c/aeron_loss_detector_test.cpp +++ b/aeron-driver/src/test/c/aeron_loss_detector_test.cpp @@ -25,7 +25,6 @@ extern "C" } #define CAPACITY (AERON_LOGBUFFER_TERM_MIN_LENGTH) -#define HEADER_LENGTH (AERON_DATA_HEADER_LENGTH) #define POSITION_BITS_TO_SHIFT (aeron_number_of_trailing_zeroes(CAPACITY)) #define MASK (CAPACITY - 1) @@ -59,11 +58,11 @@ class TermGapScannerTest : public testing::Test TEST_F(TermGapScannerTest, shouldReportGapAtBeginningOfBuffer) { - const int32_t frame_offset = AERON_ALIGN((HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT); - const int32_t high_water_mark = frame_offset + AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t frame_offset = AERON_ALIGN((AERON_DATA_HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t high_water_mark = frame_offset + AERON_DATA_HEADER_LENGTH; auto *hdr = (aeron_frame_header_t *)(m_ptr + frame_offset); - hdr->frame_length = HEADER_LENGTH; + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -83,19 +82,19 @@ TEST_F(TermGapScannerTest, shouldReportGapAtBeginningOfBuffer) TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) { - const int32_t tail = AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT); + const int32_t tail = AERON_DATA_HEADER_LENGTH; const int32_t high_water_mark = AERON_LOGBUFFER_FRAME_ALIGNMENT * 3; aeron_frame_header_t *hdr; - hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = 0; - hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -103,7 +102,7 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) { EXPECT_EQ(term_id, TERM_ID); EXPECT_EQ(term_offset, tail); - EXPECT_EQ(length, (size_t)AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); + EXPECT_EQ(length, (size_t)AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); on_gap_detected_called = true; }; @@ -115,19 +114,19 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferNotFull) TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) { - const int32_t tail = CAPACITY - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT) * 2); + const int32_t tail = CAPACITY - (AERON_DATA_HEADER_LENGTH * 2); const int32_t high_water_mark = CAPACITY; aeron_frame_header_t *hdr; - hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + tail - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = 0; - hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); - hdr->frame_length = HEADER_LENGTH; + hdr = (aeron_frame_header_t *)(m_ptr + high_water_mark - (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))); + hdr->frame_length = AERON_DATA_HEADER_LENGTH; bool on_gap_detected_called = false; m_on_gap_detected = @@ -135,7 +134,7 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) { EXPECT_EQ(term_id, TERM_ID); EXPECT_EQ(term_offset, tail); - EXPECT_EQ(length, (size_t)AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); + EXPECT_EQ(length, (size_t)AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)); on_gap_detected_called = true; }; @@ -147,16 +146,16 @@ TEST_F(TermGapScannerTest, shouldReportSingleGapWhenBufferIsFull) TEST_F(TermGapScannerTest, shouldReportNoGapWhenHwmIsInPadding) { - const int32_t padding_length = AERON_ALIGN(HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT) * 2; + const int32_t padding_length = AERON_DATA_HEADER_LENGTH * 2; const int32_t tail = CAPACITY - padding_length; - const int32_t high_water_mark = CAPACITY - padding_length + HEADER_LENGTH; + const int32_t high_water_mark = CAPACITY - padding_length + AERON_DATA_HEADER_LENGTH; aeron_frame_header_t *hdr; hdr = (aeron_frame_header_t *)(m_ptr + tail); hdr->frame_length = padding_length; - hdr = (aeron_frame_header_t *)(m_ptr + tail + HEADER_LENGTH); + hdr = (aeron_frame_header_t *)(m_ptr + tail + AERON_DATA_HEADER_LENGTH); hdr->frame_length = 0; bool on_gap_detected_called = false; @@ -173,7 +172,7 @@ TEST_F(TermGapScannerTest, shouldReportNoGapWhenHwmIsInPadding) } #define DATA_LENGTH (36) -#define MESSAGE_LENGTH (DATA_LENGTH + HEADER_LENGTH) +#define MESSAGE_LENGTH (DATA_LENGTH + AERON_DATA_HEADER_LENGTH) #define ALIGNED_FRAME_LENGTH (AERON_ALIGN(MESSAGE_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT)) class LossDetectorTest : public testing::Test @@ -684,3 +683,88 @@ TEST_F(LossDetectorTest, shouldHandleNonZeroInitialTermOffset) EXPECT_EQ(called, 1); EXPECT_TRUE(loss_found); } + +TEST_F(LossDetectorTest, shouldDetectChangesInTheGapLength) +{ + const int64_t rebuild_position = ALIGNED_FRAME_LENGTH * 3; + bool loss_found; + bool called = false; + + insert_frame(offset_of_message(2)); + insert_frame(offset_of_message(5)); + + ASSERT_EQ(feedback_delay_state_init(true), 0); + ASSERT_EQ(aeron_loss_detector_init( + &m_detector, &m_delay_generator_state, LossDetectorTest::on_gap_detected, this), 0); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 32); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 32, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 64); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 64, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 64, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_FALSE(called); + EXPECT_FALSE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, 32); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + 32, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); + + m_on_gap_detected = + [&](int32_t term_id, int32_t term_offset, size_t length) + { + EXPECT_EQ(term_id, TERM_ID); + EXPECT_EQ(term_offset, offset_of_message(3)); + EXPECT_EQ(length, ALIGNED_FRAME_LENGTH * 2); + called = true; + }; + + called = false; + ASSERT_EQ(aeron_loss_detector_scan( + &m_detector, &loss_found, m_ptr, rebuild_position, rebuild_position + CAPACITY, m_time, MASK, POSITION_BITS_TO_SHIFT, TERM_ID), + offset_of_message(3)); + EXPECT_TRUE(called); + EXPECT_TRUE(loss_found); +} diff --git a/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java b/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java index 85376d7ec7..de06c1a2c9 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/LossDetectorTest.java @@ -27,6 +27,9 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import static io.aeron.driver.LossDetector.lossFound; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MIN_LENGTH; import static io.aeron.logbuffer.LogBufferDescriptor.computePosition; @@ -67,7 +70,7 @@ class LossDetectorTest private final LossHandler lossHandler = mock(LossHandler.class); private LossDetector lossDetector = new LossDetector(DELAY_GENERATOR, lossHandler); - private long currentTime = 0; + private long currentTimeNs = 0; { dataHeader.wrap(rcvBuffer); @@ -79,9 +82,11 @@ void shouldNotSendNakWhenBufferIsEmpty() final long rebuildPosition = ACTIVE_TERM_POSITION; final long hwmPosition = ACTIVE_TERM_POSITION; - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -96,9 +101,11 @@ void shouldNotNakIfNoMissingData() insertDataFrame(offsetOfMessage(1)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -112,11 +119,13 @@ void shouldNakMissingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -128,13 +137,16 @@ void shouldRetransmitNakForMissingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(30); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(60); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(30); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(60); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, atLeast(2)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler, atLeast(2)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -146,13 +158,16 @@ void shouldStopNakOnReceivingData() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(20); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(20); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -168,19 +183,23 @@ void shouldHandleMoreThan2Gaps() insertDataFrame(offsetOfMessage(4)); insertDataFrame(offsetOfMessage(6)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(80); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(80); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); final InOrder inOrder = inOrder(lossHandler); - inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); - inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); - inOrder.verify(lossHandler, never()).onGapDetected(TERM_ID, offsetOfMessage(5), gapLength()); + inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler, never()).onGapDetected(TERM_ID, offsetOfMessage(5), ALIGNED_FRAME_LENGTH); } @Test @@ -192,20 +211,24 @@ void shouldReplaceOldNakWithNewNak() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(20); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(20); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); insertDataFrame(offsetOfMessage(4)); insertDataFrame(offsetOfMessage(1)); rebuildPosition += (ALIGNED_FRAME_LENGTH * 3L); hwmPosition = (ALIGNED_FRAME_LENGTH * 5L); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(100); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); + verify(lossHandler, atLeast(1)).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); } @Test @@ -219,23 +242,27 @@ void shouldHandleLongerRetryDelay() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); - currentTime = TimeUnit.MILLISECONDS.toNanos(80); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(80); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoMoreInteractions(lossHandler); - currentTime = TimeUnit.MILLISECONDS.toNanos(240); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(240); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler, times(2)).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler, times(2)).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -247,7 +274,8 @@ void shouldNotNakImmediatelyByDefault() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verifyNoInteractions(lossHandler); } @@ -263,12 +291,15 @@ void shouldOnlySendNaksOnceOnMultipleScans() insertDataFrame(offsetOfMessage(0)); insertDataFrame(offsetOfMessage(2)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), ALIGNED_FRAME_LENGTH); } @Test @@ -282,9 +313,11 @@ void shouldHandleHwmGreaterThanCompletedBuffer() insertDataFrame(offsetOfMessage(0)); rebuildPosition += ALIGNED_FRAME_LENGTH; - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(1), TERM_BUFFER_LENGTH - (int)rebuildPosition); } @@ -300,14 +333,105 @@ void shouldHandleNonZeroInitialTermOffset() insertDataFrame(offsetOfMessage(2)); insertDataFrame(offsetOfMessage(4)); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - currentTime = TimeUnit.MILLISECONDS.toNanos(40); - lossDetector.scan(termBuffer, rebuildPosition, hwmPosition, currentTime, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(40); + lossDetector.scan( + termBuffer, rebuildPosition, hwmPosition, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID); - verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), gapLength()); + verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); verifyNoMoreInteractions(lossHandler); } + @Test + void shouldDetectChangesInTheGapLength() + { + lossDetector = getLossHandlerWithLongRetry(); + + final long rebuildPosition = ACTIVE_TERM_POSITION + (ALIGNED_FRAME_LENGTH * 3L); + + insertDataFrame(offsetOfMessage(2)); + insertDataFrame(offsetOfMessage(5)); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 32, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(100); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 32, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(200); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(300); + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(400); + assertFalse(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + insertDataFrame(offsetOfMessage(4)); + assertTrue(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(500); + assertFalse(lossFound(lossDetector.scan( + termBuffer, + rebuildPosition, + rebuildPosition + ALIGNED_FRAME_LENGTH * 2L, + currentTimeNs, + MASK, + POSITION_BITS_TO_SHIFT, + TERM_ID))); + + assertTrue(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + currentTimeNs = TimeUnit.MILLISECONDS.toNanos(600); + assertFalse(lossFound(lossDetector.scan( + termBuffer, rebuildPosition, rebuildPosition + 64, currentTimeNs, MASK, POSITION_BITS_TO_SHIFT, TERM_ID))); + + final InOrder inOrder = inOrder(lossHandler); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 32); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 64); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH * 2); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), ALIGNED_FRAME_LENGTH); + inOrder.verify(lossHandler).onGapDetected(TERM_ID, offsetOfMessage(3), 64); + inOrder.verifyNoMoreInteractions(); + } + private LossDetector getLossHandlerWithLongRetry() { return new LossDetector(DELAY_GENERATOR_WITH_LONGER_RETRY, lossHandler); @@ -339,9 +463,4 @@ private int offsetOfMessage(final int index) { return index * ALIGNED_FRAME_LENGTH; } - - private int gapLength() - { - return ALIGNED_FRAME_LENGTH; - } } diff --git a/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java new file mode 100644 index 0000000000..c2cf747b54 --- /dev/null +++ b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2014-2025 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.driver; + +import io.aeron.ChannelUri; +import io.aeron.driver.buffer.RawLog; +import io.aeron.driver.media.ReceiveChannelEndpoint; +import io.aeron.driver.media.UdpChannel; +import io.aeron.driver.status.ReceiverHwm; +import io.aeron.driver.status.ReceiverPos; +import io.aeron.driver.status.SystemCounterDescriptor; +import io.aeron.driver.status.SystemCounters; +import io.aeron.logbuffer.FrameDescriptor; +import io.aeron.protocol.DataHeaderFlyweight; +import org.agrona.BitUtil; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.concurrent.CachedEpochClock; +import org.agrona.concurrent.CachedNanoClock; +import org.agrona.concurrent.UnsafeBuffer; +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersManager; +import org.agrona.concurrent.status.Position; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static io.aeron.logbuffer.LogBufferDescriptor.*; +import static io.aeron.protocol.DataHeaderFlyweight.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PublicationImageTest +{ + private static final int TERM_LENGTH = 64 * 1024; + private static final int INITIAL_WINDOW_LENGTH = 128 * 1024; + private static final int MAX_WINDOW_LENGHT = 1024 * 1024; + private static final long CORRELATION_ID = 42; + private static final int TRANSPORT_INDEX = 3; + private static final int SESSION_ID = 888; + private static final int STREAM_ID = 101010; + private static final int INITIAL_TERM_ID = -444666; + private static final int ACTIVE_TERM_ID = INITIAL_TERM_ID + 111; + private static final int TERM_OFFSET = TERM_LENGTH - TERM_LENGTH / 4; + private static final short FLAGS = FrameDescriptor.UNFRAGMENTED; + private static final String SOURCE_IDENTITY = "aeron:udp?endpoint=localhost:5555"; + private final MediaDriver.Context ctx = new MediaDriver.Context(); + private final ReceiveChannelEndpoint receiveChannelEndpoint = mock(ReceiveChannelEndpoint.class); + private final InetSocketAddress controlAddress = mock(InetSocketAddress.class); + private final RawLog rawLog = mock(RawLog.class); + private final FeedbackDelayGenerator feedbackDelayGenerator = mock(FeedbackDelayGenerator.class); + private final CongestionControl congestionControl = mock(CongestionControl.class); + private final CachedEpochClock epochClock = new CachedEpochClock(); + private final CachedNanoClock nanoClock = new CachedNanoClock(); + private final UnsafeBuffer buffer = new UnsafeBuffer(new byte[1024]); + private final CountersManager countersManager = new CountersManager( + new UnsafeBuffer(ByteBuffer.allocateDirect(256 * 1024)), + new UnsafeBuffer(ByteBuffer.allocateDirect(64 * 1024)), + StandardCharsets.US_ASCII); + private final DataHeaderFlyweight headerFlyweight = new DataHeaderFlyweight(); + private Position hwmPosition; + private Position rcvPosition; + private PublicationImage image; + + @BeforeEach + void before() + { + epochClock.update(TimeUnit.HOURS.toMillis(1)); + nanoClock.update(TimeUnit.HOURS.toNanos(1)); + ctx + .receiverCachedNanoClock(nanoClock) + .nanoClock(nanoClock) + .epochClock(epochClock) + .imageLivenessTimeoutNs(TimeUnit.SECONDS.toNanos(10)) + .untetheredWindowLimitTimeoutNs(TimeUnit.SECONDS.toNanos(1)) + .untetheredRestingTimeoutNs(TimeUnit.SECONDS.toNanos(1)) + .statusMessageTimeoutNs(TimeUnit.MILLISECONDS.toNanos(150)) + .systemCounters(new SystemCounters(countersManager)); + + final String channel = "aeron:udp?endpoint=localhost:5555"; + final ChannelUri channelUri = ChannelUri.parse(channel); + final UdpChannel udpChannel = mock(UdpChannel.class); + when(udpChannel.channelUri()).thenReturn(channelUri); + when(receiveChannelEndpoint.subscriptionUdpChannel()).thenReturn(udpChannel); + + final SubscriptionLink subscriptionLink1 = mock(SubscriptionLink.class); + when(subscriptionLink1.isReliable()).thenReturn(true); + when(subscriptionLink1.isTether()).thenReturn(true); + final SubscriberPosition subscriberPosition1 = mock(SubscriberPosition.class); + when(subscriberPosition1.subscription()).thenReturn(subscriptionLink1); + final SubscriptionLink subscriptionLink2 = mock(SubscriptionLink.class); + when(subscriptionLink1.isReliable()).thenReturn(false); + when(subscriptionLink1.isTether()).thenReturn(false); + final SubscriberPosition subscriberPosition2 = mock(SubscriberPosition.class); + when(subscriberPosition2.subscription()).thenReturn(subscriptionLink2); + final ArrayList subscriberPositions = new ArrayList<>(); + subscriberPositions.add(subscriberPosition1); + subscriberPositions.add(subscriberPosition2); + + final UnsafeBuffer[] termBuffers = new UnsafeBuffer[PARTITION_COUNT]; + for (int i = 0; i < termBuffers.length; i++) + { + termBuffers[i] = new UnsafeBuffer(new byte[TERM_LENGTH]); + } + when(rawLog.termBuffers()).thenReturn(termBuffers); + when(rawLog.metaData()).thenReturn(new UnsafeBuffer(new byte[LOG_META_DATA_LENGTH])); + when(rawLog.termLength()).thenReturn(TERM_LENGTH); + + when(congestionControl.initialWindowLength()).thenReturn(INITIAL_WINDOW_LENGTH); + when(congestionControl.maxWindowLength()).thenReturn(MAX_WINDOW_LENGHT); + + final long registrationId = 73249234983274L; + final ExpandableArrayBuffer tempBuffer = new ExpandableArrayBuffer(); + hwmPosition = ReceiverHwm.allocate(tempBuffer, countersManager, registrationId, SESSION_ID, STREAM_ID, channel); + rcvPosition = ReceiverPos.allocate( + tempBuffer, countersManager, registrationId, SESSION_ID, STREAM_ID, channel); + + image = new PublicationImage( + CORRELATION_ID, + ctx, + receiveChannelEndpoint, + TRANSPORT_INDEX, + controlAddress, + SESSION_ID, + STREAM_ID, + INITIAL_TERM_ID, + ACTIVE_TERM_ID, + TERM_OFFSET, + FLAGS, + rawLog, + feedbackDelayGenerator, + subscriberPositions, + hwmPosition, + rcvPosition, + SOURCE_IDENTITY, + congestionControl); + + final long position = computePosition( + ACTIVE_TERM_ID, TERM_OFFSET, positionBitsToShift(TERM_LENGTH), INITIAL_TERM_ID); + assertEquals(position, hwmPosition.get()); + assertEquals(position, rcvPosition.get()); + + ThreadLocalRandom.current().nextBytes(buffer.byteArray()); + } + + @Test + void shouldAdvanceHighWaterMarkByPacketLengthWhenItContainsPaddingFrame() + { + final int totalLength = 512; + final int packetLength = 288; + final int termId = ACTIVE_TERM_ID; + final int termOffset = TERM_LENGTH - totalLength; + int offset = 0; + offset += writeFrame(offset, termOffset, termId, 65, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, 65); + offset += writeFrame(offset, termOffset + offset, termId, 96, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, 96); + offset += writeFrame(offset, termOffset + offset, termId, 224, BEGIN_AND_END_FLAGS, HDR_TYPE_PAD, 0x888AA888); + assertEquals(totalLength, offset); + final InetSocketAddress srcAddress = mock(InetSocketAddress.class); + + final int bytes = image.insertPacket(termId, termOffset, buffer, packetLength, TRANSPORT_INDEX, srcAddress); + + assertEquals(packetLength, bytes); + final int positionBitsToShift = positionBitsToShift(TERM_LENGTH); + final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, INITIAL_TERM_ID); + assertEquals(packetPosition + packetLength, hwmPosition.get()); + final UnsafeBuffer activeTermBuffer = + rawLog.termBuffers()[indexByPosition(packetPosition, positionBitsToShift)]; + for (int i = 0; i < packetLength; i++) + { + assertEquals(buffer.getByte(i), activeTermBuffer.getByte(termOffset + i)); + } + for (int i = packetLength; i < totalLength; i++) + { + assertEquals(0, activeTermBuffer.getByte(termOffset + i)); + } + } + + @Test + void shouldAdvanceHighWaterMarkPositionOnHeartbeat() + { + final int termId = ACTIVE_TERM_ID; + final int termOffset = TERM_OFFSET + 1024; + writeFrame(0, termOffset, termId, 0, BEGIN_AND_END_FLAGS, HDR_TYPE_DATA, -1); + FrameDescriptor.frameLengthOrdered(buffer, 0, 0); + final InetSocketAddress srcAddress = mock(InetSocketAddress.class); + final int packetLength = HEADER_LENGTH; + final AtomicCounter heartBeatsCounter = ctx.systemCounters().get(SystemCounterDescriptor.HEARTBEATS_RECEIVED); + final long oldHeartBeatCount = heartBeatsCounter.getWeak(); + + final int bytes = image.insertPacket(termId, termOffset, buffer, packetLength, TRANSPORT_INDEX, srcAddress); + + assertEquals(packetLength, bytes); + final int positionBitsToShift = positionBitsToShift(TERM_LENGTH); + final long packetPosition = computePosition(termId, termOffset, positionBitsToShift, INITIAL_TERM_ID); + assertEquals(packetPosition, hwmPosition.get()); + assertEquals(oldHeartBeatCount + 1, heartBeatsCounter.getWeak()); + final UnsafeBuffer activeTermBuffer = + rawLog.termBuffers()[indexByPosition(packetPosition, positionBitsToShift)]; + for (int i = 0; i < packetLength; i++) + { + assertEquals(0, activeTermBuffer.getByte(termOffset + i)); + } + } + + private int writeFrame( + final int offset, + final int termOffset, + final int termId, + final int length, + final short flags, + final int type, + final int reservedValue) + { + final int frameLength = length + HEADER_LENGTH; + headerFlyweight.wrap(buffer, offset, frameLength); + headerFlyweight + .frameLength(frameLength) + .version(CURRENT_VERSION) + .flags(flags) + .headerType(type); + headerFlyweight + .termOffset(termOffset) + .sessionId(SESSION_ID) + .streamId(STREAM_ID) + .termId(termId) + .reservedValue(reservedValue); + + return BitUtil.align(frameLength, FrameDescriptor.FRAME_ALIGNMENT); + } +}