Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xtrig arg validate #5955

Merged
merged 16 commits into from
Feb 22, 2024
Merged
1 change: 1 addition & 0 deletions changes.d/5955.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support xtrigger argument validation.
47 changes: 27 additions & 20 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval

from cylc.flow.exceptions import (
CylcError,
InputError,
Expand All @@ -82,6 +83,7 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1718,28 +1720,33 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
else:
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)
else:
# Convert offset arg to kwarg for certainty later.
if "offset" not in xtrig.func_kwargs:
xtrig.func_kwargs["offset"] = None
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]
if (
xtrig.func_name == 'wall_clock'
and self.cycling_type == INTEGER_CYCLING_TYPE
):
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)

if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
# Generic xtrigger validation.
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)

# Specific xtrigger.validate(), if available.
with suppress(AttributeError, ImportError):
get_xtrig_func(xtrig.func_name, "validate", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr:
# (not available during validation)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
Expand Down Expand Up @@ -2430,7 +2437,7 @@ def upgrade_clock_triggers(self):
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
Expand Down
11 changes: 7 additions & 4 deletions cylc/flow/scripts/function_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

(This command is for internal use.)

Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
defined in a module of the same name. Positional and keyword arguments must be
passed in as JSON strings. <src-dir> is the workflow source dir, needed to find
local xtrigger modules.
Run a Python xtrigger function "<name>(*args, **kwargs)" in the process pool.
It must be in a module of the same name. Positional and keyword arguments must
be passed in as JSON strings.

Python entry points are the preferred way to make xtriggers available to the
scheduler, but local xtriggers can be stored in <src-dir>.

"""
import sys

Expand Down
77 changes: 48 additions & 29 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.wallclock import get_current_time_string

_XTRIG_FUNCS: dict = {}
_XTRIG_MOD_CACHE: dict = {}
_XTRIG_FUNC_CACHE: dict = {}


def _killpg(proc, signal):
Expand All @@ -66,68 +67,86 @@ def _killpg(proc, signal):
return True


def get_func(func_name, src_dir):
"""Find and return an xtrigger function from a module of the same name.
def get_xtrig_mod(mod_name, src_dir):
"""Find, cache, and return a named xtrigger module.

These locations are checked in this order:
- <src_dir>/lib/python/
- `$CYLC_PYTHONPATH`
- defined via a `cylc.xtriggers` entry point for an
installed Python package.
Locations checked in this order:
- <src_dir>/lib/python (prepend to sys.path)
- $CYLC_PYTHONPATH (already in sys.path)
- `cylc.xtriggers` entry point

Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.
(Check entry point last so users can override with local implementations).

Workflow source dir passed in - this executes in an independent subprocess.

Raises:
ImportError, if the module is not found

"""
if func_name in _XTRIG_FUNCS:
return _XTRIG_FUNCS[func_name]
if mod_name in _XTRIG_MOD_CACHE:
# Found and cached already.
return _XTRIG_MOD_CACHE[mod_name]

# First look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
mod_name = func_name
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
_XTRIG_MOD_CACHE[mod_name] = __import__(mod_name, fromlist=[mod_name])
except ImportError:
# Look for xtriggers via entry_points for external sources.
# Do this after the lib/python and PYTHONPATH approaches to allow
# users to override entry_point definitions with local/custom
# implementations.
# Then entry point.
for entry_point in iter_entry_points('cylc.xtriggers'):
if func_name == entry_point.name:
_XTRIG_FUNCS[func_name] = entry_point.load()
return _XTRIG_FUNCS[func_name]

if mod_name == entry_point.name:
_XTRIG_MOD_CACHE[mod_name] = entry_point.load()
return _XTRIG_MOD_CACHE[mod_name]
# Still unable to find anything so abort
raise

try:
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
except AttributeError:
# Module func_name has no function func_name, nor an entry_point entry.
raise
return _XTRIG_FUNCS[func_name]
return _XTRIG_MOD_CACHE[mod_name]


def get_xtrig_func(mod_name, func_name, src_dir):
"""Find, cache, and return a function from an xtrigger module.

Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module

"""
if (mod_name, func_name) in _XTRIG_FUNC_CACHE:
return _XTRIG_FUNC_CACHE[(mod_name, func_name)]

mod = get_xtrig_mod(mod_name, src_dir)

_XTRIG_FUNC_CACHE[(mod_name, func_name)] = getattr(mod, func_name)

return _XTRIG_FUNC_CACHE[(mod_name, func_name)]


def run_function(func_name, json_args, json_kwargs, src_dir):
"""Run a Python function in the process pool.

func_name(*func_args, **func_kwargs)

The function is presumed to be in a module of the same name.

Redirect any function stdout to stderr (and workflow log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is. src_dir is for local modules.

"""
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)

