From aed832e3b043535a6ece1572360a853828fa9c87 Mon Sep 17 00:00:00 2001 From: Eric Mikida Date: Tue, 23 Nov 2021 15:54:28 -0500 Subject: [PATCH] sched: Add a registration-based scheduler to Argobots This scheduler allows users to associate simple policies with each pool that dictate how the scheduler selects the pool to pop from at each iteration of the scheduler loop. --- examples/scheduling/Makefile.am | 4 +- examples/scheduling/sched_reg.c | 146 ++++++++++++++++++++++++++++++++ src/include/abt.h.in | 84 ++++++++++++++++++ src/sched/Makefile.mk | 1 + src/sched/reg.c | 136 +++++++++++++++++++++++++++++ 5 files changed, 370 insertions(+), 1 deletion(-) create mode 100644 examples/scheduling/sched_reg.c create mode 100644 src/sched/reg.c diff --git a/examples/scheduling/Makefile.am b/examples/scheduling/Makefile.am index 4cc282f6d..694da021a 100644 --- a/examples/scheduling/Makefile.am +++ b/examples/scheduling/Makefile.am @@ -8,7 +8,8 @@ TESTS = \ sched_predef \ sched_shared_pool \ sched_stack \ - sched_user + sched_user \ + sched_reg check_PROGRAMS = $(TESTS) noinst_PROGRAMS = $(TESTS) @@ -20,3 +21,4 @@ sched_predef_SOURCES = sched_predef.c sched_shared_pool_SOURCES = sched_shared_pool.c sched_stack_SOURCES = sched_stack.c sched_user_SOURCES = sched_user.c +sched_reg_SOURCES = sched_reg.c diff --git a/examples/scheduling/sched_reg.c b/examples/scheduling/sched_reg.c new file mode 100644 index 000000000..ca7ed6114 --- /dev/null +++ b/examples/scheduling/sched_reg.c @@ -0,0 +1,146 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include +#include +#include "abt.h" + +#define NUM_XSTREAMS 4 +#define NUM_THREADS 4 + + +static void create_threads(void *arg); +static void thread_hello(void *arg); + +int main(int argc, char *argv[]) +{ + ABT_xstream xstreams[NUM_XSTREAMS]; + ABT_sched scheds[NUM_XSTREAMS]; + ABT_pool pools[NUM_XSTREAMS]; + ABT_thread threads[NUM_XSTREAMS]; + int i; + + ABT_init(argc, argv); + + /* Create pools */ + for (i = 0; i < NUM_XSTREAMS; i++) { + ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_TRUE, + &pools[i]); + } + + /* Create schedulers */ + //create_scheds(NUM_XSTREAMS, pools, scheds); + ABT_sched_policy policies[NUM_XSTREAMS]; + for (int i = 0; i < NUM_XSTREAMS; i++) { + policies[i].ready_at = 0; + policies[i].ready_at_wt = ABT_get_wtime(); + policies[i].priority = i; + policies[i].max_successes = 10; + policies[i].min_successes = 2 * i + 2; + policies[i].success_timeout = 5; + policies[i].fail_timeout = 100; + policies[i].success_timeout_wt = 0.0; + policies[i].fail_timeout_wt = 5.0; + } + + policies[0].max_successes = 100; + policies[0].fail_timeout_wt=10.0; + for (int i = 0; i < NUM_XSTREAMS; i++) { + for (int j = 0; j < NUM_XSTREAMS;j++) { + policies[(i+j) % NUM_XSTREAMS].priority = j; + } + ABT_sched_create_reg(NUM_XSTREAMS, pools, NUM_XSTREAMS, policies, &scheds[i]); + } + + /* Create ESs */ + ABT_xstream_self(&xstreams[0]); + ABT_xstream_set_main_sched(xstreams[0], scheds[0]); + for (i = 1; i < NUM_XSTREAMS; i++) { + ABT_xstream_create(scheds[i], &xstreams[i]); + } + + /* Create ULTs */ + for (i = 0; i < NUM_XSTREAMS; i++) { + size_t tid = (size_t)i; + ABT_thread_create(pools[i], create_threads, (void *)tid, + ABT_THREAD_ATTR_NULL, &threads[i]); + } + /* Join & Free */ + for (i = 0; i < NUM_XSTREAMS; i++) { + ABT_thread_join(threads[i]); + ABT_thread_free(&threads[i]); + } + for (i = 1; i < NUM_XSTREAMS; i++) { + ABT_xstream_join(xstreams[i]); + ABT_xstream_free(&xstreams[i]); + } + /* Free schedulers */ + /* Note that we do not need to free the scheduler for the primary ES, + * i.e., xstreams[0], because its scheduler will be automatically freed in + * ABT_finalize(). */ + for (i = 1; i < NUM_XSTREAMS; i++) { + ABT_sched_free(&scheds[i]); + } + + /* Finalize */ + ABT_finalize(); + + return 0; +} + +static void create_threads(void *arg) +{ + int i, rank, tid = (int)(size_t)arg; + ABT_xstream xstream; + ABT_pool pools[NUM_XSTREAMS]; + ABT_thread *threads; + + ABT_xstream_self(&xstream); + ABT_xstream_get_main_pools(xstream, NUM_XSTREAMS, pools); + + ABT_xstream_get_rank(xstream, &rank); + printf("[U%d:E%d] creating ULTs\n", tid, rank); + + threads = (ABT_thread *)malloc(sizeof(ABT_thread) * NUM_THREADS); + for (i = 0; i < NUM_THREADS; i++) { + size_t id = (rank + 1) * 10 + i; + ABT_thread_create(pools[rank], thread_hello, (void *)id, ABT_THREAD_ATTR_NULL, + &threads[i]); + } + + ABT_xstream_get_rank(xstream, &rank); + printf("[U%d:E%d] freeing ULTs\n", tid, rank); + for (i = 0; i < NUM_THREADS; i++) { + ABT_thread_free(&threads[i]); + } + free(threads); +} + +static void thread_hello(void *arg) +{ + int tid = (int)(size_t)arg; + int old_rank, cur_rank; + char *msg; + + ABT_xstream_self_rank(&cur_rank); + + printf(" [U%d:E%d] Hello, world!\n", tid, cur_rank); + + ABT_thread_yield(); + + old_rank = cur_rank; + ABT_xstream_self_rank(&cur_rank); + msg = (cur_rank == old_rank) ? "" : " (stolen)"; + printf(" [U%d:E%d] Hello again.%s\n", tid, cur_rank, msg); + + ABT_thread_yield(); + + old_rank = cur_rank; + ABT_xstream_self_rank(&cur_rank); + msg = (cur_rank == old_rank) ? "" : " (stolen)"; + printf(" [U%d:E%d] Goodbye, world!%s\n", tid, cur_rank, msg); +} diff --git a/src/include/abt.h.in b/src/include/abt.h.in index 0d9f295bb..cf4e77415 100644 --- a/src/include/abt.h.in +++ b/src/include/abt.h.in @@ -2309,6 +2309,87 @@ typedef void (*ABT_tool_thread_callback_fn)(ABT_thread, ABT_xstream, uint64_t ev typedef void (*ABT_tool_task_callback_fn)(ABT_task, ABT_xstream, uint64_t event, ABT_tool_context context, void *user_arg); +/** + * @ingroup SCHED + * @brief A struct that defines the scheduling policy for an associated pool. + */ +typedef struct { + /** + * @brief Pool priority. + * + * This value determines the relative priority of the associated pool when + * the scheduler is selecting between multiple ready pools. The scheduler + * will favor pools with lower priority. Value must be non-negative. + */ + int priority; + /** + * @brief Minimum number of successful attempts. + * + * This value determines how many work units will be pulled from this pool + * before returning to the scheduler to check for interrupts and/or higher + * priority pools that may have become ready. + */ + int min_successes; + /** + * @brief Maximum number of successful attempts. + * + * This value determines the maximum number of work units will be executed + * from this pool before the pool is timed-out to allow for the scheduler + * to check other pools. + */ + int max_successes; + /** + * @brief Scheduler event number at which this pool becomes ready. + * + * This value determines the point in time that this pool becomes ready + * after being timed-out. It is specified in terms of number of work units + * that the scheduler has executed. + */ + int ready_at; + /** + * @brief Timeout value when the scheduler fails to pop a work unit. + * + * This value determines how many work units the scheduler should execute + * from OTHER pools before this pool becomes ready again in the case where + * the scheduler fails to execute a work unit from this pool (because the + * pool has become empty). + */ + int fail_timeout; + /** + * @brief Timeout value after the maximum number of successful pops. + * + * This value determines how many work units the scheduler should execute + * from OTHER pools before this pool becomes ready again in the case where + * the scheduler has executed the maximum number of events from this pool, + * as specified by max_successes. + */ + int success_timeout; + /** + * @brief Walltime at which this pool becomes ready after a timeout. + * + * This value determines the walltime at which this pool becomes ready + * again after a timeout. It works in conjunction with ready_at. For a pool + * to be ready, it must have reached both the ready_at_wt walltime and the + * ready_at scheduler event number. + */ + double ready_at_wt; + /** + * @brief Timeout value (in seconds) when the scheduler fails to pop. + * + * This value determines how many seconds this pool will be timed out + * in the case where the scheduler fails to pop an work unit from this pool. + */ + double fail_timeout_wt; + /** + * @brief Timeout value (in seconds) after the maximum successful pops. + * + * This value determines how many seconds this pool will be timed out + * in the case where the scheduler fails pops the maximum number of work + * units from this pool as determined by max_successes. + */ + double success_timeout_wt; +} ABT_sched_policy; + /* Init & Finalize */ int ABT_init(int argc, char **argv) ABT_API_PUBLIC; int ABT_finalize(void) ABT_API_PUBLIC; @@ -2363,6 +2444,9 @@ int ABT_sched_create(ABT_sched_def *def, int num_pools, ABT_pool *pools, int ABT_sched_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_sched *newsched) ABT_API_PUBLIC; +void ABT_sched_create_reg(int num_pools, ABT_pool *pools, + int num_policies, ABT_sched_policy *policies, + ABT_sched *newsched) ABT_API_PUBLIC; int ABT_sched_free(ABT_sched *sched) ABT_API_PUBLIC; int ABT_sched_set_data(ABT_sched sched, void *data) ABT_API_PUBLIC; int ABT_sched_get_data(ABT_sched sched, void **data) ABT_API_PUBLIC; diff --git a/src/sched/Makefile.mk b/src/sched/Makefile.mk index f87ab1f2b..926fa822f 100644 --- a/src/sched/Makefile.mk +++ b/src/sched/Makefile.mk @@ -8,6 +8,7 @@ abt_sources += \ sched/basic_wait.c \ sched/prio.c \ sched/randws.c \ + sched/reg.c \ sched/sched.c \ sched/sched_config.c diff --git a/src/sched/reg.c b/src/sched/reg.c new file mode 100644 index 000000000..8942ff296 --- /dev/null +++ b/src/sched/reg.c @@ -0,0 +1,136 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include "abti.h" + +typedef struct { + uint64_t event_num; + uint32_t num_pools; + uint32_t prev_pool; + uint32_t successes; + ABT_sched_policy *policies; +} sched_data_t; + +static int sched_init(ABT_sched sched, ABT_sched_config config) { + ABT_sched_policy *policies; + + sched_data_t *p_data = (sched_data_t *)calloc(1, sizeof(sched_data_t)); + + p_data->event_num = 0; + p_data->prev_pool = -1; + p_data->successes = 0; + ABT_sched_get_num_pools(sched, &p_data->num_pools); + p_data->policies = (ABT_sched_policy *)calloc(p_data->num_pools, sizeof(ABT_sched_policy)); + + ABT_sched_config_read(config, 1, &policies); + memcpy(p_data->policies, policies, p_data->num_pools * sizeof(ABT_sched_policy)); + + ABT_sched_set_data(sched, (void *)p_data); + + return ABT_SUCCESS; +} + +static void sched_run(ABT_sched sched) { + int cur_rank; + sched_data_t* p_data; + ABT_bool stop; + + int num_pools; + ABT_pool *pools; + + ABT_xstream_self_rank(&cur_rank); + ABT_sched_get_data(sched, (void **)&p_data); + + ABT_sched_get_num_pools(sched, &num_pools); + pools = (ABT_pool *)malloc(num_pools * sizeof(ABT_pool)); + ABT_sched_get_pools(sched, num_pools, 0, pools); + + while (1) { + int pool = -1; + int prio = -1; + double current_time = ABT_get_wtime(); + // TODO: Can also check "isEmpty" so we don't take from pools we don't want to + for (int i = 0; i < num_pools; i++) { + if ((p_data->policies[i].ready_at <= p_data->event_num && + p_data->policies[i].ready_at_wt <= current_time) && + (p_data->policies[i].priority < prio || prio == -1)) { + pool = i; + prio = p_data->policies[i].priority; + } + } + if (pool != p_data->prev_pool) { + p_data->prev_pool = pool; + p_data->successes = 0; + } + if (pool == -1) { + p_data->event_num++; + } else { + ABT_sched_policy *policy = &p_data->policies[pool]; + for (int i = 0; i < policy->min_successes; i++) { + /* Execute one work unit from the scheduler's pool */ + ABT_thread thread; + current_time = ABT_get_wtime(); + ABT_pool_pop_thread(pools[pool], &thread); + if (thread != ABT_THREAD_NULL) { + /* "thread" is associated with its original pool (pools[0]). */ + ABT_self_schedule(thread, ABT_POOL_NULL); + p_data->event_num++; + p_data->successes++; + if (p_data->successes >= policy->max_successes) { + p_data->successes = 0; + policy->ready_at = p_data->event_num + policy->success_timeout; + policy->ready_at_wt = current_time + policy->success_timeout_wt; + break; + } + } else { + p_data->successes = 0; + policy->ready_at = p_data->event_num + policy->fail_timeout; + policy->ready_at_wt = current_time + policy->fail_timeout_wt; + break; + } + } + } + ABT_sched_has_to_stop(sched, &stop); + if (stop == ABT_TRUE) + break; + ABT_xstream_check_events(sched); + } + + free(pools); +} + +static int sched_free(ABT_sched sched) { + sched_data_t* p_data; + + ABT_sched_get_data(sched, (void **)&p_data); + free(p_data->policies); + free(p_data); + + return ABT_SUCCESS; +} + +void ABT_sched_create_reg(int num_pools, ABT_pool *pools, + int num_policies, ABT_sched_policy *policies, + ABT_sched *sched) { + ABT_sched_config config; + + ABT_sched_config_var cv_policies = { .idx = 0, + .type = ABT_SCHED_CONFIG_PTR }; + + ABT_sched_def sched_def = { .type = ABT_SCHED_TYPE_ULT, + .init = sched_init, + .run = sched_run, + .free = sched_free, + .get_migr_pool = NULL }; + + /* Create a scheduler config */ + ABT_sched_config_create(&config, cv_policies, policies, + ABT_sched_config_var_end); + + ABT_sched_create(&sched_def, num_pools, pools, config, sched); + ABT_sched_config_free(&config); +}