Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loss detection fixes #1729

Merged
merged 14 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

typedef void (*aeron_term_gap_scanner_on_gap_detected_func_t)(void *clientd, int32_t term_id, int32_t term_offset, size_t length);

#define AERON_ALIGNED_HEADER_LENGTH (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))

inline int32_t aeron_term_gap_scanner_scan_for_gap(
const uint8_t *buffer,
int32_t term_id,
Expand Down Expand Up @@ -53,23 +51,21 @@ inline int32_t aeron_term_gap_scanner_scan_for_gap(
const int32_t gap_begin_offset = offset;
if (offset < limit_offset)
{
const int32_t limit = limit_offset - AERON_ALIGNED_HEADER_LENGTH;
while (offset < limit)
offset += AERON_DATA_HEADER_LENGTH;
while (offset < limit_offset)
{
offset += AERON_LOGBUFFER_FRAME_ALIGNMENT;

aeron_frame_header_t *hdr = (aeron_frame_header_t *)(buffer + offset);
int32_t frame_length;
AERON_GET_ACQUIRE(frame_length, hdr->frame_length);

if (0 != frame_length)
{
offset -= AERON_ALIGNED_HEADER_LENGTH;
break;
}
offset += AERON_DATA_HEADER_LENGTH;
}

const size_t gap_length = (offset - gap_begin_offset) + AERON_ALIGNED_HEADER_LENGTH;
const size_t gap_length = offset - gap_begin_offset;
on_gap_detected(clientd, term_id, gap_begin_offset, gap_length);
}

Expand Down
15 changes: 5 additions & 10 deletions aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.aeron.logbuffer;

import org.agrona.BitUtil;
import org.agrona.concurrent.UnsafeBuffer;

import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT;
Expand All @@ -31,8 +30,6 @@
*/
public class TermGapScanner
{
private static final int ALIGNED_HEADER_LENGTH = BitUtil.align(HEADER_LENGTH, FRAME_ALIGNMENT);

/**
* Handler for notifying of gaps in the log.
*/
Expand Down Expand Up @@ -82,19 +79,17 @@ public static int scanForGap(
final int gapBeginOffset = offset;
if (offset < limitOffset)
{
final int limit = limitOffset - ALIGNED_HEADER_LENGTH;
while (offset < limit)
offset += HEADER_LENGTH;
while (offset < limitOffset)
{
offset += FRAME_ALIGNMENT;

if (0 != termBuffer.getIntVolatile(offset))
if (0 != frameLengthVolatile(termBuffer, offset))
{
offset -= ALIGNED_HEADER_LENGTH;
break;
}
offset += HEADER_LENGTH;
}

final int gapLength = (offset - gapBeginOffset) + ALIGNED_HEADER_LENGTH;
final int gapLength = offset - gapBeginOffset;
handler.onGap(termId, gapBeginOffset, gapLength);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.agrona.BitUtil.align;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verifyNoMoreInteractions;

class TermGapScannerTest
{
Expand Down Expand Up @@ -51,6 +52,7 @@ void shouldReportGapAtBeginningOfBuffer()
assertEquals(0, TermGapScanner.scanForGap(termBuffer, TERM_ID, 0, highWaterMark, gapHandler));

verify(gapHandler).onGap(TERM_ID, 0, frameOffset);
verifyNoMoreInteractions(gapHandler);
}

@Test
Expand All @@ -67,6 +69,7 @@ void shouldReportSingleGapWhenBufferNotFull()
assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler));

verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT));
verifyNoMoreInteractions(gapHandler);
}

@Test
Expand All @@ -83,6 +86,7 @@ void shouldReportSingleGapWhenBufferIsFull()
assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler));

verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT));
verifyNoMoreInteractions(gapHandler);
}

@Test
Expand All @@ -100,4 +104,37 @@ void shouldReportNoGapWhenHwmIsInPadding()

verifyNoInteractions(gapHandler);
}

@Test
void shouldReportSingleHeaderGap()
{
final int offset = 8192 + 384;
when(termBuffer.getIntVolatile(offset)).thenReturn(0);
when(termBuffer.getIntVolatile(offset + HEADER_LENGTH)).thenReturn(128);

assertEquals(
offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler));

verify(termBuffer).getIntVolatile(offset);
verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH);
verify(gapHandler).onGap(TERM_ID, offset, HEADER_LENGTH);
verifyNoMoreInteractions(gapHandler, termBuffer);
}

@Test
void shouldReportGapAtTheEndOfTheBuffer()
{
final int offset = LOG_BUFFER_CAPACITY - 128;
when(termBuffer.getIntVolatile(offset)).thenReturn(0);

assertEquals(
offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler));

