Skip to content

Commit

Permalink
Showing 7 changed files with 95 additions and 102 deletions.
42 changes: 21 additions & 21 deletions aeron-client/src/main/c/aeron_fragment_assembler.c
Original file line number Diff line number Diff line change
@@ -146,31 +146,31 @@ int aeron_image_fragment_assembler_delete(aeron_image_fragment_assembler_t *asse
}

void aeron_image_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
aeron_image_fragment_assembler_t *assembler = (aeron_image_fragment_assembler_t *)clientd;
aeron_buffer_builder_t *buffer_builder = assembler->buffer_builder;
uint8_t flags = header->frame->frame_header.flags;

if ((flags & AERON_DATA_HEADER_UNFRAGMENTED) == AERON_DATA_HEADER_UNFRAGMENTED)
{
assembler->delegate(assembler->delegate_clientd, buffer, offset, length, header);
assembler->delegate(assembler->delegate_clientd, buffer, length, header);
}
else
{
if (flags & AERON_DATA_HEADER_BEGIN_FLAG)
{
aeron_buffer_builder_reset(buffer_builder);
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);
}
else if (buffer_builder->limit > 0)
{
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);

if (flags & AERON_DATA_HEADER_END_FLAG)
{
assembler->delegate(
assembler->delegate_clientd, buffer_builder->buffer, 0, buffer_builder->limit, header);
assembler->delegate_clientd, buffer_builder->buffer, buffer_builder->limit, header);
aeron_buffer_builder_reset(buffer_builder);
}
}
@@ -214,7 +214,7 @@ int aeron_image_controlled_fragment_assembler_delete(aeron_image_controlled_frag
}

aeron_controlled_fragment_handler_action_t aeron_controlled_image_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
aeron_image_controlled_fragment_assembler_t *assembler = (aeron_image_controlled_fragment_assembler_t *)clientd;
aeron_buffer_builder_t *buffer_builder = assembler->buffer_builder;
@@ -223,26 +223,26 @@ aeron_controlled_fragment_handler_action_t aeron_controlled_image_fragment_assem

if ((flags & AERON_DATA_HEADER_UNFRAGMENTED) == AERON_DATA_HEADER_UNFRAGMENTED)
{
action = assembler->delegate(assembler->delegate_clientd, buffer, offset, length, header);
action = assembler->delegate(assembler->delegate_clientd, buffer, length, header);
}
else
{
if (flags & AERON_DATA_HEADER_BEGIN_FLAG)
{
aeron_buffer_builder_reset(buffer_builder);
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);
}
else
{
size_t limit = buffer_builder->limit;
if (limit > 0)
{
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);

if (flags & AERON_DATA_HEADER_END_FLAG)
{
action = assembler->delegate(
assembler->delegate_clientd, buffer_builder->buffer, 0, buffer_builder->limit, header);
assembler->delegate_clientd, buffer_builder->buffer, buffer_builder->limit, header);

if (AERON_ACTION_ABORT == action)
{
@@ -302,14 +302,14 @@ int aeron_fragment_assembler_delete(aeron_fragment_assembler_t *assembler)
}

void aeron_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
aeron_fragment_assembler_t *assembler = (aeron_fragment_assembler_t *)clientd;
uint8_t flags = header->frame->frame_header.flags;

if ((flags & AERON_DATA_HEADER_UNFRAGMENTED) == AERON_DATA_HEADER_UNFRAGMENTED)
{
assembler->delegate(assembler->delegate_clientd, buffer, offset, length, header);
assembler->delegate(assembler->delegate_clientd, buffer, length, header);
}
else
{
@@ -329,16 +329,16 @@ void aeron_fragment_assembler_handler(
}

aeron_buffer_builder_reset(buffer_builder);
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);
}
else if (buffer_builder && buffer_builder->limit > 0)
{
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);

if (flags & AERON_DATA_HEADER_END_FLAG)
{
assembler->delegate(
assembler->delegate_clientd, buffer_builder->buffer, 0, buffer_builder->limit, header);
assembler->delegate_clientd, buffer_builder->buffer, buffer_builder->limit, header);
aeron_buffer_builder_reset(buffer_builder);
}
}
@@ -387,15 +387,15 @@ int aeron_controlled_fragment_assembler_delete(aeron_controlled_fragment_assembl
}

