Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

- implement hooks #826

Open
wants to merge 4 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ checks =

[options.data_files]
config = src/plotman/resources/plotman.yaml
src/plotman/resources/hooks.d
bin = util/listlogs

[isort]
Expand Down
6 changes: 3 additions & 3 deletions src/plotman/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class ConfigurationException(Exception):
"""Raised when plotman.yaml configuration is missing or malformed."""


def get_path() -> str:
"""Return path to where plotman.yaml configuration file should exist."""
def get_path(subject="plotman.yaml") -> str:
"""Return location of configuration files (e.g. plotman.yaml)."""
config_dir: str = appdirs.user_config_dir("plotman")
return config_dir + "/plotman.yaml"
return config_dir + "/" + subject


def read_configuration_text(config_path: str) -> str:
Expand Down
115 changes: 115 additions & 0 deletions src/plotman/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import os
import subprocess
from pathlib import Path

from plotman import configuration


class JobPhases(object):
'''Holds jobs last known phase progress with simple helper methods.'''
_job_ph = dict()

@classmethod
def changed(cls, job):
"""Checks provided job's phase agains its last known phase.
returns True if job's phase changed, or job was just created.
returns False if job is known and phase matches phase on state."""
if not job.progress():
return False
return not cls._job_ph.get(job.plot_id) or cls._job_ph[job.plot_id] != job.progress()

@classmethod
def update(cls, job_ph):
"""Updates internal state with new 'last known' job phases"""
if job_ph:
cls._job_ph = job_ph

@classmethod
def progress(cls, plot_id):
"""Returns job's last known Phase as provided by Job.progress()"""
return cls._job_ph.get(plot_id)


def run_cmd(command, env):
try:
result = subprocess.run(command, capture_output=True, env=env)
except Exception as ex:
return 1, "", str(ex)

return result.returncode, result.stdout, result.stderr


def try_run(jobs):
"""Iterates over jobs gathered during refresh, executes hooks.d
if phase was changed and updates last known phase info for next iteration."""
phases = dict()

for job in jobs:
if job.progress() is None:
continue

phases[job.plot_id] = job.progress()
if not JobPhases().changed(job):
continue

run(job)

JobPhases().update(phases)


def prepare_env(job, hooks_path):
"""Prepares env dict for the provided job"""

environment = os.environ.copy()
environment['PLOTMAN_HOOKS'] = hooks_path
environment['PLOTMAN_PLOTID'] = job.plot_id
environment['PLOTMAN_PID'] = str(job.proc.pid)
environment['PLOTMAN_TMPDIR'] = environment['PLOTMAN_TMP2DIR'] = environment['PLOTMAN_DSTDIR'] = job.tmpdir
if job.tmp2dir is not None and job.tmp2dir != '':
environment['PLOTMAN_TMP2DIR'] = job.tmp2dir
if job.dstdir is not None and job.dstdir != '':
environment['PLOTMAN_DSTDIR'] = job.dstdir
environment['PLOTMAN_LOGFILE'] = job.logfile
environment['PLOTMAN_STATUS'] = job.get_run_status()
environment['PLOTMAN_PHASE'] = str(job.progress().major) + ':' + str(job.progress().minor)

old_phase = JobPhases().progress(job.plot_id)
if old_phase:
old_phase = str(JobPhases().progress(job.plot_id).major) + ':' + str(JobPhases().progress(job.plot_id).minor)
else:
old_phase = str(old_phase)
environment['PLOTMAN_PHASE_PREV'] = old_phase

return environment


def run(job, trigger="PHASE"):
"""Runs all scripts in alphabetical order from the hooks.d directory
for the provided job.

Job's internal state is added to the Plotman's own environment.
Folowing env VARIABLES are exported:
- PLOTMAN_PLOTID (id of the plot)
- PLOTMAN_PID (pid of the process)
- PLOTMAN_TMPDIR (tmp dir [-t])
- PLOTMAN_TMP2DIR (tmp2 dir [-2])
- PLOTMAN_DSTDIR (dst dir [-d])
- PLOTMAN_LOGFILE (logfile)
- PLOTMAN_STATUS (current state of the process, e.g. RUN, STP - check job class for details)
- PLOTMAN_PHASE (phase, "major:minor" - two numbers, colon delimited)
- PLOTMAN_PHASE_PREV (phase, previous if known, or "None")
- PLOTMAN_TRIGGER (action, which triggered hooks. currently one of "PHASE"-change or "KILL")
"""

hooks_path = configuration.get_path('hooks.d')

if os.getenv('PLOTMAN_HOOKS') is not None:
return

environment = prepare_env(job, hooks_path)
environment['PLOTMAN_TRIGGER'] = trigger

for e in ['*.py','*.sh']:
for script in Path(hooks_path).glob(e):
rc, stdout, stderr = run_cmd([str(script)], environment)

4 changes: 3 additions & 1 deletion src/plotman/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# Plotman libraries
from plotman import \
archive # for get_archdir_freebytes(). TODO: move to avoid import loop
from plotman import job, plot_util
from plotman import job, plot_util, hooks
import plotman.configuration

