From 951e4b1bbe7eb08f20be84eb4183c1c1367a80e0 Mon Sep 17 00:00:00 2001 From: Matus Kral Date: Sat, 3 Jul 2021 19:13:33 +0000 Subject: [PATCH 1/4] - implement hooks - enable user to extend plotman's functionality / plotting flow, by providing external scripts, without need to touch plotmans internals - many of current feature requests can be solved with lighweight scripts (bash/python/...), e.g. #712, #711, #677, #638, #582 ..., - process cpu and io niceness can also be manipulated, affinity to cpu or NUMA on big server hardware can be set differently based on stage or plotter - all new code is in new library 'hooks' + two new shell scripts directly inside hooks.d directory (located inside plotman's own config directory - supplied hook serves as reference implementation / example, providing functionality for #677 - modification of existing plotman's code is only to call entrypoint in hooks.py and pass current jobs list containing jobs objects - currently manager.py maybe_start_new_plot() and plotman.py kill is injected with call into hooks.try_run() and hooks.run() respectively - try_run consumes fully refreshed jobs[], compares phase to previous job phase and if it is changed, calls hooks.run() - run() takes plotmans environment, extends it with particular job's metadata and calls all executable files from hooks.d directory having extension .sh or .py - scripts are called synchronously, cmd exec is waiting until the script process returns. that means the implementation is not suitable for LONG running actions. - anyhow, plotman CLI can be called without issues from within the hooks, recursion in job_refresh -> try_run -> hooks script -> plotman cmd is being checked for new: src/plotman/hooks.py src/plotman/resources/hooks.d update: src/plotman/configuration.py src/plotman/manager.py setup.cfg --- setup.cfg | 1 + src/plotman/configuration.py | 6 +- src/plotman/hooks.py | 113 ++++++++++++++++++ src/plotman/manager.py | 4 +- src/plotman/plotman.py | 4 +- src/plotman/resources/hooks.d/.lib.include | 23 ++++ .../hooks.d/01-check-dstdir-free.sh.example | 34 ++++++ 7 files changed, 180 insertions(+), 5 deletions(-) create mode 100644 src/plotman/hooks.py create mode 100644 src/plotman/resources/hooks.d/.lib.include create mode 100755 src/plotman/resources/hooks.d/01-check-dstdir-free.sh.example diff --git a/setup.cfg b/setup.cfg index 739c8161..9b6ff97c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -75,6 +75,7 @@ checks = [options.data_files] config = src/plotman/resources/plotman.yaml + src/plotman/resources/hooks.d bin = util/listlogs [isort] diff --git a/src/plotman/configuration.py b/src/plotman/configuration.py index d1fbb1c5..8d5a54be 100644 --- a/src/plotman/configuration.py +++ b/src/plotman/configuration.py @@ -26,10 +26,10 @@ class ConfigurationException(Exception): """Raised when plotman.yaml configuration is missing or malformed.""" -def get_path() -> str: - """Return path to where plotman.yaml configuration file should exist.""" +def get_path(subject="plotman.yaml") -> str: + """Return location of configuration files (e.g. plotman.yaml).""" config_dir: str = appdirs.user_config_dir("plotman") - return config_dir + "/plotman.yaml" + return config_dir + "/" + subject def read_configuration_text(config_path: str) -> str: diff --git a/src/plotman/hooks.py b/src/plotman/hooks.py new file mode 100644 index 00000000..33f6434a --- /dev/null +++ b/src/plotman/hooks.py @@ -0,0 +1,113 @@ +import os +import subprocess +from pathlib import Path + +from plotman import configuration + + +class JobPhases(object): + '''Holds jobs last known phase progress with simple helper methods.''' + _job_ph = dict() + + @classmethod + def changed(cls, job): + """Checks provided job's phase agains its last known phase. + returns True if job's phase changed, or job was just created. + returns False if job is known and phase matches phase on state.""" + if not job.progress(): + return False + return not cls._job_ph.get(job.plot_id) or cls._job_ph[job.plot_id] != job.progress() + + @classmethod + def update(cls, job_ph): + """Updates internal state with new 'last known' job phases""" + if job_ph: + cls._job_ph = job_ph + + @classmethod + def progress(cls, plot_id): + """Returns job's last known Phase as provided by Job.progress()""" + return cls._job_ph.get(plot_id) + + +def run_cmd(command, env): + try: + result = subprocess.run(command, capture_output=True, env=env) + except Exception as ex: + return 1, "", str(ex) + + return result.returncode, result.stdout, result.stderr + + +def try_run(jobs): + """Iterates over jobs gathered during refresh, executes hooks.d + if phase was changed and updates last known phase info for next iteration.""" + phases = dict() + + for job in jobs: + if job.progress() is None: + continue + + phases[job.plot_id] = job.progress() + if not JobPhases().changed(job): + continue + + run(job) + + JobPhases().update(phases) + + +def prepare_env(job, hooks_path): + """Prepares env dict for the provided job""" + + environment = os.environ.copy() + environment['PLOTMAN_HOOKS'] = hooks_path + environment['PLOTMAN_PLOTID'] = job.plot_id + environment['PLOTMAN_PID'] = str(job.proc.pid) + environment['PLOTMAN_TMPDIR'] = job.tmpdir + environment['PLOTMAN_TMP2DIR'] = job.tmp2dir + environment['PLOTMAN_DSTDIR'] = job.dstdir + environment['PLOTMAN_LOGFILE'] = job.logfile + environment['PLOTMAN_STATUS'] = job.get_run_status() + environment['PLOTMAN_PHASE'] = str(job.progress().major) + ':' + str(job.progress().minor) + + old_phase = JobPhases().progress(job.plot_id) + if old_phase: + old_phase = str(JobPhases().progress(job.plot_id).major) + ':' + str(JobPhases().progress(job.plot_id).minor) + else: + old_phase = str(old_phase) + environment['PLOTMAN_PHASE_PREV'] = old_phase + + return environment + + +def run(job, trigger="PHASE"): + """Runs all scripts in alphabetical order from the hooks.d directory + for the provided job. + + Job's internal state is added to the Plotman's own environment. + Folowing env VARIABLES are exported: + - PLOTMAN_PLOTID (id of the plot) + - PLOTMAN_PID (pid of the process) + - PLOTMAN_TMPDIR (tmp dir [-t]) + - PLOTMAN_TMP2DIR (tmp2 dir [-2]) + - PLOTMAN_DSTDIR (dst dir [-d]) + - PLOTMAN_LOGFILE (logfile) + - PLOTMAN_STATUS (current state of the process, e.g. RUN, STP - check job class for details) + - PLOTMAN_PHASE (phase, "major:minor" - two numbers, colon delimited) + - PLOTMAN_PHASE_PREV (phase, previous if known, or "None") + - PLOTMAN_TRIGGER (action, which triggered hooks. currently one of "PHASE"-change or "KILL") + """ + + hooks_path = configuration.get_path('hooks.d') + + if os.getenv('PLOTMAN_HOOKS') is not None: + return + + environment = prepare_env(job, hooks_path) + environment['PLOTMAN_TRIGGER'] = trigger + + for e in ['*.py','*.sh']: + for script in Path(hooks_path).glob(e): + rc, stdout, stderr = run_cmd([str(script)], environment) + diff --git a/src/plotman/manager.py b/src/plotman/manager.py index efe6a2c5..b0b9dba3 100644 --- a/src/plotman/manager.py +++ b/src/plotman/manager.py @@ -15,7 +15,7 @@ # Plotman libraries from plotman import \ archive # for get_archdir_freebytes(). TODO: move to avoid import loop -from plotman import job, plot_util +from plotman import job, plot_util, hooks import plotman.configuration # Constants @@ -93,6 +93,8 @@ def phases_permit_new_job(phases: typing.List[job.Phase], d: str, sched_cfg: plo def maybe_start_new_plot(dir_cfg: plotman.configuration.Directories, sched_cfg: plotman.configuration.Scheduling, plotting_cfg: plotman.configuration.Plotting, log_cfg: plotman.configuration.Logging) -> typing.Tuple[bool, str]: jobs = job.Job.get_running_jobs(log_cfg.plots) + hooks.try_run(jobs) + wait_reason = None # If we don't start a job this iteration, this says why. youngest_job_age = min(jobs, key=job.Job.get_time_wall).get_time_wall() if jobs else MAX_AGE diff --git a/src/plotman/plotman.py b/src/plotman/plotman.py index 5312b78c..bed9f498 100755 --- a/src/plotman/plotman.py +++ b/src/plotman/plotman.py @@ -15,7 +15,7 @@ import pendulum # Plotman libraries -from plotman import analyzer, archive, configuration, interactive, manager, plot_util, reporting, csv_exporter +from plotman import analyzer, archive, configuration, interactive, manager, plot_util, reporting, csv_exporter, hooks from plotman import resources as plotman_resources from plotman.job import Job @@ -331,6 +331,8 @@ def main() -> None: for f in temp_files: os.remove(f) + hooks.run(job, "KILL") + elif args.cmd == 'suspend': print('Suspending ' + job.plot_id) job.suspend() diff --git a/src/plotman/resources/hooks.d/.lib.include b/src/plotman/resources/hooks.d/.lib.include new file mode 100644 index 00000000..07cb52ac --- /dev/null +++ b/src/plotman/resources/hooks.d/.lib.include @@ -0,0 +1,23 @@ +#!/bin/sh + +log() { + local dd=$(date --iso-8601=seconds) + local logee="$(basename $0)$(printf "(%0.8s)" ${PLOTMAN_PLOTID})" + local severity="$1" + shift + + printf "%s [%-10s] %s: %s\n" "${dd%+*}" "$logee" "$severity" "$*" +} + +logInfo() { + log INFO $* +} + +logError() { + log ERROR $* >&2 +} + +### plot k32 size in bytes 101.5 * GiB === 108984795136 bytes +k32PLOTSIZE=$((1015 *1024 *1024 *1024 /10)) + +exec >>/tmp/plotman-hooks.log 2>&1 diff --git a/src/plotman/resources/hooks.d/01-check-dstdir-free.sh.example b/src/plotman/resources/hooks.d/01-check-dstdir-free.sh.example new file mode 100755 index 00000000..fc69c792 --- /dev/null +++ b/src/plotman/resources/hooks.d/01-check-dstdir-free.sh.example @@ -0,0 +1,34 @@ +#!/bin/sh + +. ${PLOTMAN_HOOKS}/.lib.include + +[ "${PLOTMAN_TRIGGER}" = "PHASE" ] || exit 0 + +logInfo "processing plot id ${PLOTMAN_PLOTID} (ph: ${PLOTMAN_PHASE})" + +### if jobs phase is below 3:6, do nothing +[ x"${PLOTMAN_PHASE}" = x3:6 -o x"${PLOTMAN_PHASE}" = x3:7 ] || { + logInfo "plot not at phase 3:6/7, hook done" + exit 0 +} + + +### get available space on DSTDIR in bytes, check result. +### exit 1 if something went wrong +AVAIL=$(df -B1 --output=avail ${PLOTMAN_DSTDIR} | grep -oE '[0-9]+') +[ -n "${AVAIL}" -a $? -eq 0 ] || { + logError "something went wrong while checking available space at ${PLOTMAN_DSTDIR}, suspending job and exiting." + kill -STOP ${PLOTMAN_PID} + exit 1 +} + + +### continue only if available space is less then plot size +[ "${AVAIL}" -lt "${k32PLOTSIZE}" ] || { + logInfo "there is ${AVAIL} available space at ${PLOTMAN_DSTDIR}, which is > k32plot ${k32PLOTSIZE}. job can continue." + exit 0 +} + +logError "not enough available space at ${PLOTMAN_DSTDIR}. suspending job." +kill -STOP ${PLOTMAN_PID} + From 1dd5ca404cd4b432d19bf2174db71068a043deda Mon Sep 17 00:00:00 2001 From: Matus Kral Date: Sun, 4 Jul 2021 11:48:07 +0000 Subject: [PATCH 2/4] - add some sanitization / checks to ENV variables needed for proper hook scripts run - to prevent expansion of paths / params to simple * e.g. $PLOTMAN_DSTDIR/*$PLOTMAN_PLOTID* -> /* if $VARS would be missing --- src/plotman/hooks.py | 8 +++++--- src/plotman/resources/hooks.d/.lib.include | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/plotman/hooks.py b/src/plotman/hooks.py index 33f6434a..96c5cb62 100644 --- a/src/plotman/hooks.py +++ b/src/plotman/hooks.py @@ -64,9 +64,11 @@ def prepare_env(job, hooks_path): environment['PLOTMAN_HOOKS'] = hooks_path environment['PLOTMAN_PLOTID'] = job.plot_id environment['PLOTMAN_PID'] = str(job.proc.pid) - environment['PLOTMAN_TMPDIR'] = job.tmpdir - environment['PLOTMAN_TMP2DIR'] = job.tmp2dir - environment['PLOTMAN_DSTDIR'] = job.dstdir + environment['PLOTMAN_TMPDIR'] = environment['PLOTMAN_TMP2DIR'] = environment['PLOTMAN_DSTDIR'] = job.tmpdir + if job.tmp2dir is not None and job.tmp2dir != '': + environment['PLOTMAN_TMP2DIR'] = job.tmp2dir + if job.dstdir is not None and job.dstdir != '': + environment['PLOTMAN_DSTDIR'] = job.dstdir environment['PLOTMAN_LOGFILE'] = job.logfile environment['PLOTMAN_STATUS'] = job.get_run_status() environment['PLOTMAN_PHASE'] = str(job.progress().major) + ':' + str(job.progress().minor) diff --git a/src/plotman/resources/hooks.d/.lib.include b/src/plotman/resources/hooks.d/.lib.include index 07cb52ac..b75957ec 100644 --- a/src/plotman/resources/hooks.d/.lib.include +++ b/src/plotman/resources/hooks.d/.lib.include @@ -17,7 +17,26 @@ logError() { log ERROR $* >&2 } +sanityChecks() { + for p in PLOTID PID TMPDIR TMP2DIR DSTDIR LOGFILE STATUS PHASE; do + eval [ -n "\${PLOTMAN_$p}" ] || { + logError "PLOTMAN_$p variable missing or empty. Is $(basename $0) being started by Plotman?" + exit 1 + } + done + + export PLOTMAN_DSTDIR=${PLOTMAN_DSTDIR%/} + export PLOTMAN_TMPDIR=${PLOTMAN_TMPDIR%/} + export PLOTMAN_TMP2DIR=${PLOTMAN_TMP2DIR%/} +} + ### plot k32 size in bytes 101.5 * GiB === 108984795136 bytes k32PLOTSIZE=$((1015 *1024 *1024 *1024 /10)) +for incl in $(ls ${PLOTMAN_HOOKS}/.lib.include.* 2>/dev/null); do + . $incl +done + exec >>/tmp/plotman-hooks.log 2>&1 + +sanityChecks From f8e788061806d2ef3785268008775c30e67fe2b6 Mon Sep 17 00:00:00 2001 From: Matus Kral Date: Sun, 4 Jul 2021 11:53:24 +0000 Subject: [PATCH 3/4] - add simple "hello world" kind of hook which logs new plotting process --- src/plotman/resources/hooks.d/00-demo-log-new-plotter.sh | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100755 src/plotman/resources/hooks.d/00-demo-log-new-plotter.sh diff --git a/src/plotman/resources/hooks.d/00-demo-log-new-plotter.sh b/src/plotman/resources/hooks.d/00-demo-log-new-plotter.sh new file mode 100755 index 00000000..74fc85ea --- /dev/null +++ b/src/plotman/resources/hooks.d/00-demo-log-new-plotter.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +. ${PLOTMAN_HOOKS}/.lib.include + +[ "${PLOTMAN_TRIGGER}" = "PHASE" ] || exit 0 + +[ x"${PLOTMAN_PHASE}" = x0:0 ] || exit 0 + +logInfo "New plotter process with pid: ${PLOTMAN_PID}, id: ${PLOTMAN_PLOTID} was just started" From 82e22b69c6c1cdb0c779107330d7a8fcff98dd38 Mon Sep 17 00:00:00 2001 From: Matus Kral Date: Sun, 4 Jul 2021 11:53:57 +0000 Subject: [PATCH 4/4] - implement post kill cleanup based on plotid wildcard search through dst/tmp/tmp2 dirs (#582,...) --- .../hooks.d/99-post-kill-cleanup.sh.example | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100755 src/plotman/resources/hooks.d/99-post-kill-cleanup.sh.example diff --git a/src/plotman/resources/hooks.d/99-post-kill-cleanup.sh.example b/src/plotman/resources/hooks.d/99-post-kill-cleanup.sh.example new file mode 100755 index 00000000..3ce7d1dc --- /dev/null +++ b/src/plotman/resources/hooks.d/99-post-kill-cleanup.sh.example @@ -0,0 +1,19 @@ +#!/bin/sh + +. ${PLOTMAN_HOOKS}/.lib.include + +[ "${PLOTMAN_TRIGGER}" = "KILL" ] || exit 0 + +logInfo "${PLOTMAN_TRIGGER} triggered processing plot id ${PLOTMAN_PLOTID} (ph: ${PLOTMAN_PHASE})" + +for d in "${PLOTMAN_DSTDIR}" "${PLOTMAN_TMPDIR}" "${PLOTMAN_TMP2DIR}"; do + + ### avoid using find, grep, xargs - all those can be various implementations + ### behaving differently for corner cases like spaces in file name, or can + ### implement different set of arguments (POSIX vs BSD variants, busybox, ....) + ### so do it the legacy way + ls "${d}/"*${PLOTMAN_PLOTID}* | while read f; do + rm -v "$f" + done +done +