Skip to content

Commit

Permalink
[C] Track snd-bpe when the available window is too small to send th…
Browse files Browse the repository at this point in the history
…e next available data fragment.
  • Loading branch information
vyazelenko committed Dec 22, 2023
1 parent 2315b08 commit a5ebbc5
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 48 deletions.
4 changes: 2 additions & 2 deletions aeron-client/src/main/c/concurrent/aeron_term_scanner.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@

#include "concurrent/aeron_term_scanner.h"

extern size_t aeron_term_scanner_scan_for_availability(
const uint8_t *buffer, size_t term_length_left, size_t max_length, size_t *padding);
extern int32_t aeron_term_scanner_scan_for_availability(
const uint8_t *buffer, int32_t term_length_left, int32_t max_length, int32_t *padding);
12 changes: 6 additions & 6 deletions aeron-client/src/main/c/concurrent/aeron_term_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
#include "util/aeron_bitutil.h"
#include "aeron_logbuffer_descriptor.h"

inline size_t aeron_term_scanner_scan_for_availability(
const uint8_t *buffer, size_t term_length_left, size_t max_length, size_t *padding)
inline int32_t aeron_term_scanner_scan_for_availability(
const uint8_t *buffer, int32_t term_length_left, int32_t max_length, int32_t *padding)
{
const size_t limit = max_length < term_length_left ? max_length : term_length_left;
size_t available = 0;
const int32_t limit = max_length < term_length_left ? max_length : term_length_left;
int32_t available = 0;
*padding = 0;

do
Expand All @@ -44,15 +44,15 @@ inline size_t aeron_term_scanner_scan_for_availability(
int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT);
if (AERON_HDR_TYPE_PAD == frame_header->type)
{
*padding = aligned_frame_length - AERON_DATA_HEADER_LENGTH;
*padding = aligned_frame_length - (int32_t)AERON_DATA_HEADER_LENGTH;
aligned_frame_length = AERON_DATA_HEADER_LENGTH;
}

available += aligned_frame_length;

if (available > limit)
{
available -= aligned_frame_length;
available = aligned_frame_length == available ? -available : available - aligned_frame_length;
*padding = 0;
break;
}
Expand Down
37 changes: 24 additions & 13 deletions aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ int aeron_network_publication_heartbeat_message_check(
int aeron_network_publication_send_data(
aeron_network_publication_t *publication, int64_t now_ns, int64_t snd_pos, int32_t term_offset)
{
const size_t term_length = (size_t)publication->term_length_mask + 1;
const int32_t term_length = publication->term_length_mask + 1;
const size_t max_vlen = publication->current_messages_per_send;
int result = 0, vlen = 0;
int64_t bytes_sent = 0;
Expand All @@ -414,14 +414,14 @@ int aeron_network_publication_send_data(

for (size_t i = 0; i < max_vlen && available_window > 0; i++)
{
size_t scan_limit = (size_t)available_window < publication->mtu_length ?
(size_t)available_window : publication->mtu_length;
int32_t scan_limit = available_window < (int32_t)publication->mtu_length ?
available_window : (int32_t)publication->mtu_length;
size_t active_index = aeron_logbuffer_index_by_position(snd_pos, publication->position_bits_to_shift);
size_t padding = 0;
int32_t padding = 0;

uint8_t *ptr = publication->mapped_raw_log.term_buffers[active_index].addr + term_offset;
const size_t term_length_left = term_length - (size_t)term_offset;
const size_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, scan_limit, &padding);
const int32_t term_length_left = term_length - term_offset;
const int32_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, scan_limit, &padding);

if (available > 0)
{
Expand All @@ -434,8 +434,18 @@ int aeron_network_publication_send_data(
term_offset += total_available;
highest_pos += total_available;
}
else if (available < 0)
{
if (publication->track_sender_limits)
{
aeron_counter_ordered_increment(publication->snd_bpe_counter.value_addr, 1);
aeron_counter_ordered_increment(publication->sender_flow_control_limits_counter, 1);
publication->track_sender_limits = false;
}
break;
}

if (available == 0 || term_length == (size_t)term_offset)
if (available == 0 || term_length == term_offset)
{
break;
}
Expand Down Expand Up @@ -538,7 +548,7 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter
int64_t sender_position = aeron_counter_get(publication->snd_pos_position.value_addr);
int64_t resend_position = aeron_logbuffer_compute_position(
term_id, term_offset, publication->position_bits_to_shift, publication->initial_term_id);
size_t term_length = (size_t)publication->term_length_mask + 1;
int32_t term_length = publication->term_length_mask + 1;
int64_t bottom_resend_window =
sender_position - (int64_t)(term_length / 2) - (int64_t)aeron_compute_max_message_length(term_length);
int result = 0;
Expand All @@ -561,11 +571,12 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter
offset += bytes_sent;

uint8_t *ptr = publication->mapped_raw_log.term_buffers[index].addr + offset;
size_t term_length_left = term_length - (size_t)offset;
size_t padding = 0;
size_t max_length = remaining_bytes < publication->mtu_length ? remaining_bytes : publication->mtu_length;
int32_t term_length_left = term_length - offset;
int32_t padding = 0;
int32_t max_length = remaining_bytes < publication->mtu_length ?
(int32_t)remaining_bytes : (int32_t)publication->mtu_length;

size_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, max_length, &padding);
int32_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, max_length, &padding);
if (available <= 0)
{
break;
Expand All @@ -591,7 +602,7 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter
break;
}

bytes_sent = (int32_t)(available + padding);
bytes_sent = available + padding;
remaining_bytes -= bytes_sent;
}
while (remaining_bytes > 0);
Expand Down
54 changes: 27 additions & 27 deletions aeron-driver/src/test/c/aeron_term_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,58 +40,58 @@ class TermScannerTest : public testing::Test
protected:
buffer_t m_buffer = {};
uint8_t *m_ptr = nullptr;
size_t m_padding = 0;
int32_t m_padding = 0;
};

