Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xtrigger function arg validation. #5452

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5452.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support xtrigger argument validation.
47 changes: 27 additions & 20 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval

from cylc.flow.exceptions import (
CylcError,
InputError,
Expand All @@ -82,6 +83,7 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1711,28 +1713,33 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
else:
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)
else:
# Convert offset arg to kwarg for certainty later.
if "offset" not in xtrig.func_kwargs:
xtrig.func_kwargs["offset"] = None
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]
if (
xtrig.func_name == 'wall_clock'
and self.cycling_type == INTEGER_CYCLING_TYPE
):
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)

if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
# Generic xtrigger validation.
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)

# Specific xtrigger.validate(), if available.
with suppress(AttributeError, ImportError):
get_xtrig_func(xtrig.func_name, "validate", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr:
# (not available during validation)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
Expand Down Expand Up @@ -2423,7 +2430,7 @@ def upgrade_clock_triggers(self):
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
Expand Down
11 changes: 7 additions & 4 deletions cylc/flow/scripts/function_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

(This command is for internal use.)

Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
defined in a module of the same name. Positional and keyword arguments must be
passed in as JSON strings. <src-dir> is the workflow source dir, needed to find
local xtrigger modules.
Run a Python xtrigger function "<name>(*args, **kwargs)" in the process pool.
It must be in a module of the same name. Positional and keyword arguments must
be passed in as JSON strings.

Python entry points are the preferred way to make xtriggers available to the
scheduler, but local xtriggers can be stored in <src-dir>.

"""
import sys

Expand Down
74 changes: 48 additions & 26 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.wallclock import get_current_time_string

_XTRIG_FUNCS: dict = {}
_XTRIG_MOD_CACHE: dict = {}
_XTRIG_FUNC_CACHE: dict = {}


def _killpg(proc, signal):
Expand All @@ -66,68 +67,89 @@ def _killpg(proc, signal):
return True


def get_func(func_name, src_dir):
"""Find and return an xtrigger function from a module of the same name.
def get_xtrig_mod(mod_name, src_dir):
"""Find, cache, and return a named xtrigger module.

These locations are checked in this order:
- <src_dir>/lib/python/
- `$CYLC_PYTHONPATH`
- defined via a `cylc.xtriggers` entry point for an
installed Python package.
Locations checked in this order:
- <src_dir>/lib/python (prepend to sys.path)
- $CYLC_PYTHONPATH (already in sys.path)
- `cylc.xtriggers` entry point

Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.
(Check entry point last so users can override with local implementations).

Workflow source dir passed in - this executes in an independent subprocess.

Raises:
ImportError, if the module is not found

"""
if func_name in _XTRIG_FUNCS:
return _XTRIG_FUNCS[func_name]
if mod_name in _XTRIG_MOD_CACHE:
# Found and cached already.
return _XTRIG_MOD_CACHE[mod_name]

# First look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
mod_name = func_name
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
_XTRIG_MOD_CACHE[mod_name] = __import__(mod_name, fromlist=[mod_name])
except ImportError:
# Look for xtriggers via entry_points for external sources.
# Do this after the lib/python and PYTHONPATH approaches to allow
# users to override entry_point definitions with local/custom
# implementations.
# Then entry point.
for entry_point in iter_entry_points('cylc.xtriggers'):
if func_name == entry_point.name:
_XTRIG_FUNCS[func_name] = entry_point.load()
return _XTRIG_FUNCS[func_name]

if mod_name == entry_point.name:
_XTRIG_MOD_CACHE[mod_name] = entry_point.load()
return _XTRIG_MOD_CACHE[mod_name]
# Still unable to find anything so abort
raise

return _XTRIG_MOD_CACHE[mod_name]


def get_xtrig_func(mod_name, func_name, src_dir):
"""Find, cache, and return a function from an xtrigger module.

Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module

"""
if (mod_name, func_name) in _XTRIG_FUNC_CACHE:
return _XTRIG_FUNC_CACHE[(mod_name, func_name)]

mod = get_xtrig_mod(mod_name, src_dir)

try:
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
_XTRIG_FUNC_CACHE[(mod_name, func_name)] = getattr(mod, func_name)
except AttributeError:
# Module func_name has no function func_name, nor an entry_point entry.
raise
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
return _XTRIG_FUNCS[func_name]

return _XTRIG_FUNC_CACHE[(mod_name, func_name)]


def run_function(func_name, json_args, json_kwargs, src_dir):
"""Run a Python function in the process pool.

func_name(*func_args, **func_kwargs)

The function is presumed to be in a module of the same name.

Redirect any function stdout to stderr (and workflow log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is. src_dir is for local modules.

"""
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)

# Find and import then function.
func = get_func(func_name, src_dir)
func = get_xtrig_func(func_name, func_name, src_dir)

# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
res = func(*func_args, **func_kwargs)

# Restore stdout.
sys.stdout = orig_stdout

# Write function return value as JSON to stdout.
sys.stdout.write(json.dumps(res))

Expand Down
16 changes: 10 additions & 6 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocpool import get_func
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.xtriggers.wall_clock import wall_clock

