Skip to content

Commit

Permalink
[C] Add support for setting CPU affinity for the async executor threa…
Browse files Browse the repository at this point in the history
…d (`aeron.driver.async.executor.cpu.affinity`).
  • Loading branch information
vyazelenko committed Jan 30, 2025
1 parent b4aa3b5 commit ae665d9
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 2 deletions.
9 changes: 8 additions & 1 deletion aeron-client/src/main/c/concurrent/aeron_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ static void *aeron_executor_dispatch(void *arg)
{
aeron_executor_t *executor = (aeron_executor_t *)arg;

aeron_thread_set_name("aeron_executor");
const char *role_name = "aeron_executor";
aeron_thread_set_name(role_name);
if (executor->dispatch_thread_cpu_affinity >= 0)
{
aeron_thread_set_affinity(role_name, executor->dispatch_thread_cpu_affinity);
}

aeron_executor_task_t *task;
aeron_linked_queue_node_t *node;
Expand Down Expand Up @@ -121,12 +126,14 @@ static void *aeron_executor_dispatch(void *arg)
int aeron_executor_init(
aeron_executor_t *executor,
bool async,
int32_t cpu_affinity,
aeron_executor_on_execution_complete_func_t on_execution_complete,
void *clientd)
{
executor->async = async,
executor->on_execution_complete = on_execution_complete;
executor->clientd = clientd;
executor->dispatch_thread_cpu_affinity = cpu_affinity;

if (async)
{
Expand Down
2 changes: 2 additions & 0 deletions aeron-client/src/main/c/concurrent/aeron_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ typedef struct aeron_executor_stct
aeron_blocking_linked_queue_t queue;
aeron_blocking_linked_queue_t return_queue;
aeron_thread_t dispatch_thread;
int32_t dispatch_thread_cpu_affinity;
}
aeron_executor_t;

int aeron_executor_init(
aeron_executor_t *executor,
bool async,
int32_t cpu_affinity,
aeron_executor_on_execution_complete_func_t on_execution_complete,
void *clientd);

Expand Down
1 change: 1 addition & 0 deletions aeron-client/src/test/c/concurrent/aeron_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ExecutorTest : public testing::Test
if (aeron_executor_init(
&m_executor,
be_async(),
-1,
on_execution_complete_cb(),
this) < 0)
{
Expand Down
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/aeron_driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ void aeron_driver_context_print_configuration(aeron_driver_context_t *context)
(uint64_t)context->network_publication_max_messages_per_send);
fprintf(fpout, "\n resource_free_limit=%" PRIu32, context->resource_free_limit);
fprintf(fpout, "\n async_executor_threads=%" PRIu32, context->async_executor_threads);
fprintf(fpout, "\n async_executor_cpu_affinity_no=%" PRId32, context->async_executor_cpu_affinity_no);
fprintf(fpout, "\n conductor_cpu_affinity_no=%" PRId32, context->conductor_cpu_affinity_no);
fprintf(fpout, "\n receiver_cpu_affinity_no=%" PRId32, context->receiver_cpu_affinity_no);
fprintf(fpout, "\n sender_cpu_affinity_no=%" PRId32, context->sender_cpu_affinity_no);
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ int aeron_driver_conductor_init(aeron_driver_conductor_t *conductor, aeron_drive
conductor->conductor_proxy.threading_mode = context->threading_mode;
conductor->conductor_proxy.conductor = conductor;

if (aeron_executor_init(&conductor->executor, context->async_executor_threads >= 1, NULL, conductor) < 0)
if (aeron_executor_init(&conductor->executor, context->async_executor_threads >= 1, context->async_executor_cpu_affinity_no, NULL, conductor) < 0)
{
return -1;
}
Expand Down
8 changes: 8 additions & 0 deletions aeron-driver/src/main/c/aeron_driver_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ static void aeron_driver_untethered_subscription_state_change_null(
#define AERON_SENDER_MAX_MESSAGES_PER_SEND_DEFAULT UINT32_C(2)
#define AERON_DRIVER_RESOURCE_FREE_LIMIT_DEFAULT UINT32_C(10)
#define AERON_DRIVER_ASYNC_EXECUTOR_THREADS_DEFAULT UINT32_C(1)
#define AERON_DRIVER_ASYNC_EXECUTOR_CPU_AFFINITY_DEFAULT (-1)
#define AERON_CPU_AFFINITY_DEFAULT (-1)
#define AERON_DRIVER_CONNECT_DEFAULT true
#define AERON_ENABLE_EXPERIMENTAL_FEATURES_DEFAULT false
Expand Down Expand Up @@ -438,6 +439,7 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
_context->network_publication_max_messages_per_send = AERON_SENDER_MAX_MESSAGES_PER_SEND_DEFAULT;
_context->resource_free_limit = AERON_DRIVER_RESOURCE_FREE_LIMIT_DEFAULT;
_context->async_executor_threads = AERON_DRIVER_ASYNC_EXECUTOR_THREADS_DEFAULT;
_context->async_executor_cpu_affinity_no = AERON_DRIVER_ASYNC_EXECUTOR_CPU_AFFINITY_DEFAULT;
_context->connect_enabled = AERON_DRIVER_CONNECT_DEFAULT;
_context->conductor_cpu_affinity_no = AERON_CPU_AFFINITY_DEFAULT;
_context->sender_cpu_affinity_no = AERON_CPU_AFFINITY_DEFAULT;
Expand Down Expand Up @@ -673,6 +675,12 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
_context->sender_cpu_affinity_no,
-1,
255);
_context->async_executor_cpu_affinity_no = aeron_config_parse_int32(
AERON_DRIVER_ASYNC_EXECUTOR_CPU_AFFINITY_ENV_VAR,
getenv(AERON_DRIVER_ASYNC_EXECUTOR_CPU_AFFINITY_ENV_VAR),
_context->async_executor_cpu_affinity_no,
-1,
255);

_context->send_to_sm_poll_ratio = (uint8_t)aeron_config_parse_uint64(
AERON_SEND_TO_STATUS_POLL_RATIO_ENV_VAR,
Expand Down
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/aeron_driver_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ typedef struct aeron_driver_context_stct
int32_t conductor_cpu_affinity_no; /* aeron.conductor.cpu.affinity = -1 */
int32_t receiver_cpu_affinity_no; /* aeron.receiver.cpu.affinity = -1 */
int32_t sender_cpu_affinity_no; /* aeron.sender.cpu.affinity = -1 */
int32_t async_executor_cpu_affinity_no; /* aeron.driver.async.executor.cpu.affinity = -1 */
int32_t stream_session_limit; /* aeron.driver.stream.session.limit = INT32_MAX */
bool enable_experimental_features; /* aeron.enable.experimental.features = false */

Expand Down
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/aeronmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ int aeron_driver_context_set_resource_free_limit(aeron_driver_context_t *context
uint32_t aeron_driver_context_get_resource_free_limit(aeron_driver_context_t *context);

#define AERON_DRIVER_ASYNC_EXECUTOR_THREADS_ENV_VAR "AERON_DRIVER_ASYNC_EXECUTOR_THREADS"
#define AERON_DRIVER_ASYNC_EXECUTOR_CPU_AFFINITY_ENV_VAR "AERON_DRIVER_ASYNC_EXECUTOR_CPU_AFFINITY"
int aeron_driver_context_set_async_executor_threads(aeron_driver_context_t *context, uint32_t value);
uint32_t aeron_driver_context_get_async_executor_threads(aeron_driver_context_t *context);

Expand Down

0 comments on commit ae665d9

Please sign in to comment.