TEST_F(TermScannerTest, shouldReturnZeroOnEmptyLog)
{
EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, MTU_LENGTH, &m_padding), 0u);
EXPECT_EQ(m_padding, 0u);
EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, MTU_LENGTH, &m_padding), 0);
EXPECT_EQ(m_padding, 0);
}

TEST_F(TermScannerTest, shouldScanSingleMessage)
{
int32_t frame_length = AERON_DATA_HEADER_LENGTH + 1;
auto aligned_frame_length = (size_t)AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT);
int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT);
auto *data_header = (aeron_data_header_t *)m_ptr;

data_header->frame_header.frame_length = (int32_t)aligned_frame_length;
data_header->frame_header.frame_length = aligned_frame_length;
data_header->frame_header.type = AERON_HDR_TYPE_DATA;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, MTU_LENGTH, &m_padding), aligned_frame_length);
EXPECT_EQ(m_padding, 0u);
EXPECT_EQ(m_padding, 0);
}

TEST_F(TermScannerTest, shouldFailToScanMessageLargerThanMaxLength)
{
int32_t frame_length = AERON_DATA_HEADER_LENGTH + 1;
auto aligned_frame_length = (size_t)AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT);
size_t max_length = aligned_frame_length - 1;
int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT);
int32_t max_length = aligned_frame_length - 1;
auto *data_header = (aeron_data_header_t *)m_ptr;

data_header->frame_header.frame_length = (int32_t)aligned_frame_length;
data_header->frame_header.frame_length = aligned_frame_length;
data_header->frame_header.type = AERON_HDR_TYPE_DATA;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, max_length, &m_padding), 0u);
EXPECT_EQ(m_padding, 0u);
EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, max_length, &m_padding), -aligned_frame_length);
EXPECT_EQ(m_padding, 0);
}

TEST_F(TermScannerTest, shouldScanTwoMessagesThatFitInSingleMtu)
{
int32_t frame_length = AERON_DATA_HEADER_LENGTH + 100;
auto aligned_frame_length = (size_t)AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT);
int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT);
auto *data_header = (aeron_data_header_t *)m_ptr;

data_header->frame_header.frame_length = (int32_t)aligned_frame_length;
data_header->frame_header.frame_length = aligned_frame_length;
data_header->frame_header.type = AERON_HDR_TYPE_DATA;

