Skip to content

Commit

Permalink
Loss detection fixes (#1729)
Browse files Browse the repository at this point in the history
* [Java] Update active loss gap upon length change. Gap length can increase when heartbeats advance the high-water mark position, or it can decrease if messages arrive out of order. In both cases we want to adjust the NAK range of the receiver.

* [Java] Update offset masking code to cast the result and not the position.

* [Java] Read high-water mark position after computing the rebuild position to ensure the widest possible loss detection scan window.

Loss detection code is concurrent to the application threads that are consuming from the image but also to the receiver thread that inserts new packets into the image and advances the high-water mark position counter. Reading that counter last ensures that we take into account all inserted packets up to this point.

* [Java] Simplify `sendData` path by assuming that the window is almost never zero. Also do not track padding bytes as the "sent bytes".

* [Java] Take into account a trailing padding frame when adjusting high-water mark on the receiver.

A single network packet might contain multiple frames (up to an MTU). When such packet contains only `HDR_TYPE_DATA` then packet length will correspond to the combined lengths of the frames it contains. However, if there is a trailing `HDR_TYPE_PAD` packet then the packet length will not be equal to the sum of the frame sizes.

For example, given a packet containing two 80 byte messages (whose aligned length is 128 bytes each) and a padding for 224 bytes. The length of the packet will be 288 bytes (256 + 32) but the "frame size" will be 512 bytes (256 + 256).

* [Java] Simplify term gap scanning logic by removing `ALIGNED_HEADER_LENGTH` and streamlining non-zero frame lookup.

* [Java] Add a test for `rcv-hwm` position update when a packet contains trailing padding frame + rename method that computes the target position offset.

* [Java] Increment `snd-bpe` if window is zero or too small to fit a frame of data.

* [Java] Do not take into account actual padding size as the `rcv-hwm` will advance on next heartbeat/data frame anyway.

* Revert "[Java] Increment `snd-bpe` if window is zero or too small to fit a frame of data."

This reverts commit 9b8229c.

* Revert "[Java] Simplify `sendData` path by assuming that the window is almost never zero. Also do not track padding bytes as the "sent bytes"."

This reverts commit 411f102.

* Revert "[Java] Read high-water mark position after computing the rebuild position to ensure the widest possible loss detection scan window."

This reverts commit 60e6047.

* [C] Simplify term gap scanning logic by removing `ALIGNED_HEADER_LENGTH` and streamlining non-zero frame lookup.

* [C] Update active loss gap upon length change. Gap length can increase when heartbeats advance the high-water mark position, or it can decrease if messages arrive out of order. In both cases we want to adjust the NAK range of the receiver.
  • Loading branch information
vyazelenko authored Jan 29, 2025
1 parent 05e80f1 commit 5db3ae0
Show file tree
Hide file tree
Showing 9 changed files with 598 additions and 112 deletions.
12 changes: 4 additions & 8 deletions aeron-client/src/main/c/concurrent/aeron_term_gap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

typedef void (*aeron_term_gap_scanner_on_gap_detected_func_t)(void *clientd, int32_t term_id, int32_t term_offset, size_t length);

#define AERON_ALIGNED_HEADER_LENGTH (AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT))

inline int32_t aeron_term_gap_scanner_scan_for_gap(
const uint8_t *buffer,
int32_t term_id,
Expand Down Expand Up @@ -53,23 +51,21 @@ inline int32_t aeron_term_gap_scanner_scan_for_gap(
const int32_t gap_begin_offset = offset;
if (offset < limit_offset)
{
const int32_t limit = limit_offset - AERON_ALIGNED_HEADER_LENGTH;
while (offset < limit)
offset += AERON_DATA_HEADER_LENGTH;
while (offset < limit_offset)
{
offset += AERON_LOGBUFFER_FRAME_ALIGNMENT;

aeron_frame_header_t *hdr = (aeron_frame_header_t *)(buffer + offset);
int32_t frame_length;
AERON_GET_ACQUIRE(frame_length, hdr->frame_length);

if (0 != frame_length)
{
offset -= AERON_ALIGNED_HEADER_LENGTH;
break;
}
offset += AERON_DATA_HEADER_LENGTH;
}

const size_t gap_length = (offset - gap_begin_offset) + AERON_ALIGNED_HEADER_LENGTH;
const size_t gap_length = offset - gap_begin_offset;
on_gap_detected(clientd, term_id, gap_begin_offset, gap_length);
}

Expand Down
15 changes: 5 additions & 10 deletions aeron-client/src/main/java/io/aeron/logbuffer/TermGapScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.aeron.logbuffer;

import org.agrona.BitUtil;
import org.agrona.concurrent.UnsafeBuffer;

import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT;
Expand All @@ -31,8 +30,6 @@
*/
public class TermGapScanner
{
private static final int ALIGNED_HEADER_LENGTH = BitUtil.align(HEADER_LENGTH, FRAME_ALIGNMENT);

/**
* Handler for notifying of gaps in the log.
*/
Expand Down Expand Up @@ -82,19 +79,17 @@ public static int scanForGap(
final int gapBeginOffset = offset;
if (offset < limitOffset)
{
final int limit = limitOffset - ALIGNED_HEADER_LENGTH;
while (offset < limit)
offset += HEADER_LENGTH;
while (offset < limitOffset)
{
offset += FRAME_ALIGNMENT;

if (0 != termBuffer.getIntVolatile(offset))
if (0 != frameLengthVolatile(termBuffer, offset))
{
offset -= ALIGNED_HEADER_LENGTH;
break;
}
offset += HEADER_LENGTH;
}

final int gapLength = (offset - gapBeginOffset) + ALIGNED_HEADER_LENGTH;
final int gapLength = offset - gapBeginOffset;
handler.onGap(termId, gapBeginOffset, gapLength);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.agrona.BitUtil.align;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verifyNoMoreInteractions;

class TermGapScannerTest
{
Expand Down Expand Up @@ -51,6 +52,7 @@ void shouldReportGapAtBeginningOfBuffer()
assertEquals(0, TermGapScanner.scanForGap(termBuffer, TERM_ID, 0, highWaterMark, gapHandler));

verify(gapHandler).onGap(TERM_ID, 0, frameOffset);
verifyNoMoreInteractions(gapHandler);
}

@Test
Expand All @@ -67,6 +69,7 @@ void shouldReportSingleGapWhenBufferNotFull()
assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler));

verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT));
verifyNoMoreInteractions(gapHandler);
}

@Test
Expand All @@ -83,6 +86,7 @@ void shouldReportSingleGapWhenBufferIsFull()
assertEquals(tail, TermGapScanner.scanForGap(termBuffer, TERM_ID, tail, highWaterMark, gapHandler));

verify(gapHandler).onGap(TERM_ID, tail, align(HEADER_LENGTH, FRAME_ALIGNMENT));
verifyNoMoreInteractions(gapHandler);
}