# Find and import then function.
func = get_func(func_name, src_dir)
func = get_xtrig_func(func_name, func_name, src_dir)

# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
res = func(*func_args, **func_kwargs)

# Restore stdout.
sys.stdout = orig_stdout

# Write function return value as JSON to stdout.
sys.stdout.write(json.dumps(res))

Expand Down
27 changes: 21 additions & 6 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from contextlib import suppress
from enum import Enum
from inspect import signature
import json
import re
from copy import deepcopy
Expand All @@ -26,7 +27,7 @@
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocpool import get_func
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.xtriggers.wall_clock import wall_clock

if TYPE_CHECKING:
Expand Down Expand Up @@ -240,12 +241,14 @@ def __init__(
self.do_housekeeping = False

@staticmethod
def validate_xtrigger(
def check_xtrigger(
label: str,
fctx: 'SubFuncContext',
fdir: str,
) -> None:
"""Validate an Xtrigger function.
"""Generic xtrigger validation: check existence and string templates.

Xtrigger modules may also supply a specific "validate" function.

Args:
label: xtrigger label
Expand All @@ -262,8 +265,9 @@ def validate_xtrigger(

"""
fname: str = fctx.func_name

try:
func = get_func(fname, fdir)
func = get_xtrig_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
Expand All @@ -276,12 +280,22 @@ def validate_xtrigger(
fname,
f"'{fname}' not found in xtrigger module '{fname}'",
)

if not callable(func):
raise XtriggerConfigError(
label,
fname,
f"'{fname}' not callable in xtrigger module '{fname}'",
)
if func is not wall_clock:
# Validate args and kwargs against the function signature
# (but not for wall_clock because it's a special case).
try:
signature(func).bind(*fctx.func_args, **fctx.func_kwargs)
except TypeError as exc:
raise XtriggerConfigError(
label, fname, f"{fctx.get_signature()}: {exc}"
)

# Check any string templates in the function arg values (note this
# won't catch bad task-specific values - which are added dynamically).
Expand Down Expand Up @@ -318,13 +332,14 @@ def validate_xtrigger(
def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.

Check the xtrigger function exists here (e.g. during validation).
Call check_xtrigger before this, during validation.

Args:
label: xtrigger label
fctx: function context
fdir: function module directory

"""
self.validate_xtrigger(label, fctx, fdir)
self.functx_map[label] = fctx

def mutate_trig(self, label, kwargs):
Expand Down
43 changes: 34 additions & 9 deletions cylc/flow/xtriggers/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,47 @@

"""A Cylc xtrigger function."""

from contextlib import suppress
from cylc.flow.exceptions import WorkflowConfigError

from typing import Tuple

def echo(*args, **kwargs):
"""Prints args to stdout and return success only if kwargs['succeed']
is True.

def echo(*args, **kwargs) -> Tuple:
"""Print arguments to stdout, return kwargs['succeed'] and kwargs.

This may be a useful aid to understanding how xtriggers work.

Args:
succeed: Set the success of failure of this xtrigger.
*args: Print to stdout.
**kwargs: Print to stdout, and return as output.

Examples:

>>> echo('Breakfast Time', succeed=True, egg='poached')
echo: ARGS: ('Breakfast Time',)
echo: KWARGS: {'succeed': True, 'egg': 'poached'}
(True, {'succeed': True, 'egg': 'poached'})

Returns
tuple: (True/False, kwargs)
(True/False, kwargs)

"""
print("echo: ARGS:", args)
print("echo: KWARGS:", kwargs)
result = False
with suppress(KeyError):
result = kwargs["succeed"] is True
return result, kwargs

return kwargs["succeed"], kwargs


def validate(f_args, f_kwargs, f_signature):

"""
Validate the xtrigger function arguments parsed from the workflow config.

This is separate from the xtrigger to allow parse-time validation.

"""
if "succeed" not in f_kwargs or not isinstance(f_kwargs["succeed"], bool):
raise WorkflowConfigError(
f"Requires 'succeed=True/False' arg: {f_signature}"
)
Loading
Loading