data_header = (aeron_data_header_t *)(m_ptr + aligned_frame_length);
data_header->frame_header.frame_length = (int32_t)aligned_frame_length;
data_header->frame_header.frame_length = aligned_frame_length;
data_header->frame_header.type = AERON_HDR_TYPE_DATA;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(
m_ptr, CAPACITY, MTU_LENGTH, &m_padding), 2 * aligned_frame_length);
EXPECT_EQ(m_padding, 0u);
EXPECT_EQ(m_padding, 0);
}

TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtMtuBoundary)
Expand All @@ -108,8 +108,8 @@ TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtMtuBoundary)
data_header->frame_header.type = AERON_HDR_TYPE_DATA;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(
m_ptr, CAPACITY, MTU_LENGTH, &m_padding), (size_t)(frame_one_length + frame_two_length));
EXPECT_EQ(m_padding, 0u);
m_ptr, CAPACITY, MTU_LENGTH, &m_padding), frame_one_length + frame_two_length);
EXPECT_EQ(m_padding, 0);
}

TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtSecondThatSpansMtu)
Expand All @@ -126,8 +126,8 @@ TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtSecondThatSpansMtu)
data_header->frame_header.type = AERON_HDR_TYPE_DATA;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(
m_ptr, CAPACITY, MTU_LENGTH, &m_padding), (size_t)frame_one_length);
EXPECT_EQ(m_padding, 0u);
m_ptr, CAPACITY, MTU_LENGTH, &m_padding), frame_one_length);
EXPECT_EQ(m_padding, 0);
}

TEST_F(TermScannerTest, shouldScanLastFrameInBuffer)
Expand All @@ -141,8 +141,8 @@ TEST_F(TermScannerTest, shouldScanLastFrameInBuffer)
data_header->frame_header.type = AERON_HDR_TYPE_DATA;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(
m_ptr, aligned_frame_length, MTU_LENGTH, &m_padding), (size_t)aligned_frame_length);
EXPECT_EQ(m_padding, 0u);
m_ptr, aligned_frame_length, MTU_LENGTH, &m_padding), aligned_frame_length);
EXPECT_EQ(m_padding, 0);
}

TEST_F(TermScannerTest, shouldScanLastMessageInBufferPlusPadding)
Expand All @@ -163,15 +163,15 @@ TEST_F(TermScannerTest, shouldScanLastMessageInBufferPlusPadding)
data_header->frame_header.type = AERON_HDR_TYPE_PAD;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(
m_ptr, CAPACITY - offset, MTU_LENGTH, &m_padding), (size_t)(aligned_frame_length + AERON_DATA_HEADER_LENGTH));
EXPECT_EQ(m_padding, (size_t)(padding_frame_length - AERON_DATA_HEADER_LENGTH));
m_ptr, CAPACITY - offset, MTU_LENGTH, &m_padding), aligned_frame_length + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(m_padding, padding_frame_length - AERON_DATA_HEADER_LENGTH);
}

TEST_F(TermScannerTest, shouldScanLastMessageInBufferMinusPaddingLimitedByMtu)
{
int32_t aligned_frame_length = AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT);
int32_t offset = CAPACITY - (AERON_ALIGN((AERON_DATA_HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT));
size_t mtu = (size_t)aligned_frame_length + 8u;
int32_t offset = CAPACITY - AERON_ALIGN((AERON_DATA_HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT);
int32_t mtu = aligned_frame_length + 8;

m_ptr += offset;

Expand All @@ -185,6 +185,6 @@ TEST_F(TermScannerTest, shouldScanLastMessageInBufferMinusPaddingLimitedByMtu)
data_header->frame_header.type = AERON_HDR_TYPE_PAD;

EXPECT_EQ(aeron_term_scanner_scan_for_availability(
m_ptr, CAPACITY - offset, mtu, &m_padding), (size_t)aligned_frame_length);
EXPECT_EQ(m_padding, 0u);
m_ptr, CAPACITY - offset, mtu, &m_padding), aligned_frame_length);
EXPECT_EQ(m_padding, 0);
}

0 comments on commit a5ebbc5

Please sign in to comment.