diff --git a/aeron-client/src/main/c/concurrent/aeron_term_scanner.c b/aeron-client/src/main/c/concurrent/aeron_term_scanner.c index a20b4956ac..c682230aa2 100644 --- a/aeron-client/src/main/c/concurrent/aeron_term_scanner.c +++ b/aeron-client/src/main/c/concurrent/aeron_term_scanner.c @@ -16,5 +16,5 @@ #include "concurrent/aeron_term_scanner.h" -extern size_t aeron_term_scanner_scan_for_availability( - const uint8_t *buffer, size_t term_length_left, size_t max_length, size_t *padding); +extern int32_t aeron_term_scanner_scan_for_availability( + const uint8_t *buffer, int32_t term_length_left, int32_t max_length, int32_t *padding); diff --git a/aeron-client/src/main/c/concurrent/aeron_term_scanner.h b/aeron-client/src/main/c/concurrent/aeron_term_scanner.h index 0753919685..eb507aa074 100644 --- a/aeron-client/src/main/c/concurrent/aeron_term_scanner.h +++ b/aeron-client/src/main/c/concurrent/aeron_term_scanner.h @@ -22,11 +22,11 @@ #include "util/aeron_bitutil.h" #include "aeron_logbuffer_descriptor.h" -inline size_t aeron_term_scanner_scan_for_availability( - const uint8_t *buffer, size_t term_length_left, size_t max_length, size_t *padding) +inline int32_t aeron_term_scanner_scan_for_availability( + const uint8_t *buffer, int32_t term_length_left, int32_t max_length, int32_t *padding) { - const size_t limit = max_length < term_length_left ? max_length : term_length_left; - size_t available = 0; + const int32_t limit = max_length < term_length_left ? max_length : term_length_left; + int32_t available = 0; *padding = 0; do @@ -44,7 +44,7 @@ inline size_t aeron_term_scanner_scan_for_availability( int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT); if (AERON_HDR_TYPE_PAD == frame_header->type) { - *padding = aligned_frame_length - AERON_DATA_HEADER_LENGTH; + *padding = aligned_frame_length - (int32_t)AERON_DATA_HEADER_LENGTH; aligned_frame_length = AERON_DATA_HEADER_LENGTH; } @@ -52,7 +52,7 @@ inline size_t aeron_term_scanner_scan_for_availability( if (available > limit) { - available -= aligned_frame_length; + available = aligned_frame_length == available ? -available : available - aligned_frame_length; *padding = 0; break; } diff --git a/aeron-driver/src/main/c/aeron_network_publication.c b/aeron-driver/src/main/c/aeron_network_publication.c index 8d3b788ae4..b92357ee21 100644 --- a/aeron-driver/src/main/c/aeron_network_publication.c +++ b/aeron-driver/src/main/c/aeron_network_publication.c @@ -404,7 +404,7 @@ int aeron_network_publication_heartbeat_message_check( int aeron_network_publication_send_data( aeron_network_publication_t *publication, int64_t now_ns, int64_t snd_pos, int32_t term_offset) { - const size_t term_length = (size_t)publication->term_length_mask + 1; + const int32_t term_length = publication->term_length_mask + 1; const size_t max_vlen = publication->current_messages_per_send; int result = 0, vlen = 0; int64_t bytes_sent = 0; @@ -414,14 +414,14 @@ int aeron_network_publication_send_data( for (size_t i = 0; i < max_vlen && available_window > 0; i++) { - size_t scan_limit = (size_t)available_window < publication->mtu_length ? - (size_t)available_window : publication->mtu_length; + int32_t scan_limit = available_window < (int32_t)publication->mtu_length ? + available_window : (int32_t)publication->mtu_length; size_t active_index = aeron_logbuffer_index_by_position(snd_pos, publication->position_bits_to_shift); - size_t padding = 0; + int32_t padding = 0; uint8_t *ptr = publication->mapped_raw_log.term_buffers[active_index].addr + term_offset; - const size_t term_length_left = term_length - (size_t)term_offset; - const size_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, scan_limit, &padding); + const int32_t term_length_left = term_length - term_offset; + const int32_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, scan_limit, &padding); if (available > 0) { @@ -434,8 +434,18 @@ int aeron_network_publication_send_data( term_offset += total_available; highest_pos += total_available; } + else if (available < 0) + { + if (publication->track_sender_limits) + { + aeron_counter_ordered_increment(publication->snd_bpe_counter.value_addr, 1); + aeron_counter_ordered_increment(publication->sender_flow_control_limits_counter, 1); + publication->track_sender_limits = false; + } + break; + } - if (available == 0 || term_length == (size_t)term_offset) + if (available == 0 || term_length == term_offset) { break; } @@ -538,7 +548,7 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter int64_t sender_position = aeron_counter_get(publication->snd_pos_position.value_addr); int64_t resend_position = aeron_logbuffer_compute_position( term_id, term_offset, publication->position_bits_to_shift, publication->initial_term_id); - size_t term_length = (size_t)publication->term_length_mask + 1; + int32_t term_length = publication->term_length_mask + 1; int64_t bottom_resend_window = sender_position - (int64_t)(term_length / 2) - (int64_t)aeron_compute_max_message_length(term_length); int result = 0; @@ -561,11 +571,12 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter offset += bytes_sent; uint8_t *ptr = publication->mapped_raw_log.term_buffers[index].addr + offset; - size_t term_length_left = term_length - (size_t)offset; - size_t padding = 0; - size_t max_length = remaining_bytes < publication->mtu_length ? remaining_bytes : publication->mtu_length; + int32_t term_length_left = term_length - offset; + int32_t padding = 0; + int32_t max_length = remaining_bytes < publication->mtu_length ? + (int32_t)remaining_bytes : (int32_t)publication->mtu_length; - size_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, max_length, &padding); + int32_t available = aeron_term_scanner_scan_for_availability(ptr, term_length_left, max_length, &padding); if (available <= 0) { break; @@ -591,7 +602,7 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter break; } - bytes_sent = (int32_t)(available + padding); + bytes_sent = available + padding; remaining_bytes -= bytes_sent; } while (remaining_bytes > 0); diff --git a/aeron-driver/src/test/c/aeron_term_scanner_test.cpp b/aeron-driver/src/test/c/aeron_term_scanner_test.cpp index 53a40d38fa..2d571463ee 100644 --- a/aeron-driver/src/test/c/aeron_term_scanner_test.cpp +++ b/aeron-driver/src/test/c/aeron_term_scanner_test.cpp @@ -40,58 +40,58 @@ class TermScannerTest : public testing::Test protected: buffer_t m_buffer = {}; uint8_t *m_ptr = nullptr; - size_t m_padding = 0; + int32_t m_padding = 0; }; TEST_F(TermScannerTest, shouldReturnZeroOnEmptyLog) { - EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, MTU_LENGTH, &m_padding), 0u); - EXPECT_EQ(m_padding, 0u); + EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, MTU_LENGTH, &m_padding), 0); + EXPECT_EQ(m_padding, 0); } TEST_F(TermScannerTest, shouldScanSingleMessage) { int32_t frame_length = AERON_DATA_HEADER_LENGTH + 1; - auto aligned_frame_length = (size_t)AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT); + int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT); auto *data_header = (aeron_data_header_t *)m_ptr; - data_header->frame_header.frame_length = (int32_t)aligned_frame_length; + data_header->frame_header.frame_length = aligned_frame_length; data_header->frame_header.type = AERON_HDR_TYPE_DATA; EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, MTU_LENGTH, &m_padding), aligned_frame_length); - EXPECT_EQ(m_padding, 0u); + EXPECT_EQ(m_padding, 0); } TEST_F(TermScannerTest, shouldFailToScanMessageLargerThanMaxLength) { int32_t frame_length = AERON_DATA_HEADER_LENGTH + 1; - auto aligned_frame_length = (size_t)AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT); - size_t max_length = aligned_frame_length - 1; + int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT); + int32_t max_length = aligned_frame_length - 1; auto *data_header = (aeron_data_header_t *)m_ptr; - data_header->frame_header.frame_length = (int32_t)aligned_frame_length; + data_header->frame_header.frame_length = aligned_frame_length; data_header->frame_header.type = AERON_HDR_TYPE_DATA; - EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, max_length, &m_padding), 0u); - EXPECT_EQ(m_padding, 0u); + EXPECT_EQ(aeron_term_scanner_scan_for_availability(m_ptr, CAPACITY, max_length, &m_padding), -aligned_frame_length); + EXPECT_EQ(m_padding, 0); } TEST_F(TermScannerTest, shouldScanTwoMessagesThatFitInSingleMtu) { int32_t frame_length = AERON_DATA_HEADER_LENGTH + 100; - auto aligned_frame_length = (size_t)AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT); + int32_t aligned_frame_length = AERON_ALIGN(frame_length, AERON_LOGBUFFER_FRAME_ALIGNMENT); auto *data_header = (aeron_data_header_t *)m_ptr; - data_header->frame_header.frame_length = (int32_t)aligned_frame_length; + data_header->frame_header.frame_length = aligned_frame_length; data_header->frame_header.type = AERON_HDR_TYPE_DATA; data_header = (aeron_data_header_t *)(m_ptr + aligned_frame_length); - data_header->frame_header.frame_length = (int32_t)aligned_frame_length; + data_header->frame_header.frame_length = aligned_frame_length; data_header->frame_header.type = AERON_HDR_TYPE_DATA; EXPECT_EQ(aeron_term_scanner_scan_for_availability( m_ptr, CAPACITY, MTU_LENGTH, &m_padding), 2 * aligned_frame_length); - EXPECT_EQ(m_padding, 0u); + EXPECT_EQ(m_padding, 0); } TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtMtuBoundary) @@ -108,8 +108,8 @@ TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtMtuBoundary) data_header->frame_header.type = AERON_HDR_TYPE_DATA; EXPECT_EQ(aeron_term_scanner_scan_for_availability( - m_ptr, CAPACITY, MTU_LENGTH, &m_padding), (size_t)(frame_one_length + frame_two_length)); - EXPECT_EQ(m_padding, 0u); + m_ptr, CAPACITY, MTU_LENGTH, &m_padding), frame_one_length + frame_two_length); + EXPECT_EQ(m_padding, 0); } TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtSecondThatSpansMtu) @@ -126,8 +126,8 @@ TEST_F(TermScannerTest, shouldScanTwoMessagesAndStopAtSecondThatSpansMtu) data_header->frame_header.type = AERON_HDR_TYPE_DATA; EXPECT_EQ(aeron_term_scanner_scan_for_availability( - m_ptr, CAPACITY, MTU_LENGTH, &m_padding), (size_t)frame_one_length); - EXPECT_EQ(m_padding, 0u); + m_ptr, CAPACITY, MTU_LENGTH, &m_padding), frame_one_length); + EXPECT_EQ(m_padding, 0); } TEST_F(TermScannerTest, shouldScanLastFrameInBuffer) @@ -141,8 +141,8 @@ TEST_F(TermScannerTest, shouldScanLastFrameInBuffer) data_header->frame_header.type = AERON_HDR_TYPE_DATA; EXPECT_EQ(aeron_term_scanner_scan_for_availability( - m_ptr, aligned_frame_length, MTU_LENGTH, &m_padding), (size_t)aligned_frame_length); - EXPECT_EQ(m_padding, 0u); + m_ptr, aligned_frame_length, MTU_LENGTH, &m_padding), aligned_frame_length); + EXPECT_EQ(m_padding, 0); } TEST_F(TermScannerTest, shouldScanLastMessageInBufferPlusPadding) @@ -163,15 +163,15 @@ TEST_F(TermScannerTest, shouldScanLastMessageInBufferPlusPadding) data_header->frame_header.type = AERON_HDR_TYPE_PAD; EXPECT_EQ(aeron_term_scanner_scan_for_availability( - m_ptr, CAPACITY - offset, MTU_LENGTH, &m_padding), (size_t)(aligned_frame_length + AERON_DATA_HEADER_LENGTH)); - EXPECT_EQ(m_padding, (size_t)(padding_frame_length - AERON_DATA_HEADER_LENGTH)); + m_ptr, CAPACITY - offset, MTU_LENGTH, &m_padding), aligned_frame_length + AERON_DATA_HEADER_LENGTH); + EXPECT_EQ(m_padding, padding_frame_length - AERON_DATA_HEADER_LENGTH); } TEST_F(TermScannerTest, shouldScanLastMessageInBufferMinusPaddingLimitedByMtu) { int32_t aligned_frame_length = AERON_ALIGN(AERON_DATA_HEADER_LENGTH, AERON_LOGBUFFER_FRAME_ALIGNMENT); - int32_t offset = CAPACITY - (AERON_ALIGN((AERON_DATA_HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT)); - size_t mtu = (size_t)aligned_frame_length + 8u; + int32_t offset = CAPACITY - AERON_ALIGN((AERON_DATA_HEADER_LENGTH * 3), AERON_LOGBUFFER_FRAME_ALIGNMENT); + int32_t mtu = aligned_frame_length + 8; m_ptr += offset; @@ -185,6 +185,6 @@ TEST_F(TermScannerTest, shouldScanLastMessageInBufferMinusPaddingLimitedByMtu) data_header->frame_header.type = AERON_HDR_TYPE_PAD; EXPECT_EQ(aeron_term_scanner_scan_for_availability( - m_ptr, CAPACITY - offset, mtu, &m_padding), (size_t)aligned_frame_length); - EXPECT_EQ(m_padding, 0u); + m_ptr, CAPACITY - offset, mtu, &m_padding), aligned_frame_length); + EXPECT_EQ(m_padding, 0); }