Skip to content

Commit

Permalink
[C] refactor response client/server code to separate out CLI code (#1576
Browse files Browse the repository at this point in the history
)

* [C] refactor response client/server code to separate out CLI code

* [C] fix null client issue, address (hopefully) snprintf issue

* [C] fix strncpy size
  • Loading branch information
nbradac authored Apr 5, 2024
1 parent 51e0009 commit df5ed87
Show file tree
Hide file tree
Showing 2 changed files with 467 additions and 278 deletions.
266 changes: 173 additions & 93 deletions aeron-samples/src/main/c/response/response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,30 @@
#include "util/aeron_parse_util.h"
#include "util/aeron_strutil.h"
#include "aeron_agent.h"
#include "aeron_alloc.h"

#include "../samples_configuration.h"
#include "../sample_util.h"

typedef struct response_client_stct response_client_t;

int response_client_create(
response_client_t **response_clientp,
aeron_t *aeron,
aeron_fragment_handler_t delegate,
const char *request_channel,
int32_t request_stream_id,
const char *response_control_channel,
int32_t response_stream_id);

void response_client_delete(response_client_t *response_client);

int64_t response_client_offer(response_client_t *response_client, const uint8_t *buffer, size_t length);

bool response_client_is_connected(response_client_t *response_client);

int response_client_do_work(response_client_t *response_client);

const char usage_str[] =
"[-h][-v][-c request-uri][-d response-uri][-l linger][-m messages][-p prefix][-r response-stream-id][-s request-stream-id]\n"
" -h help\n"
Expand All @@ -49,7 +69,7 @@ const char usage_str[] =
" -p prefix aeron.dir location specified as prefix\n"
" -r response-stream-id response stream-id to use\n"
" -s request-stream-id request stream-id to use\n"
;
;

volatile bool running = true;

Expand Down Expand Up @@ -100,21 +120,15 @@ int main(int argc, char **argv)
const char *aeron_dir = NULL;
aeron_t *aeron = NULL;

aeron_async_add_subscription_t *async_add_sub = NULL;
aeron_subscription_t *subscription = NULL;
aeron_fragment_assembler_t *fragment_assembler = NULL;
const char *response_control_channel = DEFAULT_RESPONSE_CONTROL_CHANNEL;
int32_t response_stream_id = DEFAULT_RESPONSE_STREAM_ID;
int64_t subscriber_registration_id;

aeron_async_add_publication_t *async_add_pub = NULL;
aeron_publication_t *publication = NULL;
const char *request_channel = DEFAULT_REQUEST_CHANNEL;
int32_t request_stream_id = DEFAULT_REQUEST_STREAM_ID;

uint64_t linger_ns = DEFAULT_LINGER_TIMEOUT_MS * UINT64_C(1000) * UINT64_C(1000);
uint64_t messages = DEFAULT_NUMBER_OF_MESSAGES;

response_client_t *response_client = NULL;

