Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: add "age" as a factor to priority calculation for jobs #455

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/guide/accounting-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ the multi-factor priority plugin are:
consumed. See the :ref:`Glossary definition <glossary-section>` for a more
detailed explanation of how fair-share is utilized within flux-accounting.

* **queue**: an integer weight associated with submitting a job to a certain
queue.

* **age**: how long a job has been able to be scheduled but could not due to an
external constraint (e.g a resource constraint).

* **urgency**: a user-controlled factor to prioritize their own jobs.

In addition to generating an integer priority for submitted jobs in a Flux
Expand Down
8 changes: 8 additions & 0 deletions doc/man5/flux-config-accounting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ the priority plugin have the following weights:
+-------------+--------+
| queue | 10000 |
+-------------+--------+
| age | 1000 |
+-------------+--------+

The ``accounting.factor-weights`` sub-table may contain the following keys:

Expand All @@ -34,6 +36,11 @@ queue
Integer value that represents the weight associated with submitting a job
to a certain queue.

age
Integer value that represents the weight associated with how long a job has
been able to be scheduled but could not due to an external constraint (e.g
a resource constraint).


EXAMPLE
=======
Expand All @@ -43,3 +50,4 @@ EXAMPLE
[accounting.factor-weights]
fairshare = 10000
queue = 1000
age = 100
198 changes: 191 additions & 7 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extern "C" {
#include <vector>
#include <sstream>
#include <cstdint>
#include <chrono>

// custom bank_info class file
#include "accounting.hpp"
Expand All @@ -48,13 +49,85 @@ std::map<std::string, Queue> queues;
std::map<int, std::string> users_def_bank;
std::vector<std::string> projects;
std::map<std::string, int> priority_weights;
std::map<flux_jobid_t,
std::chrono::time_point<std::chrono::system_clock>> released_jobs;

/******************************************************************************
* *
* Helper Functions *
* *
*****************************************************************************/

/*
* Calculate the age of a job given a job ID. If the job does not have an
* "age", just return 0.0 since it will not be factored into the priority
* calculation of a job.
*/
double calculate_job_age (long int jobid, double prev_age) {
double age_factor = 0.0;

auto it = released_jobs.find (jobid);
if (it != released_jobs.end ()) {
// job has an "age" associated with it; fetch the timestamp
auto t_released = it->second;
// get current time
auto t_now = std::chrono::system_clock::now();
// calculate age of job
age_factor = std::chrono::duration<double>
(t_now - t_released).count ();
}
// add any previously calculated age
age_factor += prev_age;

return age_factor;
}


/*
* Calculate and store the age of a job up to this point. This covers the
* scenario where a job that was accumulating age should no longer do so due
* to a controlled reason (such as the job being held by the user with an
* urgency update).
*/
int check_and_pack_previous_age (flux_plugin_t *p, flux_t *h, long int jobid)
{
std::chrono::time_point<std::chrono::system_clock> t_now;
auto it = released_jobs.find (jobid);
if (it != released_jobs.end ()) {
double *prev_age = static_cast<double *> (malloc (sizeof (double)));
if (!prev_age) {
errno = ENOMEM;
return -1;
}

// attempt to fetch any previously-accumulated age of the job and add it
double *total_age = static_cast<double *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:prev_age"));
if (total_age != NULL)
*prev_age += *total_age;

// calculate the age of the job up to this point
t_now = std::chrono::system_clock::now();
*prev_age += std::chrono::duration<double>
(t_now - released_jobs[jobid]).count ();

// erase entry in released_jobs map (since the job's age will be reset)
released_jobs.erase (jobid);

if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:prev_age",
prev_age,
free) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");
}

return 0;
}


/*
* Calculate a user's job priority using the following factors:
*
Expand All @@ -65,20 +138,45 @@ std::map<std::string, int> priority_weights;
*
* queue: a factor that can further affect the priority of a job based on the
* queue passed in.
*
* age: a factor that considers the time that a job was released to the
* scheduler to be run but could not due to another constraint (e.g a
* resource constraint). Any urgency, priority, or jobspec-update event
* posted to a job in SCHED state will transition the job back to
* PRIORITY. After the priority of the job is calculated, the job will
* be transitioned back to SCHED state and the job's "t_released"
* timestamp will be updated to reflect this most recent transition. This
* is to prevent users from purposely holding their jobs while they are in
* SCHED state and artificially boosting their job's priority.
*/
int64_t priority_calculation (flux_plugin_t *p,
flux_plugin_arg_t *args,
int userid,
char *bank,
int urgency)
{
long int jobid;
double fshare_factor = 0.0, priority = 0.0;
int queue_factor = 0;
int fshare_weight, queue_weight;
int fshare_weight, queue_weight, age_weight;
double age_factor = 0.0, age = 0.0;
Association *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:I}",
"id", &jobid) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
return -1;
}

fshare_weight = priority_weights["fairshare"];
queue_weight = priority_weights["queue"];
age_weight = priority_weights["age"];

if (urgency == FLUX_JOB_URGENCY_HOLD)
return FLUX_JOB_PRIORITY_MIN;
Expand All @@ -98,11 +196,22 @@ int64_t priority_calculation (flux_plugin_t *p,
return -1;
}

double *prev_age = static_cast<double *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:prev_age"));
if (prev_age != NULL)
// factor in any previously-accumulated age to the calculation of
// the age factor for this job
age += *prev_age;

age_factor = calculate_job_age (jobid, age);
fshare_factor = b->fairshare;
queue_factor = b->queue_factor;

