Skip to content

Commit

Permalink
Merge pull request #417 from cmoussa1/issue#416
Browse files Browse the repository at this point in the history
plugin: move helper functions for `plugin.query` callback
  • Loading branch information
mergify[bot] authored Feb 14, 2024
2 parents 8e84ac3 + 881de2f commit b291ffb
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 159 deletions.
3 changes: 3 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ if test "X$PYTHON" = "X"; then
AC_MSG_ERROR([could not find python])
fi

# checks for packages
PKG_CHECK_MODULES([JANSSON], [jansson >= 2.10], [], [])

#
# Project directories
#
Expand Down
5 changes: 3 additions & 2 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ accounting_test01_t_SOURCES = \
plugins/test/accounting_test01.cpp \
plugins/accounting.cpp \
plugins/accounting.hpp
accounting_test01_t_CXXFLAGS = $(AM_CXXFLAGS) -I$(top_srcdir)
accounting_test01_t_CXXFLAGS = $(AM_CXXFLAGS) -I$(top_srcdir) $(JANSSON_CFLAGS)
accounting_test01_t_LDADD = \
common/libtap/libtap.la
common/libtap/libtap.la \
$(JANSSON_LIBS)

noinst_PROGRAMS = \
cmd/flux-account-update-fshare \
Expand Down
90 changes: 90 additions & 0 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,93 @@ Association* get_association (int userid,

return &bank_it->second;
}


json_t* Association::to_json () const
{
json_t *held_job_ids = json_array ();
if (!held_job_ids) {
return nullptr;
}
for (const auto &job_id : held_jobs) {
json_t *temp;
if (!(temp = json_integer (job_id))
|| json_array_append_new (held_job_ids, temp) < 0) {
json_decref (held_job_ids);
return nullptr;
}
}

json_t *user_queues = json_array ();
if (!user_queues) {
json_decref (held_job_ids);
return nullptr;
}
for (const auto &queue : queues) {
json_t *temp;
if (!(temp = json_string (queue.c_str ()))
|| json_array_append_new (user_queues, temp) < 0) {
json_decref (held_job_ids);
json_decref (user_queues);
return nullptr;
}
}

// 'o' steals the reference for both held_job_ids and user_queues
json_t *u = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i, s:o, s:o, s:i, s:i}",
"bank_name", bank_name.c_str (),
"fairshare", fairshare,
"max_run_jobs", max_run_jobs,
"cur_run_jobs", cur_run_jobs,
"max_active_jobs", max_active_jobs,
"cur_active_jobs", cur_active_jobs,
"held_jobs", held_job_ids,
"queues", user_queues,
"queue_factor", queue_factor,
"active", active);

if (!u)
return nullptr;

return u;
}


json_t* convert_map_to_json (std::map<int, std::map<std::string, Association>>
&users)
{
json_t *accounting_data = json_array ();
if (!accounting_data)
return nullptr;

// each Association in the users map is a pair; the first item is the
// userid and the second is a list of banks they belong to
for (const auto& association : users) {
json_t *banks = json_array ();
if (!banks) {
json_decref (accounting_data);
return nullptr;
}
for (const auto &bank : association.second) {
// bank.second refers to an Association object
json_t *b = bank.second.to_json ();
if (!b || json_array_append_new (banks, b) < 0) {
json_decref (accounting_data);
json_decref (banks);
return nullptr;
}
}

json_t *u = json_pack ("{siso}",
"userid",
association.first,
"banks", banks);
if (!u || json_array_append_new (accounting_data, u) < 0) {
json_decref (accounting_data);
json_decref (banks);
return nullptr;
}
}

return accounting_data;
}
16 changes: 16 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
\************************************************************/

// header file for the Accounting class
extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>
#include <flux/jobtap.h>
#include <jansson.h>
}

