Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xtrig arg validate #5955

Merged
merged 16 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5955.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support xtrigger argument validation.
42 changes: 20 additions & 22 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval

from cylc.flow.exceptions import (
CylcError,
InputError,
Expand Down Expand Up @@ -1718,28 +1719,25 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
else:
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)
else:
# Convert offset arg to kwarg for certainty later.
if "offset" not in xtrig.func_kwargs:
xtrig.func_kwargs["offset"] = None
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]
if (
xtrig.func_name == 'wall_clock'
and self.cycling_type == INTEGER_CYCLING_TYPE
):
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)

if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
# Generic xtrigger validation.
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)

if self.xtrigger_mgr:
# (not available during validation)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
Expand Down Expand Up @@ -2427,10 +2425,10 @@ def upgrade_clock_triggers(self):
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
args = [] if offset is None else [offset]
xtrig = SubFuncContext(label, 'wall_clock', args, {})
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,12 @@ class XtriggerConfigError(WorkflowConfigError):

"""

def __init__(self, label: str, trigger: str, message: str):
def __init__(self, label: str, message: str):
self.label: str = label
self.trigger: str = trigger
self.message: str = message

def __str__(self):
return f'[{self.label}] {self.message}'
def __str__(self) -> str:
return f'[@{self.label}] {self.message}'


class ClientError(CylcError):
Expand Down
11 changes: 7 additions & 4 deletions cylc/flow/scripts/function_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

(This command is for internal use.)

Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
defined in a module of the same name. Positional and keyword arguments must be
passed in as JSON strings. <src-dir> is the workflow source dir, needed to find
local xtrigger modules.
Run a Python xtrigger function "<name>(*args, **kwargs)" in the process pool.
It must be in a module of the same name. Positional and keyword arguments must
be passed in as JSON strings.

Python entry points are the preferred way to make xtriggers available to the
scheduler, but local xtriggers can be stored in <src-dir>.

"""
import sys

Expand Down
77 changes: 48 additions & 29 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.wallclock import get_current_time_string

_XTRIG_FUNCS: dict = {}
_XTRIG_MOD_CACHE: dict = {}
_XTRIG_FUNC_CACHE: dict = {}


def _killpg(proc, signal):
Expand All @@ -66,68 +67,86 @@ def _killpg(proc, signal):
return True


def get_func(func_name, src_dir):
"""Find and return an xtrigger function from a module of the same name.
def get_xtrig_mod(mod_name, src_dir):
"""Find, cache, and return a named xtrigger module.

These locations are checked in this order:
- <src_dir>/lib/python/
- `$CYLC_PYTHONPATH`
- defined via a `cylc.xtriggers` entry point for an
installed Python package.
Locations checked in this order:
- <src_dir>/lib/python (prepend to sys.path)
- $CYLC_PYTHONPATH (already in sys.path)
- `cylc.xtriggers` entry point

Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.
(Check entry point last so users can override with local implementations).

Workflow source dir passed in - this executes in an independent subprocess.

Raises:
ImportError, if the module is not found

"""
if func_name in _XTRIG_FUNCS:
return _XTRIG_FUNCS[func_name]
if mod_name in _XTRIG_MOD_CACHE:
# Found and cached already.
return _XTRIG_MOD_CACHE[mod_name]

# First look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
mod_name = func_name
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
_XTRIG_MOD_CACHE[mod_name] = __import__(mod_name, fromlist=[mod_name])
except ImportError:
# Look for xtriggers via entry_points for external sources.
# Do this after the lib/python and PYTHONPATH approaches to allow
# users to override entry_point definitions with local/custom
# implementations.
# Then entry point.
for entry_point in iter_entry_points('cylc.xtriggers'):
if func_name == entry_point.name:
_XTRIG_FUNCS[func_name] = entry_point.load()
return _XTRIG_FUNCS[func_name]

if mod_name == entry_point.name:
_XTRIG_MOD_CACHE[mod_name] = entry_point.load()
return _XTRIG_MOD_CACHE[mod_name]
# Still unable to find anything so abort
raise

try:
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
except AttributeError:
# Module func_name has no function func_name, nor an entry_point entry.
raise
return _XTRIG_FUNCS[func_name]
return _XTRIG_MOD_CACHE[mod_name]


def get_xtrig_func(mod_name, func_name, src_dir):
"""Find, cache, and return a function from an xtrigger module.

Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module

"""
if (mod_name, func_name) in _XTRIG_FUNC_CACHE:
return _XTRIG_FUNC_CACHE[(mod_name, func_name)]

mod = get_xtrig_mod(mod_name, src_dir)

_XTRIG_FUNC_CACHE[(mod_name, func_name)] = getattr(mod, func_name)

return _XTRIG_FUNC_CACHE[(mod_name, func_name)]


def run_function(func_name, json_args, json_kwargs, src_dir):
"""Run a Python function in the process pool.

func_name(*func_args, **func_kwargs)

The function is presumed to be in a module of the same name.

Redirect any function stdout to stderr (and workflow log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is. src_dir is for local modules.

"""
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)

# Find and import then function.
func = get_func(func_name, src_dir)
func = get_xtrig_func(func_name, func_name, src_dir)

# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
res = func(*func_args, **func_kwargs)

# Restore stdout.
sys.stdout = orig_stdout

# Write function return value as JSON to stdout.
sys.stdout.write(json.dumps(res))

Expand Down
Loading