# Constants
Expand Down Expand Up @@ -93,6 +93,8 @@ def phases_permit_new_job(phases: typing.List[job.Phase], d: str, sched_cfg: plo
def maybe_start_new_plot(dir_cfg: plotman.configuration.Directories, sched_cfg: plotman.configuration.Scheduling, plotting_cfg: plotman.configuration.Plotting, log_cfg: plotman.configuration.Logging) -> typing.Tuple[bool, str]:
jobs = job.Job.get_running_jobs(log_cfg.plots)

hooks.try_run(jobs)

wait_reason = None # If we don't start a job this iteration, this says why.

youngest_job_age = min(jobs, key=job.Job.get_time_wall).get_time_wall() if jobs else MAX_AGE
Expand Down
4 changes: 3 additions & 1 deletion src/plotman/plotman.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pendulum

# Plotman libraries
from plotman import analyzer, archive, configuration, interactive, manager, plot_util, reporting, csv_exporter
from plotman import analyzer, archive, configuration, interactive, manager, plot_util, reporting, csv_exporter, hooks
from plotman import resources as plotman_resources
from plotman.job import Job

Expand Down Expand Up @@ -331,6 +331,8 @@ def main() -> None:
for f in temp_files:
os.remove(f)

hooks.run(job, "KILL")

elif args.cmd == 'suspend':
print('Suspending ' + job.plot_id)
job.suspend()
Expand Down
42 changes: 42 additions & 0 deletions src/plotman/resources/hooks.d/.lib.include
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/sh

log() {
local dd=$(date --iso-8601=seconds)
local logee="$(basename $0)$(printf "(%0.8s)" ${PLOTMAN_PLOTID})"
local severity="$1"
shift

printf "%s [%-10s] %s: %s\n" "${dd%+*}" "$logee" "$severity" "$*"
}

logInfo() {
log INFO $*
}

logError() {
log ERROR $* >&2
}

sanityChecks() {
for p in PLOTID PID TMPDIR TMP2DIR DSTDIR LOGFILE STATUS PHASE; do
eval [ -n "\${PLOTMAN_$p}" ] || {
logError "PLOTMAN_$p variable missing or empty. Is $(basename $0) being started by Plotman?"
exit 1
}
done

export PLOTMAN_DSTDIR=${PLOTMAN_DSTDIR%/}
export PLOTMAN_TMPDIR=${PLOTMAN_TMPDIR%/}
export PLOTMAN_TMP2DIR=${PLOTMAN_TMP2DIR%/}
}

### plot k32 size in bytes 101.5 * GiB === 108984795136 bytes
k32PLOTSIZE=$((1015 *1024 *1024 *1024 /10))

for incl in $(ls ${PLOTMAN_HOOKS}/.lib.include.* 2>/dev/null); do
. $incl
done

exec >>/tmp/plotman-hooks.log 2>&1

sanityChecks
9 changes: 9 additions & 0 deletions src/plotman/resources/hooks.d/00-demo-log-new-plotter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/sh

. ${PLOTMAN_HOOKS}/.lib.include

[ "${PLOTMAN_TRIGGER}" = "PHASE" ] || exit 0

[ x"${PLOTMAN_PHASE}" = x0:0 ] || exit 0

logInfo "New plotter process with pid: ${PLOTMAN_PID}, id: ${PLOTMAN_PLOTID} was just started"
34 changes: 34 additions & 0 deletions src/plotman/resources/hooks.d/01-check-dstdir-free.sh.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/sh

. ${PLOTMAN_HOOKS}/.lib.include

[ "${PLOTMAN_TRIGGER}" = "PHASE" ] || exit 0

logInfo "processing plot id ${PLOTMAN_PLOTID} (ph: ${PLOTMAN_PHASE})"

### if jobs phase is below 3:6, do nothing
[ x"${PLOTMAN_PHASE}" = x3:6 -o x"${PLOTMAN_PHASE}" = x3:7 ] || {
logInfo "plot not at phase 3:6/7, hook done"
exit 0
}


### get available space on DSTDIR in bytes, check result.
### exit 1 if something went wrong
AVAIL=$(df -B1 --output=avail ${PLOTMAN_DSTDIR} | grep -oE '[0-9]+')
[ -n "${AVAIL}" -a $? -eq 0 ] || {
logError "something went wrong while checking available space at ${PLOTMAN_DSTDIR}, suspending job and exiting."
kill -STOP ${PLOTMAN_PID}
exit 1
}


### continue only if available space is less then plot size
[ "${AVAIL}" -lt "${k32PLOTSIZE}" ] || {
logInfo "there is ${AVAIL} available space at ${PLOTMAN_DSTDIR}, which is > k32plot ${k32PLOTSIZE}. job can continue."
exit 0
}

logError "not enough available space at ${PLOTMAN_DSTDIR}. suspending job."
kill -STOP ${PLOTMAN_PID}

19 changes: 19 additions & 0 deletions src/plotman/resources/hooks.d/99-post-kill-cleanup.sh.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/sh

. ${PLOTMAN_HOOKS}/.lib.include

[ "${PLOTMAN_TRIGGER}" = "KILL" ] || exit 0

logInfo "${PLOTMAN_TRIGGER} triggered processing plot id ${PLOTMAN_PLOTID} (ph: ${PLOTMAN_PHASE})"

for d in "${PLOTMAN_DSTDIR}" "${PLOTMAN_TMPDIR}" "${PLOTMAN_TMP2DIR}"; do

### avoid using find, grep, xargs - all those can be various implementations
### behaving differently for corner cases like spaces in file name, or can
### implement different set of arguments (POSIX vs BSD variants, busybox, ....)
### so do it the legacy way
ls "${d}/"*${PLOTMAN_PLOTID}* | while read f; do
rm -v "$f"
done
done