Skip to content

Commit

Permalink
- implement hooks
Browse files Browse the repository at this point in the history
- enable user to extend plotman's functionality / plotting flow,
  by providing external scripts, without need to touch plotmans
  internals
- many of current feature requests can be solved with lighweight
  scripts (bash/python/...), e.g. #712, #711, #677, #638, #582
  ...,
- process cpu and io niceness can also be manipulated, affinity
  to cpu or NUMA on big server hardware can be set differently
  based on stage or plotter

- all new code is in new library 'hooks' + two new shell scripts
  directly inside hooks.d directory (located inside plotman's
  own config directory
- supplied hook serves as reference implementation / example,
  providing functionality for #677
- modification of existing plotman's code is only to call entrypoint
  in hooks.py and pass current jobs list containing jobs objects

- currently manager.py maybe_start_new_plot() and plotman.py kill
  is injected with call into hooks.try_run() and hooks.run() respectively
- try_run consumes fully refreshed jobs[], compares phase to previous
  job phase and if it is changed, calls hooks.run()
- run() takes plotmans environment, extends it with particular job's
  metadata and calls all executable files from hooks.d directory
  having extension .sh or .py

- scripts are called synchronously, cmd exec is waiting until the
  script process returns. that means the implementation is not suitable
  for LONG running actions.
- anyhow, plotman CLI can be called without issues from within the hooks,
  recursion in job_refresh -> try_run -> hooks script -> plotman cmd
  is being checked for

new:	src/plotman/hooks.py
	src/plotman/resources/hooks.d

update: src/plotman/configuration.py
	src/plotman/manager.py
	setup.cfg
  • Loading branch information
mk01 committed Jul 3, 2021
1 parent 24d0012 commit 97d901c
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 5 deletions.
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ checks =

[options.data_files]
config = src/plotman/resources/plotman.yaml
src/plotman/resources/hooks.d/.lib.include
src/plotman/resources/hooks.d/01-check-dstdir-free.sh.example
bin = util/listlogs

[isort]
Expand Down
6 changes: 3 additions & 3 deletions src/plotman/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class ConfigurationException(Exception):
"""Raised when plotman.yaml configuration is missing or malformed."""


def get_path() -> str:
"""Return path to where plotman.yaml configuration file should exist."""
def get_path(subject="plotman.yaml") -> str:
"""Return location of configuration files (e.g. plotman.yaml)."""
config_dir: str = appdirs.user_config_dir("plotman")
return config_dir + "/plotman.yaml"
return config_dir + "/" + subject


def read_configuration_text(config_path: str) -> str:
Expand Down
113 changes: 113 additions & 0 deletions src/plotman/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
import subprocess
from pathlib import Path

from plotman import configuration


class JobPhases(object):
'''Holds jobs last known phase progress with simple helper methods.'''
_job_ph = dict()

@classmethod
def changed(cls, job):
"""Checks provided job's phase agains its last known phase.
returns True if job's phase changed, or job was just created.
returns False if job is known and phase matches phase on state."""
if not job.progress():
return False
return not cls._job_ph.get(job.plot_id) or cls._job_ph[job.plot_id] != job.progress()

@classmethod
def update(cls, job_ph):
"""Updates internal state with new 'last known' job phases"""
if job_ph:
cls._job_ph = job_ph

@classmethod
def progress(cls, plot_id):
"""Returns job's last known Phase as provided by Job.progress()"""
return cls._job_ph.get(plot_id)


def run_cmd(command, env):
try:
result = subprocess.run(command, capture_output=True, env=env)
except Exception as ex:
return 1, "", str(ex)

return result.returncode, result.stdout, result.stderr


def try_run(jobs):
"""Iterates over jobs gathered during refresh, executes hooks.d
if phase was changed and updates last known phase info for next iteration."""
phases = dict()

for job in jobs:
if job.progress() is None:
continue

phases[job.plot_id] = job.progress()
if not JobPhases().changed(job):
continue

run(job)

JobPhases().update(phases)


def prepare_env(job, hooks_path):
"""Prepares env dict for the provided job"""

environment = os.environ.copy()
environment['PLOTMAN_HOOKS'] = hooks_path
environment['PLOTMAN_PLOTID'] = job.plot_id
environment['PLOTMAN_PID'] = str(job.proc.pid)
environment['PLOTMAN_TMPDIR'] = job.tmpdir
environment['PLOTMAN_TMP2DIR'] = job.tmp2dir
environment['PLOTMAN_DSTDIR'] = job.dstdir
environment['PLOTMAN_LOGFILE'] = job.logfile
environment['PLOTMAN_STATUS'] = job.get_run_status()
environment['PLOTMAN_PHASE'] = str(job.progress().major) + ':' + str(job.progress().minor)

old_phase = JobPhases().progress(job.plot_id)
if old_phase:
old_phase = str(JobPhases().progress(job.plot_id).major) + ':' + str(JobPhases().progress(job.plot_id).minor)
else:
old_phase = str(old_phase)
environment['PLOTMAN_PHASE_PREV'] = old_phase

return environment


def run(job, trigger="PHASE"):
"""Runs all scripts in alphabetical order from the hooks.d directory
for the provided job.
Job's internal state is added to the Plotman's own environment.
Folowing env VARIABLES are exported:
- PLOTMAN_PLOTID (id of the plot)
- PLOTMAN_PID (pid of the process)
- PLOTMAN_TMPDIR (tmp dir [-t])
- PLOTMAN_TMP2DIR (tmp2 dir [-2])
- PLOTMAN_DSTDIR (dst dir [-d])
- PLOTMAN_LOGFILE (logfile)
- PLOTMAN_STATUS (current state of the process, e.g. RUN, STP - check job class for details)
- PLOTMAN_PHASE (phase, "major:minor" - two numbers, colon delimited)
- PLOTMAN_PHASE_PREV (phase, previous if known, or "None")
- PLOTMAN_TRIGGER (action, which triggered hooks. currently one of "PHASE"-change or "KILL")
"""

hooks_path = configuration.get_path('hooks.d')

if os.getenv('PLOTMAN_HOOKS') is not None:
return

environment = prepare_env(job, hooks_path)
environment['PLOTMAN_TRIGGER'] = trigger

for e in ['*.py','*.sh']:
for script in Path(hooks_path).glob(e):
rc, stdout, stderr = run_cmd([str(script)], environment)

4 changes: 3 additions & 1 deletion src/plotman/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# Plotman libraries
from plotman import \
archive # for get_archdir_freebytes(). TODO: move to avoid import loop
from plotman import job, plot_util
from plotman import job, plot_util, hooks
import plotman.configuration

# Constants
Expand Down Expand Up @@ -93,6 +93,8 @@ def phases_permit_new_job(phases: typing.List[job.Phase], d: str, sched_cfg: plo
def maybe_start_new_plot(dir_cfg: plotman.configuration.Directories, sched_cfg: plotman.configuration.Scheduling, plotting_cfg: plotman.configuration.Plotting, log_cfg: plotman.configuration.Logging) -> typing.Tuple[bool, str]:
jobs = job.Job.get_running_jobs(log_cfg.plots)

hooks.try_run(jobs)

wait_reason = None # If we don't start a job this iteration, this says why.

youngest_job_age = min(jobs, key=job.Job.get_time_wall).get_time_wall() if jobs else MAX_AGE
Expand Down
4 changes: 3 additions & 1 deletion src/plotman/plotman.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pendulum

# Plotman libraries
from plotman import analyzer, archive, configuration, interactive, manager, plot_util, reporting, csv_exporter
from plotman import analyzer, archive, configuration, interactive, manager, plot_util, reporting, csv_exporter, hooks
from plotman import resources as plotman_resources
from plotman.job import Job

Expand Down Expand Up @@ -331,6 +331,8 @@ def main() -> None:
for f in temp_files:
os.remove(f)

hooks.run(job, "KILL")

elif args.cmd == 'suspend':
print('Suspending ' + job.plot_id)
job.suspend()
Expand Down
23 changes: 23 additions & 0 deletions src/plotman/resources/hooks.d/.lib.include
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/sh

log() {
local dd=$(date --iso-8601=seconds)
local logee="$(basename $0)$(printf "(%0.8s)" ${PLOTMAN_PLOTID})"
local severity="$1"
shift

printf "%s [%-10s] %s: %s\n" "${dd%+*}" "$logee" "$severity" "$*"
}

logInfo() {
log INFO $*
}

logError() {
log ERROR $* >&2
}

### plot k32 size in bytes 101.5 * GiB === 108984795136 bytes
k32PLOTSIZE=$((1015 *1024 *1024 *1024 /10))

exec >>/tmp/plotman-hooks.log 2>&1
34 changes: 34 additions & 0 deletions src/plotman/resources/hooks.d/01-check-dstdir-free.sh.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/sh

. ${PLOTMAN_HOOKS}/.lib.include

[ "${PLOTMAN_TRIGGER}" = "PHASE" ] || exit 0

logInfo "processing plot id ${PLOTMAN_PLOTID} (ph: ${PLOTMAN_PHASE})"

### if jobs phase is below 3:6, do nothing
[ x"${PLOTMAN_PHASE}" = x3:6 -o x"${PLOTMAN_PHASE}" = x3:7 ] || {
logInfo "plot not at phase 3:6/7, hook done"
exit 0
}


### get available space on DSTDIR in bytes, check result.
### exit 1 if something went wrong
AVAIL=$(df -B1 --output=avail ${PLOTMAN_DSTDIR} | grep -oE '[0-9]+')
[ -n "${AVAIL}" -a $? -eq 0 ] || {
logError "something went wrong while checking available space at ${PLOTMAN_DSTDIR}, suspending job and exiting."
kill -STOP ${PLOTMAN_PID}
exit 1
}


### continue only if available space is less then plot size
[ "${AVAIL}" -lt "${k32PLOTSIZE}" ] || {
logInfo "there is ${AVAIL} available space at ${PLOTMAN_DSTDIR}, which is > k32plot ${k32PLOTSIZE}. job can continue."
exit 0
}

logError "not enough available space at ${PLOTMAN_DSTDIR}. suspending job."
kill -STOP ${PLOTMAN_PID}

0 comments on commit 97d901c

Please sign in to comment.