aeron_controlled_fragment_handler_action_t aeron_controlled_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
aeron_controlled_fragment_assembler_t *assembler = (aeron_controlled_fragment_assembler_t *)clientd;
uint8_t flags = header->frame->frame_header.flags;
aeron_controlled_fragment_handler_action_t action = AERON_ACTION_CONTINUE;

if ((flags & AERON_DATA_HEADER_UNFRAGMENTED) == AERON_DATA_HEADER_UNFRAGMENTED)
{
action = assembler->delegate(assembler->delegate_clientd, buffer, offset, length, header);
action = assembler->delegate(assembler->delegate_clientd, buffer, length, header);
}
else
{
@@ -415,19 +415,19 @@ aeron_controlled_fragment_handler_action_t aeron_controlled_fragment_assembler_h
}

aeron_buffer_builder_reset(buffer_builder);
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);
}
else if (NULL != buffer_builder)
{
size_t limit = buffer_builder->limit;
if (limit > 0)
{
aeron_buffer_builder_append(buffer_builder, buffer, offset, length);
aeron_buffer_builder_append(buffer_builder, buffer, length);

if (flags & AERON_DATA_HEADER_END_FLAG)
{
action = assembler->delegate(
assembler->delegate_clientd, buffer_builder->buffer, 0, buffer_builder->limit, header);
assembler->delegate_clientd, buffer_builder->buffer, buffer_builder->limit, header);

if (AERON_ACTION_ABORT == action)
{
@@ -447,4 +447,4 @@ aeron_controlled_fragment_handler_action_t aeron_controlled_fragment_assembler_h

extern void aeron_buffer_builder_reset(aeron_buffer_builder_t *buffer_builder);
extern int aeron_buffer_builder_append(
aeron_buffer_builder_t *buffer_builder, const uint8_t *buffer, size_t offset, size_t length);
aeron_buffer_builder_t *buffer_builder, const uint8_t *buffer, size_t length);
4 changes: 2 additions & 2 deletions aeron-client/src/main/c/aeron_fragment_assembler.h
Original file line number Diff line number Diff line change
@@ -73,14 +73,14 @@ inline void aeron_buffer_builder_reset(aeron_buffer_builder_t *buffer_builder)
}

inline int aeron_buffer_builder_append(
aeron_buffer_builder_t *buffer_builder, const uint8_t *buffer, size_t offset, size_t length)
aeron_buffer_builder_t *buffer_builder, const uint8_t *buffer, size_t length)
{
if (aeron_buffer_builder_ensure_capacity(buffer_builder, length) < 0)
{
return -1;
}

memcpy(buffer_builder->buffer + buffer_builder->limit, buffer + offset, length);
memcpy(buffer_builder->buffer + buffer_builder->limit, buffer, length);
buffer_builder->limit += length;
return 0;
}
18 changes: 6 additions & 12 deletions aeron-client/src/main/c/aeron_image.c
Original file line number Diff line number Diff line change
@@ -206,8 +206,7 @@ int aeron_image_poll(aeron_image_t *image, aeron_fragment_handler_t handler, voi

handler(
clientd,
term_buffer,
frame_offset + AERON_DATA_HEADER_LENGTH,
term_buffer + frame_offset + AERON_DATA_HEADER_LENGTH,
frame_length - AERON_DATA_HEADER_LENGTH,
&header);
++fragments_read;
@@ -274,8 +273,7 @@ int aeron_image_controlled_poll(
aeron_controlled_fragment_handler_action_t action =
handler(
clientd,
term_buffer,
frame_offset + AERON_DATA_HEADER_LENGTH,
term_buffer + frame_offset + AERON_DATA_HEADER_LENGTH,
frame_length - AERON_DATA_HEADER_LENGTH,
&header);

@@ -357,8 +355,7 @@ int aeron_image_bounded_poll(

handler(
clientd,
term_buffer,
frame_offset + AERON_DATA_HEADER_LENGTH,
term_buffer + frame_offset + AERON_DATA_HEADER_LENGTH,
frame_length - AERON_DATA_HEADER_LENGTH,
&header);
++fragments_read;
@@ -428,8 +425,7 @@ int aeron_image_bounded_controlled_poll(
aeron_controlled_fragment_handler_action_t action =
handler(
clientd,
term_buffer,
frame_offset + AERON_DATA_HEADER_LENGTH,
term_buffer + frame_offset + AERON_DATA_HEADER_LENGTH,
frame_length - AERON_DATA_HEADER_LENGTH,
&header);

@@ -531,8 +527,7 @@ int64_t aeron_image_controlled_peek(
aeron_controlled_fragment_handler_action_t action =
handler(
clientd,
term_buffer,
frame_offset + AERON_DATA_HEADER_LENGTH,
term_buffer + frame_offset + AERON_DATA_HEADER_LENGTH,
frame_length - AERON_DATA_HEADER_LENGTH,
&header);

@@ -627,8 +622,7 @@ int aeron_image_block_poll(

handler(
clientd,
term_buffer,
offset,
term_buffer + offset,
length,
image->session_id,
term_id);
20 changes: 7 additions & 13 deletions aeron-client/src/main/c/aeronc.h
Original file line number Diff line number Diff line change
@@ -783,12 +783,11 @@ int aeron_exclusive_publication_close(aeron_exclusive_publication_t *publication
*
* @param clientd passed to the poll function.
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
*/
typedef void (*aeron_fragment_handler_t)(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header);
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header);

typedef enum aeron_controlled_fragment_handler_action_en
{
@@ -825,13 +824,12 @@ aeron_controlled_fragment_handler_action_t;
*
* @param clientd passed to the controlled poll function.
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
* @return The action to be taken with regard to the stream position after the callback.
*/
typedef aeron_controlled_fragment_handler_action_t (*aeron_controlled_fragment_handler_t)(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header);
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header);

/**
* Callback for handling a block of messages being read from a log.
@@ -844,7 +842,7 @@ typedef aeron_controlled_fragment_handler_action_t (*aeron_controlled_fragment_h
* @param term_id of the stream containing this block of message fragments.
*/
typedef void (*aeron_block_handler_t)(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, int32_t session_id, int32_t term_id);
void *clientd, const uint8_t *buffer, size_t length, int32_t session_id, int32_t term_id);

/**
* Poll the images under the subscription for available message fragments.
@@ -1177,12 +1175,11 @@ int aeron_image_fragment_assembler_delete(aeron_image_fragment_assembler_t *asse
*
* @param clientd passed in the poll call (must be a aeron_image_fragment_assembler_t)
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
*/
void aeron_image_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header);
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header);

/**
* Create an image controlled fragment assembler for use with a single image.
@@ -1210,13 +1207,12 @@ int aeron_image_controlled_fragment_assembler_delete(aeron_image_controlled_frag
*
* @param clientd passed in the poll call (must be a aeron_image_controlled_fragment_assembler_t)
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
* @return The action to be taken with regard to the stream position after the callback.
*/
aeron_controlled_fragment_handler_action_t aeron_controlled_image_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header);
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header);

/**
* Create a fragment assembler for use with a subscription.
@@ -1244,12 +1240,11 @@ int aeron_fragment_assembler_delete(aeron_fragment_assembler_t *assembler);
*
* @param clientd passed in the poll call (must be a aeron_fragment_assembler_t)
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
*/
void aeron_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header);
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header);

