Skip to content

Commit

Permalink
Improve xtrigger validation
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Feb 16, 2024
1 parent 68dbfd2 commit 51438cf
Show file tree
Hide file tree
Showing 16 changed files with 205 additions and 229 deletions.
13 changes: 2 additions & 11 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1735,14 +1734,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
# Generic xtrigger validation.
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)

# Specific xtrigger.validate(), if available.
with suppress(AttributeError, ImportError):
get_xtrig_func(xtrig.func_name, "validate", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr:
# (not available during validation)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
Expand Down Expand Up @@ -2434,8 +2425,8 @@ def upgrade_clock_triggers(self):
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
args = [] if offset is None else [offset]
xtrig = SubFuncContext(label, 'wall_clock', args, {})
if self.xtrigger_mgr is None:
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
else:
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,12 @@ class XtriggerConfigError(WorkflowConfigError):
"""

def __init__(self, label: str, trigger: str, message: str):
def __init__(self, label: str, message: str):
self.label: str = label
self.trigger: str = trigger
self.message: str = message

def __str__(self):
return f'[{self.label}] {self.message}'
def __str__(self) -> str:
return f'[@{self.label}] {self.message}'


class ClientError(CylcError):
Expand Down
91 changes: 60 additions & 31 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from contextlib import suppress
from enum import Enum
from inspect import signature
from inspect import Parameter, Signature, signature
import json
import re
from copy import deepcopy
Expand All @@ -31,6 +31,7 @@
from cylc.flow.xtriggers.wall_clock import wall_clock

if TYPE_CHECKING:
from inspect import BoundArguments
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocctx import SubFuncContext
Expand Down Expand Up @@ -246,9 +247,11 @@ def check_xtrigger(
fctx: 'SubFuncContext',
fdir: str,
) -> None:
"""Generic xtrigger validation: check existence and string templates.
"""Generic xtrigger validation: check existence, string templates and
function signature.
Xtrigger modules may also supply a specific "validate" function.
Xtrigger modules may also supply a specific `validate` function
which will be run here.
Args:
label: xtrigger label
Expand All @@ -262,6 +265,7 @@ def check_xtrigger(
* If the function is not callable.
* If any string template in the function context
arguments are not present in the expected template values.
* If the arguments do not match the function signature.
"""
fname: str = fctx.func_name
Expand All @@ -270,32 +274,37 @@ def check_xtrigger(
func = get_xtrig_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
fname,
f"xtrigger module '{fname}' not found",
label, f"xtrigger module '{fname}' not found",
)
except AttributeError:
raise XtriggerConfigError(
label,
fname,
f"'{fname}' not found in xtrigger module '{fname}'",
label, f"'{fname}' not found in xtrigger module '{fname}'",
)

if not callable(func):
raise XtriggerConfigError(
label,
fname,
f"'{fname}' not callable in xtrigger module '{fname}'",
label, f"'{fname}' not callable in xtrigger module '{fname}'",
)
if func is not wall_clock:
# Validate args and kwargs against the function signature
# (but not for wall_clock because it's a special case).
try:
signature(func).bind(*fctx.func_args, **fctx.func_kwargs)
except TypeError as exc:
raise XtriggerConfigError(
label, fname, f"{fctx.get_signature()}: {exc}"

# Validate args and kwargs against the function signature
sig = signature(func)
sig_str = fctx.get_signature()
if func is wall_clock:
# wall_clock is a special case where the Python function signature
# is different to the xtrigger signature
sig = Signature([
Parameter(
'offset', Parameter.POSITIONAL_OR_KEYWORD, default='P0Y'
)
])
try:
bound_args = sig.bind(*fctx.func_args, **fctx.func_kwargs)
except TypeError as exc:
raise XtriggerConfigError(label, f"{sig_str}: {exc}")
# Specific xtrigger.validate(), if available.
XtriggerManager.try_xtrig_validate_func(
label, fname, fdir, bound_args, sig_str
)

# Check any string templates in the function arg values (note this
# won't catch bad task-specific values - which are added dynamically).
Expand All @@ -311,9 +320,7 @@ def check_xtrigger(
template_vars.add(TemplateVariables(match))
except ValueError:
raise XtriggerConfigError(
label,
fname,
f"Illegal template in xtrigger: {match}",
label, f"Illegal template in xtrigger: {match}",
)

# check for deprecated template variables
Expand All @@ -329,6 +336,30 @@ def check_xtrigger(
f' {", ".join(t.value for t in deprecated_variables)}'
)

@staticmethod
def try_xtrig_validate_func(
label: str,
fname: str,
fdir: str,
bound_args: 'BoundArguments',
signature_str: str,
):
"""Call an xtrigger's `validate()` function if it is implemented.
Raise XtriggerConfigError if validation fails.
"""
try:
xtrig_validate_func = get_xtrig_func(fname, 'validate', fdir)
except (AttributeError, ImportError):
return
bound_args.apply_defaults()
try:
xtrig_validate_func(bound_args.arguments)
except Exception as exc: # Note: catch all errors
raise XtriggerConfigError(
label, f"{signature_str} validation failed: {exc}"
)

def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.
Expand Down Expand Up @@ -410,20 +441,18 @@ def get_xtrig_ctx(
args = []
kwargs = {}
if ctx.func_name == "wall_clock":
if "trigger_time" in ctx.func_kwargs:
if "trigger_time" in ctx.func_kwargs: # noqa: SIM401 (readabilty)
# Internal (retry timer): trigger_time already set.
kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"]
elif "offset" in ctx.func_kwargs: # noqa: SIM106
else:
# External (clock xtrigger): convert offset to trigger_time.
# Datetime cycling only.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
itask.point,
ctx.func_kwargs["offset"]
)
else:
# Should not happen!
raise ValueError(
"wall_clock function kwargs needs trigger time or offset"
ctx.func_kwargs.get(
"offset",
ctx.func_args[0] if ctx.func_args else None
)
)
else:
# Other xtrig functions: substitute template values.
Expand Down
18 changes: 10 additions & 8 deletions cylc/flow/xtriggers/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

"""A Cylc xtrigger function."""

from typing import Any, Dict, Tuple
from cylc.flow.exceptions import WorkflowConfigError

from typing import Tuple


def echo(*args, **kwargs) -> Tuple:
"""Print arguments to stdout, return kwargs['succeed'] and kwargs.
Expand Down Expand Up @@ -48,15 +47,18 @@ def echo(*args, **kwargs) -> Tuple:
return kwargs["succeed"], kwargs


def validate(f_args, f_kwargs, f_signature):

def validate(all_args: Dict[str, Any]):
"""
Validate the xtrigger function arguments parsed from the workflow config.
This is separate from the xtrigger to allow parse-time validation.
"""
if "succeed" not in f_kwargs or not isinstance(f_kwargs["succeed"], bool):
raise WorkflowConfigError(
f"Requires 'succeed=True/False' arg: {f_signature}"
)
# NOTE: with (*args, **kwargs) pattern, all_args looks like:
# {
# 'args': (arg1, arg2, ...),
# 'kwargs': {kwarg1: val, kwarg2: val, ...}
# }
succeed = all_args['kwargs'].get("succeed")
if not isinstance(succeed, bool):
raise WorkflowConfigError("Requires 'succeed=True/False' arg")
39 changes: 8 additions & 31 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,33 @@
"""xtrigger function to trigger off of a wall clock time."""

from time import time
from typing import Any, Dict
from cylc.flow.cycling.iso8601 import interval_parse
from cylc.flow.exceptions import WorkflowConfigError


def wall_clock(trigger_time=None):
def wall_clock(trigger_time: int) -> bool:
"""Return True after the desired wall clock time, False.
Args:
trigger_time (int):
trigger_time:
Trigger time as seconds since Unix epoch.
"""
return time() > trigger_time


def validate(f_args, f_kwargs, f_signature):
def validate(args: Dict[str, Any]):
"""Validate and manipulate args parsed from the workflow config.
NOTE: the xtrigger signature is different to the function signature above
wall_clock() # infer zero interval
wall_clock(PT1H)
wall_clock(offset=PT1H)
The offset must be a valid ISO 8601 interval.
If f_args used, convert to f_kwargs for clarity.
"""

n_args = len(f_args)
n_kwargs = len(f_kwargs)

if n_args + n_kwargs > 1:
raise WorkflowConfigError(f"Too many args: {f_signature}")

if n_kwargs:
# sole kwarg must be "offset"
kw = next(iter(f_kwargs))
if kw != "offset":
raise WorkflowConfigError(f"Illegal arg '{kw}': {f_signature}")

elif n_args:
# convert to kwarg
f_kwargs["offset"] = f_args[0]
del f_args[0]

else:
# no args, infer zero interval
f_kwargs["offset"] = "P0Y"

# must be a valid interval
try:
interval_parse(f_kwargs["offset"])
interval_parse(args["offset"])
except (ValueError, AttributeError):
raise WorkflowConfigError(f"Invalid offset: {f_signature}")
raise WorkflowConfigError(f"Invalid offset: {args['offset']}")
Loading

0 comments on commit 51438cf

Please sign in to comment.