diff --git a/src/job-manager/plugins/dws-jobtap.c b/src/job-manager/plugins/dws-jobtap.c index 075628f..3d608ce 100644 --- a/src/job-manager/plugins/dws-jobtap.c +++ b/src/job-manager/plugins/dws-jobtap.c @@ -556,6 +556,61 @@ static int exception_cb (flux_plugin_t *p, return 0; } +/* + * Generate a new jobspec constraints object for a job so that it can avoid + * attempting to run on nodes attached to down rabbits. + */ +static json_t *generate_constraints(flux_t *h, flux_plugin_t *p, flux_jobid_t jobid, const char *exclude_str){ + flux_plugin_arg_t *args = flux_jobtap_job_lookup (p, jobid); + json_t *constraints = NULL; + json_t *not; + if (!args + || flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:{s:{s:{s?o}}}}", + "jobspec", + "attributes", + "system", + "constraints", + &constraints) + < 0) { + flux_log_error (h, "Failed to unpack args"); + flux_plugin_arg_destroy (args); + return NULL; + } + if (!constraints) { + if (!(constraints = json_pack ("{s:[{s:[s]}]}", "not", "hostlist", exclude_str))){ + flux_log_error (h, "Failed to create new constraints object"); + flux_plugin_arg_destroy (args); + return NULL; + } + flux_plugin_arg_destroy (args); + return constraints; + } + else { // deep copy the constraints because we don't want to modify it in-place + if (!(constraints = json_deep_copy (constraints))) { + flux_log_error (h, "Failed to deep copy constraints object"); + flux_plugin_arg_destroy (args); + return NULL; + } + } + flux_plugin_arg_destroy (args); + if (!(not = json_object_get (constraints, "not"))) { + if (json_object_set_new (constraints, "not", json_pack ("[{s:[s]}]", "hostlist", exclude_str)) < 0) { + flux_log_error (h, "Failed to create new NOT constraints object"); + json_decref (constraints); + return NULL; + } + return constraints; + } + if (json_array_append_new (not, json_pack ("{s:[s]}", "hostlist", exclude_str)) < 0) { + flux_log_error (h, "Failed to create new NOT constraints object"); + json_decref (constraints); + return NULL; + } + return constraints; +} + static void resource_update_msg_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, @@ -563,37 +618,53 @@ static void resource_update_msg_cb (flux_t *h, { flux_plugin_t *p = (flux_plugin_t *)arg; json_int_t jobid; - json_t *resources = NULL, *errmsg; + json_t *resources = NULL, *errmsg, *constraints = NULL; int copy_offload; - const char *errmsg_str; + const char *errmsg_str, *exclude_str; if (flux_msg_unpack (msg, - "{s:I, s:o, s:b, s:o}", + "{s:I, s:o, s:b, s:o, s:s}", "id", &jobid, "resources", &resources, "copy-offload", ©_offload, - "errmsg", &errmsg) + "errmsg", &errmsg, + "exclude", &exclude_str) < 0) { flux_log_error (h, "received malformed dws.resource-update RPC"); return; } + if (strlen(exclude_str) > 0) { + if (!(constraints = generate_constraints (h, p, jobid, exclude_str))) { + flux_log_error (h, "Could not generate exclusion hostlist"); + raise_job_exception (p, jobid, "dws", "Could not generate exclusion hostlist"); + return; + } + } if (!json_is_null (errmsg)) { if (!(errmsg_str = json_string_value (errmsg))){ flux_log_error (h, "received malformed dws.resource-update RPC, errmsg must be string or JSON null"); errmsg_str = ""; } raise_job_exception (p, jobid, "dws", errmsg_str); + json_decref (constraints); return; } - else if (flux_jobtap_jobspec_update_id_pack (p, + else if (flux_jobtap_job_aux_set (p, + jobid, + "flux::dws-copy-offload", + copy_offload ? (void*) 1 : (void*) 0, + NULL) < 0 + || flux_jobtap_jobspec_update_id_pack (p, (flux_jobid_t) jobid, - "{s: O}", "resources", - resources) < 0 - || flux_jobtap_job_aux_set (p, jobid, "flux::dws-copy-offload", copy_offload ? (void*) 1 : (void*) 0, NULL) < 0 ) { + "{s:O, s:o*}", + "resources", resources, + "attributes.system.constraints", constraints) < 0) { + flux_log_error (h, "could not update jobspec with new constraints and resources"); raise_job_exception (p, jobid, "dws", "Internal error: failed to update jobspec"); + json_decref (constraints); return; } if (flux_jobtap_dependency_remove (p, jobid, CREATE_DEP_NAME) < 0) { diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 0fc7476..126e1a0 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -40,6 +40,7 @@ WORKFLOW_NAME_PREFIX = "fluxjob-" WORKFLOW_NAME_FORMAT = WORKFLOW_NAME_PREFIX + "{jobid}" _MIN_ALLOCATION_SIZE = 4 # minimum rabbit allocation size +_EXCLUDE_HOSTS = Hostlist() _EXITCODE_NORESTART = 3 # exit code indicating to systemd not to restart @@ -500,6 +501,7 @@ def _workflow_state_change_cb_inner(workflow, winfo, handle, k8s_api, disable_fl "resources": resources, "copy-offload": copy_offload, "errmsg": errmsg, + "exclude": _EXCLUDE_HOSTS.encode(), }, ).then(log_rpc_response) save_workflow_to_kvs(handle, jobid, workflow) @@ -597,14 +599,21 @@ def drain_offline_nodes(handle, rabbit_name, nodelist, allowlist): ).then(log_rpc_response) -def mark_rabbit(handle, status, resource_path, ssdcount, name): +def mark_rabbit(handle, status, resource_path, ssdcount, name, disable_fluxion): """Send an RPC to mark a rabbit as up or down.""" if status == "Ready": LOGGER.debug("Marking rabbit %s as up", name) status = "up" + if disable_fluxion: + _EXCLUDE_HOSTS.delete(_RABBITS_TO_HOSTLISTS[name]) + return else: LOGGER.debug("Marking rabbit %s as down, status is %s", name, status) status = "down" + if disable_fluxion: + _EXCLUDE_HOSTS.append(_RABBITS_TO_HOSTLISTS[name]) + _EXCLUDE_HOSTS.uniq() + return for ssdnum in range(ssdcount): payload = {"resource_path": resource_path + f"/ssd{ssdnum}", "status": status} handle.rpc("sched-fluxion-resource.set_status", payload).then(log_rpc_response) @@ -622,14 +631,13 @@ def rabbit_state_change_cb(event, handle, rabbit_rpaths, disable_fluxion, allowl "Encountered an unknown Storage object '%s' in the event stream", name ) return - if not disable_fluxion: - try: - status = rabbit["status"]["status"] - except KeyError: - # if rabbit doesn't have a status, consider it down - mark_rabbit(handle, "Down", *rabbit_rpaths[name], name) - else: - mark_rabbit(handle, status, *rabbit_rpaths[name], name) + try: + status = rabbit["status"]["status"] + except KeyError: + # if rabbit doesn't have a status, consider it down + mark_rabbit(handle, "Down", *rabbit_rpaths[name], name, disable_fluxion) + else: + mark_rabbit(handle, status, *rabbit_rpaths[name], name, disable_fluxion) try: computes = rabbit["status"]["access"]["computes"] except KeyError: @@ -694,8 +702,8 @@ def init_rabbits(k8s_api, handle, watchers, disable_fluxion, drain_queues): ) if disable_fluxion: # don't mark the rabbit up or down but add the rabbit to the mapping - rabbit_rpaths[name] = None - elif name not in rabbit_rpaths: + rabbit_rpaths[name] = (None, None) + if name not in rabbit_rpaths: LOGGER.error( "Encountered an unknown Storage object '%s' in the event stream", name ) @@ -704,9 +712,11 @@ def init_rabbits(k8s_api, handle, watchers, disable_fluxion, drain_queues): rabbit_status = rabbit["status"]["status"] except KeyError: # if rabbit doesn't have a status, consider it down - mark_rabbit(handle, "Down", *rabbit_rpaths[name], name) + mark_rabbit(handle, "Down", *rabbit_rpaths[name], name, disable_fluxion) else: - mark_rabbit(handle, rabbit_status, *rabbit_rpaths[name], name) + mark_rabbit( + handle, rabbit_status, *rabbit_rpaths[name], name, disable_fluxion + ) # rabbits don't have a 'status' field until they boot try: computes = rabbit["status"]["access"]["computes"] diff --git a/t/dws-dependencies/coral2_dws.py b/t/dws-dependencies/coral2_dws.py index c476ae7..3e8a3f1 100644 --- a/t/dws-dependencies/coral2_dws.py +++ b/t/dws-dependencies/coral2_dws.py @@ -32,6 +32,7 @@ def create_cb(fh, t, msg, arg): "resources": msg.payload["resources"], "copy-offload": False, "errmsg": None, + "exclude": "", }, ) diff --git a/t/t1003-dws-nnf-watch.t b/t/t1003-dws-nnf-watch.t index 7288ba8..05f64cd 100755 --- a/t/t1003-dws-nnf-watch.t +++ b/t/t1003-dws-nnf-watch.t @@ -13,6 +13,7 @@ flux setattr log-stderr-level 1 DATA_DIR=${SHARNESS_TEST_SRCDIR}/data/nnf-watch/ DWS_MODULE_PATH=${FLUX_SOURCE_DIR}/src/modules/coral2_dws.py RPC=${FLUX_BUILD_DIR}/t/util/rpc +PLUGINPATH=${FLUX_BUILD_DIR}/src/job-manager/plugins/.libs if test_have_prereq NO_DWS_K8S; then skip_all='skipping DWS workflow tests due to no DWS K8s' @@ -209,8 +210,88 @@ test_expect_success 'return the storage resource to Live mode' ' kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Ready\"" ' +test_expect_success 'exec Storage watching script with --disable-fluxion' ' + flux cancel ${jobid} && + flux resource undrain compute-01 && + echo " +[rabbit] +drain_compute_nodes = false + " | flux config load && + flux jobtap load ${PLUGINPATH}/dws-jobtap.so && + jobid=$(flux submit \ + --setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws5.out \ + --error=dws5.err -o per-resource.type=node flux python ${DWS_MODULE_PATH} \ + -vvv --disable-fluxion) && + flux job wait-event -vt 15 -p guest.exec.eventlog ${jobid} shell.start && + flux job wait-event -vt 15 -m "note=dws watchers setup" ${jobid} exception && + ${RPC} "dws.watch_test" +' + +test_expect_success 'Storages are up and rabbit jobs can run' ' + kubectl get storages kind-worker2 -ojson | jq -e ".spec.state == \"Enabled\"" && + kubectl get storages kind-worker2 -ojson | jq -e ".status.status == \"Ready\"" && + kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Ready\"" && + kubectl get storages kind-worker3 -ojson | jq -e ".spec.state == \"Enabled\"" && + kubectl get storages kind-worker3 -ojson | jq -e ".status.status == \"Ready\"" && + JOBID=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs \ + name=project1" -N1 -n1 hostname) && + flux job wait-event -vt 10 ${JOBID} jobspec-update && + flux job wait-event -vt 10 ${JOBID} alloc && + flux job wait-event -vt 10 -m status=0 ${JOBID} finish && + flux job wait-event -vt 20 ${JOBID} clean && + flux job attach $JOBID +' + +test_expect_success 'update to the Storage status is caught by the watch' ' + kubectl patch storages kind-worker2 \ + --type merge --patch-file ${DATA_DIR}/down.yaml && + kubectl get storages kind-worker2 -ojson | jq -e ".spec.state == \"Disabled\"" && + sleep 0.2 && + kubectl get storages kind-worker2 -ojson | jq -e ".status.status == \"Disabled\"" && + kubectl patch storages kind-worker3 \ + --type merge --patch-file ${DATA_DIR}/down.yaml && + kubectl get storages kind-worker3 -ojson | jq -e ".spec.state == \"Disabled\"" && + sleep 0.2 && + kubectl get storages kind-worker3 -ojson | jq -e ".status.status == \"Disabled\"" && + sleep 3 +' + +test_expect_success 'rabbits now marked as down are not allocated' ' + JOBID=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs \ + name=project1" -N1 -n1 hostname) && + flux job wait-event -vt 10 ${JOBID} jobspec-update && + test_must_fail flux job wait-event -vt 3 ${JOBID} alloc && + flux job wait-event -vt 1 ${JOBID} exception && + flux job wait-event -vt 2 ${JOBID} clean +' + +test_expect_success 'revert the changes to the Storage' ' + kubectl patch storages kind-worker2 \ + --type merge --patch-file ${DATA_DIR}/up.yaml && + kubectl get storages kind-worker2 -ojson | jq -e ".spec.state == \"Enabled\"" && + sleep 0.2 && + kubectl get storages kind-worker2 -ojson | jq -e ".status.status == \"Ready\"" && + kubectl patch storages kind-worker3 \ + --type merge --patch-file ${DATA_DIR}/up.yaml && + kubectl get storages kind-worker3 -ojson | jq -e ".spec.state == \"Enabled\"" && + sleep 0.2 && + kubectl get storages kind-worker3 -ojson | jq -e ".status.status == \"Ready\"" && + sleep 1 +' + +test_expect_success 'rabbits now marked as up and can be allocated' ' + JOBID=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs \ + name=project1" -N1 -n1 hostname) && + flux jobs && flux resource list && + flux job wait-event -vt 10 ${JOBID} jobspec-update && + flux job wait-event -vt 5 ${JOBID} alloc && + flux job wait-event -vt 25 -m status=0 ${JOBID} finish + flux job wait-event -vt 20 ${JOBID} clean +' + test_expect_success 'unload fluxion' ' - flux cancel ${jobid}; flux module remove sched-fluxion-qmanager && + flux cancel ${jobid}; flux job wait-event -vt 1 ${jobid} clean && + flux module remove sched-fluxion-qmanager && flux module remove sched-fluxion-resource '