verify(termBuffer).getIntVolatile(offset);
verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH);
verify(termBuffer).getIntVolatile(offset + 2 * HEADER_LENGTH);
verify(termBuffer).getIntVolatile(offset + 3 * HEADER_LENGTH);
verify(gapHandler).onGap(TERM_ID, offset, 128);
verifyNoMoreInteractions(gapHandler, termBuffer);
}
}
3 changes: 2 additions & 1 deletion aeron-driver/src/main/c/aeron_loss_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ inline void aeron_loss_detector_on_gap(void *clientd, int32_t term_id, int32_t t
inline bool aeron_loss_detector_gaps_match(aeron_loss_detector_t *detector)
{
return detector->active_gap.term_id == detector->scanned_gap.term_id &&
detector->active_gap.term_offset == detector->scanned_gap.term_offset;
detector->active_gap.term_offset == detector->scanned_gap.term_offset &&
detector->active_gap.length == detector->scanned_gap.length;
}

inline void aeron_loss_detector_activate_gap(aeron_loss_detector_t *detector, int64_t now_ns)
Expand Down
8 changes: 5 additions & 3 deletions aeron-driver/src/main/java/io/aeron/driver/LossDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,23 @@ public long scan(
final int initialTermId)
{
boolean lossFound = false;
int rebuildOffset = (int)rebuildPosition & termLengthMask;
int rebuildOffset = (int)(rebuildPosition & termLengthMask);

if (rebuildPosition < hwmPosition)
{
final int rebuildTermCount = (int)(rebuildPosition >>> positionBitsToShift);
final int hwmTermCount = (int)(hwmPosition >>> positionBitsToShift);

final int rebuildTermId = initialTermId + rebuildTermCount;
final int hwmTermOffset = (int)hwmPosition & termLengthMask;
final int hwmTermOffset = (int)(hwmPosition & termLengthMask);
final int limitOffset = rebuildTermCount == hwmTermCount ? hwmTermOffset : termLengthMask + 1;

rebuildOffset = scanForGap(termBuffer, rebuildTermId, rebuildOffset, limitOffset, this);
if (rebuildOffset < limitOffset)
{
if (scannedTermOffset != activeTermOffset || scannedTermId != activeTermId)
if (scannedTermOffset != activeTermOffset ||
scannedTermId != activeTermId ||
scannedLength != activeLength)
{
activateGap(nowNs);
lossFound = true;
Expand Down
13 changes: 8 additions & 5 deletions aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ enum State
private static final VarHandle END_SM_CHANGE_VH;
private static final VarHandle BEGIN_LOSS_CHANGE_VH;
private static final VarHandle END_LOSS_CHANGE_VH;

static
{
try
Expand Down Expand Up @@ -561,7 +562,7 @@ int trackRebuild(final long nowNs)
positionBitsToShift,
initialTermId);

final int rebuildTermOffset = (int)rebuildPosition & termLengthMask;
final int rebuildTermOffset = (int)(rebuildPosition & termLengthMask);
final long newRebuildPosition = (rebuildPosition - rebuildTermOffset) + rebuildOffset(scanOutcome);
this.rebuildPosition.proposeMaxOrdered(newRebuildPosition);

Expand Down Expand Up @@ -631,12 +632,12 @@ int insertPacket(
{
final long nowNs = cachedNanoClock.nanoTime();
timeOfLastPacketNs = nowNs;
trackConnection(transportIndex, srcAddress, nowNs);
final ImageConnection imageConnection = trackConnection(transportIndex, srcAddress, nowNs);

if (isEndOfStream)
{
imageConnections[transportIndex].eosPosition = packetPosition;
imageConnections[transportIndex].isEos = true;
imageConnection.eosPosition = packetPosition;
imageConnection.isEos = true;

if (!this.isEndOfStream && isAllConnectedEos())
{
Expand Down Expand Up @@ -1017,7 +1018,8 @@ private void cleanBufferTo(final long position)
}
}

private void trackConnection(final int transportIndex, final InetSocketAddress srcAddress, final long nowNs)
private ImageConnection trackConnection(
final int transportIndex, final InetSocketAddress srcAddress, final long nowNs)
{
imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1);
ImageConnection imageConnection = imageConnections[transportIndex];
Expand All @@ -1030,6 +1032,7 @@ private void trackConnection(final int transportIndex, final InetSocketAddress s

imageConnection.timeOfLastActivityNs = nowNs;
imageConnection.timeOfLastFrameNs = nowNs;
return imageConnection;
}

private boolean isAllConnectedEos()
Expand Down
Loading