From b59fc52f8b78da9653618d904c9978f907d061f7 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 20 May 2024 08:53:42 -0700 Subject: [PATCH 1/6] conf.update: add unpack of "age" priority weight Problem: The priority plugin does not unpack the priority weight associated with an "age" factor for a job's priority calculation. Add the "age" factor's associated integer weight to the plugin's internal map that stores the weights of all of the priority factors by unpacking it in the callback for conf.update. --- src/plugins/mf_priority.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 32dbefba..fb714526 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -185,16 +185,17 @@ static int conf_update_cb (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - int fshare_weight = -1, queue_weight = -1; + int fshare_weight = -1, queue_weight = -1, age_weight = -1; flux_t *h = flux_jobtap_get_flux (p); // unpack the various factors to be used in job priority calculation if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s?{s?{s?{s?i, s?i}}}", + "{s?{s?{s?{s?i, s?i, s?i}}}", "conf", "accounting", "factor-weights", "fairshare", &fshare_weight, - "queue", &queue_weight) < 0) { + "queue", &queue_weight, + "age", &age_weight) < 0) { flux_log_error (flux_jobtap_get_flux (p), "mf_priority: conf.update: flux_plugin_arg_unpack: %s", flux_plugin_arg_strerror (args)); @@ -205,6 +206,8 @@ static int conf_update_cb (flux_plugin_t *p, priority_weights["fairshare"] = fshare_weight; if (queue_weight != -1) priority_weights["queue"] = queue_weight; + if (age_weight != -1) + priority_weights["age"] = queue_weight; return 0; } From 486cb61866432872387b79418e8c5fe9f736106f Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 21 May 2024 12:53:58 -0700 Subject: [PATCH 2/6] plugin: add "age" factor in priority calculation Problem: The priority plugin does not factor in the age of a job in its priority calculation. Add a new callback to the plugin for job.state.sched and store the timestamp of a job in SCHED state in a map where the key is the job ID and the value is the timestamp. When the job enters job.priority.get, the timestamp will be used with the time at that moment to calculate the "age" of the job, where the older the age, the more of a priority bump the job gets. If a job currently accumulating age has its urgency changed to 0, the age of the job will be saved and no longer grow until the urgency of the job is changed back to a value > 0. If a job currently accumulating age has one of its attributes updated, such as the queue or the bank it is running under, any previously-accumulated age will be cleared and reset for the job. When the job enters job.state.inactive, the key-value pair for the job ID and the timestamp of it entering state SCHED is removed. --- src/plugins/mf_priority.cpp | 189 +++++++++++++++++++++++++++++++++++- 1 file changed, 185 insertions(+), 4 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index fb714526..e16d18ad 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -29,6 +29,7 @@ extern "C" { #include #include #include +#include // custom bank_info class file #include "accounting.hpp" @@ -48,6 +49,8 @@ std::map queues; std::map users_def_bank; std::vector projects; std::map priority_weights; +std::map> released_jobs; /****************************************************************************** * * @@ -55,6 +58,76 @@ std::map priority_weights; * * *****************************************************************************/ +/* + * Calculate the age of a job given a job ID. If the job does not have an + * "age", just return 0.0 since it will not be factored into the priority + * calculation of a job. + */ +double calculate_job_age (long int jobid, double prev_age) { + double age_factor = 0.0; + + auto it = released_jobs.find (jobid); + if (it != released_jobs.end ()) { + // job has an "age" associated with it; fetch the timestamp + auto t_released = it->second; + // get current time + auto t_now = std::chrono::system_clock::now(); + // calculate age of job + age_factor = std::chrono::duration + (t_now - t_released).count (); + } + // add any previously calculated age + age_factor += prev_age; + + return age_factor; +} + + +/* + * Calculate and store the age of a job up to this point. This covers the + * scenario where a job that was accumulating age should no longer do so due + * to a controlled reason (such as the job being held by the user with an + * urgency update). + */ +int check_and_pack_previous_age (flux_plugin_t *p, flux_t *h, long int jobid) +{ + std::chrono::time_point t_now; + auto it = released_jobs.find (jobid); + if (it != released_jobs.end ()) { + double *prev_age = static_cast (malloc (sizeof (double))); + if (!prev_age) { + errno = ENOMEM; + return -1; + } + + // attempt to fetch any previously-accumulated age of the job and add it + double *total_age = static_cast (flux_jobtap_job_aux_get ( + p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:prev_age")); + if (total_age != NULL) + *prev_age += *total_age; + + // calculate the age of the job up to this point + t_now = std::chrono::system_clock::now(); + *prev_age += std::chrono::duration + (t_now - released_jobs[jobid]).count (); + + // erase entry in released_jobs map (since the job's age will be reset) + released_jobs.erase (jobid); + + if (flux_jobtap_job_aux_set (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:prev_age", + prev_age, + free) < 0) + flux_log_error (h, "flux_jobtap_job_aux_set"); + } + + return 0; +} + + /* * Calculate a user's job priority using the following factors: * @@ -65,6 +138,16 @@ std::map priority_weights; * * queue: a factor that can further affect the priority of a job based on the * queue passed in. + * + * age: a factor that considers the time that a job was released to the + * scheduler to be run but could not due to another constraint (e.g a + * resource constraint). Any urgency, priority, or jobspec-update event + * posted to a job in SCHED state will transition the job back to + * PRIORITY. After the priority of the job is calculated, the job will + * be transitioned back to SCHED state and the job's "t_released" + * timestamp will be updated to reflect this most recent transition. This + * is to prevent users from purposely holding their jobs while they are in + * SCHED state and artificially boosting their job's priority. */ int64_t priority_calculation (flux_plugin_t *p, flux_plugin_arg_t *args, @@ -72,13 +155,28 @@ int64_t priority_calculation (flux_plugin_t *p, char *bank, int urgency) { + long int jobid; double fshare_factor = 0.0, priority = 0.0; int queue_factor = 0; - int fshare_weight, queue_weight; + int fshare_weight, queue_weight, age_weight; + double age_factor = 0.0, age = 0.0; Association *b; + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:I}", + "id", &jobid) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } + fshare_weight = priority_weights["fairshare"]; queue_weight = priority_weights["queue"]; + age_weight = priority_weights["age"]; if (urgency == FLUX_JOB_URGENCY_HOLD) return FLUX_JOB_PRIORITY_MIN; @@ -98,11 +196,22 @@ int64_t priority_calculation (flux_plugin_t *p, return -1; } + double *prev_age = static_cast (flux_jobtap_job_aux_get ( + p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:prev_age")); + if (prev_age != NULL) + // factor in any previously-accumulated age to the calculation of + // the age factor for this job + age += *prev_age; + + age_factor = calculate_job_age (jobid, age); fshare_factor = b->fairshare; queue_factor = b->queue_factor; priority = round ((fshare_weight * fshare_factor) + (queue_weight * queue_factor) + + (age_weight * age_factor) + (urgency - 16)); if (priority < 0) @@ -456,12 +565,14 @@ static int priority_cb (flux_plugin_t *p, char *bank = NULL; char *queue = NULL; int64_t priority; + long int jobid; Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:i, s{s{s{s?s, s?s}}}}", + "{s:I, s:i, s:i, s{s{s{s?s, s?s}}}}", + "id", &jobid, "urgency", &urgency, "userid", &userid, "jobspec", "attributes", "system", @@ -485,6 +596,20 @@ static int priority_cb (flux_plugin_t *p, return -1; } + if (urgency == 0) { + // the job is being held; if the job was accumulating an age up to this + // point, we need to store this age and no longer consider age a factor + // until this job is updated to have an urgency > 0 + if (check_and_pack_previous_age (p, h, jobid) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", + 0, + "failed to pack prev_age for job"); + return -1; + } + } + if (b->max_run_jobs == BANK_INFO_MISSING) { // the association that this job is submitted under could not be found // in the plugin's internal map when the job was first submitted and is @@ -810,6 +935,43 @@ static int depend_cb (flux_plugin_t *p, } +/* + * When a job gets to the job.state.sched state, generate a timestamp to be + * associated with the job in the case that it is eligible to be run but + * cannot (e.g due to resource constraints). The timestamp will be used + * to calculate the "age" of a job and increase its priority according to + * the time it has been waiting to run. + */ +static int sched_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + int urgency = 0; + long int jobid; + std::chrono::time_point t_released; + flux_t *h = flux_jobtap_get_flux (p); + + // unpack urgency + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:I, s:i}", + "id", &jobid, + "urgency", &urgency) < 0) + return flux_jobtap_error (p, args, "unable to unpack urgency"); + + if (urgency > 0) { + // the job is not being held, and therefore we need to get the current + // time to keep track of how long the job has been released and waiting + t_released = std::chrono::system_clock::now(); + // store job id, t_released in map + released_jobs[jobid] = t_released; + } + + return 0; +} + + static int run_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, @@ -851,12 +1013,14 @@ static int job_updated (flux_plugin_t *p, char *bank = NULL; char *updated_queue = NULL; char *updated_bank = NULL; + long int jobid; Association *a; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s}}}, s:{s?s, s?s}}", + "{s:I, s:i, s{s{s{s?s}}}, s:{s?s, s?s}}", + "id", &jobid, "userid", &userid, "jobspec", "attributes", "system", "bank", &bank, @@ -919,6 +1083,16 @@ static int job_updated (flux_plugin_t *p, // associated with the job a->queue_factor = get_queue_info (updated_queue, a->queues, queues); + // the job was updated to either run under a new bank or new queue; reset + // the age of the job + released_jobs.erase (jobid); + if (flux_jobtap_job_aux_set (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority:prev_age", + NULL, + NULL) < 0) + flux_log_error (h, "flux_jobtap_job_aux_set"); + return 0; } @@ -1032,12 +1206,14 @@ static int inactive_cb (flux_plugin_t *p, void *data) { int userid; + long int jobid; Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i}", + "{s:I, s:i}", + "id", &jobid, "userid", &userid) < 0) { flux_log (h, LOG_ERR, @@ -1082,6 +1258,10 @@ static int inactive_cb (flux_plugin_t *p, b->held_jobs.erase (b->held_jobs.begin ()); } + // if the job had an "age" associated with it, remove it from the map that + // stores its ID and t_released timestamp + released_jobs.erase (jobid); + return 0; } @@ -1095,6 +1275,7 @@ static const struct flux_plugin_handler tab[] = { { "job.state.depend", depend_cb, NULL }, { "job.update", job_updated, NULL}, { "job.state.run", run_cb, NULL}, + { "job.state.sched", sched_cb, NULL}, { "plugin.query", query_cb, NULL}, { "job.update.attributes.system.queue", update_queue_cb, NULL }, { "job.update.attributes.system.bank", update_bank_cb, NULL }, From ad47979c303f7135d02f99e58fa2822461178eb2 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 21 May 2024 12:57:37 -0700 Subject: [PATCH 3/6] t: disable "age" factor in priority calculation Problem: A number of tests in the testsuite look for a specific priority value that becomes very difficult to test with the addition of the "age" factor. In the tests that look for a specific priority value, disable the "age" factor in the priority calculation of a job. --- t/t1001-mf-priority-basic.t | 11 ++++++++++- t/t1002-mf-priority-small-no-tie.t | 11 ++++++++++- t/t1003-mf-priority-small-tie.t | 11 ++++++++++- t/t1004-mf-priority-small-tie-all.t | 11 ++++++++++- t/t1005-max-jobs-limits.t | 11 ++++++++++- t/t1008-mf-priority-update.t | 11 ++++++++++- t/t1012-mf-priority-load.t | 11 ++++++++++- t/t1013-mf-priority-queues.t | 10 ++++++++++ t/t1014-mf-priority-dne.t | 11 ++++++++++- t/t1015-mf-priority-urgency.t | 11 ++++++++++- t/t1018-mf-priority-disable-entry.t | 11 ++++++++++- t/t1019-mf-priority-info-fetch.t | 12 +++++++++++- t/t1020-mf-priority-issue262.t | 11 ++++++++++- t/t1030-mf-priority-update-queue.t | 4 +++- t/t1033-mf-priority-update-job.t | 2 ++ t/t1034-mf-priority-config.t | 4 +++- 16 files changed, 139 insertions(+), 14 deletions(-) diff --git a/t/t1001-mf-priority-basic.t b/t/t1001-mf-priority-basic.t index 5bddbd74..1fecf57c 100755 --- a/t/t1001-mf-priority-basic.t +++ b/t/t1001-mf-priority-basic.t @@ -7,9 +7,10 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -27,6 +28,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'send an empty payload to make sure unpack fails' ' cat <<-EOF >bad_payload.py && import flux diff --git a/t/t1002-mf-priority-small-no-tie.t b/t/t1002-mf-priority-small-no-tie.t index e0fbc977..963defa6 100755 --- a/t/t1002-mf-priority-small-no-tie.t +++ b/t/t1002-mf-priority-small-no-tie.t @@ -8,9 +8,10 @@ SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py SMALL_NO_TIE=${SHARNESS_TEST_SRCDIR}/expected/sample_payloads/small_no_tie.json +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -28,6 +29,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'send the user information to the plugin' ' flux python ${SEND_PAYLOAD} ${SMALL_NO_TIE} ' diff --git a/t/t1003-mf-priority-small-tie.t b/t/t1003-mf-priority-small-tie.t index 0b4baa80..19ac8b99 100755 --- a/t/t1003-mf-priority-small-tie.t +++ b/t/t1003-mf-priority-small-tie.t @@ -8,9 +8,10 @@ SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py SMALL_TIE=${SHARNESS_TEST_SRCDIR}/expected/sample_payloads/small_tie.json +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -28,6 +29,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'send the user information to the plugin' ' flux python ${SEND_PAYLOAD} ${SMALL_TIE} ' diff --git a/t/t1004-mf-priority-small-tie-all.t b/t/t1004-mf-priority-small-tie-all.t index 7b5a032a..91c1d643 100755 --- a/t/t1004-mf-priority-small-tie-all.t +++ b/t/t1004-mf-priority-small-tie-all.t @@ -8,9 +8,10 @@ SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py SMALL_TIE_ALL=${SHARNESS_TEST_SRCDIR}/expected/sample_payloads/small_tie_all.json +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -28,6 +29,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'send the user information to the plugin' ' flux python ${SEND_PAYLOAD} ${SMALL_TIE_ALL} ' diff --git a/t/t1005-max-jobs-limits.t b/t/t1005-max-jobs-limits.t index ad3bb1a5..a5517059 100755 --- a/t/t1005-max-jobs-limits.t +++ b/t/t1005-max-jobs-limits.t @@ -7,9 +7,10 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -27,6 +28,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'create fake_user.json' ' cat <<-EOF >fake_user.json { diff --git a/t/t1008-mf-priority-update.t b/t/t1008-mf-priority-update.t index f45566e8..8e50acb2 100755 --- a/t/t1008-mf-priority-update.t +++ b/t/t1008-mf-priority-update.t @@ -7,9 +7,10 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py DB_PATH=$(pwd)/FluxAccountingTest.db +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -35,6 +36,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'add some banks to the DB' ' flux account add-bank root 1 && flux account add-bank --parent-bank=root account1 1 diff --git a/t/t1012-mf-priority-load.t b/t/t1012-mf-priority-load.t index 04e307c5..8decf7cb 100755 --- a/t/t1012-mf-priority-load.t +++ b/t/t1012-mf-priority-load.t @@ -6,9 +6,10 @@ test_description='Test multi-factor priority plugin and loading user information MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -20,6 +21,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'create fake_payload.py' ' cat <<-EOF >fake_payload.py import flux diff --git a/t/t1013-mf-priority-queues.t b/t/t1013-mf-priority-queues.t index b00fafc2..99a8249f 100755 --- a/t/t1013-mf-priority-queues.t +++ b/t/t1013-mf-priority-queues.t @@ -38,6 +38,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >conf.d/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'add some banks to the DB' ' flux account add-bank root 1 && flux account add-bank --parent-bank=root account1 1 && @@ -92,6 +100,8 @@ test_expect_success 'configure flux with those queues' ' [queues.silver] [queues.gold] [queues.foo] + [accounting.factor-weights] + age = 0 EOT flux config reload && flux queue stop --all diff --git a/t/t1014-mf-priority-dne.t b/t/t1014-mf-priority-dne.t index 79de32ea..aa2be2cc 100755 --- a/t/t1014-mf-priority-dne.t +++ b/t/t1014-mf-priority-dne.t @@ -5,8 +5,9 @@ test_description='Test cancelling active jobs with a late user/bank info load' . `dirname $0`/sharness.sh MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +mkdir -p config export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -18,6 +19,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'submit a number of jobs with no user/bank info loaded to plugin' ' jobid1=$(flux submit --wait-event=depend hostname) && jobid2=$(flux submit --wait-event=depend hostname) && diff --git a/t/t1015-mf-priority-urgency.t b/t/t1015-mf-priority-urgency.t index 30108c35..4c9fa9ae 100755 --- a/t/t1015-mf-priority-urgency.t +++ b/t/t1015-mf-priority-urgency.t @@ -8,8 +8,9 @@ SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py SAME_FAIRSHARE=${SHARNESS_TEST_SRCDIR}/expected/sample_payloads/same_fairshare.json +mkdir -p config export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -27,6 +28,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'add a default queue and send it to the plugin' ' cat <<-EOF >fake_payload.py import flux diff --git a/t/t1018-mf-priority-disable-entry.t b/t/t1018-mf-priority-disable-entry.t index e83a6833..3f627be2 100755 --- a/t/t1018-mf-priority-disable-entry.t +++ b/t/t1018-mf-priority-disable-entry.t @@ -6,8 +6,9 @@ test_description='Test rejecting jobs from a user who has been disabled from the MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so DB_PATH=$(pwd)/FluxAccountingTest.db +mkdir -p config export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -27,6 +28,14 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'add some banks to the DB' ' flux account add-bank root 1 && flux account add-bank --parent-bank=root account1 1 && diff --git a/t/t1019-mf-priority-info-fetch.t b/t/t1019-mf-priority-info-fetch.t index 5a563066..142cd6f8 100755 --- a/t/t1019-mf-priority-info-fetch.t +++ b/t/t1019-mf-priority-info-fetch.t @@ -8,9 +8,10 @@ SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py DB_PATH=$(pwd)/FluxAccountingTest.db EXPECTED_FILES=${SHARNESS_TEST_SRCDIR}/expected/plugin_state +mkdir -p config export TEST_UNDER_FLUX_NO_JOB_EXEC=y export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job +test_under_flux 1 job -o,--config-path=$(pwd)/config flux setattr log-stderr-level 1 @@ -36,6 +37,15 @@ test_expect_success 'check that mf_priority plugin is loaded' ' flux jobtap list | grep mf_priority ' + +test_expect_success 'disable age factor in multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success HAVE_JQ 'flux jobtap query returns basic information' ' flux jobtap query mf_priority.so >query.json && test_debug "jq -S . config/test.toml <<-EOT && + [accounting.factor-weights] + age = 0 + EOT + flux config reload +' + test_expect_success 'add some banks to the DB' ' flux account add-bank root 1 && flux account add-bank --parent-bank=root account1 1 && diff --git a/t/t1030-mf-priority-update-queue.t b/t/t1030-mf-priority-update-queue.t index d0c4d1c1..d2e5d14a 100755 --- a/t/t1030-mf-priority-update-queue.t +++ b/t/t1030-mf-priority-update-queue.t @@ -63,10 +63,12 @@ test_expect_success 'send flux-accounting DB information to the plugin' ' ' test_expect_success 'configure flux with some queues' ' - cat >conf.d/queues.toml <<-EOT && + cat >conf.d/test.toml <<-EOT && [queues.bronze] [queues.silver] [queues.gold] + [accounting.factor-weights] + age = 0 EOT flux config reload ' diff --git a/t/t1033-mf-priority-update-job.t b/t/t1033-mf-priority-update-job.t index 668a2856..194d2dd7 100755 --- a/t/t1033-mf-priority-update-job.t +++ b/t/t1033-mf-priority-update-job.t @@ -67,6 +67,8 @@ test_expect_success 'configure flux with some queues' ' [queues.bronze] [queues.silver] [queues.gold] + [accounting.factor-weights] + age = 0 EOT flux config reload ' diff --git a/t/t1034-mf-priority-config.t b/t/t1034-mf-priority-config.t index 0baaf448..547158d6 100755 --- a/t/t1034-mf-priority-config.t +++ b/t/t1034-mf-priority-config.t @@ -51,7 +51,7 @@ test_expect_success 'send flux-accounting DB information to the plugin' ' test_expect_success 'no configured priority factors will use default weights' ' job1=$(flux python ${SUBMIT_AS} 1001 -n1 hostname) && flux job wait-event -f json ${job1} priority | jq '.context.priority' > job1.test && - grep "50000" job1.test && + grep -c "50[0-9][0-9][0-9]" job1.test && flux cancel ${job1} ' @@ -60,6 +60,7 @@ test_expect_success 'set up new configuration for multi-factor priority plugin' [accounting.factor-weights] fairshare = 1000 queue = 100 + age = 0 EOT flux config reload ' @@ -76,6 +77,7 @@ test_expect_success 'change the configuration for the priority factors' ' [accounting.factor-weights] fairshare = 500 queue = 100 + age = 0 EOT flux config reload ' From 0286fa112f0e53099c077d33bddd7ae3412b9165 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 21 May 2024 12:59:04 -0700 Subject: [PATCH 4/6] t: add tests for "age" factor Problem: flux-accounting has no tests for the implementation of the "age" factor in the priority plugin. Add some tests. --- t/Makefile.am | 1 + t/t1035-mf-priority-age.t | 237 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+) create mode 100755 t/t1035-mf-priority-age.t diff --git a/t/Makefile.am b/t/Makefile.am index 3a9124b8..287f66f5 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -35,6 +35,7 @@ TESTSCRIPTS = \ t1032-mf-priority-update-bank.t \ t1033-mf-priority-update-job.t \ t1034-mf-priority-config.t \ + t1035-mf-priority-age.t \ t5000-valgrind.t \ python/t1000-example.py diff --git a/t/t1035-mf-priority-age.t b/t/t1035-mf-priority-age.t new file mode 100755 index 00000000..3adc0482 --- /dev/null +++ b/t/t1035-mf-priority-age.t @@ -0,0 +1,237 @@ +#!/bin/bash + +test_description='test the age factor in multi-factor priority plugin' + +. `dirname $0`/sharness.sh +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +mkdir -p config + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job -o,--config-path=$(pwd)/config + +flux setattr log-stderr-level 1 + +test_expect_success 'allow guest access to testexec' ' + flux config load <<-EOF + [exec.testexec] + allow-guests = true + EOF +' + +test_expect_success 'load plugin successfully without configuration' ' + flux jobtap load ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'create a flux-accounting DB' ' + flux account -p ${DB_PATH} create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'add some banks to the DB' ' + flux account -p ${DB_PATH} add-bank root 1 && + flux account -p ${DB_PATH} add-bank --parent-bank=root bankA 1 && + flux account -p ${DB_PATH} add-bank --parent-bank=root bankB 1 +' + +test_expect_success 'add a user to the DB' ' + flux account -p ${DB_PATH} add-user \ + --username=user1001 \ + --userid=1001 \ + --bank=bankA && + flux account -p ${DB_PATH} add-user \ + --username=user1001 \ + --userid=1001 \ + --bank=bankB +' + +test_expect_success 'configure multi-factor priority plugin' ' + cat >config/test.toml <<-EOT && + [accounting.factor-weights] + fairshare = 1000 + queue = 100 + age = 100 + EOT + flux config reload +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p $(pwd)/FluxAccountingTest.db +' + +# In this first set of tests, we will submit two one-node jobs with just one +# node available. While both jobs will receive a priority and be released by +# the scheduler, the second one will have to wait for the first to finish +# running before it can run. Therefore, the second job will have its time of +# release kept track of by the plugin while it waits to be run. Once time has +# passed and jobs are reprioritized, the held job's priority will increase +# since it has been waiting. The check at the end of this first set makes sure +# that the priority of the second job has increased as a result of waiting for +# the resources so that it can run. +test_expect_success 'submit two one-node jobs with just one node available' ' + job1=$(flux python ${SUBMIT_AS} 1001 -N1 sleep 60) && + flux job wait-event -vt 10 ${job1} alloc && + job2=$(flux python ${SUBMIT_AS} 1001 -N1 sleep 60) && + flux job wait-event -f json ${job2} priority \ + | jq '.context.priority' > job2.priority +' + +test_expect_success 'reprioritize jobs while jobs are active' ' + sleep 1.5 && + cat <<-EOF >fake_payload.py + import flux + + flux.Flux().rpc("job-manager.mf_priority.reprioritize").get() + EOF + flux python fake_payload.py && + flux cancel ${job1} +' + +test_expect_success 'grab new priority of job' ' + flux job info ${job2} eventlog > eventlog.out && + grep "priority" eventlog.out \ + | awk "NR==2" \ + | jq '.context.priority' > job2.new_priority +' + +test_expect_success 'make sure the priority of the job has increased' ' + old_priority=$(cat job2.priority) && + new_priority=$(cat job2.new_priority) && + test $new_priority -gt $old_priority +' + +test_expect_success 'cancel job' ' + flux cancel ${job2} +' + +# In this second set of tests, a job is submitted with an urgency set to 0, +# therefore preventing it from running. When the urgency of the job is updated +# to a value greater than 0, we check that the age factor is not considered +# when re-calculating the priority of the job. +test_expect_success 'submit a job with urgency==0' ' + job=$(flux python ${SUBMIT_AS} 1001 --urgency=0 -N1 sleep 60) +' + +test_expect_success 'update the urgency of the job' ' + flux job urgency ${job} 16 +' + +test_expect_success 'make sure priority has not increased' ' + flux job wait-event -vt 10 ${job} alloc && + flux job info ${job} eventlog > eventlog.out && + grep "priority" eventlog.out \ + | jq '.context.priority' | tail -n 1 > job.priority + test "$(cat job.priority)" -eq 500 +' + +test_expect_success 'cancel job' ' + flux cancel ${job} +' + +# In this set of tests, the scenario from the first set is repeated; this time, +# the second job has an urgency of 0 set to it, preventing it from running. Its +# urgency is then updated to have a value greater than 0 but cannot actually +# run since the first job is still running. Jobs are reprioritized and the job +# that previously could not run is checked to make sure that its priority has +# increased from before as a result of its age in the queue. +test_expect_success 'submit two jobs' ' + job1=$(flux python ${SUBMIT_AS} 1001 -N1 sleep 60) && + flux job wait-event -vt 10 ${job1} alloc && + job2=$(flux python ${SUBMIT_AS} 1001 -N1 --urgency=0 sleep 60) && + flux job wait-event -vt 10 ${job2} priority +' + +test_expect_success 'update the urgency of the second job to start considering age as a factor' ' + flux job urgency ${job2} 16 +' + +test_expect_success 'reprioritize jobs and cancel running job' ' + sleep 1.5 && + cat <<-EOF >fake_payload.py + import flux + + flux.Flux().rpc("job-manager.mf_priority.reprioritize").get() + EOF + flux python fake_payload.py && + flux cancel ${job1} +' + +test_expect_success 'priority of previously-released job increases' ' + flux job info ${job2} eventlog > eventlog.out && + grep "priority" eventlog.out \ + | awk "NR==2" \ + | jq '.context.priority' > job2.old_priority && + grep "priority" eventlog.out \ + | awk "NR==3" \ + | jq '.context.priority' > job2.new_priority +' + +test_expect_success 'compare priorities of previously released job' ' + old_priority=$(cat job2.old_priority) && + new_priority=$(cat job2.new_priority) && + test $new_priority -gt $old_priority +' + +test_expect_success 'cancel job' ' + flux cancel ${job2} +' + +# In this set of tests, we check that updating a job to use a different bank or +# queue clears and resets the age of the job. +test_expect_success 'submit two one-node jobs with just one node available' ' + job1=$(flux python ${SUBMIT_AS} 1001 -N1 sleep 60) && + flux job wait-event -vt 10 ${job1} alloc && + job2=$(flux python ${SUBMIT_AS} 1001 -N1 sleep 60) && + flux job wait-event -f json ${job2} priority \ + | jq '.context.priority' > job2.priority +' + +test_expect_success 'reprioritize jobs while jobs are active (job2 will accumulate age)' ' + sleep 1.5 && + cat <<-EOF >fake_payload.py + import flux + + flux.Flux().rpc("job-manager.mf_priority.reprioritize").get() + EOF + flux python fake_payload.py +' + +test_expect_success 'change the urgency to 0 and update the bank of the released job' ' + flux job urgency ${job2} 0 && + flux update ${job2} bank=bankB && + flux job wait-event -vt 10 ${job2} priority && + flux job eventlog ${job2} > eventlog.out && + grep "attributes.system.bank=\"bankB\"" eventlog.out +' + +test_expect_success 'cancel first job, update urgency of released job' ' + flux cancel ${job1} && + flux job urgency ${job2} 16 && + flux job wait-event -f json ${job2} alloc && + flux job info ${job2} eventlog > eventlog.out && + grep "priority" eventlog.out \ + | awk "NR==5" \ + | jq '.context.priority' > job2.new_priority +' + +test_expect_success 'make sure priority does not factor in the age before the bank update' ' + old_priority=$(cat job2.priority) && + new_priority=$(cat job2.new_priority) && + test $new_priority -eq $old_priority +' + +test_expect_success 'cancel released job' ' + flux cancel ${job2} +' + +test_expect_success 'shut down flux-accounting service' ' + flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" +' + +test_done From 703d95676edbe8d0e382cfae05924cc6aa6bd101 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Tue, 28 May 2024 08:30:10 -0700 Subject: [PATCH 5/6] man5: add "age" entry to manpage Problem: The flux-config-accounting manpage does not describe the "age" factor in the priority plugin even though it is a factor in the calculation of a job's priority. Add "age" to the description of the priority factors in the flux-config-accounting manpage. --- doc/man5/flux-config-accounting.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/doc/man5/flux-config-accounting.rst b/doc/man5/flux-config-accounting.rst index 7252fcd5..1bb2992f 100644 --- a/doc/man5/flux-config-accounting.rst +++ b/doc/man5/flux-config-accounting.rst @@ -19,6 +19,8 @@ the priority plugin have the following weights: +-------------+--------+ | queue | 10000 | +-------------+--------+ +| age | 1000 | ++-------------+--------+ The ``accounting.factor-weights`` sub-table may contain the following keys: @@ -34,6 +36,11 @@ queue Integer value that represents the weight associated with submitting a job to a certain queue. +age + Integer value that represents the weight associated with how long a job has + been able to be scheduled but could not due to an external constraint (e.g + a resource constraint). + EXAMPLE ======= @@ -43,3 +50,4 @@ EXAMPLE [accounting.factor-weights] fairshare = 10000 queue = 1000 + age = 100 From 37cad48b8a1a970511f0ee394d47ad60ab9419dd Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Tue, 28 May 2024 08:35:55 -0700 Subject: [PATCH 6/6] accounting-guide: add "queue", "age" factors Problem: The accounting guide does not list "queue" or "age" as factors considered by the plugin when calculating a job's priority. Add both of these factors to the listed description of factors used when calculating a job's priority in the priority plugin. --- doc/guide/accounting-guide.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/doc/guide/accounting-guide.rst b/doc/guide/accounting-guide.rst index 9ed513b4..d1dc2b99 100644 --- a/doc/guide/accounting-guide.rst +++ b/doc/guide/accounting-guide.rst @@ -316,6 +316,12 @@ the multi-factor priority plugin are: consumed. See the :ref:`Glossary definition ` for a more detailed explanation of how fair-share is utilized within flux-accounting. +* **queue**: an integer weight associated with submitting a job to a certain + queue. + +* **age**: how long a job has been able to be scheduled but could not due to an + external constraint (e.g a resource constraint). + * **urgency**: a user-controlled factor to prioritize their own jobs. In addition to generating an integer priority for submitted jobs in a Flux