Skip to content

Commit

Permalink
Xtrigger function arg validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jan 28, 2024
1 parent efb3016 commit d5bc789
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 23 deletions.
18 changes: 14 additions & 4 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval

from cylc.flow.exceptions import (
CylcError,
InputError,
Expand All @@ -82,6 +83,7 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import get_func
from cylc.flow.task_events_mgr import (
EventData,
get_event_handler_data
Expand Down Expand Up @@ -1711,10 +1713,8 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
else:
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
Expand All @@ -1729,10 +1729,20 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]

# Call the xtrigger's validate_config function if it has one.
with suppress(AttributeError, ImportError):
get_func(xtrig.func_name, "validate_config", self.fdir)(
xtrig.func_args,
xtrig.func_kwargs,
xtrig.get_signature()
)

if self.xtrigger_mgr is None:
# Validation only.
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
Expand Down
24 changes: 16 additions & 8 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ def _killpg(proc, signal):
return True


def get_func(func_name, src_dir):
"""Find and return an xtrigger function from a module of the same name.
def get_func(mod_name, func_name, src_dir):
"""Find and return a named function from a named module.
Can be in <src_dir>/lib/python, cylc.flow.xtriggers, or in Python path.
These locations are checked in this order:
- <src_dir>/lib/python/
Expand All @@ -78,13 +80,17 @@ def get_func(func_name, src_dir):
Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.
Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module
"""
if func_name in _XTRIG_FUNCS:
return _XTRIG_FUNCS[func_name]
if (mod_name, func_name) in _XTRIG_FUNCS:
# Found and cached already.
return _XTRIG_FUNCS[(mod_name, func_name)]

# First look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
mod_name = func_name
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
except ImportError:
Expand All @@ -101,18 +107,20 @@ def get_func(func_name, src_dir):
raise

try:
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
_XTRIG_FUNCS[(mod_name, func_name)] = getattr(mod_by_name, func_name)
except AttributeError:
# Module func_name has no function func_name, nor an entry_point entry.
raise
return _XTRIG_FUNCS[func_name]
return _XTRIG_FUNCS[(mod_name, func_name)]


def run_function(func_name, json_args, json_kwargs, src_dir):
"""Run a Python function in the process pool.
func_name(*func_args, **func_kwargs)
The function is presumed to be in a module of the same name.
Redirect any function stdout to stderr (and workflow log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is. src_dir is for local modules.
Expand All @@ -121,7 +129,7 @@ def run_function(func_name, json_args, json_kwargs, src_dir):
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)
# Find and import then function.
func = get_func(func_name, src_dir)
func = get_func(func_name, func_name, src_dir)
# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,9 @@ def validate_xtrigger(
"""
fname: str = fctx.func_name

try:
func = get_func(fname, fdir)
func = get_func(fname, fname, fdir)
except ImportError:
raise XtriggerConfigError(
label,
Expand Down
31 changes: 31 additions & 0 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"""xtrigger function to trigger off of a wall clock time."""

from time import time
from cylc.flow.cycling.iso8601 import interval_parse
from cylc.flow.exceptions import WorkflowConfigError


def wall_clock(trigger_time=None):
Expand All @@ -27,3 +29,32 @@ def wall_clock(trigger_time=None):
Trigger time as seconds since Unix epoch.
"""
return time() > trigger_time


def validate_config(f_args, f_kwargs, f_signature):
"""Validate and manipulate args parsed from the workflow config.
wall_clock() # zero offset
wall_clock(PT1H)
wall_clock(offset=PT1H)
The offset must be a valid ISO 8601 interval.
If f_args used, convert to f_kwargs for clarity.
"""
n_args = len(f_args)
n_kwargs = len(f_kwargs)

if n_args + n_kwargs > 1:
raise WorkflowConfigError(f"xtrigger: too many args: {f_signature}")

if n_args:
f_kwargs["offset"] = f_args[0]
elif not n_kwargs:
f_kwargs["offset"] = "P0Y"

try:
interval_parse(f_kwargs["offset"])
except ValueError:
raise WorkflowConfigError(f"xtrigger: invalid offset: {f_signature}")
21 changes: 11 additions & 10 deletions tests/unit/test_subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ def test_xfunction(self):
with the_answer_file.open(mode="w") as f:
f.write("""the_answer = lambda: 42""")
f.flush()
fn = get_func("the_answer", temp_dir)
f_name = "the_answer"
fn = get_func(f_name, f_name, temp_dir)
result = fn()
self.assertEqual(42, result)

Expand All @@ -166,19 +167,18 @@ def test_xfunction_cache(self):
python_dir.mkdir(parents=True)
amandita_file = python_dir / "amandita.py"
with amandita_file.open(mode="w") as f:
f.write("""amandita = lambda: 'chocolate'""")
f.write("""choco = lambda: 'chocolate'""")
f.flush()
fn = get_func("amandita", temp_dir)
m_name = "amandita" # module
f_name = "choco" # function
fn = get_func(m_name, f_name, temp_dir)
result = fn()
self.assertEqual('chocolate', result)

# is in the cache
self.assertTrue('amandita' in _XTRIG_FUNCS)
self.assertTrue((m_name, f_name) in _XTRIG_FUNCS)
# returned from cache
self.assertEqual(fn, get_func("amandita", temp_dir))
del _XTRIG_FUNCS['amandita']
# is not in the cache
self.assertFalse('amandita' in _XTRIG_FUNCS)
self.assertEqual(fn, get_func(m_name, f_name, temp_dir))

def test_xfunction_import_error(self):
"""Test for error on importing a xtrigger function.
Expand All @@ -189,7 +189,7 @@ def test_xfunction_import_error(self):
"""
with TemporaryDirectory() as temp_dir:
with self.assertRaises(ModuleNotFoundError):
get_func("invalid-module-name", temp_dir)
get_func("invalid-module-name", "func-name", temp_dir)

def test_xfunction_attribute_error(self):
"""Test for error on looking for an attribute in a xtrigger script."""
Expand All @@ -200,8 +200,9 @@ def test_xfunction_attribute_error(self):
with the_answer_file.open(mode="w") as f:
f.write("""the_droid = lambda: 'excalibur'""")
f.flush()
f_name = "the_sword"
with self.assertRaises(AttributeError):
get_func("the_sword", temp_dir)
get_func(f_name, f_name, temp_dir)


@pytest.fixture
Expand Down

0 comments on commit d5bc789

Please sign in to comment.