#ifndef ACCOUNTING_H
#define ACCOUNTING_H
Expand All @@ -21,6 +29,7 @@
// all attributes are per-user/bank
class Association {
public:
// attributes
std::string bank_name; // name of bank
double fairshare; // fair share value
int max_run_jobs; // max number of running jobs
Expand All @@ -31,6 +40,9 @@ class Association {
std::vector<std::string> queues; // list of accessible queues
int queue_factor; // priority factor associated with queue
int active; // active status

// methods
json_t* to_json () const; // convert object to JSON string
};

// get an Association object that points to user/bank in the users map;
Expand All @@ -41,4 +53,8 @@ Association* get_association (int userid,
&users,
std::map<int, std::string> &users_def_bank);

// iterate through the users map and construct a JSON object of each user/bank
json_t* convert_map_to_json (std::map<int, std::map<std::string, Association>>
&users);

#endif // ACCOUNTING_H
155 changes: 4 additions & 151 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,139 +203,6 @@ int check_queue_factor (flux_plugin_t *p,
}


/*
* Add held job IDs to a JSON array to be added to a bank_info JSON object.
*/
static json_t *add_held_jobs (
const std::pair<std::string, Association> &b)
{
json_t *held_jobs = NULL;

// add any held jobs to a JSON array
if (!(held_jobs = json_array ()))
goto error;

for (auto const &j : b.second.held_jobs) {
json_t *jobid = json_integer (j);

if (!jobid)
goto error;

if (json_array_append_new (held_jobs, jobid) < 0) {
json_decref (jobid);
goto error;
}
}

return held_jobs;
error:
json_decref (held_jobs);
return NULL;
}


/*
* Create a JSON object for a bank that a user belongs to.
*/
static json_t *pack_bank_info_object (
const std::pair<std::string, Association> &b)
{
json_t *bank_info, *held_jobs = NULL;

held_jobs = add_held_jobs (b);
if (held_jobs == NULL)
goto error;

if (!(bank_info = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i, s:o, s:i}",
"bank", b.first.c_str (),
"fairshare", b.second.fairshare,
"max_run_jobs", b.second.max_run_jobs,
"cur_run_jobs", b.second.cur_run_jobs,
"max_active_jobs", b.second.max_active_jobs,
"cur_active_jobs", b.second.cur_active_jobs,
"held_jobs", held_jobs,
"active", b.second.active))) {
goto error;
}

return bank_info;
error:
json_decref (held_jobs);
return NULL;
}


/*
* For each bank that a user belongs to, create a JSON object for each bank and
* add it to the user JSON object.
*/
static json_t *banks_to_json (
flux_plugin_t *p,
std::pair<int, std::map<std::string, Association>> &u)
{
json_t *bank_info, *banks = NULL;

banks = json_array (); // array of banks that user belongs to
if (!banks)
goto error;

for (auto const &b : u.second) {
bank_info = pack_bank_info_object (b); // JSON object for one bank
if (bank_info == NULL)
goto error;

if (json_array_append_new (banks, bank_info) < 0) {
json_decref (bank_info);
goto error;
}
}

return banks;
error:
json_decref (banks);
return NULL;
}


/*
* Iterate thrpugh each user in users map and create a JSON object for each
* user.
*/
static json_t *user_to_json (
flux_plugin_t *p,
std::pair<int, std::map<std::string, Association>> u)
{
json_t *user = json_object (); // JSON object for one user
json_t *userid, *banks = NULL;

if (!user)
return NULL;

userid = json_integer (u.first);
if (!userid)
goto error;

if (json_object_set_new (user, "userid", userid) < 0) {
json_decref (userid);
goto error;
}

banks = banks_to_json (p, u);
if (banks == NULL)
goto error;

if (json_object_set_new (user, "banks", banks) < 0) {
json_decref (banks);
goto error;
}

return user;
error:
json_decref (user);
return NULL;
}


// Scan the users map and look at each user's default bank to see if any one
// of them have a valid bank (i.e one that is not "DNE"; if any of the users do
// do have a valid bank, it will return false)
Expand Down Expand Up @@ -430,35 +297,21 @@ static int query_cb (flux_plugin_t *p,
void *data)
{
flux_t *h = flux_jobtap_get_flux (p);
json_t *all_users = json_array (); // array of user/bank combos
json_t *accounting_data = convert_map_to_json (users);

if (!all_users)
if (!accounting_data)
return -1;

for (auto const &u : users) {
json_t *user = user_to_json (p, u);
if (user == NULL) {
json_decref (all_users);
return -1;
}

if (json_array_append_new (all_users, user) < 0) {
json_decref (user);
json_decref (all_users);
return -1;
}
}

if (flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:O}",
"mf_priority_map",
all_users) < 0)
accounting_data) < 0)
flux_log_error (flux_jobtap_get_flux (p),
"mf_priority: query_cb: flux_plugin_arg_pack: %s",
flux_plugin_arg_strerror (args));

json_decref (all_users);
json_decref (accounting_data);

return 0;
}
Expand Down
12 changes: 10 additions & 2 deletions t/expected/plugin_state/internal_state_1.expected
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@
"userid": 1001,
"banks": [
{
"bank": "account1",
"bank_name": "account1",
"fairshare": 0.5,
"max_run_jobs": 2,
"cur_run_jobs": 0,
"max_active_jobs": 7,
"cur_active_jobs": 0,
"held_jobs": [],
"queues": [
""
],
"queue_factor": 0,
"active": 1
},
{
"bank": "account2",
"bank_name": "account2",
"fairshare": 0.5,
"max_run_jobs": 5,
"cur_run_jobs": 0,
"max_active_jobs": 7,
"cur_active_jobs": 0,
"held_jobs": [],
"queues": [
""
],
"queue_factor": 0,
"active": 1
}
]
Expand Down
Loading

0 comments on commit b291ffb

Please sign in to comment.