while ((opt = getopt(argc, argv, "hvc:d:l:m:p:r:s:")) != -1)
{
switch (opt)
Expand Down Expand Up @@ -191,7 +205,6 @@ int main(int argc, char **argv)

signal(SIGINT, sigint_handler);


if (aeron_context_init(&context) < 0)
{
fprintf(stderr, "aeron_context_init: %s\n", aeron_errmsg());
Expand Down Expand Up @@ -219,94 +232,27 @@ int main(int argc, char **argv)
goto cleanup;
}

if (response_client_create(
&response_client,
aeron,
poll_handler,
request_channel,
request_stream_id,
response_control_channel,
response_stream_id) < 0)
{
char _response_channel_buf[AERON_MAX_PATH] = { 0 };

SNPRINTF(_response_channel_buf, sizeof(_response_channel_buf) - 1, "%s|control-mode=response", response_control_channel);

printf("Subscribing to response channel %s on Stream ID %" PRId32 "\n", _response_channel_buf, response_stream_id);

if (aeron_async_add_subscription(
&async_add_sub,
aeron,
_response_channel_buf,
response_stream_id,
print_available_image,
NULL,
print_unavailable_image,
NULL) < 0)
{
fprintf(stderr, "aeron_async_add_subscription: %s\n", aeron_errmsg());
goto cleanup;
}
}

while (NULL == subscription)
{
if (aeron_async_add_subscription_poll(&subscription, async_add_sub) < 0)
{
fprintf(stderr, "aeron_async_add_subscription_poll: %s\n", aeron_errmsg());
goto cleanup;
}

sched_yield();
}

printf("Subscription channel status %" PRIu64 "\n", aeron_subscription_channel_status(subscription));

if (aeron_fragment_assembler_create(&fragment_assembler, poll_handler, subscription) < 0)
{
fprintf(stderr, "aeron_fragment_assembler_create: %s\n", aeron_errmsg());
fprintf(stderr, "response_client_create: %s\n", aeron_errmsg());
goto cleanup;
}

{
aeron_subscription_constants_t constants;

if (aeron_subscription_constants(subscription, &constants) < 0)
{
fprintf(stderr, "aeron_subscription_constants: %s\n", aeron_errmsg());
goto cleanup;
}

subscriber_registration_id = constants.registration_id;
}

{
char _channel_buf[AERON_MAX_PATH] = { 0 };

SNPRINTF(_channel_buf, sizeof(_channel_buf) - 1, "%s|response-correlation-id=%" PRIi64, request_channel, subscriber_registration_id);

printf("Publishing to channel %s on Stream ID %" PRId32 "\n", _channel_buf, request_stream_id);

if (aeron_async_add_publication(&async_add_pub, aeron, _channel_buf, request_stream_id) < 0)
{
fprintf(stderr, "aeron_async_add_publication: %s\n", aeron_errmsg());
goto cleanup;
}
}

while (NULL == publication)
{
if (aeron_async_add_publication_poll(&publication, async_add_pub) < 0)
{
fprintf(stderr, "aeron_async_add_publication_poll: %s\n", aeron_errmsg());
goto cleanup;
}

sched_yield();
}

printf("Publication channel status %" PRIu64 "\n", aeron_publication_channel_status(publication));

for (size_t i = 0; i < messages && is_running(); i++)
{
message_len = SNPRINTF(small_message, sizeof(small_message) - 1, "Hello World! %" PRIu64, (uint64_t)i);
message = small_message;
printf("offering %" PRIu64 "/%" PRIu64 " - ", (uint64_t)i, (uint64_t)messages);
fflush(stdout);

int64_t result = aeron_publication_offer(publication, (const uint8_t *)message, message_len, NULL, NULL);
int64_t result = response_client_offer(response_client, (const uint8_t *)message, message_len);

if (result > 0)
{
Expand All @@ -333,15 +279,14 @@ int main(int argc, char **argv)
printf("Offer failed due to unknown reason %" PRId64 "\n", result);
}

if (!aeron_publication_is_connected(publication))
if (!response_client_is_connected(response_client))
{
printf("No active subscribers detected\n");
}

aeron_nano_sleep(1000ul * 1000ul * 1000ul);

int fragments_read = aeron_subscription_poll(
subscription, aeron_fragment_assembler_handler, fragment_assembler, DEFAULT_FRAGMENT_COUNT_LIMIT);
int fragments_read = response_client_do_work(response_client);

if (fragments_read < 0)
{
Expand All @@ -359,13 +304,148 @@ int main(int argc, char **argv)
}

cleanup:
aeron_subscription_close(subscription, NULL, NULL);
aeron_publication_close(publication, NULL, NULL);
response_client_delete(response_client);
aeron_close(aeron);
aeron_context_close(context);
aeron_fragment_assembler_delete(fragment_assembler);

return status;
}

extern bool is_running(void);

struct response_client_stct
{
aeron_t *aeron;
aeron_subscription_t *subscription;
aeron_fragment_assembler_t *fragment_assembler;
aeron_publication_t *publication;
};

int response_client_create(
response_client_t **response_clientp,
aeron_t *aeron,
aeron_fragment_handler_t delegate,
const char *request_channel,
int32_t request_stream_id,
const char *response_control_channel,
int32_t response_stream_id)
{
response_client_t *response_client;
aeron_async_add_subscription_t *async_add_sub;
aeron_async_add_publication_t *async_add_pub;
char _response_channel_buf[AERON_MAX_PATH] = { 0 };
char _channel_buf[AERON_MAX_PATH] = { 0 };
int64_t subscriber_registration_id;
aeron_subscription_constants_t constants;

aeron_alloc((void **)&response_client, sizeof(response_client_t));

SNPRINTF(_response_channel_buf, sizeof(_response_channel_buf) - 1, "%s|control-mode=response", response_control_channel);

printf("Subscribing to response channel %s on Stream ID %" PRId32 "\n", _response_channel_buf, response_stream_id);

if (aeron_async_add_subscription(
&async_add_sub,
aeron,
_response_channel_buf,
response_stream_id,
print_available_image,
NULL,
print_unavailable_image,
NULL) < 0)
{
fprintf(stderr, "aeron_async_add_subscription: %s\n", aeron_errmsg());
goto cleanup;
}

response_client->subscription = NULL;
while (NULL == response_client->subscription)
{
if (aeron_async_add_subscription_poll(&response_client->subscription, async_add_sub) < 0)
{
fprintf(stderr, "aeron_async_add_subscription_poll: %s\n", aeron_errmsg());
goto cleanup;
}

sched_yield();
}

printf("Subscription channel status %" PRIu64 "\n", aeron_subscription_channel_status(response_client->subscription));

if (aeron_fragment_assembler_create(&response_client->fragment_assembler, delegate, response_client->subscription) < 0)
{
fprintf(stderr, "aeron_fragment_assembler_create: %s\n", aeron_errmsg());
goto cleanup;
}

if (aeron_subscription_constants(response_client->subscription, &constants) < 0)
{
fprintf(stderr, "aeron_subscription_constants: %s\n", aeron_errmsg());
goto cleanup;
}

subscriber_registration_id = constants.registration_id;

SNPRINTF(_channel_buf, sizeof(_channel_buf) - 1, "%s|response-correlation-id=%" PRIi64, request_channel, subscriber_registration_id);

printf("Publishing to channel %s on Stream ID %" PRId32 "\n", _channel_buf, request_stream_id);

if (aeron_async_add_publication(&async_add_pub, aeron, _channel_buf, request_stream_id) < 0)
{
fprintf(stderr, "aeron_async_add_publication: %s\n", aeron_errmsg());
goto cleanup;
}

while (NULL == response_client->publication)
{
if (aeron_async_add_publication_poll(&response_client->publication, async_add_pub) < 0)
{
fprintf(stderr, "aeron_async_add_publication_poll: %s\n", aeron_errmsg());
goto cleanup;
}

sched_yield();
}

printf("Publication channel status %" PRIu64 "\n", aeron_publication_channel_status(response_client->publication));

*response_clientp = response_client;

return 0;

cleanup:
response_client_delete(response_client);

return -1;
}

void response_client_delete(response_client_t *response_client)
{
if (NULL != response_client)
{
aeron_subscription_close(response_client->subscription, NULL, NULL);
aeron_publication_close(response_client->publication, NULL, NULL);
aeron_fragment_assembler_delete(response_client->fragment_assembler);

aeron_free(response_client);
}
}

int64_t response_client_offer(response_client_t *response_client, const uint8_t *buffer, size_t length)
{
return aeron_publication_offer(response_client->publication, buffer, length, NULL, NULL);
}

bool response_client_is_connected(response_client_t *response_client)
{
return aeron_publication_is_connected(response_client->publication);
}

int response_client_do_work(response_client_t *response_client)
{
return aeron_subscription_poll(
response_client->subscription,
aeron_fragment_assembler_handler,
response_client->fragment_assembler,
DEFAULT_FRAGMENT_COUNT_LIMIT);
}
Loading

0 comments on commit df5ed87

Please sign in to comment.