diff --git a/aeron-samples/src/main/c/response/response_client.c b/aeron-samples/src/main/c/response/response_client.c index ac0f5f2ab1..c82ef4475c 100644 --- a/aeron-samples/src/main/c/response/response_client.c +++ b/aeron-samples/src/main/c/response/response_client.c @@ -34,10 +34,30 @@ #include "util/aeron_parse_util.h" #include "util/aeron_strutil.h" #include "aeron_agent.h" +#include "aeron_alloc.h" #include "../samples_configuration.h" #include "../sample_util.h" +typedef struct response_client_stct response_client_t; + +int response_client_create( + response_client_t **response_clientp, + aeron_t *aeron, + aeron_fragment_handler_t delegate, + const char *request_channel, + int32_t request_stream_id, + const char *response_control_channel, + int32_t response_stream_id); + +void response_client_delete(response_client_t *response_client); + +int64_t response_client_offer(response_client_t *response_client, const uint8_t *buffer, size_t length); + +bool response_client_is_connected(response_client_t *response_client); + +int response_client_do_work(response_client_t *response_client); + const char usage_str[] = "[-h][-v][-c request-uri][-d response-uri][-l linger][-m messages][-p prefix][-r response-stream-id][-s request-stream-id]\n" " -h help\n" @@ -49,7 +69,7 @@ const char usage_str[] = " -p prefix aeron.dir location specified as prefix\n" " -r response-stream-id response stream-id to use\n" " -s request-stream-id request stream-id to use\n" - ; +; volatile bool running = true; @@ -100,21 +120,15 @@ int main(int argc, char **argv) const char *aeron_dir = NULL; aeron_t *aeron = NULL; - aeron_async_add_subscription_t *async_add_sub = NULL; - aeron_subscription_t *subscription = NULL; - aeron_fragment_assembler_t *fragment_assembler = NULL; const char *response_control_channel = DEFAULT_RESPONSE_CONTROL_CHANNEL; int32_t response_stream_id = DEFAULT_RESPONSE_STREAM_ID; - int64_t subscriber_registration_id; - - aeron_async_add_publication_t *async_add_pub = NULL; - aeron_publication_t *publication = NULL; const char *request_channel = DEFAULT_REQUEST_CHANNEL; int32_t request_stream_id = DEFAULT_REQUEST_STREAM_ID; - uint64_t linger_ns = DEFAULT_LINGER_TIMEOUT_MS * UINT64_C(1000) * UINT64_C(1000); uint64_t messages = DEFAULT_NUMBER_OF_MESSAGES; + response_client_t *response_client = NULL; + while ((opt = getopt(argc, argv, "hvc:d:l:m:p:r:s:")) != -1) { switch (opt) @@ -191,7 +205,6 @@ int main(int argc, char **argv) signal(SIGINT, sigint_handler); - if (aeron_context_init(&context) < 0) { fprintf(stderr, "aeron_context_init: %s\n", aeron_errmsg()); @@ -219,86 +232,19 @@ int main(int argc, char **argv) goto cleanup; } + if (response_client_create( + &response_client, + aeron, + poll_handler, + request_channel, + request_stream_id, + response_control_channel, + response_stream_id) < 0) { - char _response_channel_buf[AERON_MAX_PATH] = { 0 }; - - SNPRINTF(_response_channel_buf, sizeof(_response_channel_buf) - 1, "%s|control-mode=response", response_control_channel); - - printf("Subscribing to response channel %s on Stream ID %" PRId32 "\n", _response_channel_buf, response_stream_id); - - if (aeron_async_add_subscription( - &async_add_sub, - aeron, - _response_channel_buf, - response_stream_id, - print_available_image, - NULL, - print_unavailable_image, - NULL) < 0) - { - fprintf(stderr, "aeron_async_add_subscription: %s\n", aeron_errmsg()); - goto cleanup; - } - } - - while (NULL == subscription) - { - if (aeron_async_add_subscription_poll(&subscription, async_add_sub) < 0) - { - fprintf(stderr, "aeron_async_add_subscription_poll: %s\n", aeron_errmsg()); - goto cleanup; - } - - sched_yield(); - } - - printf("Subscription channel status %" PRIu64 "\n", aeron_subscription_channel_status(subscription)); - - if (aeron_fragment_assembler_create(&fragment_assembler, poll_handler, subscription) < 0) - { - fprintf(stderr, "aeron_fragment_assembler_create: %s\n", aeron_errmsg()); + fprintf(stderr, "response_client_create: %s\n", aeron_errmsg()); goto cleanup; } - { - aeron_subscription_constants_t constants; - - if (aeron_subscription_constants(subscription, &constants) < 0) - { - fprintf(stderr, "aeron_subscription_constants: %s\n", aeron_errmsg()); - goto cleanup; - } - - subscriber_registration_id = constants.registration_id; - } - - { - char _channel_buf[AERON_MAX_PATH] = { 0 }; - - SNPRINTF(_channel_buf, sizeof(_channel_buf) - 1, "%s|response-correlation-id=%" PRIi64, request_channel, subscriber_registration_id); - - printf("Publishing to channel %s on Stream ID %" PRId32 "\n", _channel_buf, request_stream_id); - - if (aeron_async_add_publication(&async_add_pub, aeron, _channel_buf, request_stream_id) < 0) - { - fprintf(stderr, "aeron_async_add_publication: %s\n", aeron_errmsg()); - goto cleanup; - } - } - - while (NULL == publication) - { - if (aeron_async_add_publication_poll(&publication, async_add_pub) < 0) - { - fprintf(stderr, "aeron_async_add_publication_poll: %s\n", aeron_errmsg()); - goto cleanup; - } - - sched_yield(); - } - - printf("Publication channel status %" PRIu64 "\n", aeron_publication_channel_status(publication)); - for (size_t i = 0; i < messages && is_running(); i++) { message_len = SNPRINTF(small_message, sizeof(small_message) - 1, "Hello World! %" PRIu64, (uint64_t)i); @@ -306,7 +252,7 @@ int main(int argc, char **argv) printf("offering %" PRIu64 "/%" PRIu64 " - ", (uint64_t)i, (uint64_t)messages); fflush(stdout); - int64_t result = aeron_publication_offer(publication, (const uint8_t *)message, message_len, NULL, NULL); + int64_t result = response_client_offer(response_client, (const uint8_t *)message, message_len); if (result > 0) { @@ -333,15 +279,14 @@ int main(int argc, char **argv) printf("Offer failed due to unknown reason %" PRId64 "\n", result); } - if (!aeron_publication_is_connected(publication)) + if (!response_client_is_connected(response_client)) { printf("No active subscribers detected\n"); } aeron_nano_sleep(1000ul * 1000ul * 1000ul); - int fragments_read = aeron_subscription_poll( - subscription, aeron_fragment_assembler_handler, fragment_assembler, DEFAULT_FRAGMENT_COUNT_LIMIT); + int fragments_read = response_client_do_work(response_client); if (fragments_read < 0) { @@ -359,13 +304,148 @@ int main(int argc, char **argv) } cleanup: - aeron_subscription_close(subscription, NULL, NULL); - aeron_publication_close(publication, NULL, NULL); + response_client_delete(response_client); aeron_close(aeron); aeron_context_close(context); - aeron_fragment_assembler_delete(fragment_assembler); return status; } extern bool is_running(void); + +struct response_client_stct +{ + aeron_t *aeron; + aeron_subscription_t *subscription; + aeron_fragment_assembler_t *fragment_assembler; + aeron_publication_t *publication; +}; + +int response_client_create( + response_client_t **response_clientp, + aeron_t *aeron, + aeron_fragment_handler_t delegate, + const char *request_channel, + int32_t request_stream_id, + const char *response_control_channel, + int32_t response_stream_id) +{ + response_client_t *response_client; + aeron_async_add_subscription_t *async_add_sub; + aeron_async_add_publication_t *async_add_pub; + char _response_channel_buf[AERON_MAX_PATH] = { 0 }; + char _channel_buf[AERON_MAX_PATH] = { 0 }; + int64_t subscriber_registration_id; + aeron_subscription_constants_t constants; + + aeron_alloc((void **)&response_client, sizeof(response_client_t)); + + SNPRINTF(_response_channel_buf, sizeof(_response_channel_buf) - 1, "%s|control-mode=response", response_control_channel); + + printf("Subscribing to response channel %s on Stream ID %" PRId32 "\n", _response_channel_buf, response_stream_id); + + if (aeron_async_add_subscription( + &async_add_sub, + aeron, + _response_channel_buf, + response_stream_id, + print_available_image, + NULL, + print_unavailable_image, + NULL) < 0) + { + fprintf(stderr, "aeron_async_add_subscription: %s\n", aeron_errmsg()); + goto cleanup; + } + + response_client->subscription = NULL; + while (NULL == response_client->subscription) + { + if (aeron_async_add_subscription_poll(&response_client->subscription, async_add_sub) < 0) + { + fprintf(stderr, "aeron_async_add_subscription_poll: %s\n", aeron_errmsg()); + goto cleanup; + } + + sched_yield(); + } + + printf("Subscription channel status %" PRIu64 "\n", aeron_subscription_channel_status(response_client->subscription)); + + if (aeron_fragment_assembler_create(&response_client->fragment_assembler, delegate, response_client->subscription) < 0) + { + fprintf(stderr, "aeron_fragment_assembler_create: %s\n", aeron_errmsg()); + goto cleanup; + } + + if (aeron_subscription_constants(response_client->subscription, &constants) < 0) + { + fprintf(stderr, "aeron_subscription_constants: %s\n", aeron_errmsg()); + goto cleanup; + } + + subscriber_registration_id = constants.registration_id; + + SNPRINTF(_channel_buf, sizeof(_channel_buf) - 1, "%s|response-correlation-id=%" PRIi64, request_channel, subscriber_registration_id); + + printf("Publishing to channel %s on Stream ID %" PRId32 "\n", _channel_buf, request_stream_id); + + if (aeron_async_add_publication(&async_add_pub, aeron, _channel_buf, request_stream_id) < 0) + { + fprintf(stderr, "aeron_async_add_publication: %s\n", aeron_errmsg()); + goto cleanup; + } + + while (NULL == response_client->publication) + { + if (aeron_async_add_publication_poll(&response_client->publication, async_add_pub) < 0) + { + fprintf(stderr, "aeron_async_add_publication_poll: %s\n", aeron_errmsg()); + goto cleanup; + } + + sched_yield(); + } + + printf("Publication channel status %" PRIu64 "\n", aeron_publication_channel_status(response_client->publication)); + + *response_clientp = response_client; + + return 0; + +cleanup: + response_client_delete(response_client); + + return -1; +} + +void response_client_delete(response_client_t *response_client) +{ + if (NULL != response_client) + { + aeron_subscription_close(response_client->subscription, NULL, NULL); + aeron_publication_close(response_client->publication, NULL, NULL); + aeron_fragment_assembler_delete(response_client->fragment_assembler); + + aeron_free(response_client); + } +} + +int64_t response_client_offer(response_client_t *response_client, const uint8_t *buffer, size_t length) +{ + return aeron_publication_offer(response_client->publication, buffer, length, NULL, NULL); +} + +bool response_client_is_connected(response_client_t *response_client) +{ + return aeron_publication_is_connected(response_client->publication); +} + +int response_client_do_work(response_client_t *response_client) +{ + return aeron_subscription_poll( + response_client->subscription, + aeron_fragment_assembler_handler, + response_client->fragment_assembler, + DEFAULT_FRAGMENT_COUNT_LIMIT); +} diff --git a/aeron-samples/src/main/c/response/response_server.c b/aeron-samples/src/main/c/response/response_server.c index b0935a57a3..988043a42b 100644 --- a/aeron-samples/src/main/c/response/response_server.c +++ b/aeron-samples/src/main/c/response/response_server.c @@ -39,15 +39,26 @@ #include "../samples_configuration.h" #include "../sample_util.h" -typedef struct response_channel_info_t_stct -{ - aeron_image_t *image; - aeron_subscription_t *subscription; - aeron_async_add_publication_t *async_add_pub; - aeron_publication_t *publication; - aeron_fragment_assembler_t *fragment_assembler; -} -response_channel_info_t; +typedef struct response_channel_info_stct response_channel_info_t; + +typedef struct response_server_stct response_server_t; + +int response_server_create( + response_server_t **response_serverp, + aeron_t *aeron, + aeron_fragment_handler_t delegate, + const char *request_channel, + int32_t request_stream_id, + const char *response_control_channel, + int32_t response_stream_id); + +void response_server_delete(response_server_t *response_server); + +int32_t response_server_subscription_constants(response_channel_info_t *response_channel_info, aeron_subscription_constants_t *subscription_constants); + +int64_t response_server_publication_offer(response_channel_info_t *response_channel_info, const uint8_t *buffer, size_t length); + +int response_server_do_work(response_server_t *response_server); const char usage_str[] = "[-h][-v][-c request-uri][-d response-uri][-p prefix][-r response-stream-id][-s request-stream-id]\n" @@ -62,13 +73,6 @@ const char usage_str[] = volatile bool running = true; -aeron_t *aeron = NULL; -const char *response_control_channel = DEFAULT_RESPONSE_CONTROL_CHANNEL; -int32_t response_stream_id = DEFAULT_RESPONSE_STREAM_ID; - -aeron_int64_to_ptr_hash_map_t response_channel_info_map; -aeron_mutex_t info_lock; - void sigint_handler(int signal) { AERON_PUT_ORDERED(running, false); @@ -83,11 +87,11 @@ inline bool is_running(void) void poll_handler(void *clientd, const uint8_t *buffer, size_t length, aeron_header_t *header) { - response_channel_info_t *response_channel_info = (response_channel_info_t *)clientd; + response_channel_info_t *response_channel_info = clientd; aeron_subscription_constants_t subscription_constants; aeron_header_values_t header_values; - if (aeron_subscription_constants(response_channel_info->subscription, &subscription_constants) < 0) + if (response_server_subscription_constants(response_channel_info, &subscription_constants) < 0) { fprintf(stderr, "could not get subscription constants: %s\n", aeron_errmsg()); return; @@ -103,154 +107,36 @@ void poll_handler(void *clientd, const uint8_t *buffer, size_t length, aeron_hea (int)length, buffer); - if (NULL != response_channel_info->publication) - { - char message[256] = { 0 }; - int message_len; - - message_len = SNPRINTF(message, sizeof(message) - 1, "responding to message: %.*s", (int)length, buffer); - - int64_t result = aeron_publication_offer(response_channel_info->publication, (const uint8_t *)message, message_len, NULL, NULL); - - if (result > 0) - { - printf("response sent!\n"); - } - else if (AERON_PUBLICATION_BACK_PRESSURED == result) - { - printf("Offer failed due to back pressure\n"); - } - else if (AERON_PUBLICATION_NOT_CONNECTED == result) - { - printf("Offer failed because publisher is not connected to a subscriber\n"); - } - else if (AERON_PUBLICATION_ADMIN_ACTION == result) - { - printf("Offer failed because of an administration action in the system\n"); - } - else if (AERON_PUBLICATION_CLOSED == result) - { - printf("Offer failed because publication is closed\n"); - } - else - { - printf("Offer failed due to unknown reason %" PRId64 "\n", result); - } - } -} + char message[256] = { 0 }; + int message_len; -void handle_available_image(void *clientd, aeron_subscription_t *subscription, aeron_image_t *image) -{ - response_channel_info_t *response_channel_info = NULL; - aeron_image_constants_t constants; + message_len = SNPRINTF(message, sizeof(message) - 1, "responding to message: %.*s", (int)length, buffer); - print_available_image(NULL, subscription, image); + int64_t result = response_server_publication_offer(response_channel_info, (const uint8_t *)message, message_len); - if (aeron_image_constants(image, &constants) < 0) + if (result > 0) { - fprintf(stderr, "aeron_image_constants: %s\n", aeron_errmsg()); - return; + printf("response sent!\n"); } - - aeron_alloc((void **)&response_channel_info, sizeof(response_channel_info_t)); - - response_channel_info->image = image; - response_channel_info->subscription = subscription; - + else if (AERON_PUBLICATION_BACK_PRESSURED == result) { - char _channel_buf[AERON_MAX_PATH] = { 0 }; - - SNPRINTF(_channel_buf, sizeof(_channel_buf) - 1, "%s|control-mode=response|response-correlation-id=%" PRIi64, response_control_channel, constants.correlation_id); - - printf("Responding on channel %s on Stream ID %" PRId32 "\n", _channel_buf, response_stream_id); - - if (aeron_async_add_publication(&response_channel_info->async_add_pub, aeron, _channel_buf, response_stream_id) < 0) - { - fprintf(stderr, "aeron_async_add_publication: %s\n", aeron_errmsg()); - return; - } + printf("Offer failed due to back pressure\n"); } - - response_channel_info->publication = NULL; - - if (aeron_fragment_assembler_create(&response_channel_info->fragment_assembler, poll_handler, response_channel_info) < 0) + else if (AERON_PUBLICATION_NOT_CONNECTED == result) { - fprintf(stderr, "aeron_fragment_assembler_create: %s\n", aeron_errmsg()); - return; + printf("Offer failed because publisher is not connected to a subscriber\n"); } - - aeron_mutex_lock(&info_lock); - if (aeron_int64_to_ptr_hash_map_put(&response_channel_info_map, constants.correlation_id, response_channel_info) < 0) + else if (AERON_PUBLICATION_ADMIN_ACTION == result) { - fprintf(stderr, "aeron_int64_to_ptr_hash_map_put: %s\n", aeron_errmsg()); + printf("Offer failed because of an administration action in the system\n"); } - aeron_mutex_unlock(&info_lock); -} - -void handle_unavailable_image(void *clientd, aeron_subscription_t *subscription, aeron_image_t *image) -{ - response_channel_info_t *response_channel_info = NULL; - aeron_image_constants_t constants; - - print_unavailable_image(NULL, subscription, image); - - if (aeron_image_constants(image, &constants) < 0) + else if (AERON_PUBLICATION_CLOSED == result) { - fprintf(stderr, "aeron_image_constants: %s\n", aeron_errmsg()); - return; - } - - aeron_mutex_lock(&info_lock); - response_channel_info = aeron_int64_to_ptr_hash_map_remove(&response_channel_info_map, constants.correlation_id); - aeron_mutex_unlock(&info_lock); - - if (NULL != response_channel_info) - { - aeron_publication_close(response_channel_info->publication, NULL, NULL); - aeron_fragment_assembler_delete(response_channel_info->fragment_assembler); - aeron_free(response_channel_info); - } -} - -void process_response_channel_info(void *clientd, int64_t key, void *value) -{ - response_channel_info_t *response_channel_info = (response_channel_info_t *)value; - - if (NULL != response_channel_info->async_add_pub) - { - int rc; - - rc = aeron_async_add_publication_poll(&response_channel_info->publication, response_channel_info->async_add_pub); - - if (rc == 0) - { - return; // still waiting - } - - if (rc < 0) - { - fprintf(stderr, "aeron_async_add_publication_poll: %s\n", aeron_errmsg()); - } - - // if we're here, _poll returned either 1 or -1. Either way, we're done with the async_add_pub - response_channel_info->async_add_pub = NULL; - } - - int fragments_read = aeron_image_poll( - response_channel_info->image, - aeron_fragment_assembler_handler, - response_channel_info->fragment_assembler, - DEFAULT_FRAGMENT_COUNT_LIMIT); - - if (fragments_read < 0) - { - fprintf(stderr, "aeron_image_poll: %s\n", aeron_errmsg()); + printf("Offer failed because publication is closed\n"); } else { - int *total_fragments_read = (int *)clientd; - - *total_fragments_read += fragments_read; + printf("Offer failed due to unknown reason %" PRId64 "\n", result); } } @@ -260,14 +146,16 @@ int main(int argc, char **argv) aeron_context_t *context = NULL; const char *aeron_dir = NULL; + aeron_t *aeron = NULL; - aeron_async_add_subscription_t *async = NULL; - aeron_subscription_t *subscription = NULL; const char *request_channel = DEFAULT_REQUEST_CHANNEL; int32_t request_stream_id = DEFAULT_REQUEST_STREAM_ID; - + const char *response_control_channel = DEFAULT_RESPONSE_CONTROL_CHANNEL; + int32_t response_stream_id = DEFAULT_RESPONSE_STREAM_ID; const uint64_t idle_duration_ns = UINT64_C(1000) * UINT64_C(1000); /* 1ms */ + response_server_t *response_server = NULL; + while ((opt = getopt(argc, argv, "hvc:d:p:r:s:")) != -1) { switch (opt) @@ -324,14 +212,6 @@ int main(int argc, char **argv) signal(SIGINT, sigint_handler); - if (aeron_int64_to_ptr_hash_map_init(&response_channel_info_map, 8, AERON_MAP_DEFAULT_LOAD_FACTOR) < 0) - { - fprintf(stderr, "aeron_int64_to_ptr_hash_map_init: %s\n", aeron_errmsg()); - goto cleanup; - } - - aeron_mutex_init(&info_lock, NULL); - printf("Subscribing to channel %s on Stream ID %" PRId32 "\n", request_channel, request_stream_id); if (aeron_context_init(&context) < 0) @@ -361,23 +241,185 @@ int main(int argc, char **argv) goto cleanup; } + if (response_server_create( + &response_server, + aeron, + poll_handler, + request_channel, + request_stream_id, + response_control_channel, + response_stream_id) < 0) + { + fprintf(stderr, "response_server_create: %s\n", aeron_errmsg()); + goto cleanup; + } + + while (is_running()) + { + int total_fragments_read = response_server_do_work(response_server); + + aeron_idle_strategy_sleeping_idle((void *)&idle_duration_ns, total_fragments_read); + } + + printf("Shutting down...\n"); + status = EXIT_SUCCESS; + +cleanup: + response_server_delete(response_server); + aeron_close(aeron); + aeron_context_close(context); + + return status; +} + +extern bool is_running(void); + +struct response_server_stct +{ + aeron_t *aeron; + aeron_subscription_t *subscription; + aeron_fragment_handler_t delegate; + char response_control_channel[AERON_MAX_PATH]; + int32_t response_stream_id; + aeron_int64_to_ptr_hash_map_t response_channel_info_map; + aeron_mutex_t info_lock; +}; + +struct response_channel_info_stct +{ + aeron_image_t *image; + aeron_subscription_t *subscription; + aeron_async_add_publication_t *async_add_pub; + aeron_publication_t *publication; + aeron_fragment_assembler_t *fragment_assembler; +}; + +void response_server_handle_available_image(void *clientd, aeron_subscription_t *subscription, aeron_image_t *image) +{ + response_server_t *response_server = clientd; + response_channel_info_t *response_channel_info = NULL; + aeron_image_constants_t constants; + + print_available_image(NULL, subscription, image); + + if (aeron_image_constants(image, &constants) < 0) + { + fprintf(stderr, "aeron_image_constants: %s\n", aeron_errmsg()); + return; + } + + aeron_alloc((void **)&response_channel_info, sizeof(response_channel_info_t)); + + response_channel_info->image = image; + response_channel_info->subscription = subscription; + + { + char _channel_buf[AERON_MAX_PATH] = { 0 }; + + SNPRINTF( + _channel_buf, + sizeof(_channel_buf) - 1, + "%.*s|control-mode=response|response-correlation-id=%" PRIi64, + (int)strlen(response_server->response_control_channel), + response_server->response_control_channel, + constants.correlation_id); + + printf("Responding on channel %s on Stream ID %" PRId32 "\n", _channel_buf, response_server->response_stream_id); + + if (aeron_async_add_publication(&response_channel_info->async_add_pub, response_server->aeron, _channel_buf, response_server->response_stream_id) < 0) + { + fprintf(stderr, "aeron_async_add_publication: %s\n", aeron_errmsg()); + return; + } + } + + response_channel_info->publication = NULL; + + if (aeron_fragment_assembler_create(&response_channel_info->fragment_assembler, response_server->delegate, response_channel_info) < 0) + { + fprintf(stderr, "aeron_fragment_assembler_create: %s\n", aeron_errmsg()); + return; + } + + aeron_mutex_lock(&response_server->info_lock); + if (aeron_int64_to_ptr_hash_map_put(&response_server->response_channel_info_map, constants.correlation_id, response_channel_info) < 0) + { + fprintf(stderr, "aeron_int64_to_ptr_hash_map_put: %s\n", aeron_errmsg()); + } + aeron_mutex_unlock(&response_server->info_lock); +} + +void response_server_handle_unavailable_image(void *clientd, aeron_subscription_t *subscription, aeron_image_t *image) +{ + response_server_t *response_server = clientd; + response_channel_info_t *response_channel_info = NULL; + aeron_image_constants_t constants; + + print_unavailable_image(NULL, subscription, image); + + if (aeron_image_constants(image, &constants) < 0) + { + fprintf(stderr, "aeron_image_constants: %s\n", aeron_errmsg()); + return; + } + + aeron_mutex_lock(&response_server->info_lock); + response_channel_info = aeron_int64_to_ptr_hash_map_remove(&response_server->response_channel_info_map, constants.correlation_id); + aeron_mutex_unlock(&response_server->info_lock); + + if (NULL != response_channel_info) + { + aeron_publication_close(response_channel_info->publication, NULL, NULL); + aeron_fragment_assembler_delete(response_channel_info->fragment_assembler); + aeron_free(response_channel_info); + } +} + +int response_server_create( + response_server_t **response_serverp, + aeron_t *aeron, + aeron_fragment_handler_t delegate, + const char *request_channel, + int32_t request_stream_id, + const char *response_control_channel, + int32_t response_stream_id) +{ + response_server_t *response_server; + aeron_async_add_subscription_t *async = NULL; + + aeron_alloc((void **)&response_server, sizeof(response_server_t)); + + response_server->aeron = aeron, + strncpy(response_server->response_control_channel, response_control_channel, sizeof(response_server->response_control_channel) - 1); + response_server->response_stream_id = response_stream_id; + response_server->delegate = delegate; + + if (aeron_int64_to_ptr_hash_map_init(&response_server->response_channel_info_map, 8, AERON_MAP_DEFAULT_LOAD_FACTOR) < 0) + { + fprintf(stderr, "aeron_int64_to_ptr_hash_map_init: %s\n", aeron_errmsg()); + goto cleanup; + } + + aeron_mutex_init(&response_server->info_lock, NULL); + if (aeron_async_add_subscription( &async, aeron, request_channel, request_stream_id, - handle_available_image, - NULL, - handle_unavailable_image, - NULL) < 0) + response_server_handle_available_image, + response_server, + response_server_handle_unavailable_image, + response_server) < 0) { fprintf(stderr, "aeron_async_add_subscription: %s\n", aeron_errmsg()); goto cleanup; } - while (NULL == subscription) + response_server->subscription = NULL; + while (NULL == response_server->subscription) { - if (aeron_async_add_subscription_poll(&subscription, async) < 0) + if (aeron_async_add_subscription_poll(&response_server->subscription, async) < 0) { fprintf(stderr, "aeron_async_add_subscription_poll: %s\n", aeron_errmsg()); goto cleanup; @@ -386,30 +428,97 @@ int main(int argc, char **argv) sched_yield(); } - printf("Subscription channel status %" PRIu64 "\n", aeron_subscription_channel_status(subscription)); + printf("Subscription channel status %" PRIu64 "\n", aeron_subscription_channel_status(response_server->subscription)); - while (is_running()) + *response_serverp = response_server; + + return 0; + +cleanup: + response_server_delete(response_server); + + return -1; +} + +void response_server_delete(response_server_t *response_server) +{ + if (NULL != response_server) { - int total_fragments_read = 0; + aeron_subscription_close(response_server->subscription, NULL, NULL); + aeron_int64_to_ptr_hash_map_delete(&response_server->response_channel_info_map); + aeron_mutex_destroy(&response_server->info_lock); - aeron_mutex_lock(&info_lock); - aeron_int64_to_ptr_hash_map_for_each(&response_channel_info_map, process_response_channel_info, &total_fragments_read); - aeron_mutex_unlock(&info_lock); + aeron_free(response_server); + } +} - aeron_idle_strategy_sleeping_idle((void *)&idle_duration_ns, total_fragments_read); +void response_server_process_response_channel_info(void *clientd, int64_t key, void *value) +{ + response_channel_info_t *response_channel_info = value; + + if (NULL != response_channel_info->async_add_pub) + { + int rc; + + rc = aeron_async_add_publication_poll(&response_channel_info->publication, response_channel_info->async_add_pub); + + if (rc == 0) + { + return; // still waiting + } + + if (rc < 0) + { + fprintf(stderr, "aeron_async_add_publication_poll: %s\n", aeron_errmsg()); + } + + // if we're here, _poll returned either 1 or -1. Either way, we're done with the async_add_pub + response_channel_info->async_add_pub = NULL; } - printf("Shutting down...\n"); - status = EXIT_SUCCESS; + int fragments_read = aeron_image_poll( + response_channel_info->image, + aeron_fragment_assembler_handler, + response_channel_info->fragment_assembler, + DEFAULT_FRAGMENT_COUNT_LIMIT); -cleanup: - aeron_subscription_close(subscription, NULL, NULL); - aeron_close(aeron); - aeron_context_close(context); - aeron_int64_to_ptr_hash_map_delete(&response_channel_info_map); - aeron_mutex_destroy(&info_lock); + if (fragments_read < 0) + { + fprintf(stderr, "aeron_image_poll: %s\n", aeron_errmsg()); + } + else + { + int *total_fragments_read = (int *)clientd; - return status; + *total_fragments_read += fragments_read; + } } -extern bool is_running(void); +int32_t response_server_subscription_constants(response_channel_info_t *response_channel_info, aeron_subscription_constants_t *subscription_constants) +{ + return aeron_subscription_constants(response_channel_info->subscription, subscription_constants); +} + +int64_t response_server_publication_offer(response_channel_info_t *response_channel_info, const uint8_t *buffer, size_t length) +{ + if (NULL == response_channel_info->publication) + { + return AERON_PUBLICATION_NOT_CONNECTED; + } + + return aeron_publication_offer(response_channel_info->publication, buffer, length, NULL, NULL); +} + +int response_server_do_work(response_server_t *response_server) +{ + int total_fragments_read = 0; + + aeron_mutex_lock(&response_server->info_lock); + aeron_int64_to_ptr_hash_map_for_each( + &response_server->response_channel_info_map, + response_server_process_response_channel_info, + &total_fragments_read); + aeron_mutex_unlock(&response_server->info_lock); + + return total_fragments_read; +}