Skip to content

Commit

Permalink
Xtrigger function arg validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Apr 5, 2023
1 parent edc60e9 commit 8a28af6
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 36 deletions.
37 changes: 21 additions & 16 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval

from cylc.flow.exceptions import (
CylcError,
WorkflowConfigError,
Expand Down Expand Up @@ -84,6 +85,7 @@
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.print_tree import print_tree
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1749,28 +1751,31 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
else:
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)
else:
# Convert offset arg to kwarg for certainty later.
if "offset" not in xtrig.func_kwargs:
xtrig.func_kwargs["offset"] = None
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]
if (
xtrig.func_name == 'wall_clock' and
self.cycling_type == INTEGER_CYCLING_TYPE
):
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)

# Call the xtrigger's validate_config function if it has one.
with suppress(AttributeError):
get_func(xtrig.func_name, "validate_config", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
Expand Down
33 changes: 20 additions & 13 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,38 @@ def _killpg(proc, signal):
return True


def get_func(func_name, src_dir):
"""Find and return an xtrigger function from a module of the same name.
def get_func(mod_name, func_name, src_dir):
"""Find and return a name function from a named module.
Can be in <src_dir>/lib/python, cylc.flow.xtriggers, or in Python path.
Can be in <src_dir>/lib/python, CYLC_MOD_LOC, or in Python path.
Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.
Raises:
ImportError, if the module is not found
AtributeError, if the function is not found in the module
"""
if func_name in _XTRIG_FUNCS:
# Found and cached already.
return _XTRIG_FUNCS[func_name]
# First look in <src-dir>/lib/python.

# 1. look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
mod_name = func_name
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
except ImportError:
# Then look in built-in xtriggers.
mod_name = "%s.%s" % ("cylc.flow.xtriggers", func_name)
# 2. look in built-in xtriggers.
mod_name = "%s.%s" % ("cylc.flow.xtriggers", mod_name)
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
except ImportError:
raise
try:
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
except AttributeError:
# Module func_name has no function func_name.
raise

# Module found and imported, return the named function.

_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
return _XTRIG_FUNCS[func_name]


Expand All @@ -99,6 +104,8 @@ def run_function(func_name, json_args, json_kwargs, src_dir):
func_name(*func_args, **func_kwargs)
The function is presumed to be in a module of the same name.
Redirect any function stdout to stderr (and workflow log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is. src_dir is for local modules.
Expand All @@ -107,7 +114,7 @@ def run_function(func_name, json_args, json_kwargs, src_dir):
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)
# Find and import then function.
func = get_func(func_name, src_dir)
func = get_func(func_name, func_name, src_dir)
# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.xtriggers.wall_clock import wall_clock

from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
Expand Down Expand Up @@ -252,8 +251,9 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
"""
fname: str = fctx.func_name

try:
func = get_func(fname, fdir)
func = get_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
Expand Down
32 changes: 32 additions & 0 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"""xtrigger function to trigger off of a wall clock time."""

from time import time
from cylc.flow.cycling.iso8601 import interval_parse
from cylc.flow.exceptions import WorkflowConfigError


def wall_clock(trigger_time=None):
Expand All @@ -27,3 +29,33 @@ def wall_clock(trigger_time=None):
Trigger time as seconds since Unix epoch.
"""
return time() > trigger_time


def validate_config(f_args, f_kwargs, f_signature):
"""Validate and manipulate args parsed from the workflow config.
wall_clock() # zero offset
wall_clock(PT1H)
wall_clock(offset=PT1H)
And offset must be a valid ISO 8601 interval.
If f_args used, convert to f_kwargs for clarity.
"""

n_args = len(f_args)
n_kwargs = len(f_kwargs)

if n_args + n_kwargs > 1:
raise WorkflowConfigError(f"xtrigger: too many args: {f_signature}")

Check warning on line 51 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L51

Added line #L51 was not covered by tests

if n_args:
f_kwargs["offset"] = f_args[0]
elif not n_kwargs:
f_kwargs["offset"] = "P0Y"

try:
interval_parse(f_kwargs["offset"])
except ValueError:
raise WorkflowConfigError(f"xtrigger: invalid offset: {f_signature}")

Check warning on line 61 in cylc/flow/xtriggers/wall_clock.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/wall_clock.py#L60-L61

Added lines #L60 - L61 were not covered by tests
13 changes: 8 additions & 5 deletions tests/unit/test_subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def test_xfunction(self):
with the_answer_file.open(mode="w") as f:
f.write("""the_answer = lambda: 42""")
f.flush()
fn = get_func("the_answer", temp_dir)
f_name = "the_answer"
fn = get_func(f_name, f_name, temp_dir)
result = fn()
self.assertEqual(42, result)

Expand All @@ -165,14 +166,15 @@ def test_xfunction_cache(self):
with amandita_file.open(mode="w") as f:
f.write("""amandita = lambda: 'chocolate'""")
f.flush()
fn = get_func("amandita", temp_dir)
f_name = "amandita"
fn = get_func(f_name, f_name, temp_dir)
result = fn()
self.assertEqual('chocolate', result)

# is in the cache
self.assertTrue('amandita' in _XTRIG_FUNCS)
# returned from cache
self.assertEqual(fn, get_func("amandita", temp_dir))
self.assertEqual(fn, get_func(f_name, f_name, temp_dir))
del _XTRIG_FUNCS['amandita']
# is not in the cache
self.assertFalse('amandita' in _XTRIG_FUNCS)
Expand All @@ -186,7 +188,7 @@ def test_xfunction_import_error(self):
"""
with TemporaryDirectory() as temp_dir:
with self.assertRaises(ModuleNotFoundError):
get_func("invalid-module-name", temp_dir)
get_func("invalid-module-name", "func-name", temp_dir)

def test_xfunction_attribute_error(self):
"""Test for error on looking for an attribute in a xtrigger script."""
Expand All @@ -197,8 +199,9 @@ def test_xfunction_attribute_error(self):
with the_answer_file.open(mode="w") as f:
f.write("""the_droid = lambda: 'excalibur'""")
f.flush()
f_name = "the_sword"
with self.assertRaises(AttributeError):
get_func("the_sword", temp_dir)
get_func(f_name, f_name, temp_dir)


@pytest.fixture
Expand Down

0 comments on commit 8a28af6

Please sign in to comment.