if TYPE_CHECKING:
Expand Down Expand Up @@ -240,12 +240,14 @@ def __init__(
self.do_housekeeping = False

@staticmethod
def validate_xtrigger(
def check_xtrigger(
label: str,
fctx: 'SubFuncContext',
fdir: str,
) -> None:
"""Validate an Xtrigger function.
"""Generic xtrigger validation: check existence and string templates.

Xtrigger modules may also supply a specific "validate" function.

Args:
label: xtrigger label
Expand All @@ -262,8 +264,9 @@ def validate_xtrigger(

"""
fname: str = fctx.func_name

try:
func = get_func(fname, fdir)
func = get_xtrig_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
Expand Down Expand Up @@ -318,13 +321,14 @@ def validate_xtrigger(
def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.

Check the xtrigger function exists here (e.g. during validation).
Call check_xtrigger before this, during validation.

Args:
label: xtrigger label
fctx: function context
fdir: function module directory

"""
self.validate_xtrigger(label, fctx, fdir)
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
self.functx_map[label] = fctx

def mutate_trig(self, label, kwargs):
Expand Down
24 changes: 17 additions & 7 deletions cylc/flow/xtriggers/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

"""A Cylc xtrigger function."""

from contextlib import suppress
from cylc.flow.exceptions import WorkflowConfigError


def echo(*args, **kwargs):
"""Prints args to stdout and return success only if kwargs['succeed']
is True.
"""Print arguments to stdout, return kwargs['succeed'] and kwargs.

This may be a useful aid to understanding how xtriggers work.

Expand All @@ -31,7 +30,18 @@
"""
print("echo: ARGS:", args)
print("echo: KWARGS:", kwargs)
result = False
with suppress(KeyError):
result = kwargs["succeed"] is True
return result, kwargs

return kwargs["succeed"], kwargs


def validate(f_args, f_kwargs, f_signature):
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
"""
Validate the xtrigger function arguments parsed from the workflow config.

This is separate from the xtrigger to allow parse-time validation.

"""
if "succeed" not in f_kwargs or not type(f_kwargs["succeed"]) is bool:
raise WorkflowConfigError(

Check warning on line 45 in cylc/flow/xtriggers/echo.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/echo.py#L45

Added line #L45 was not covered by tests
f"Requires 'succeed=True/False' arg: {f_signature}"
)
43 changes: 43 additions & 0 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"""xtrigger function to trigger off of a wall clock time."""

from time import time
from cylc.flow.cycling.iso8601 import interval_parse
from cylc.flow.exceptions import WorkflowConfigError


def wall_clock(trigger_time=None):
Expand All @@ -27,3 +29,44 @@
Trigger time as seconds since Unix epoch.
"""
return time() > trigger_time


def validate(f_args, f_kwargs, f_signature):
"""Validate and manipulate args parsed from the workflow config.

wall_clock() # infer zero interval
wall_clock(PT1H)
wall_clock(offset=PT1H)

The offset must be a valid ISO 8601 interval.

If f_args used, convert to f_kwargs for clarity.

"""

n_args = len(f_args)
n_kwargs = len(f_kwargs)

if n_args + n_kwargs > 1:
raise WorkflowConfigError(f"Too many args: {f_signature}")

Check warning on line 51 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L51

Added line #L51 was not covered by tests

if n_kwargs:
# sole kwarg must be "offset"
kw = next(iter(f_kwargs))
if kw != "offset":
raise WorkflowConfigError(f"Illegal arg '{kw}': {f_signature}")

Check warning on line 57 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L57

Added line #L57 was not covered by tests

elif n_args:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that this is safe - I think that it needs to be if not elif. I've had a go at writing integration tests for these, and I was going to write some unit tests too, but I've run out of time.

Copy link
Member Author

@hjoliver hjoliver Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's valid because of line 50. i.e. it has to be one or the other because the config file can only have (e.g.) wall_clock(PT1H) or wall_clock(offset=PT1H).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense - I was projecting backward from the xrandom test where I started out with a copy of this.

# convert to kwarg
f_kwargs["offset"] = f_args[0]
del f_args[0]

else:
# no args, infer zero interval
f_kwargs["offset"] = "P0Y"

# must be a valid interval
try:
interval_parse(f_kwargs["offset"])
except ValueError:
raise WorkflowConfigError(f"Invalid offset: {f_signature}")

Check warning on line 72 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L71-L72

Added lines #L71 - L72 were not covered by tests
12 changes: 7 additions & 5 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ cylc.main_loop =
cylc.pre_configure =
cylc.post_install =
log_vc_info = cylc.flow.install_plugins.log_vc_info:main
# NOTE: Built-in xtriggers
# NOTE: Built-in xtrigger modules
# - must contain a function (the xtrigger) with the same name as the module
# - and may contain a "validate" function to check arguments
cylc.xtriggers =
echo = cylc.flow.xtriggers.echo:echo
wall_clock = cylc.flow.xtriggers.wall_clock:wall_clock
workflow_state = cylc.flow.xtriggers.workflow_state:workflow_state
xrandom = cylc.flow.xtriggers.xrandom:xrandom
echo = cylc.flow.xtriggers.echo
wall_clock = cylc.flow.xtriggers.wall_clock
workflow_state = cylc.flow.xtriggers.workflow_state
xrandom = cylc.flow.xtriggers.xrandom
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

[bdist_rpm]
requires =
Expand Down
Loading
Loading