/**
* Create a controlled fragment assembler for use with a subscription.
@@ -1277,13 +1272,12 @@ int aeron_controlled_fragment_assembler_delete(aeron_controlled_fragment_assembl
*
* @param clientd passed in the poll call (must be a aeron_controlled_fragment_assembler_t)
* @param buffer containing the data.
* @param offset at which the data begins.
* @param length of the data in bytes.
* @param header representing the meta data for the data.
* @return The action to be taken with regard to the stream position after the callback.
*/
aeron_controlled_fragment_handler_action_t aeron_controlled_fragment_assembler_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header);
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header);

/*
* Counter functions
95 changes: 50 additions & 45 deletions aeron-client/src/test/c/aeron_image_test.cpp
Original file line number Diff line number Diff line change
@@ -115,24 +115,24 @@ class ImageTest : public testing::Test
}

static void fragment_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
auto image = reinterpret_cast<ImageTest *>(clientd);

if (image->m_handler)
{
image->m_handler(buffer, offset, length, header);
image->m_handler(buffer, length, header);
}
}

static aeron_controlled_fragment_handler_action_t controlled_fragment_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
auto image = reinterpret_cast<ImageTest *>(clientd);

if (image->m_controlled_handler)
{
return image->m_controlled_handler(buffer, offset, length, header);
return image->m_controlled_handler(buffer, length, header);
}

return AERON_ACTION_CONTINUE;
@@ -167,6 +167,11 @@ class ImageTest : public testing::Test
m_image, controlled_fragment_handler, this, max_position, fragment_limit);
}

const uint8_t *termBuffer(size_t index)
{
return m_image->log_buffer->mapped_raw_log.term_buffers[index].addr;
}

protected:
int64_t m_correlationId = 0;
int64_t m_sub_pos = 0;
@@ -176,8 +181,8 @@ class ImageTest : public testing::Test

size_t m_position_bits_to_shift;

std::function<void(const uint8_t *, size_t, size_t, aeron_header_t *)> m_handler = nullptr;
std::function<aeron_controlled_fragment_handler_action_t(const uint8_t *, size_t, size_t, aeron_header_t *)>
std::function<void(const uint8_t *, size_t, aeron_header_t *)> m_handler = nullptr;
std::function<aeron_controlled_fragment_handler_action_t(const uint8_t *, size_t, aeron_header_t *)>
m_controlled_handler = nullptr;

aeron_image_t *m_image;
@@ -194,9 +199,9 @@ TEST_F(ImageTest, shouldReadFirstMessage)

appendMessage(m_sub_pos, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(offset, m_sub_pos + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + m_sub_pos + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
EXPECT_EQ(header->frame->frame_header.type, AERON_HDR_TYPE_DATA);
};
@@ -209,7 +214,7 @@ TEST_F(ImageTest, shouldNotReadPastTail)
{
createImage();

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
{
FAIL() << "should not be called";
};
@@ -229,7 +234,7 @@ TEST_F(ImageTest, shouldReadOneLimitedMessage)
appendMessage(m_sub_pos, messageLength);
appendMessage(m_sub_pos + alignedMessageLength, messageLength);

auto null_handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto null_handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
{
};

@@ -248,7 +253,7 @@ TEST_F(ImageTest, shouldReadMultipleMessages)
appendMessage(m_sub_pos + alignedMessageLength, messageLength);
size_t handlerCallCount = 0;

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
{
handlerCallCount++;
};
@@ -270,9 +275,9 @@ TEST_F(ImageTest, shouldReadLastMessage)

appendMessage(m_sub_pos, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(offset, m_sub_pos + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + m_sub_pos + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
};

@@ -293,7 +298,7 @@ TEST_F(ImageTest, shouldNotReadLastMessageWhenPadding)
// this will append padding instead of the message as it will trip over the end.
appendMessage(m_sub_pos, messageLength + AERON_DATA_HEADER_LENGTH);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
{
FAIL() << "should not be called";
};
@@ -338,7 +343,7 @@ TEST_F(ImageTest, shouldReportCorrectPositionOnReception)

appendMessage(m_sub_pos, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
{
};

@@ -361,9 +366,9 @@ TEST_F(ImageTest, shouldReportCorrectPositionOnReceptionWithNonZeroPositionInIni

appendMessage(m_sub_pos, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(offset, initialTermOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + initialTermOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
};

@@ -387,9 +392,9 @@ TEST_F(ImageTest, shouldReportCorrectPositionOnReceptionWithNonZeroPositionInNon

appendMessage(m_sub_pos, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(offset, initialTermOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(1) + initialTermOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
};

@@ -402,7 +407,7 @@ TEST_F(ImageTest, shouldPollNoFragmentsToControlledFragmentHandler)
createImage();

bool called = false;
auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
called = true;
@@ -424,10 +429,10 @@ TEST_F(ImageTest, shouldPollOneFragmentToControlledFragmentHandlerOnContinue)

appendMessage(m_sub_pos, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
EXPECT_EQ(offset, m_sub_pos + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + m_sub_pos + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
EXPECT_EQ(header->frame->frame_header.type, AERON_HDR_TYPE_DATA);
return AERON_ACTION_CONTINUE;
@@ -449,7 +454,7 @@ TEST_F(ImageTest, shouldNotPollOneFragmentToControlledFragmentHandlerOnAbort)

appendMessage(m_sub_pos, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
return AERON_ACTION_ABORT;
@@ -470,10 +475,10 @@ TEST_F(ImageTest, shouldPollOneFragmentToControlledFragmentHandlerOnBreak)
appendMessage(m_sub_pos, messageLength);
appendMessage(m_sub_pos + alignedMessageLength, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
EXPECT_EQ(offset, AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
EXPECT_EQ(header->frame->frame_header.type, AERON_HDR_TYPE_DATA);
return AERON_ACTION_BREAK;
@@ -495,7 +500,7 @@ TEST_F(ImageTest, shouldPollFragmentsToControlledFragmentHandlerOnCommit)
appendMessage(m_sub_pos, messageLength);
appendMessage(m_sub_pos + alignedMessageLength, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
fragmentCount++;
@@ -504,14 +509,14 @@ TEST_F(ImageTest, shouldPollFragmentsToControlledFragmentHandlerOnCommit)
{
EXPECT_EQ(m_sub_pos, 0);

EXPECT_EQ(offset, AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + AERON_DATA_HEADER_LENGTH);
}
else if (2 == fragmentCount)
{
// testing current position here after first message commit
EXPECT_EQ(m_sub_pos, alignedMessageLength);

EXPECT_EQ(offset, alignedMessageLength + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + alignedMessageLength + AERON_DATA_HEADER_LENGTH);
}

EXPECT_EQ(length, messageLength);
@@ -539,7 +544,7 @@ TEST_F(ImageTest, shouldUpdatePositionToEndOfCommittedFragmentOnCommit)
appendMessage(initialPosition + alignedMessageLength, messageLength);
appendMessage(initialPosition + (2 * alignedMessageLength), messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
fragmentCount++;
@@ -550,23 +555,23 @@ TEST_F(ImageTest, shouldUpdatePositionToEndOfCommittedFragmentOnCommit)
{
EXPECT_EQ(m_sub_pos, initialPosition);

EXPECT_EQ(offset, AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + AERON_DATA_HEADER_LENGTH);
return AERON_ACTION_CONTINUE;
}
else if (2 == fragmentCount)
{
// testing current position here after first message continue
EXPECT_EQ(m_sub_pos, initialPosition);

EXPECT_EQ(offset, alignedMessageLength + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + alignedMessageLength + AERON_DATA_HEADER_LENGTH);
return AERON_ACTION_COMMIT;
}
else if (3 == fragmentCount)
{
// testing current position here after second message commit
EXPECT_EQ(m_sub_pos, initialPosition + (2 * alignedMessageLength));

EXPECT_EQ(offset, (2 * alignedMessageLength) + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + (2 * alignedMessageLength) + AERON_DATA_HEADER_LENGTH);
return AERON_ACTION_CONTINUE;
}

@@ -593,7 +598,7 @@ TEST_F(ImageTest, shouldPollFragmentsToControlledFragmentHandlerOnContinue)
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
fragmentCount++;
@@ -602,14 +607,14 @@ TEST_F(ImageTest, shouldPollFragmentsToControlledFragmentHandlerOnContinue)
{
EXPECT_EQ(m_sub_pos, initialPosition);

EXPECT_EQ(offset, AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + AERON_DATA_HEADER_LENGTH);
}
else if (2 == fragmentCount)
{
// testing current position here after first message continue
EXPECT_EQ(m_sub_pos, initialPosition);

EXPECT_EQ(offset, alignedMessageLength + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + alignedMessageLength + AERON_DATA_HEADER_LENGTH);
}

EXPECT_EQ(length, messageLength);
@@ -637,7 +642,7 @@ TEST_F(ImageTest, shouldPollNoFragmentsToBoundedControlledFragmentHandlerWithMax
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
called = true;
@@ -665,7 +670,7 @@ TEST_F(ImageTest, shouldPollFragmentsToBoundedControlledFragmentHandlerWithIniti
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
return AERON_ACTION_CONTINUE;
@@ -691,7 +696,7 @@ TEST_F(ImageTest, shouldPollFragmentsToBoundedControlledFragmentHandlerWithMaxPo
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength);

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
return AERON_ACTION_CONTINUE;
@@ -717,7 +722,7 @@ TEST_F(ImageTest, shouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionBefo
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength);

auto null_handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto null_handler = [&](const uint8_t *, size_t length, aeron_header_t *header)
{
};

@@ -742,10 +747,10 @@ TEST_F(ImageTest, shouldPollFragmentsToBoundedControlledFragmentHandlerWithMaxPo
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength * 2); // will insert padding

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
EXPECT_EQ(offset, initialOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + initialOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
return AERON_ACTION_CONTINUE;
};
@@ -771,10 +776,10 @@ TEST_F(ImageTest, shouldPollFragmentsToBoundedControlledFragmentHandlerWithMaxPo
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength * 2); // will insert padding

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
-> aeron_controlled_fragment_handler_action_t
{
EXPECT_EQ(offset, initialOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + initialOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
return AERON_ACTION_CONTINUE;
};
@@ -800,9 +805,9 @@ TEST_F(ImageTest, shouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAbov
appendMessage(initialPosition, messageLength);
appendMessage(initialPosition + alignedMessageLength, messageLength * 2); // will insert padding

auto handler = [&](const uint8_t *, size_t offset, size_t length, aeron_header_t *header)
auto handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(offset, initialOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(buffer, termBuffer(0) + initialOffset + AERON_DATA_HEADER_LENGTH);
EXPECT_EQ(length, messageLength);
};

2 changes: 1 addition & 1 deletion aeron-client/src/test/c/aeron_subscription_test.cpp
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ class SubscriptionTest : public testing::Test
}

static void null_fragment_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
}

16 changes: 8 additions & 8 deletions aeron-driver/src/test/c/aeron_c_system_test.cpp
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ using namespace aeron;
class CSystemTest : public testing::Test
{
public:
using poll_handler_t = std::function<void(const uint8_t *, size_t, size_t, aeron_header_t *)>;
using poll_handler_t = std::function<void(const uint8_t *, size_t, aeron_header_t *)>;
using image_handler_t = std::function<void(aeron_image_t *)>;

CSystemTest()
@@ -151,11 +151,11 @@ class CSystemTest : public testing::Test
}

static void poll_handler(
void *clientd, const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header)
{
auto test = reinterpret_cast<CSystemTest *>(clientd);

test->m_poll_handler(buffer, offset, length, header);
test->m_poll_handler(buffer, length, header);
}

int poll(aeron_subscription_t *subscription, poll_handler_t &handler, int fragment_limit)
@@ -303,7 +303,7 @@ TEST_F(CSystemTest, shouldOfferAndPollOneMessage)

int poll_result;
bool called = false;
poll_handler_t handler = [&](const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
poll_handler_t handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(length, strlen(message));
called = true;
@@ -347,7 +347,7 @@ TEST_F(CSystemTest, shouldOfferAndPollThreeTermsOfMessages)

int poll_result;
bool called = false;
poll_handler_t handler = [&](const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
poll_handler_t handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(length, sizeof(message));
called = true;
@@ -398,7 +398,7 @@ TEST_F(CSystemTest, shouldAllowImageToGoUnavailableWithNoPollAfter)

int poll_result;
bool called = false;
poll_handler_t handler = [&](const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
poll_handler_t handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(length, sizeof(message));
called = true;
@@ -455,7 +455,7 @@ TEST_F(CSystemTest, shouldAllowImageToGoUnavailableWithPollAfter)

int poll_result;
bool called = false;
poll_handler_t handler = [&](const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
poll_handler_t handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
EXPECT_EQ(length, sizeof(message));
called = true;
@@ -476,7 +476,7 @@ TEST_F(CSystemTest, shouldAllowImageToGoUnavailableWithPollAfter)
std::this_thread::yield();
}

poll_handler_t handler = [&](const uint8_t *buffer, size_t offset, size_t length, aeron_header_t *header)
poll_handler_t handler = [&](const uint8_t *buffer, size_t length, aeron_header_t *header)
{
};

0 comments on commit 2d55823

Please sign in to comment.