Skip to content

Commit

Permalink
Reimplement xtrigger sequential arg post-merge & add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie authored and dwsutherland committed Mar 19, 2024
1 parent a3d4711 commit cc1ec4d
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 81 deletions.
4 changes: 2 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,7 @@ def resume_workflow(self, quiet: bool = False) -> None:
def command_force_trigger_tasks(
self,
tasks: Iterable[str],
flow: List[str],
flow: List[Union[str, int]],
flow_wait: bool = False,
flow_descr: Optional[str] = None
):
Expand All @@ -2162,7 +2162,7 @@ def command_force_trigger_tasks(
def command_set(
self,
tasks: List[str],
flow: List[str],
flow: List[Union[str, int]],
outputs: Optional[List[str]] = None,
prerequisites: Optional[List[str]] = None,
flow_wait: bool = False,
Expand Down
59 changes: 49 additions & 10 deletions cylc/flow/subprocctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,39 @@
Coerce more value type from string (to time point, duration, xtriggers, etc.).
"""

from inspect import Parameter
import json
from shlex import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

from cylc.flow.wallclock import get_current_time_string

if TYPE_CHECKING:
from inspect import Signature


def add_kwarg_to_sig(
sig: 'Signature', arg_name: str, default: Any
) -> 'Signature':
"""Return a new signature with a kwarg added."""
# Note: added kwarg has to be before **kwargs ("variadic") in the signature
positional_or_keyword: List[Parameter] = []
variadic: List[Parameter] = []
for param in sig.parameters.values():
if param.kind == Parameter.VAR_KEYWORD:
variadic.append(param)
else:
positional_or_keyword.append(param)
return sig.replace(parameters=[
*positional_or_keyword,
Parameter(
arg_name,
kind=Parameter.KEYWORD_ONLY,
default=default,
),
*variadic,
])


class SubProcContext: # noqa: SIM119 (not really relevant to this case)
"""Represent the context of an external command to run as a subprocess.
Expand Down Expand Up @@ -115,23 +143,31 @@ class SubFuncContext(SubProcContext):
Attributes:
# See also parent class attributes.
.label (str):
.label:
function label under [xtriggers] in flow.cylc
.func_name (str):
.func_name:
function name
.func_args (list):
.func_args:
function positional args
.func_kwargs (dict):
.func_kwargs:
function keyword args
.intvl (float - seconds):
function call interval (how often to check the external trigger)
.ret_val (bool, dict)
.intvl:
function call interval in secs (how often to check the
external trigger)
.ret_val
function return: (satisfied?, result to pass to trigger tasks)
"""

DEFAULT_INTVL = 10.0

def __init__(self, label, func_name, func_args, func_kwargs, intvl=None):
def __init__(
self,
label: str,
func_name: str,
func_args: List[Any],
func_kwargs: Dict[str, Any],
intvl: Union[float, str] = DEFAULT_INTVL
):
"""Initialize a function context."""
self.label = label
self.func_name = func_name
Expand All @@ -141,9 +177,12 @@ def __init__(self, label, func_name, func_args, func_kwargs, intvl=None):
self.intvl = float(intvl)
except (TypeError, ValueError):
self.intvl = self.DEFAULT_INTVL
self.ret_val = (False, None) # (satisfied, broadcast)
self.ret_val: Tuple[
bool, Optional[dict]
] = (False, None) # (satisfied, broadcast)
super(SubFuncContext, self).__init__(
'xtrigger-func', cmd=[], shell=False)
'xtrigger-func', cmd=[], shell=False
)

def update_command(self, workflow_run_dir):
"""Update the function wrap command after changes."""
Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':
point_itasks[point] = list(itask_id_map.values())
return point_itasks

def get_task(self, point, name) -> Optional[TaskProxy]:
def get_task(self, point: 'PointBase', name: str) -> Optional[TaskProxy]:
"""Retrieve a task from the pool."""
rel_id = f'{point}/{name}'
tasks = self.active_tasks.get(point)
Expand Down Expand Up @@ -1803,7 +1803,7 @@ def set_prereqs_and_outputs(
items: Iterable[str],
outputs: List[str],
prereqs: List[str],
flow: List[str],
flow: List[Union[str, int]],
flow_wait: bool = False,
flow_descr: Optional[str] = None
):
Expand Down Expand Up @@ -2001,7 +2001,7 @@ def remove_tasks(self, items):

def _get_flow_nums(
self,
flow: List[str],
flow: List[Union[str, int]],
meta: Optional[str] = None,
) -> Optional[Set[int]]:
"""Get correct flow numbers given user command options."""
Expand Down Expand Up @@ -2073,7 +2073,7 @@ def _force_trigger(self, itask):

def force_trigger_tasks(
self, items: Iterable[str],
flow: List[str],
flow: List[Union[str, int]],
flow_wait: bool = False,
flow_descr: Optional[str] = None
):
Expand Down
42 changes: 22 additions & 20 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from contextlib import suppress
from enum import Enum
from inspect import getfullargspec, signature
from inspect import signature
import json
import re
from copy import deepcopy
Expand All @@ -34,6 +34,7 @@
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocctx import add_kwarg_to_sig
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.xtriggers.wall_clock import _wall_clock

Expand Down Expand Up @@ -330,32 +331,33 @@ def check_xtrigger(
raise XtriggerConfigError(
label, f"'{fname}' not callable in xtrigger module '{fname}'",
)
x_argspec = getfullargspec(func)
if 'sequential' in x_argspec.args:
if (
x_argspec.defaults is None
or not isinstance(
x_argspec.defaults[x_argspec.args.index('sequential')],
bool
)
):

sig = signature(func)
sig_str = fctx.get_signature()

# Handle reserved 'sequential' kwarg:
sequential_param = sig.parameters.get('sequential', None)
if sequential_param:
if not isinstance(sequential_param.default, bool):
raise XtriggerConfigError(
label,
fname,
(
f"xtrigger module '{fname}' contains reserved argument"
" name 'sequential' that has no boolean default"
),
f"xtrigger '{fname}' function definition contains "
"reserved argument 'sequential' that has no "
"boolean default"
)
)
elif 'sequential' not in fctx.func_kwargs:
fctx.func_kwargs['sequential'] = x_argspec.defaults[
x_argspec.args.index('sequential')
]
fctx.func_kwargs.setdefault('sequential', sequential_param.default)
elif 'sequential' in fctx.func_kwargs:
# xtrig call marked as sequential; add 'sequential' arg to
# signature for validation
sig = add_kwarg_to_sig(
sig, 'sequential', fctx.func_kwargs['sequential']
)

# Validate args and kwargs against the function signature
sig_str = fctx.get_signature()
try:
bound_args = signature(func).bind(
bound_args = sig.bind(
*fctx.func_args, **fctx.func_kwargs
)
except TypeError as exc:
Expand Down
15 changes: 0 additions & 15 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ def test_validate_implicit_task_name(
are blacklisted get caught and raise errors.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'graph': {
'R1': task_name
Expand Down Expand Up @@ -189,9 +186,6 @@ def test_no_graph(flow, validate):
def test_parse_special_tasks_invalid(flow, validate, section):
"""It should fail for invalid "special tasks"."""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': 'now',
'special tasks': {
Expand All @@ -211,9 +205,6 @@ def test_parse_special_tasks_invalid(flow, validate, section):
def test_parse_special_tasks_interval(flow, validate):
"""It should fail for invalid durations in clock-triggers."""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': 'now',
'special tasks': {
Expand Down Expand Up @@ -359,7 +350,6 @@ def test_xtrig_validation_wall_clock(
https://github.com/cylc/cylc-flow/issues/5448
"""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '1012',
'xtriggers': {'myxt': 'wall_clock(offset=PT7MH)'},
Expand All @@ -378,7 +368,6 @@ def test_xtrig_implicit_wall_clock(flow: Fixture, validate: Fixture):
xtrigger definition.
"""
wid = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '2024',
'graph': {'R1': '@wall_clock => foo'},
Expand All @@ -396,7 +385,6 @@ def test_xtrig_validation_echo(
https://github.com/cylc/cylc-flow/issues/5448
"""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'xtriggers': {'myxt': 'echo()'},
'graph': {'R1': '@myxt => foo'},
Expand All @@ -418,7 +406,6 @@ def test_xtrig_validation_xrandom(
https://github.com/cylc/cylc-flow/issues/5448
"""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'xtriggers': {'myxt': 'xrandom(200)'},
'graph': {'R1': '@myxt => foo'},
Expand Down Expand Up @@ -459,7 +446,6 @@ def kustom_validate(args):
)

id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '1012',
'xtriggers': {'myxt': 'kustom_xt(feature=42)'},
Expand Down Expand Up @@ -490,7 +476,6 @@ def test_xtrig_signature_validation(
):
"""Test automatic xtrigger function signature validation."""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '2024',
'xtriggers': {'myxt': xtrig_call},
Expand Down
Loading

0 comments on commit cc1ec4d

Please sign in to comment.