Skip to content

Commit

Permalink
Arg validation for entry point xtriggers.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jan 29, 2024
1 parent d410f41 commit 2538bcf
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 92 deletions.
File renamed without changes.
37 changes: 17 additions & 20 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_func
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1716,31 +1716,28 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)
else:
# Convert offset arg to kwarg for certainty later.
if "offset" not in xtrig.func_kwargs:
xtrig.func_kwargs["offset"] = None
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]
if (
xtrig.func_name == 'wall_clock'
and self.cycling_type == INTEGER_CYCLING_TYPE
):
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)

# Generic xtrigger validation.
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)

# Call the xtrigger's validate_config function if it has one.
# Specific xtrigger.validate(), if available.
with suppress(AttributeError, ImportError):
get_func(xtrig.func_name, "validate_config", self.fdir)(
get_xtrig_func(xtrig.func_name, "validate", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr is None:
# Validation only.
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
if self.xtrigger_mgr:
# (not available during validation)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)
Expand Down Expand Up @@ -2433,7 +2430,7 @@ def upgrade_clock_triggers(self):
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
Expand Down
11 changes: 7 additions & 4 deletions cylc/flow/scripts/function_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
(This command is for internal use.)
Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
defined in a module of the same name. Positional and keyword arguments must be
passed in as JSON strings. <src-dir> is the workflow source dir, needed to find
local xtrigger modules.
Run a Python xtrigger function "<name>(*args, **kwargs)" in the process pool.
It must be in a module of the same name. Positional and keyword arguments must
be passed in as JSON strings.
Python entry points are the preferred way to make xtriggers available to the
scheduler, but local xtriggers can be stored in <src-dir>.
"""
import sys

Expand Down
68 changes: 41 additions & 27 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.wallclock import get_current_time_string

_XTRIG_FUNCS: dict = {}
_XTRIG_MOD_CACHE: dict = {}
_XTRIG_FUNC_CACHE: dict = {}


def _killpg(proc, signal):
Expand All @@ -66,52 +67,61 @@ def _killpg(proc, signal):
return True


def get_func(mod_name, func_name, src_dir):
"""Find and return a named function from a named module.
def get_xtrig_mod(mod_name, src_dir):
"""Find, cache, and return a named xtrigger module.
Can be in <src_dir>/lib/python, cylc.flow.xtriggers, or in Python path.
Locations checked in this order:
- <src_dir>/lib/python (prepend to sys.path)
- $CYLC_PYTHONPATH (already in sys.path)
- `cylc.xtriggers` entry point
These locations are checked in this order:
- <src_dir>/lib/python/
- `$CYLC_PYTHONPATH`
- defined via a `cylc.xtriggers` entry point for an
installed Python package.
(Check entry point last so users can override with local implementations).
Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.
Workflow source dir passed in - this executes in an independent subprocess.
Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module
"""
if (mod_name, func_name) in _XTRIG_FUNCS:
if mod_name in _XTRIG_MOD_CACHE:
# Found and cached already.
return _XTRIG_FUNCS[(mod_name, func_name)]
return _XTRIG_MOD_CACHE[mod_name]

# First look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
_XTRIG_MOD_CACHE[mod_name] = __import__(mod_name, fromlist=[mod_name])
except ImportError:
# Look for xtriggers via entry_points for external sources.
# Do this after the lib/python and PYTHONPATH approaches to allow
# users to override entry_point definitions with local/custom
# implementations.
# Then entry point.
for entry_point in iter_entry_points('cylc.xtriggers'):
if func_name == entry_point.name:
_XTRIG_FUNCS[func_name] = entry_point.load()
return _XTRIG_FUNCS[func_name]

if mod_name == entry_point.name:
_XTRIG_MOD_CACHE[mod_name] = entry_point.load()
return _XTRIG_MOD_CACHE[mod_name]
# Still unable to find anything so abort
raise

return _XTRIG_MOD_CACHE[mod_name]


def get_xtrig_func(mod_name, func_name, src_dir):
"""Find, cache, and return a function from an xtrigger module.
Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module
"""
if (mod_name, func_name) in _XTRIG_FUNC_CACHE:
return _XTRIG_FUNC_CACHE[(mod_name, func_name)]

mod = get_xtrig_mod(mod_name, src_dir)

try:
_XTRIG_FUNCS[(mod_name, func_name)] = getattr(mod_by_name, func_name)
_XTRIG_FUNC_CACHE[(mod_name, func_name)] = getattr(mod, func_name)
except AttributeError:
# Module func_name has no function func_name, nor an entry_point entry.
raise
return _XTRIG_FUNCS[(mod_name, func_name)]

return _XTRIG_FUNC_CACHE[(mod_name, func_name)]


def run_function(func_name, json_args, json_kwargs, src_dir):
Expand All @@ -128,14 +138,18 @@ def run_function(func_name, json_args, json_kwargs, src_dir):
"""
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)

# Find and import then function.
func = get_func(func_name, func_name, src_dir)
func = get_xtrig_func(func_name, func_name, src_dir)

# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
res = func(*func_args, **func_kwargs)

# Restore stdout.
sys.stdout = orig_stdout

# Write function return value as JSON to stdout.
sys.stdout.write(json.dumps(res))

Expand Down
15 changes: 9 additions & 6 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocpool import get_func
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.xtriggers.wall_clock import wall_clock

if TYPE_CHECKING:
Expand Down Expand Up @@ -240,12 +240,14 @@ def __init__(
self.do_housekeeping = False

@staticmethod
def validate_xtrigger(
def check_xtrigger(
label: str,
fctx: 'SubFuncContext',
fdir: str,
) -> None:
"""Validate an Xtrigger function.
"""Generic xtrigger validation: check existence and string templates.
Xtrigger modules may also supply a specific "validate" function.
Args:
label: xtrigger label
Expand All @@ -264,7 +266,7 @@ def validate_xtrigger(
fname: str = fctx.func_name

try:
func = get_func(fname, fname, fdir)
func = get_xtrig_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
Expand Down Expand Up @@ -319,13 +321,14 @@ def validate_xtrigger(
def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.
Check the xtrigger function exists here (e.g. during validation).
Call check_xtrigger before this, during validation.
Args:
label: xtrigger label
fctx: function context
fdir: function module directory
"""
self.validate_xtrigger(label, fctx, fdir)
self.functx_map[label] = fctx

def mutate_trig(self, label, kwargs):
Expand Down
20 changes: 13 additions & 7 deletions cylc/flow/xtriggers/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

"""A Cylc xtrigger function."""

from contextlib import suppress
from cylc.flow.exceptions import WorkflowConfigError


def echo(*args, **kwargs):
"""Prints args to stdout and return success only if kwargs['succeed']
is True.
"""Print arguments to stdout, return kwargs['succeed'] and kwargs.
This may be a useful aid to understanding how xtriggers work.
Expand All @@ -31,7 +30,14 @@ def echo(*args, **kwargs):
"""
print("echo: ARGS:", args)
print("echo: KWARGS:", kwargs)
result = False
with suppress(KeyError):
result = kwargs["succeed"] is True
return result, kwargs

return kwargs["succeed"], kwargs


def validate(f_args, f_kwargs, f_signature):
try:
assert type(f_kwargs["succeed"]) is bool
except (KeyError, AssertionError):
raise WorkflowConfigError(

Check warning on line 41 in cylc/flow/xtriggers/echo.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/echo.py#L40-L41

Added lines #L40 - L41 were not covered by tests
f"xtrigger requires 'succeed=True/False': {f_signature}"
)
24 changes: 18 additions & 6 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def wall_clock(trigger_time=None):
return time() > trigger_time


def validate_config(f_args, f_kwargs, f_signature):
def validate(f_args, f_kwargs, f_signature):
"""Validate and manipulate args parsed from the workflow config.
wall_clock() # zero offset
wall_clock() # infer zero interval
wall_clock(PT1H)
wall_clock(offset=PT1H)
Expand All @@ -43,18 +43,30 @@ def validate_config(f_args, f_kwargs, f_signature):
If f_args used, convert to f_kwargs for clarity.
"""

n_args = len(f_args)
n_kwargs = len(f_kwargs)

if n_args + n_kwargs > 1:
raise WorkflowConfigError(f"xtrigger: too many args: {f_signature}")
raise WorkflowConfigError(f"Too many args: {f_signature}")

Check warning on line 51 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L51

Added line #L51 was not covered by tests

if n_kwargs:
# sole kwarg must be "offset"
kw = next(iter(f_kwargs))
if kw != "offset":
raise WorkflowConfigError(f"Illegal arg '{kw}': {f_signature}")

Check warning on line 57 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L57

Added line #L57 was not covered by tests

if n_args:
elif n_args:
# convert to kwarg
f_kwargs["offset"] = f_args[0]
elif not n_kwargs:
del f_args[0]

else:
# no args, infer zero interval
f_kwargs["offset"] = "P0Y"

# must be a valid interval
try:
interval_parse(f_kwargs["offset"])
except ValueError:
raise WorkflowConfigError(f"xtrigger: invalid offset: {f_signature}")
raise WorkflowConfigError(f": Invalid offset: {f_signature}")

Check warning on line 72 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L71-L72

Added lines #L71 - L72 were not covered by tests
13 changes: 8 additions & 5 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,15 @@ cylc.main_loop =
cylc.pre_configure =
cylc.post_install =
log_vc_info = cylc.flow.install_plugins.log_vc_info:main
# NOTE: Built-in xtriggers
# NOTE: Built-in xtrigger modules
# - must contain a function (the xtrigger) with the same name as the module
# - and may contain a "validate" function to check arguments
cylc.xtriggers =
echo = cylc.flow.xtriggers.echo:echo
wall_clock = cylc.flow.xtriggers.wall_clock:wall_clock
workflow_state = cylc.flow.xtriggers.workflow_state:workflow_state
xrandom = cylc.flow.xtriggers.xrandom:xrandom
echo = cylc.flow.xtriggers.echo
wall_clock = cylc.flow.xtriggers.wall_clock
workflow_state = cylc.flow.xtriggers.workflow_state
xrandom = cylc.flow.xtriggers.xrandom
ball_cock = cylc.ball.ball_cock

[bdist_rpm]
requires =
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/xtriggers/03-sequence.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__'
final cycle point = +P1Y
[[xtriggers]]
e1 = echo(name='bob', succeed=True)
e2 = echo(name='alice')
e2 = echo(name='alice', succeed=False)
[[dependencies]]
[[[R1]]]
graph = "start"
Expand All @@ -55,7 +55,7 @@ cylc show "${WORKFLOW_NAME}//2026/foo" | grep -E '^ - xtrigger' > 2026.foo.log

# 2026/foo should get only xtrigger e2.
cmp_ok 2026.foo.log - <<__END__
- xtrigger "e2 = echo(name=alice)"
- xtrigger "e2 = echo(name=alice, succeed=False)"
__END__

cylc stop --now --max-polls=10 --interval=2 "${WORKFLOW_NAME}"
Expand Down
Loading

0 comments on commit 2538bcf

Please sign in to comment.