@Test
Expand All @@ -100,4 +104,37 @@ void shouldReportNoGapWhenHwmIsInPadding()

verifyNoInteractions(gapHandler);
}

@Test
void shouldReportSingleHeaderGap()
{
final int offset = 8192 + 384;
when(termBuffer.getIntVolatile(offset)).thenReturn(0);
when(termBuffer.getIntVolatile(offset + HEADER_LENGTH)).thenReturn(128);

assertEquals(
offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler));

verify(termBuffer).getIntVolatile(offset);
verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH);
verify(gapHandler).onGap(TERM_ID, offset, HEADER_LENGTH);
verifyNoMoreInteractions(gapHandler, termBuffer);
}

@Test
void shouldReportGapAtTheEndOfTheBuffer()
{
final int offset = LOG_BUFFER_CAPACITY - 128;
when(termBuffer.getIntVolatile(offset)).thenReturn(0);

assertEquals(
offset, TermGapScanner.scanForGap(termBuffer, TERM_ID, offset, LOG_BUFFER_CAPACITY, gapHandler));

verify(termBuffer).getIntVolatile(offset);
verify(termBuffer).getIntVolatile(offset + HEADER_LENGTH);
verify(termBuffer).getIntVolatile(offset + 2 * HEADER_LENGTH);
verify(termBuffer).getIntVolatile(offset + 3 * HEADER_LENGTH);
verify(gapHandler).onGap(TERM_ID, offset, 128);
verifyNoMoreInteractions(gapHandler, termBuffer);
}
}
3 changes: 2 additions & 1 deletion aeron-driver/src/main/c/aeron_loss_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ inline void aeron_loss_detector_on_gap(void *clientd, int32_t term_id, int32_t t
inline bool aeron_loss_detector_gaps_match(aeron_loss_detector_t *detector)
{
return detector->active_gap.term_id == detector->scanned_gap.term_id &&
detector->active_gap.term_offset == detector->scanned_gap.term_offset;
detector->active_gap.term_offset == detector->scanned_gap.term_offset &&
detector->active_gap.length == detector->scanned_gap.length;
}

inline void aeron_loss_detector_activate_gap(aeron_loss_detector_t *detector, int64_t now_ns)
Expand Down
8 changes: 5 additions & 3 deletions aeron-driver/src/main/java/io/aeron/driver/LossDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,23 @@ public long scan(
final int initialTermId)
{
boolean lossFound = false;
int rebuildOffset = (int)rebuildPosition & termLengthMask;
int rebuildOffset = (int)(rebuildPosition & termLengthMask);

if (rebuildPosition < hwmPosition)
{
final int rebuildTermCount = (int)(rebuildPosition >>> positionBitsToShift);
final int hwmTermCount = (int)(hwmPosition >>> positionBitsToShift);

final int rebuildTermId = initialTermId + rebuildTermCount;
final int hwmTermOffset = (int)hwmPosition & termLengthMask;
final int hwmTermOffset = (int)(hwmPosition & termLengthMask);
final int limitOffset = rebuildTermCount == hwmTermCount ? hwmTermOffset : termLengthMask + 1;

rebuildOffset = scanForGap(termBuffer, rebuildTermId, rebuildOffset, limitOffset, this);
if (rebuildOffset < limitOffset)
{
if (scannedTermOffset != activeTermOffset || scannedTermId != activeTermId)
if (scannedTermOffset != activeTermOffset ||
scannedTermId != activeTermId ||
scannedLength != activeLength)
{
activateGap(nowNs);
lossFound = true;
Expand Down
13 changes: 8 additions & 5 deletions aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ enum State
private static final VarHandle END_SM_CHANGE_VH;
private static final VarHandle BEGIN_LOSS_CHANGE_VH;
private static final VarHandle END_LOSS_CHANGE_VH;

static
{
try
Expand Down Expand Up @@ -561,7 +562,7 @@ int trackRebuild(final long nowNs)
positionBitsToShift,
initialTermId);

final int rebuildTermOffset = (int)rebuildPosition & termLengthMask;
final int rebuildTermOffset = (int)(rebuildPosition & termLengthMask);
final long newRebuildPosition = (rebuildPosition - rebuildTermOffset) + rebuildOffset(scanOutcome);
this.rebuildPosition.proposeMaxOrdered(newRebuildPosition);

Expand Down Expand Up @@ -631,12 +632,12 @@ int insertPacket(
{
final long nowNs = cachedNanoClock.nanoTime();
timeOfLastPacketNs = nowNs;
trackConnection(transportIndex, srcAddress, nowNs);
final ImageConnection imageConnection = trackConnection(transportIndex, srcAddress, nowNs);

if (isEndOfStream)
{
imageConnections[transportIndex].eosPosition = packetPosition;
imageConnections[transportIndex].isEos = true;
imageConnection.eosPosition = packetPosition;
imageConnection.isEos = true;

if (!this.isEndOfStream && isAllConnectedEos())
{
Expand Down Expand Up @@ -1017,7 +1018,8 @@ private void cleanBufferTo(final long position)
}
}

private void trackConnection(final int transportIndex, final InetSocketAddress srcAddress, final long nowNs)
private ImageConnection trackConnection(
final int transportIndex, final InetSocketAddress srcAddress, final long nowNs)
{
imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1);
ImageConnection imageConnection = imageConnections[transportIndex];
Expand All @@ -1030,6 +1032,7 @@ private void trackConnection(final int transportIndex, final InetSocketAddress s

imageConnection.timeOfLastActivityNs = nowNs;
imageConnection.timeOfLastFrameNs = nowNs;
return imageConnection;
}

private boolean isAllConnectedEos()
Expand Down
Loading

0 comments on commit 5db3ae0

Please sign in to comment.