priority = round ((fshare_weight * fshare_factor) +
(queue_weight * queue_factor) +
(age_weight * age_factor) +
(urgency - 16));

if (priority < 0)
Expand Down Expand Up @@ -185,16 +294,17 @@ static int conf_update_cb (flux_plugin_t *p,
flux_plugin_arg_t *args,
void *data)
{
int fshare_weight = -1, queue_weight = -1;
int fshare_weight = -1, queue_weight = -1, age_weight = -1;
flux_t *h = flux_jobtap_get_flux (p);

// unpack the various factors to be used in job priority calculation
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s?{s?{s?{s?i, s?i}}}",
"{s?{s?{s?{s?i, s?i, s?i}}}",
"conf", "accounting", "factor-weights",
"fairshare", &fshare_weight,
"queue", &queue_weight) < 0) {
"queue", &queue_weight,
"age", &age_weight) < 0) {
flux_log_error (flux_jobtap_get_flux (p),
"mf_priority: conf.update: flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
Expand All @@ -205,6 +315,8 @@ static int conf_update_cb (flux_plugin_t *p,
priority_weights["fairshare"] = fshare_weight;
if (queue_weight != -1)
priority_weights["queue"] = queue_weight;
if (age_weight != -1)
priority_weights["age"] = queue_weight;

return 0;
}
Expand Down Expand Up @@ -453,12 +565,14 @@ static int priority_cb (flux_plugin_t *p,
char *bank = NULL;
char *queue = NULL;
int64_t priority;
long int jobid;
Association *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:i, s{s{s{s?s, s?s}}}}",
"{s:I, s:i, s:i, s{s{s{s?s, s?s}}}}",
"id", &jobid,
"urgency", &urgency,
"userid", &userid,
"jobspec", "attributes", "system",
Expand All @@ -482,6 +596,20 @@ static int priority_cb (flux_plugin_t *p,
return -1;
}

if (urgency == 0) {
// the job is being held; if the job was accumulating an age up to this
// point, we need to store this age and no longer consider age a factor
// until this job is updated to have an urgency > 0
if (check_and_pack_previous_age (p, h, jobid) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"failed to pack prev_age for job");
return -1;
}
}

if (b->max_run_jobs == BANK_INFO_MISSING) {
// the association that this job is submitted under could not be found
// in the plugin's internal map when the job was first submitted and is
Expand Down Expand Up @@ -807,6 +935,43 @@ static int depend_cb (flux_plugin_t *p,
}


/*
* When a job gets to the job.state.sched state, generate a timestamp to be
* associated with the job in the case that it is eligible to be run but
* cannot (e.g due to resource constraints). The timestamp will be used
* to calculate the "age" of a job and increase its priority according to
* the time it has been waiting to run.
*/
static int sched_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int urgency = 0;
long int jobid;
std::chrono::time_point<std::chrono::system_clock> t_released;
flux_t *h = flux_jobtap_get_flux (p);

// unpack urgency
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:I, s:i}",
"id", &jobid,
"urgency", &urgency) < 0)
return flux_jobtap_error (p, args, "unable to unpack urgency");

if (urgency > 0) {
// the job is not being held, and therefore we need to get the current
// time to keep track of how long the job has been released and waiting
t_released = std::chrono::system_clock::now();
// store job id, t_released in map
released_jobs[jobid] = t_released;
}

return 0;
}


static int run_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -848,12 +1013,14 @@ static int job_updated (flux_plugin_t *p,
char *bank = NULL;
char *updated_queue = NULL;
char *updated_bank = NULL;
long int jobid;
Association *a;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s}}}, s:{s?s, s?s}}",
"{s:I, s:i, s{s{s{s?s}}}, s:{s?s, s?s}}",
"id", &jobid,
"userid", &userid,
"jobspec", "attributes", "system", "bank",
&bank,
Expand Down Expand Up @@ -916,6 +1083,16 @@ static int job_updated (flux_plugin_t *p,
// associated with the job
a->queue_factor = get_queue_info (updated_queue, a->queues, queues);

// the job was updated to either run under a new bank or new queue; reset
// the age of the job
released_jobs.erase (jobid);
if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:prev_age",
NULL,
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");

return 0;
}

Expand Down Expand Up @@ -1029,12 +1206,14 @@ static int inactive_cb (flux_plugin_t *p,
void *data)
{
int userid;
long int jobid;
Association *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i}",
"{s:I, s:i}",
"id", &jobid,
"userid", &userid) < 0) {
flux_log (h,
LOG_ERR,
Expand Down Expand Up @@ -1079,6 +1258,10 @@ static int inactive_cb (flux_plugin_t *p,
b->held_jobs.erase (b->held_jobs.begin ());
}

// if the job had an "age" associated with it, remove it from the map that
// stores its ID and t_released timestamp
released_jobs.erase (jobid);

return 0;
}

Expand All @@ -1092,6 +1275,7 @@ static const struct flux_plugin_handler tab[] = {
{ "job.state.depend", depend_cb, NULL },
{ "job.update", job_updated, NULL},
{ "job.state.run", run_cb, NULL},
{ "job.state.sched", sched_cb, NULL},
{ "plugin.query", query_cb, NULL},
{ "job.update.attributes.system.queue", update_queue_cb, NULL },
{ "job.update.attributes.system.bank", update_bank_cb, NULL },
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ TESTSCRIPTS = \
t1032-mf-priority-update-bank.t \
t1033-mf-priority-update-job.t \
t1034-mf-priority-config.t \
t1035-mf-priority-age.t \
t5000-valgrind.t \
python/t1000-example.py

Expand Down
Loading
Loading