From ca52d68ccc60be6c0d2adb78b7df3b16e9449e1f Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 15 Sep 2023 13:24:21 +1200 Subject: [PATCH 01/14] parentless sequential xtrigger spawning --- cylc/flow/cfgspec/workflow.py | 20 +++++- cylc/flow/config.py | 16 ++++- cylc/flow/scheduler.py | 5 ++ cylc/flow/task_pool.py | 64 ++++++++++++++++--- cylc/flow/task_proxy.py | 7 +- cylc/flow/unicode_rules.py | 1 + cylc/flow/xtrigger_mgr.py | 20 +++++- .../cylc-config/00-simple/section1.stdout | 2 + 8 files changed, 120 insertions(+), 15 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 96288808e59..f01f2837c92 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -814,8 +814,26 @@ def get_script_common_text(this: str, example: Optional[str] = None): Example:: - ``my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S`` + .. code-block:: cylc + + [[xtriggers]] + my_trig = my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S + [[[settings]]] + non-sequential xtriggers = my_trig + ''') + with Conf('settings', desc=''' + Section heading for xtrigger behavior settings. + ''') as Queue: + Conf('non-sequential xtriggers', VDR.V_STRING_LIST, desc=''' + A list of xtrigger labels whose xtrigger, of associated + parentless task, be checked out to the runahead limit. + + This allows for non-sequential xtrigger checking. + + A task with multiple xtriggers requires all labels + be specified to behave in this way. + ''') with Conf('graph', desc=f''' The workflow graph is defined under this section. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index f871531adeb..c65e77abf7b 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1705,7 +1705,13 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, self.taskdefs[right].add_dependency(dependency, seq) validator = XtriggerNameValidator.validate - for label in self.cfg['scheduling']['xtriggers']: + xtrigs = self.cfg['scheduling']['xtriggers'] + for label in xtrigs: + if ( + label == 'settings' + and not isinstance(xtrigs[label], SubFuncContext) + ): + continue valid, msg = validator(label) if not valid: raise WorkflowConfigError( @@ -1714,7 +1720,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, for label in xtrig_labels: try: - xtrig = self.cfg['scheduling']['xtriggers'][label] + xtrig = xtrigs[label] except KeyError: if label != 'wall_clock': raise WorkflowConfigError(f"xtrigger not defined: {label}") @@ -1740,6 +1746,12 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, self.taskdefs[right].add_xtrig_label(label, seq) + if self.xtrigger_mgr is not None: + with suppress(KeyError): + self.xtrigger_mgr.non_sequential_labels.update( + set(xtrigs['settings']['non-sequential xtriggers']) + ) + def get_actual_first_point(self, start_point): """Get actual first cycle point for the workflow diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index d630889a9d4..8afcbe3faa7 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -481,6 +481,7 @@ async def configure(self, params): self.config, self.workflow_db_mgr, self.task_events_mgr, + self.xtrigger_mgr, self.data_store_mgr, self.flow_mgr ) @@ -1748,6 +1749,10 @@ async def main_loop(self) -> None: if all(itask.is_ready_to_run()): self.pool.queue_task(itask) + if self.xtrigger_mgr.sequential_spawn_next: + # Spawn parentless tasks with sequentially checked xtrigger(s). + self.pool.spawn_parentless_sequential_xtriggers() + if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 1e97d3c3034..935487e73dd 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -90,6 +90,7 @@ from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.taskdef import TaskDef from cylc.flow.task_events_mgr import TaskEventsManager + from cylc.flow.xtrigger_mgr import XtriggerManager from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager from cylc.flow.flow_mgr import FlowMgr, FlowNums @@ -110,6 +111,7 @@ def __init__( config: 'WorkflowConfig', workflow_db_mgr: 'WorkflowDatabaseManager', task_events_mgr: 'TaskEventsManager', + xtrigger_mgr: 'XtriggerManager', data_store_mgr: 'DataStoreMgr', flow_mgr: 'FlowMgr' ) -> None: @@ -119,6 +121,7 @@ def __init__( self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr self.task_events_mgr: 'TaskEventsManager' = task_events_mgr self.task_events_mgr.spawn_func = self.spawn_on_output + self.xtrigger_mgr: 'XtriggerManager' = xtrigger_mgr self.data_store_mgr: 'DataStoreMgr' = data_store_mgr self.flow_mgr: 'FlowMgr' = flow_mgr @@ -267,7 +270,7 @@ def release_runahead_tasks(self): for itask in release_me: self.rh_release_and_queue(itask) - if itask.flow_nums: + if itask.flow_nums and not itask.is_xtrigger_sequential: self.spawn_to_rh_limit( itask.tdef, itask.tdef.next_point(itask.point), @@ -560,6 +563,15 @@ def load_db_task_pool_for_restart(self, row_idx, row): ) ) + # Set xtrigger checking type, which effects parentless spawning. + if ( + itask.tdef.is_parentless(itask.point) + and set(itask.state.xtriggers.keys()).difference( + self.xtrigger_mgr.non_sequential_labels + ) + ): + itask.is_xtrigger_sequential = True + if itask.state_reset(status, is_runahead=True): self.data_store_mgr.delta_task_runahead(itask) self.add_to_pool(itask) @@ -705,42 +717,65 @@ def get_or_spawn_task( ntask = self._get_task_by_id( Tokens(cycle=str(point), task=name).relative_id ) + is_in_pool = False if ntask is None: # ntask does not exist: spawn it in the flow. ntask = self.spawn_task(name, point, flow_nums, flow_wait) + # if the task was found set xtrigger checking type. + if ( + ntask is not None + and set(ntask.state.xtriggers.keys()).difference( + self.xtrigger_mgr.non_sequential_labels + ) + ): + ntask.is_xtrigger_sequential = True else: # ntask already exists (n=0): merge flows. + is_in_pool = True self.merge_flows(ntask, flow_nums) - return ntask # may be None + return ntask, is_in_pool # may be None def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: - """Spawn parentless task instances from point to runahead limit.""" + """Spawn parentless task instances from point to runahead limit. + + Sequentially checked xtriggers with spawn corresponding task out to the + next task with any xtrigger with the same behaviour, or to the runahead + limit (whichever occurs first). + + """ if not flow_nums or point is None: # Force-triggered no-flow task. # Or called with an invalid next_point. return if self.runahead_limit_point is None: self.compute_runahead() + + is_sequential = False while point is not None and (point <= self.runahead_limit_point): if tdef.is_parentless(point): - ntask = self.get_or_spawn_task( + ntask, is_in_pool = self.get_or_spawn_task( point, tdef.name, flow_nums ) if ntask is not None: - self.add_to_pool(ntask) + if not is_in_pool: + self.add_to_pool(ntask) self.rh_release_and_queue(ntask) + if ntask.is_xtrigger_sequential: + is_sequential = True + break point = tdef.next_point(point) # Once more for the runahead-limited task (don't release it). - self.spawn_if_parentless(tdef, point, flow_nums) + if not is_sequential: + self.spawn_if_parentless(tdef, point, flow_nums) def spawn_if_parentless(self, tdef, point, flow_nums): """Spawn a task if parentless, regardless of runahead limit.""" if flow_nums and point is not None and tdef.is_parentless(point): - ntask = self.get_or_spawn_task( + ntask, is_in_pool = self.get_or_spawn_task( point, tdef.name, flow_nums ) - if ntask is not None: + if ntask is not None and not is_in_pool: self.add_to_pool(ntask) def remove(self, itask, reason=None): @@ -2043,6 +2078,19 @@ def force_trigger_tasks( self.add_to_pool(itask) self._force_trigger(itask) + def spawn_parentless_sequential_xtriggers(self): + """Spawn successor(s) of parentless wall clock satisfied tasks.""" + while self.xtrigger_mgr.sequential_spawn_next: + taskid = self.xtrigger_mgr.sequential_spawn_next.pop() + itask = self._get_task_by_id(taskid) + # Will spawn out to RH limit or next parentless clock trigger + # or non-parentless. + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + def clock_expire_tasks(self): """Expire any tasks past their clock-expiry time.""" for itask in self.get_tasks(): diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 43fa210592f..39e1fd7fa8a 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -164,6 +164,9 @@ class TaskProxy: .transient: This is a transient proxy - not to be added to the task pool, but used e.g. to spawn children, or to get task-specific information. + .is_xtrigger_sequential: + A flag used to determine whether this task needs to wait for + xtrigger satisfaction to spawn. Args: tdef: The definition object of this task. @@ -206,7 +209,8 @@ class TaskProxy: 'try_timers', 'waiting_on_job_prep', 'mode_settings', - 'transient' + 'transient', + 'is_xtrigger_sequential', ] def __init__( @@ -242,6 +246,7 @@ def __init__( task=self.tdef.name, ) self.identity = self.tokens.relative_id + self.is_xtrigger_sequential = False self.reload_successor: Optional['TaskProxy'] = None self.point_as_seconds: Optional[int] = None diff --git a/cylc/flow/unicode_rules.py b/cylc/flow/unicode_rules.py index 4608559798d..9aab533c7f4 100644 --- a/cylc/flow/unicode_rules.py +++ b/cylc/flow/unicode_rules.py @@ -324,6 +324,7 @@ class XtriggerNameValidator(UnicodeRuleChecker): RULES = [ allowed_characters(r'a-zA-Z0-9', '_'), not_starts_with('_cylc'), + not_equals('settings'), ] diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index b891165dbd3..a77faf842a3 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -217,6 +217,14 @@ def __init__( # Signatures of active functions (waiting on callback). self.active: list = [] + # Clock labels, to avoid repeated string comparisons + self.wall_clock_labels: set = set() + # Labels whose xtrigger will be checked out to the RH limit. + self.non_sequential_labels: set = set() + # Gather parentless tasks whose xtrigger(s) have been satisfied + # (these will be used to spawn the next occurance). + self.sequential_spawn_next: set = set() + self.workflow_run_dir = workflow_run_dir # For function arg templating. @@ -365,6 +373,8 @@ def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: """ self.functx_map[label] = fctx + if fctx.func_name == "wall_clock": + self.wall_clock_labels.add(label) def mutate_trig(self, label, kwargs): self.functx_map[label].func_kwargs.update(kwargs) @@ -433,7 +443,7 @@ def get_xtrig_ctx( args = [] kwargs = {} - if ctx.func_name == "wall_clock": + if label in self.wall_clock_labels: if "trigger_time" in ctx.func_kwargs: # noqa: SIM401 (readabilty) # Internal (retry timer): trigger_time already set. kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"] @@ -472,14 +482,16 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): - # Special case: quick synchronous clock check: - if sig.startswith("wall_clock"): + if label in self.wall_clock_labels: + # Special case: quick synchronous clock check. if sig in self.sat_xtrig: # Already satisfied, just update the task itask.state.xtriggers[label] = True elif _wall_clock(*ctx.func_args, **ctx.func_kwargs): # Newly satisfied itask.state.xtriggers[label] = True + if itask.is_xtrigger_sequential: + self.sequential_spawn_next.add(itask.identity) self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) self.workflow_db_mgr.put_xtriggers({sig: {}}) @@ -502,6 +514,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): [itask.tdef.name], xtrigger_env ) + if itask.is_xtrigger_sequential: + self.sequential_spawn_next.add(itask.identity) continue # Call the function to check the unsatisfied xtrigger. diff --git a/tests/functional/cylc-config/00-simple/section1.stdout b/tests/functional/cylc-config/00-simple/section1.stdout index 972b7a55606..e6e21d3d84b 100644 --- a/tests/functional/cylc-config/00-simple/section1.stdout +++ b/tests/functional/cylc-config/00-simple/section1.stdout @@ -16,5 +16,7 @@ runahead limit = P4 clock-expire = sequential = [[xtriggers]] + [[[settings]]] + non-sequential xtriggers = [[graph]] R1 = OPS:finish-all => VAR From 80208b7e8c026248fa658cc7ca57bd2ea155bcfa Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 21 Sep 2023 18:00:33 +1200 Subject: [PATCH 02/14] default reversed, xtrigger argument added --- cylc/flow/cfgspec/workflow.py | 36 ++++++------- cylc/flow/config.py | 15 ++---- cylc/flow/task_pool.py | 8 +-- cylc/flow/unicode_rules.py | 1 - cylc/flow/xtrigger_mgr.py | 54 +++++++++++++++++-- .../cylc-config/00-simple/section1.stdout | 3 +- 6 files changed, 73 insertions(+), 44 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index f01f2837c92..e9558bad8ba 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -802,9 +802,21 @@ def get_script_common_text(this: str, example: Optional[str] = None): :ref:`SequentialTasks`. ''') + Conf('sequential xtriggers default', VDR.V_BOOLEAN, False, desc=''' + Set to ``True``, this allows for sequential spawning of associated + parentless tasks on xtrigger satisfaction. + Instead of out to the runahead limit (default: ``False``). + + This workflow wide default can be overridden by a reserved + keyword argument in the xtrigger function declaration + (``sequential=True/False``). + + The presence of one sequential xtrigger on a parentless task with + multiple xtriggers will cause sequential behavior. + ''') with Conf('xtriggers', desc=''' - This section is for *External Trigger* function declarations - - see :ref:`Section External Triggers`. + This section is for *External Trigger* function declarations - + see :ref:`Section External Triggers`. '''): Conf('', VDR.V_XTRIGGER, desc=''' Any user-defined event trigger function declarations and @@ -814,26 +826,8 @@ def get_script_common_text(this: str, example: Optional[str] = None): Example:: - .. code-block:: cylc - - [[xtriggers]] - my_trig = my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S - [[[settings]]] - non-sequential xtriggers = my_trig - + ``my_trigger(arg1, arg2, kwarg1, kwarg2):PT10S`` ''') - with Conf('settings', desc=''' - Section heading for xtrigger behavior settings. - ''') as Queue: - Conf('non-sequential xtriggers', VDR.V_STRING_LIST, desc=''' - A list of xtrigger labels whose xtrigger, of associated - parentless task, be checked out to the runahead limit. - - This allows for non-sequential xtrigger checking. - - A task with multiple xtriggers requires all labels - be specified to behave in this way. - ''') with Conf('graph', desc=f''' The workflow graph is defined under this section. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index c65e77abf7b..6274dfa01d8 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1707,17 +1707,16 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, validator = XtriggerNameValidator.validate xtrigs = self.cfg['scheduling']['xtriggers'] for label in xtrigs: - if ( - label == 'settings' - and not isinstance(xtrigs[label], SubFuncContext) - ): - continue valid, msg = validator(label) if not valid: raise WorkflowConfigError( f'Invalid xtrigger name "{label}" - {msg}' ) + if self.xtrigger_mgr is not None: + self.xtrigger_mgr.sequential_xtriggers_default = ( + self.cfg['scheduling']['sequential xtriggers default'] + ) for label in xtrig_labels: try: xtrig = xtrigs[label] @@ -1746,12 +1745,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, self.taskdefs[right].add_xtrig_label(label, seq) - if self.xtrigger_mgr is not None: - with suppress(KeyError): - self.xtrigger_mgr.non_sequential_labels.update( - set(xtrigs['settings']['non-sequential xtriggers']) - ) - def get_actual_first_point(self, start_point): """Get actual first cycle point for the workflow diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 935487e73dd..8e45bb8b6fd 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -566,8 +566,8 @@ def load_db_task_pool_for_restart(self, row_idx, row): # Set xtrigger checking type, which effects parentless spawning. if ( itask.tdef.is_parentless(itask.point) - and set(itask.state.xtriggers.keys()).difference( - self.xtrigger_mgr.non_sequential_labels + and set(itask.state.xtriggers.keys()).intersection( + self.xtrigger_mgr.sequential_xtrigger_labels ) ): itask.is_xtrigger_sequential = True @@ -724,8 +724,8 @@ def get_or_spawn_task( # if the task was found set xtrigger checking type. if ( ntask is not None - and set(ntask.state.xtriggers.keys()).difference( - self.xtrigger_mgr.non_sequential_labels + and set(ntask.state.xtriggers.keys()).intersection( + self.xtrigger_mgr.sequential_xtrigger_labels ) ): ntask.is_xtrigger_sequential = True diff --git a/cylc/flow/unicode_rules.py b/cylc/flow/unicode_rules.py index 9aab533c7f4..4608559798d 100644 --- a/cylc/flow/unicode_rules.py +++ b/cylc/flow/unicode_rules.py @@ -324,7 +324,6 @@ class XtriggerNameValidator(UnicodeRuleChecker): RULES = [ allowed_characters(r'a-zA-Z0-9', '_'), not_starts_with('_cylc'), - not_equals('settings'), ] diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index a77faf842a3..40c5554bc09 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -16,7 +16,7 @@ from contextlib import suppress from enum import Enum -from inspect import signature +from inspect import getfullargspec, signature import json import re from copy import deepcopy @@ -186,6 +186,32 @@ class XtriggerManager: managed uniquely - i.e. many tasks depending on the same clock trigger (with same offset from cycle point) get satisfied by the same call. + Parentless tasks with xtrigger(s) are, by default, spawned out to the + runahead limit. This results in non-sequential, and potentially + unnecessary, checking out to this limit (and may introduce clutter to + user interfaces). An option to make this sequential is now available, + by changing the default for all xtriggers in a workflow, and a way to + override this default with a (reserved) keyword function argument + (i.e. "sequential=True/False"): + + # Example: + [scheduling] + sequential xtrigger default = True + [[xtriggers]] + clock_0 = wall_clock() # offset PT0H + clock_1 = wall_clock(offset=PT1H) + # or wall_clock(PT1H) + workflow_x = workflow_state( + workflow=other, + point=%(task_cycle_point)s, + sequential=False + ):PT30S + [[graph]] + PT1H = ''' + @clock_1 & @workflow_x => foo & bar + @wall_clock = baz # pre-defined zero-offset clock + ''' + Args: workflow: workflow name user: workflow owner @@ -219,8 +245,11 @@ def __init__( # Clock labels, to avoid repeated string comparisons self.wall_clock_labels: set = set() - # Labels whose xtrigger will be checked out to the RH limit. - self.non_sequential_labels: set = set() + + # Workflow wide default, used when not specified in xtrigger kwargs. + self.sequential_xtriggers_default = False + # Labels whose xtriggers are sequentially checked. + self.sequential_xtrigger_labels: set = set() # Gather parentless tasks whose xtrigger(s) have been satisfied # (these will be used to spawn the next occurance). self.sequential_spawn_next: set = set() @@ -271,6 +300,7 @@ def check_xtrigger( * If the function module was not found. * If the function was not found in the xtrigger module. * If the function is not callable. + * If the function is not callable. * If any string template in the function context arguments are not present in the expected template values. * If the arguments do not match the function signature. @@ -293,6 +323,15 @@ def check_xtrigger( raise XtriggerConfigError( label, f"'{fname}' not callable in xtrigger module '{fname}'", ) + if 'sequential' in getfullargspec(func).args: + raise XtriggerConfigError( + label, + fname, + ( + f"xtrigger module '{fname}' contains reserved" + " argument name 'sequential'" + ), + ) # Validate args and kwargs against the function signature sig_str = fctx.get_signature() @@ -373,6 +412,11 @@ def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: """ self.functx_map[label] = fctx + if fctx.func_kwargs.pop( + 'sequential', + self.sequential_xtriggers_default + ): + self.sequential_xtrigger_labels.add(label) if fctx.func_name == "wall_clock": self.wall_clock_labels.add(label) @@ -490,12 +534,12 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): elif _wall_clock(*ctx.func_args, **ctx.func_kwargs): # Newly satisfied itask.state.xtriggers[label] = True - if itask.is_xtrigger_sequential: - self.sequential_spawn_next.add(itask.identity) self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) self.workflow_db_mgr.put_xtriggers({sig: {}}) LOG.info('xtrigger satisfied: %s = %s', label, sig) + if itask.is_xtrigger_sequential: + self.sequential_spawn_next.add(itask.identity) self.do_housekeeping = True continue # General case: potentially slow asynchronous function call. diff --git a/tests/functional/cylc-config/00-simple/section1.stdout b/tests/functional/cylc-config/00-simple/section1.stdout index e6e21d3d84b..8695889440e 100644 --- a/tests/functional/cylc-config/00-simple/section1.stdout +++ b/tests/functional/cylc-config/00-simple/section1.stdout @@ -6,6 +6,7 @@ hold after cycle point = stop after cycle point = cycling mode = integer runahead limit = P4 +sequential xtriggers default = False [[queues]] [[[default]]] limit = 100 @@ -16,7 +17,5 @@ runahead limit = P4 clock-expire = sequential = [[xtriggers]] - [[[settings]]] - non-sequential xtriggers = [[graph]] R1 = OPS:finish-all => VAR From 1471506aeb53c7153ed57f0b7604233370e365fb Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 22 Sep 2023 12:31:21 +1200 Subject: [PATCH 03/14] add sequential arg option with wall_clock default True --- cylc/flow/cfgspec/workflow.py | 4 ++-- cylc/flow/xtrigger_mgr.py | 30 +++++++++++++++++++++--------- cylc/flow/xtriggers/wall_clock.py | 4 +++- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index e9558bad8ba..cf211d35f70 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -808,8 +808,8 @@ def get_script_common_text(this: str, example: Optional[str] = None): Instead of out to the runahead limit (default: ``False``). This workflow wide default can be overridden by a reserved - keyword argument in the xtrigger function declaration - (``sequential=True/False``). + keyword argument in the xtrigger function declaration and/or + function (``sequential=True/False``). The presence of one sequential xtrigger on a parentless task with multiple xtriggers will cause sequential behavior. diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 40c5554bc09..a60eba83800 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -323,15 +323,27 @@ def check_xtrigger( raise XtriggerConfigError( label, f"'{fname}' not callable in xtrigger module '{fname}'", ) - if 'sequential' in getfullargspec(func).args: - raise XtriggerConfigError( - label, - fname, - ( - f"xtrigger module '{fname}' contains reserved" - " argument name 'sequential'" - ), - ) + x_argspec = getfullargspec(func) + if 'sequential' in x_argspec.args: + if ( + x_argspec.defaults is None + or not isinstance( + x_argspec.defaults[x_argspec.args.index('sequential')], + bool + ) + ): + raise XtriggerConfigError( + label, + fname, + ( + f"xtrigger module '{fname}' contains reserved argument" + " name 'sequential' that has no boolean default" + ), + ) + elif 'sequential' not in fctx.func_kwargs: + fctx.func_kwargs['sequential'] = x_argspec.defaults[ + x_argspec.args.index('sequential') + ] # Validate args and kwargs against the function signature sig_str = fctx.get_signature() diff --git a/cylc/flow/xtriggers/wall_clock.py b/cylc/flow/xtriggers/wall_clock.py index d5d1a009154..91ba49a8961 100644 --- a/cylc/flow/xtriggers/wall_clock.py +++ b/cylc/flow/xtriggers/wall_clock.py @@ -22,7 +22,7 @@ from cylc.flow.exceptions import WorkflowConfigError -def wall_clock(offset: str = 'PT0S'): +def wall_clock(offset: str = 'PT0S', sequential: bool = True): """Trigger at a specific real "wall clock" time relative to the cycle point in the graph. @@ -48,6 +48,8 @@ def _wall_clock(trigger_time: int) -> bool: Args: trigger_time: Trigger time as seconds since Unix epoch. + sequential (bool): + Used by the workflow to flag corresponding xtriggers as sequential. """ return time() > trigger_time From bb2d786b52a63b53cfc5a3542e310df0ed26e181 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 6 Oct 2023 21:09:28 +1300 Subject: [PATCH 04/14] handle restart/reload/remove --- cylc/flow/task_pool.py | 66 ++++++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 8e45bb8b6fd..0c926d355c3 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -706,34 +706,44 @@ def rh_release_and_queue(self, itask) -> None: def get_or_spawn_task( self, point: 'PointBase', - name: str, + tdef: 'TaskDef', flow_nums: 'FlowNums', flow_wait: bool = False - ) -> Optional[TaskProxy]: + ) -> 'Tuple[Optional[TaskProxy], bool, bool]': """Return new or existing task point/name with merged flow_nums. It does not add a spawned task proxy to the pool. """ ntask = self._get_task_by_id( - Tokens(cycle=str(point), task=name).relative_id + Tokens(cycle=str(point), task=tdef.name).relative_id ) is_in_pool = False + is_xtrig_sequential = False if ntask is None: # ntask does not exist: spawn it in the flow. - ntask = self.spawn_task(name, point, flow_nums, flow_wait) + ntask = self.spawn_task(tdef.name, point, flow_nums, flow_wait) # if the task was found set xtrigger checking type. - if ( - ntask is not None - and set(ntask.state.xtriggers.keys()).intersection( + if ntask is not None: + if set(ntask.state.xtriggers.keys()).intersection( self.xtrigger_mgr.sequential_xtrigger_labels - ) + ): + ntask.is_xtrigger_sequential = True + is_xtrig_sequential = True + elif { + xtrig_label + for sequence, xtrig_labels in tdef.xtrig_labels.items() + for xtrig_label in xtrig_labels + if sequence.is_valid(point) + }.intersection( + self.xtrigger_mgr.sequential_xtrigger_labels ): - ntask.is_xtrigger_sequential = True + is_xtrig_sequential = True else: # ntask already exists (n=0): merge flows. is_in_pool = True self.merge_flows(ntask, flow_nums) - return ntask, is_in_pool # may be None + is_xtrig_sequential = ntask.is_xtrigger_sequential + return ntask, is_in_pool, is_xtrig_sequential # may be None def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: """Spawn parentless task instances from point to runahead limit. @@ -750,30 +760,33 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: if self.runahead_limit_point is None: self.compute_runahead() - is_sequential = False + is_xtrig_sequential = False while point is not None and (point <= self.runahead_limit_point): if tdef.is_parentless(point): - ntask, is_in_pool = self.get_or_spawn_task( - point, tdef.name, flow_nums + ntask, is_in_pool, is_xtrig_sequential = ( + self.get_or_spawn_task( + point, + tdef, + flow_nums + ) ) if ntask is not None: if not is_in_pool: self.add_to_pool(ntask) self.rh_release_and_queue(ntask) - if ntask.is_xtrigger_sequential: - is_sequential = True - break + if is_xtrig_sequential: + break point = tdef.next_point(point) # Once more for the runahead-limited task (don't release it). - if not is_sequential: + if not is_xtrig_sequential: self.spawn_if_parentless(tdef, point, flow_nums) def spawn_if_parentless(self, tdef, point, flow_nums): """Spawn a task if parentless, regardless of runahead limit.""" if flow_nums and point is not None and tdef.is_parentless(point): - ntask, is_in_pool = self.get_or_spawn_task( - point, tdef.name, flow_nums + ntask, is_in_pool, _ = self.get_or_spawn_task( + point, tdef, flow_nums ) if ntask is not None and not is_in_pool: self.add_to_pool(ntask) @@ -994,6 +1007,14 @@ def reload_taskdefs(self, config: 'WorkflowConfig') -> None: self.check_task_output, ) self._swap_out(new_task) + # Set xtrigger checking type for parentless spawning. + if ( + new_task.tdef.is_parentless(new_task.point) + and set(new_task.state.xtriggers.keys()).intersection( + self.xtrigger_mgr.sequential_xtrigger_labels + ) + ): + new_task.is_xtrigger_sequential = True self.data_store_mgr.delta_task_prerequisite(new_task) LOG.info(f"[{itask}] reloaded task definition") if itask.state(*TASK_STATUSES_ACTIVE): @@ -1948,6 +1969,13 @@ def remove_tasks(self, items): """Remove tasks from the pool (forced by command).""" itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: + # Spawn next occurance of xtrigger sequential task. + if itask.is_xtrigger_sequential: + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) self.remove(itask, 'request') if self.compute_runahead(): self.release_runahead_tasks() From edd5d99f7b9db723f308ee56465e7b64839d2750 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 29 Nov 2023 20:45:33 +1300 Subject: [PATCH 05/14] review fixes --- cylc/flow/data_store_mgr.py | 5 ++- cylc/flow/task_pool.py | 69 +++++++++++++++++++++---------------- cylc/flow/task_proxy.py | 16 +++++++-- 3 files changed, 58 insertions(+), 32 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 7a59e76d7a4..da5119bb707 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -1192,7 +1192,10 @@ def generate_ghost_task( point, flow_nums, submit_num=0, - data_mode=True + data_mode=True, + sequential_xtrigger_labels=( + self.schd.xtrigger_mgr.sequential_xtrigger_labels + ), ) is_orphan = False diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 0c926d355c3..a0db1e7c104 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -480,7 +480,10 @@ def load_db_task_pool_for_restart(self, row_idx, row): submit_num=submit_num, is_late=bool(is_late), flow_wait=bool(flow_wait), - is_manual_submit=bool(is_manual_submit) + is_manual_submit=bool(is_manual_submit), + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) except WorkflowConfigError: @@ -563,15 +566,6 @@ def load_db_task_pool_for_restart(self, row_idx, row): ) ) - # Set xtrigger checking type, which effects parentless spawning. - if ( - itask.tdef.is_parentless(itask.point) - and set(itask.state.xtriggers.keys()).intersection( - self.xtrigger_mgr.sequential_xtrigger_labels - ) - ): - itask.is_xtrigger_sequential = True - if itask.state_reset(status, is_runahead=True): self.data_store_mgr.delta_task_runahead(itask) self.add_to_pool(itask) @@ -712,6 +706,18 @@ def get_or_spawn_task( ) -> 'Tuple[Optional[TaskProxy], bool, bool]': """Return new or existing task point/name with merged flow_nums. + Returns: + tuple - (itask, is_in_pool, is_xtrig_sequential) + + itask: + The requested task proxy, or None if task does not + exist or cannot spawn. + is_in_pool: + Was the task found in a pool. + is_xtrig_sequential: + Is the next task occurance spawned on xtrigger satisfaction, + or do all occurances spawn out to the runahead limit. + It does not add a spawned task proxy to the pool. """ ntask = self._get_task_by_id( @@ -723,19 +729,15 @@ def get_or_spawn_task( # ntask does not exist: spawn it in the flow. ntask = self.spawn_task(tdef.name, point, flow_nums, flow_wait) # if the task was found set xtrigger checking type. + # otherwise find the xtrigger type if it can't spawn + # for whatever reason. if ntask is not None: - if set(ntask.state.xtriggers.keys()).intersection( - self.xtrigger_mgr.sequential_xtrigger_labels - ): - ntask.is_xtrigger_sequential = True - is_xtrig_sequential = True - elif { - xtrig_label + is_xtrig_sequential = ntask.is_xtrigger_sequential + elif any( + xtrig_label in self.xtrigger_mgr.sequential_xtrigger_labels for sequence, xtrig_labels in tdef.xtrig_labels.items() for xtrig_label in xtrig_labels if sequence.is_valid(point) - }.intersection( - self.xtrigger_mgr.sequential_xtrigger_labels ): is_xtrig_sequential = True else: @@ -743,7 +745,8 @@ def get_or_spawn_task( is_in_pool = True self.merge_flows(ntask, flow_nums) is_xtrig_sequential = ntask.is_xtrigger_sequential - return ntask, is_in_pool, is_xtrig_sequential # may be None + # ntask may still be None + return ntask, is_in_pool, is_xtrig_sequential def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: """Spawn parentless task instances from point to runahead limit. @@ -807,6 +810,13 @@ def remove(self, itask, reason=None): msg = "task completed" else: msg = f"removed ({reason})" + + if itask.is_xtrigger_sequential: + with suppress(ValueError): + self.xtrigger_mgr.sequential_spawn_next.remove( + itask.identity + ) + try: del self.active_tasks[itask.point][itask.identity] except KeyError: @@ -1001,20 +1011,15 @@ def reload_taskdefs(self, config: 'WorkflowConfig') -> None: itask.point, itask.flow_nums, itask.state.status, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) itask.copy_to_reload_successor( new_task, self.check_task_output, ) self._swap_out(new_task) - # Set xtrigger checking type for parentless spawning. - if ( - new_task.tdef.is_parentless(new_task.point) - and set(new_task.state.xtriggers.keys()).intersection( - self.xtrigger_mgr.sequential_xtrigger_labels - ) - ): - new_task.is_xtrigger_sequential = True self.data_store_mgr.delta_task_prerequisite(new_task) LOG.info(f"[{itask}] reloaded task definition") if itask.state(*TASK_STATUSES_ACTIVE): @@ -1721,7 +1726,10 @@ def _get_task_proxy_db_outputs( flow_wait=flow_wait, submit_num=submit_num, transient=transient, - is_manual_submit=is_manual_submit + is_manual_submit=is_manual_submit, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) if itask is None: return None @@ -2092,6 +2100,9 @@ def force_trigger_tasks( flow_nums, flow_wait=flow_wait, submit_num=submit_num, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) if itask is None: continue diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 39e1fd7fa8a..e1c1e77dc39 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -226,7 +226,8 @@ def __init__( is_manual_submit: bool = False, flow_wait: bool = False, data_mode: bool = False, - transient: bool = False + transient: bool = False, + sequential_xtrigger_labels: Optional[Set[str]] = None, ) -> None: self.tdef = tdef @@ -246,7 +247,6 @@ def __init__( task=self.tdef.name, ) self.identity = self.tokens.relative_id - self.is_xtrigger_sequential = False self.reload_successor: Optional['TaskProxy'] = None self.point_as_seconds: Optional[int] = None @@ -289,6 +289,18 @@ def __init__( self.state = TaskState(tdef, self.point, status, is_held) + # Set xtrigger checking type, which effects parentless spawning. + if ( + sequential_xtrigger_labels + and self.tdef.is_parentless(start_point) + and set(self.state.xtriggers.keys()).intersection( + sequential_xtrigger_labels + ) + ): + self.is_xtrigger_sequential = True + else: + self.is_xtrigger_sequential = False + # Determine graph children of this task (for spawning). if data_mode: self.graph_children = {} From 6142ac78a1c8e596c4de185414c553b75ac1ed03 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 22 Sep 2023 11:54:09 +1200 Subject: [PATCH 06/14] tests added --- tests/functional/xtriggers/04-sequential.t | 110 +++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 tests/functional/xtriggers/04-sequential.t diff --git a/tests/functional/xtriggers/04-sequential.t b/tests/functional/xtriggers/04-sequential.t new file mode 100644 index 00000000000..c79a4da6176 --- /dev/null +++ b/tests/functional/xtriggers/04-sequential.t @@ -0,0 +1,110 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Test xtrigger sequential spawning - +# + +. "$(dirname "$0")/test_header" + +set_test_number 7 + +# Test workflow uses built-in 'echo' xtrigger. +init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__' +[scheduler] + cycle point format = %Y + allow implicit tasks = True +[scheduling] + initial cycle point = 3000 + runahead limit = P5 + sequential xtriggers default = True + [[xtriggers]] + clock_1 = wall_clock(offset=P2Y, sequential=False) + clock_2 = wall_clock() + up_1 = workflow_state(\ + workflow=%(workflow)s, \ + task=b, \ + point=%(point)s, \ + offset=-P1Y, \ + sequential=False \ + ):PT1S + [[graph]] + R1 = """ +@clock_1 => a +b +""" + +P1Y/P1Y = """ +@clock_2 => a +@clock_2 => b +@up_1 => c +""" +__FLOW_CONFIG__ + +run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}" + +# Run workflow; it will stall waiting on the never-satisfied xtriggers. +cylc play "${WORKFLOW_NAME}" + +poll_grep_workflow_log -E '3001/c/.* => succeeded' + +cylc stop --max-polls=10 --interval=2 "${WORKFLOW_NAME}" + +cylc play "${WORKFLOW_NAME}" + +cylc show "${WORKFLOW_NAME}//3001/a" | grep -E 'state: ' > 3001.a.log +cylc show "${WORKFLOW_NAME}//3002/a" 2>&1 >/dev/null \ + | grep -E 'No matching' > 3002.a.log + +# 3001/a should be spawned at both 3000/3001. +cmp_ok 3001.a.log - <<__END__ +state: waiting +__END__ +# 3002/a should not exist. +cmp_ok 3002.a.log - <<__END__ +No matching active tasks found: 3002/a +__END__ + +cylc reload "${WORKFLOW_NAME}" + +cylc remove "${WORKFLOW_NAME}//3001/b" + +cylc show "${WORKFLOW_NAME}//3002/b" | grep -E 'state: ' > 3002.b.log +cylc show "${WORKFLOW_NAME}//3003/b" 2>&1 >/dev/null \ + | grep -E 'No matching' > 3003.b.log + +# 3002/b should be only at 3002. +cmp_ok 3002.b.log - <<__END__ +state: waiting +__END__ +cmp_ok 3003.b.log - <<__END__ +No matching active tasks found: 3003/b +__END__ + +cylc show "${WORKFLOW_NAME}//3002/c" | grep -E 'state: ' > 3002.c.log +cylc show "${WORKFLOW_NAME}//3005/c" | grep -E 'state: ' > 3005.c.log + +# c should be from 3002-3005. +cmp_ok 3002.c.log - <<__END__ +state: waiting +__END__ +cmp_ok 3005.c.log - <<__END__ +state: waiting +__END__ + + +cylc stop --now --max-polls=10 --interval=2 "${WORKFLOW_NAME}" +purge +exit From 1573d002a84680335d20c874d4035187795ad771 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 30 Nov 2023 01:23:03 +1300 Subject: [PATCH 07/14] Review fixes 2 Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/cfgspec/workflow.py | 9 +++++---- cylc/flow/task_pool.py | 14 ++++++-------- cylc/flow/task_proxy.py | 11 +++-------- cylc/flow/xtrigger_mgr.py | 16 +++++++++++----- tests/functional/xtriggers/04-sequential.t | 14 +++++++------- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index cf211d35f70..0e80f554dea 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -803,11 +803,12 @@ def get_script_common_text(this: str, example: Optional[str] = None): ''') Conf('sequential xtriggers default', VDR.V_BOOLEAN, False, desc=''' - Set to ``True``, this allows for sequential spawning of associated - parentless tasks on xtrigger satisfaction. - Instead of out to the runahead limit (default: ``False``). + When set to ``True``, parentless tasks that trigger off xtriggers + will only spawn sequentially, i.e. on the satisfaction of the + xtriggers in order. Otherwise, these tasks will all spawn at the + same time up to the runahead limit. - This workflow wide default can be overridden by a reserved + This workflow-wide default can be overridden by a reserved keyword argument in the xtrigger function declaration and/or function (``sequential=True/False``). diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index a0db1e7c104..456caa4c939 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -751,9 +751,10 @@ def get_or_spawn_task( def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: """Spawn parentless task instances from point to runahead limit. - Sequentially checked xtriggers with spawn corresponding task out to the - next task with any xtrigger with the same behaviour, or to the runahead - limit (whichever occurs first). + Sequentially checked xtriggers will spawn the next occurrence of their + corresponding tasks. These tasks will keep spawning until they depend + on any unsatisfied xtrigger of the same sequential behavior, are no + longer parentless, and/or hit the runahead limit. """ if not flow_nums or point is None: @@ -812,10 +813,7 @@ def remove(self, itask, reason=None): msg = f"removed ({reason})" if itask.is_xtrigger_sequential: - with suppress(ValueError): - self.xtrigger_mgr.sequential_spawn_next.remove( - itask.identity - ) + self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity) try: del self.active_tasks[itask.point][itask.identity] @@ -1977,7 +1975,7 @@ def remove_tasks(self, items): """Remove tasks from the pool (forced by command).""" itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: - # Spawn next occurance of xtrigger sequential task. + # Spawn next occurrence of xtrigger sequential task. if itask.is_xtrigger_sequential: self.spawn_to_rh_limit( itask.tdef, diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index e1c1e77dc39..898017c8da1 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -290,16 +290,11 @@ def __init__( self.state = TaskState(tdef, self.point, status, is_held) # Set xtrigger checking type, which effects parentless spawning. - if ( + self.is_xtrigger_sequential = bool( sequential_xtrigger_labels and self.tdef.is_parentless(start_point) - and set(self.state.xtriggers.keys()).intersection( - sequential_xtrigger_labels - ) - ): - self.is_xtrigger_sequential = True - else: - self.is_xtrigger_sequential = False + and sequential_xtrigger_labels.intersection(self.state.xtriggers) + ) # Determine graph children of this task (for spawning). if data_mode: diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index a60eba83800..d27054323e9 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -21,7 +21,14 @@ import re from copy import deepcopy from time import time -from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING +from typing import ( + Any, + Dict, + Optional, + Set, + Tuple, + TYPE_CHECKING +) from cylc.flow import LOG from cylc.flow.exceptions import XtriggerConfigError @@ -244,15 +251,15 @@ def __init__( self.active: list = [] # Clock labels, to avoid repeated string comparisons - self.wall_clock_labels: set = set() + self.wall_clock_labels: Set[str] = set() # Workflow wide default, used when not specified in xtrigger kwargs. self.sequential_xtriggers_default = False # Labels whose xtriggers are sequentially checked. - self.sequential_xtrigger_labels: set = set() + self.sequential_xtrigger_labels: Set[str] = set() # Gather parentless tasks whose xtrigger(s) have been satisfied # (these will be used to spawn the next occurance). - self.sequential_spawn_next: set = set() + self.sequential_spawn_next: Set[str] = set() self.workflow_run_dir = workflow_run_dir @@ -300,7 +307,6 @@ def check_xtrigger( * If the function module was not found. * If the function was not found in the xtrigger module. * If the function is not callable. - * If the function is not callable. * If any string template in the function context arguments are not present in the expected template values. * If the arguments do not match the function signature. diff --git a/tests/functional/xtriggers/04-sequential.t b/tests/functional/xtriggers/04-sequential.t index c79a4da6176..7922e7ff807 100644 --- a/tests/functional/xtriggers/04-sequential.t +++ b/tests/functional/xtriggers/04-sequential.t @@ -43,14 +43,14 @@ init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__' ):PT1S [[graph]] R1 = """ -@clock_1 => a -b -""" + @clock_1 => a + b + """ +P1Y/P1Y = """ -@clock_2 => a -@clock_2 => b -@up_1 => c -""" + @clock_2 => a + @clock_2 => b + @up_1 => c + """ __FLOW_CONFIG__ run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}" From a178fa5b48ecc0d52ac45f6f42edf698d2d99e86 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Tue, 13 Feb 2024 22:36:56 +1300 Subject: [PATCH 08/14] Integration tests added --- cylc/flow/task_pool.py | 32 +++- cylc/flow/xtrigger_mgr.py | 1 + .../integration/test_sequential_xtriggers.py | 138 ++++++++++++++++++ 3 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_sequential_xtriggers.py diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 456caa4c939..009d498f582 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -814,6 +814,9 @@ def remove(self, itask, reason=None): if itask.is_xtrigger_sequential: self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity) + self.xtrigger_mgr.sequential_has_spawned_next.discard( + itask.identity + ) try: del self.active_tasks[itask.point][itask.identity] @@ -1976,7 +1979,16 @@ def remove_tasks(self, items): itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: # Spawn next occurrence of xtrigger sequential task. - if itask.is_xtrigger_sequential: + if ( + itask.is_xtrigger_sequential + and ( + itask.identity not in + self.xtrigger_mgr.sequential_has_spawned_next + ) + ): + self.xtrigger_mgr.sequential_has_spawned_next.add( + itask.identity + ) self.spawn_to_rh_limit( itask.tdef, itask.tdef.next_point(itask.point), @@ -2038,6 +2050,23 @@ def _force_trigger(self, itask): itask.tdef.next_point(itask.point), itask.flow_nums ) + # Task may be set running before xtrigger is satisfied, + # if so check/spawn if xtrigger sequential. + elif ( + itask.is_xtrigger_sequential + and ( + itask.identity not in + self.xtrigger_mgr.sequential_has_spawned_next + ) + ): + self.xtrigger_mgr.sequential_has_spawned_next.add( + itask.identity + ) + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) else: # De-queue it to run now. self.task_queue_mgr.force_release_task(itask) @@ -2119,6 +2148,7 @@ def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" while self.xtrigger_mgr.sequential_spawn_next: taskid = self.xtrigger_mgr.sequential_spawn_next.pop() + self.xtrigger_mgr.sequential_has_spawned_next.add(taskid) itask = self._get_task_by_id(taskid) # Will spawn out to RH limit or next parentless clock trigger # or non-parentless. diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index d27054323e9..4d94327c582 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -260,6 +260,7 @@ def __init__( # Gather parentless tasks whose xtrigger(s) have been satisfied # (these will be used to spawn the next occurance). self.sequential_spawn_next: Set[str] = set() + self.sequential_has_spawned_next: Set[str] = set() self.workflow_run_dir = workflow_run_dir diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py new file mode 100644 index 00000000000..00cda19a9c6 --- /dev/null +++ b/tests/integration/test_sequential_xtriggers.py @@ -0,0 +1,138 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test interactions with sequential xtriggers.""" + +import pytest + +from cylc.flow.cycling.iso8601 import ISO8601Point + + +@pytest.fixture() +def sequential(flow, scheduler): + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'runahead limit': 'P2', + 'initial cycle point': '2000', + 'graph': { + 'P1Y': '@wall_clock => foo', + } + } + }) + + sequential = scheduler(id_) + + def list_tasks(): + """List the task instance cycle points present in the pool.""" + nonlocal sequential + return sorted(itask.tokens['cycle'] for itask in sequential.pool.get_all_tasks()) + + sequential.list_tasks = list_tasks + + return sequential + + +async def test_remove(sequential, start): + """It should spawn the next instance when a task is removed. + + Ensure that removing a task with a sequential xtrigger does not break the + chain causing future instances to be removed from the workflow. + """ + async with start(sequential): + # the scheduler starts with one task in the pool + assert sequential.list_tasks() == ['2000'] + + # it sequentially spawns out to the runahead limit + for year in range(2000, 2010): + foo = sequential.pool.get_task(ISO8601Point(f'{year}'), 'foo') + if foo.state(is_runahead=True): + break + sequential.xtrigger_mgr.call_xtriggers_async(foo) + sequential.pool.spawn_parentless_sequential_xtriggers() + assert sequential.list_tasks() == [ + '2000', + '2001', + '2002', + '2003', + ] + + # remove all tasks in the pool + sequential.pool.remove_tasks(['*']) + + # the next cycle should be automatically spawned + assert sequential.list_tasks() == ['2004'] + + # NOTE: You won't spot this issue in a functional test because the + # re-spawned tasks are detected as completed and automatically removed. + # So ATM not dangerous, but potentially inefficient. + + +async def test_trigger(sequential, start): + """It should spawn its next instance if triggered ahead of time. + + If you manually trigger a sequentially spawned task before its xtriggers + have become satisfied, then the sequential spawning chain is broken. + + The task pool should defend against this to ensure that triggering a task + doesn't cancel it's future instances. + """ + async with start(sequential): + assert sequential.list_tasks() == ['2000'] + + foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo') + sequential.pool.force_trigger_tasks([foo.identity], {1}) + foo.state_reset('succeeded') + sequential.pool.spawn_on_output(foo, 'succeeded') + + assert sequential.list_tasks() == ['2001'] + + +async def test_reload(sequential, start): + """It should set the is_xtrigger_sequential flag on reload. + + TODO: test that changes to the sequential status in the config get picked + up on reload + """ + async with start(sequential): + # the task should be marked as sequential + pre_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo') + assert pre_reload.is_xtrigger_sequential is True + + # reload the workflow + sequential.pool.reload_taskdefs(sequential.config) + + # the original task proxy should have been replaced + post_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo') + assert id(pre_reload) != id(post_reload) + + # the new task should be marked as sequential + assert post_reload.is_xtrigger_sequential is True + + +# TODO: test that a task is marked as sequential if any of its xtriggers are +# sequential (as opposed to all)? + +# TODO: test setting the sequential argument in [scheduling][xtrigger] items +# changes the behaviour + +# TODO: test the interaction between "sequential xtriggers default" and the +# sequential argument to [scheduling][xtrigger] +# * Should we be able to override the default by setting sequential=False? +# * Or should that result in a validation error? From c8c9b15002f895be571928e6ae37ac2deaaf7239 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 15 Feb 2024 16:50:58 +1300 Subject: [PATCH 09/14] hilary review changes --- cylc/flow/cfgspec/workflow.py | 24 +++++++++---------- cylc/flow/config.py | 2 +- cylc/flow/scheduler.py | 1 - cylc/flow/task_pool.py | 4 ++-- .../cylc-config/00-simple/section1.stdout | 2 +- tests/functional/xtriggers/04-sequential.t | 2 +- .../integration/test_sequential_xtriggers.py | 2 +- 7 files changed, 18 insertions(+), 19 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 0e80f554dea..4f2885b0029 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -802,18 +802,18 @@ def get_script_common_text(this: str, example: Optional[str] = None): :ref:`SequentialTasks`. ''') - Conf('sequential xtriggers default', VDR.V_BOOLEAN, False, desc=''' - When set to ``True``, parentless tasks that trigger off xtriggers - will only spawn sequentially, i.e. on the satisfaction of the - xtriggers in order. Otherwise, these tasks will all spawn at the - same time up to the runahead limit. - - This workflow-wide default can be overridden by a reserved - keyword argument in the xtrigger function declaration and/or - function (``sequential=True/False``). - - The presence of one sequential xtrigger on a parentless task with - multiple xtriggers will cause sequential behavior. + Conf('spawn from xtriggers sequentially', VDR.V_BOOLEAN, False, + desc=''' + If ``True``, tasks that only depend on xtriggers will not spawn + until their previous (cycle point) instance is satisfied. + Otherwise, they will all spawn at once out to the runahead limit. + + This setting can be overridden by a reserved keyword argument in + individual xtrigger declarations, or in xtrigger function + definitions. + + One sequential xtrigger on a parentless task with multiple + xtriggers will cause sequential behavior. ''') with Conf('xtriggers', desc=''' This section is for *External Trigger* function declarations - diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 6274dfa01d8..8a3f6df076c 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1715,7 +1715,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, if self.xtrigger_mgr is not None: self.xtrigger_mgr.sequential_xtriggers_default = ( - self.cfg['scheduling']['sequential xtriggers default'] + self.cfg['scheduling']['spawn from xtriggers sequentially'] ) for label in xtrig_labels: try: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 8afcbe3faa7..a07f6744cea 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1750,7 +1750,6 @@ async def main_loop(self) -> None: self.pool.queue_task(itask) if self.xtrigger_mgr.sequential_spawn_next: - # Spawn parentless tasks with sequentially checked xtrigger(s). self.pool.spawn_parentless_sequential_xtriggers() if self.xtrigger_mgr.do_housekeeping: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 009d498f582..2650b3d1648 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -715,8 +715,8 @@ def get_or_spawn_task( is_in_pool: Was the task found in a pool. is_xtrig_sequential: - Is the next task occurance spawned on xtrigger satisfaction, - or do all occurances spawn out to the runahead limit. + Is the next task occurrence spawned on xtrigger satisfaction, + or do all occurrence spawn out to the runahead limit. It does not add a spawned task proxy to the pool. """ diff --git a/tests/functional/cylc-config/00-simple/section1.stdout b/tests/functional/cylc-config/00-simple/section1.stdout index 8695889440e..87324cc9e97 100644 --- a/tests/functional/cylc-config/00-simple/section1.stdout +++ b/tests/functional/cylc-config/00-simple/section1.stdout @@ -6,7 +6,7 @@ hold after cycle point = stop after cycle point = cycling mode = integer runahead limit = P4 -sequential xtriggers default = False +spawn from xtriggers sequentially = False [[queues]] [[[default]]] limit = 100 diff --git a/tests/functional/xtriggers/04-sequential.t b/tests/functional/xtriggers/04-sequential.t index 7922e7ff807..f89c5454db5 100644 --- a/tests/functional/xtriggers/04-sequential.t +++ b/tests/functional/xtriggers/04-sequential.t @@ -30,7 +30,7 @@ init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__' [scheduling] initial cycle point = 3000 runahead limit = P5 - sequential xtriggers default = True + spawn from xtriggers sequentially = True [[xtriggers]] clock_1 = wall_clock(offset=P2Y, sequential=False) clock_2 = wall_clock() diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py index 00cda19a9c6..051227eabee 100644 --- a/tests/integration/test_sequential_xtriggers.py +++ b/tests/integration/test_sequential_xtriggers.py @@ -132,7 +132,7 @@ async def test_reload(sequential, start): # TODO: test setting the sequential argument in [scheduling][xtrigger] items # changes the behaviour -# TODO: test the interaction between "sequential xtriggers default" and the +# TODO: test the interaction between "spawn from xtriggers sequentially" and the # sequential argument to [scheduling][xtrigger] # * Should we be able to override the default by setting sequential=False? # * Or should that result in a validation error? From fcc489a3eb0923ea5b5e69e4c6d8411e655975d3 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Tue, 20 Feb 2024 15:55:16 +1300 Subject: [PATCH 10/14] Update cylc/flow/cfgspec/workflow.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/cfgspec/workflow.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 4f2885b0029..ec33c49e1db 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -808,9 +808,8 @@ def get_script_common_text(this: str, example: Optional[str] = None): until their previous (cycle point) instance is satisfied. Otherwise, they will all spawn at once out to the runahead limit. - This setting can be overridden by a reserved keyword argument in - individual xtrigger declarations, or in xtrigger function - definitions. + This setting can be overridden by the reserved keyword argument + ``sequential`` in individual xtrigger declarations. One sequential xtrigger on a parentless task with multiple xtriggers will cause sequential behavior. From e1b93ff956356617faf7b98fec58470aac444bd9 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 21 Feb 2024 23:12:04 +1300 Subject: [PATCH 11/14] hold spawn on multi seq xtrigs --- cylc/flow/xtrigger_mgr.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4d94327c582..9df985744a5 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -557,7 +557,7 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): self.data_store_mgr.delta_task_xtrigger(sig, True) self.workflow_db_mgr.put_xtriggers({sig: {}}) LOG.info('xtrigger satisfied: %s = %s', label, sig) - if itask.is_xtrigger_sequential: + if self.all_task_seq_xtriggers_satisfied(itask): self.sequential_spawn_next.add(itask.identity) self.do_housekeeping = True continue @@ -577,7 +577,7 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): [itask.tdef.name], xtrigger_env ) - if itask.is_xtrigger_sequential: + if self.all_task_seq_xtriggers_satisfied(itask): self.sequential_spawn_next.add(itask.identity) continue @@ -610,6 +610,14 @@ def housekeep(self, itasks): del self.sat_xtrig[sig] self.do_housekeeping = False + def all_task_seq_xtriggers_satisfied(self, itask: 'TaskProxy') -> bool: + """Check if all sequential xtriggers are satisfied for a task.""" + return itask.is_xtrigger_sequential and all( + itask.state.xtriggers[label] + for label in itask.state.xtriggers + if label in self.sequential_xtrigger_labels + ) + def callback(self, ctx: 'SubFuncContext'): """Callback for asynchronous xtrigger functions. From 81a92f8ebca00a83d5bd38b74faf652a0d1e4858 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Thu, 22 Feb 2024 13:54:02 +0000 Subject: [PATCH 12/14] Reimplement xtrigger `sequential` arg post-merge & add tests --- cylc/flow/scheduler.py | 4 +- cylc/flow/subprocctx.py | 59 +++++-- cylc/flow/task_pool.py | 8 +- cylc/flow/xtrigger_mgr.py | 42 ++--- tests/integration/test_config.py | 15 -- .../integration/test_sequential_xtriggers.py | 162 +++++++++++++++--- tests/unit/test_xtrigger_mgr.py | 12 +- 7 files changed, 221 insertions(+), 81 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index a07f6744cea..67e63ab8874 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -2151,7 +2151,7 @@ def resume_workflow(self, quiet: bool = False) -> None: def command_force_trigger_tasks( self, tasks: Iterable[str], - flow: List[str], + flow: List[Union[str, int]], flow_wait: bool = False, flow_descr: Optional[str] = None ): @@ -2162,7 +2162,7 @@ def command_force_trigger_tasks( def command_set( self, tasks: List[str], - flow: List[str], + flow: List[Union[str, int]], outputs: Optional[List[str]] = None, prerequisites: Optional[List[str]] = None, flow_wait: bool = False, diff --git a/cylc/flow/subprocctx.py b/cylc/flow/subprocctx.py index 84f1c68e70b..d76e34ce9a4 100644 --- a/cylc/flow/subprocctx.py +++ b/cylc/flow/subprocctx.py @@ -18,11 +18,39 @@ Coerce more value type from string (to time point, duration, xtriggers, etc.). """ +from inspect import Parameter import json from shlex import quote +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from cylc.flow.wallclock import get_current_time_string +if TYPE_CHECKING: + from inspect import Signature + + +def add_kwarg_to_sig( + sig: 'Signature', arg_name: str, default: Any +) -> 'Signature': + """Return a new signature with a kwarg added.""" + # Note: added kwarg has to be before **kwargs ("variadic") in the signature + positional_or_keyword: List[Parameter] = [] + variadic: List[Parameter] = [] + for param in sig.parameters.values(): + if param.kind == Parameter.VAR_KEYWORD: + variadic.append(param) + else: + positional_or_keyword.append(param) + return sig.replace(parameters=[ + *positional_or_keyword, + Parameter( + arg_name, + kind=Parameter.KEYWORD_ONLY, + default=default, + ), + *variadic, + ]) + class SubProcContext: # noqa: SIM119 (not really relevant to this case) """Represent the context of an external command to run as a subprocess. @@ -115,23 +143,31 @@ class SubFuncContext(SubProcContext): Attributes: # See also parent class attributes. - .label (str): + .label: function label under [xtriggers] in flow.cylc - .func_name (str): + .func_name: function name - .func_args (list): + .func_args: function positional args - .func_kwargs (dict): + .func_kwargs: function keyword args - .intvl (float - seconds): - function call interval (how often to check the external trigger) - .ret_val (bool, dict) + .intvl: + function call interval in secs (how often to check the + external trigger) + .ret_val function return: (satisfied?, result to pass to trigger tasks) """ DEFAULT_INTVL = 10.0 - def __init__(self, label, func_name, func_args, func_kwargs, intvl=None): + def __init__( + self, + label: str, + func_name: str, + func_args: List[Any], + func_kwargs: Dict[str, Any], + intvl: Union[float, str] = DEFAULT_INTVL + ): """Initialize a function context.""" self.label = label self.func_name = func_name @@ -141,9 +177,12 @@ def __init__(self, label, func_name, func_args, func_kwargs, intvl=None): self.intvl = float(intvl) except (TypeError, ValueError): self.intvl = self.DEFAULT_INTVL - self.ret_val = (False, None) # (satisfied, broadcast) + self.ret_val: Tuple[ + bool, Optional[dict] + ] = (False, None) # (satisfied, broadcast) super(SubFuncContext, self).__init__( - 'xtrigger-func', cmd=[], shell=False) + 'xtrigger-func', cmd=[], shell=False + ) def update_command(self, workflow_run_dir): """Update the function wrap command after changes.""" diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2650b3d1648..552cc06dd39 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -858,7 +858,7 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]': point_itasks[point] = list(itask_id_map.values()) return point_itasks - def get_task(self, point, name) -> Optional[TaskProxy]: + def get_task(self, point: 'PointBase', name: str) -> Optional[TaskProxy]: """Retrieve a task from the pool.""" rel_id = f'{point}/{name}' tasks = self.active_tasks.get(point) @@ -1803,7 +1803,7 @@ def set_prereqs_and_outputs( items: Iterable[str], outputs: List[str], prereqs: List[str], - flow: List[str], + flow: List[Union[str, int]], flow_wait: bool = False, flow_descr: Optional[str] = None ): @@ -2001,7 +2001,7 @@ def remove_tasks(self, items): def _get_flow_nums( self, - flow: List[str], + flow: List[Union[str, int]], meta: Optional[str] = None, ) -> Optional[Set[int]]: """Get correct flow numbers given user command options.""" @@ -2073,7 +2073,7 @@ def _force_trigger(self, itask): def force_trigger_tasks( self, items: Iterable[str], - flow: List[str], + flow: List[Union[str, int]], flow_wait: bool = False, flow_descr: Optional[str] = None ): diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 9df985744a5..762d8aa5ba3 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -16,7 +16,7 @@ from contextlib import suppress from enum import Enum -from inspect import getfullargspec, signature +from inspect import signature import json import re from copy import deepcopy @@ -34,6 +34,7 @@ from cylc.flow.exceptions import XtriggerConfigError import cylc.flow.flags from cylc.flow.hostuserutil import get_user +from cylc.flow.subprocctx import add_kwarg_to_sig from cylc.flow.subprocpool import get_xtrig_func from cylc.flow.xtriggers.wall_clock import _wall_clock @@ -330,32 +331,33 @@ def check_xtrigger( raise XtriggerConfigError( label, f"'{fname}' not callable in xtrigger module '{fname}'", ) - x_argspec = getfullargspec(func) - if 'sequential' in x_argspec.args: - if ( - x_argspec.defaults is None - or not isinstance( - x_argspec.defaults[x_argspec.args.index('sequential')], - bool - ) - ): + + sig = signature(func) + sig_str = fctx.get_signature() + + # Handle reserved 'sequential' kwarg: + sequential_param = sig.parameters.get('sequential', None) + if sequential_param: + if not isinstance(sequential_param.default, bool): raise XtriggerConfigError( label, - fname, ( - f"xtrigger module '{fname}' contains reserved argument" - " name 'sequential' that has no boolean default" - ), + f"xtrigger '{fname}' function definition contains " + "reserved argument 'sequential' that has no " + "boolean default" + ) ) - elif 'sequential' not in fctx.func_kwargs: - fctx.func_kwargs['sequential'] = x_argspec.defaults[ - x_argspec.args.index('sequential') - ] + fctx.func_kwargs.setdefault('sequential', sequential_param.default) + elif 'sequential' in fctx.func_kwargs: + # xtrig call marked as sequential; add 'sequential' arg to + # signature for validation + sig = add_kwarg_to_sig( + sig, 'sequential', fctx.func_kwargs['sequential'] + ) # Validate args and kwargs against the function signature - sig_str = fctx.get_signature() try: - bound_args = signature(func).bind( + bound_args = sig.bind( *fctx.func_args, **fctx.func_kwargs ) except TypeError as exc: diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index 581ec4d83fb..0a186aa41bd 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -90,9 +90,6 @@ def test_validate_implicit_task_name( are blacklisted get caught and raise errors. """ id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True' - }, 'scheduling': { 'graph': { 'R1': task_name @@ -189,9 +186,6 @@ def test_no_graph(flow, validate): def test_parse_special_tasks_invalid(flow, validate, section): """It should fail for invalid "special tasks".""" id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True', - }, 'scheduling': { 'initial cycle point': 'now', 'special tasks': { @@ -211,9 +205,6 @@ def test_parse_special_tasks_invalid(flow, validate, section): def test_parse_special_tasks_interval(flow, validate): """It should fail for invalid durations in clock-triggers.""" id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True', - }, 'scheduling': { 'initial cycle point': 'now', 'special tasks': { @@ -359,7 +350,6 @@ def test_xtrig_validation_wall_clock( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '1012', 'xtriggers': {'myxt': 'wall_clock(offset=PT7MH)'}, @@ -378,7 +368,6 @@ def test_xtrig_implicit_wall_clock(flow: Fixture, validate: Fixture): xtrigger definition. """ wid = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '2024', 'graph': {'R1': '@wall_clock => foo'}, @@ -396,7 +385,6 @@ def test_xtrig_validation_echo( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'xtriggers': {'myxt': 'echo()'}, 'graph': {'R1': '@myxt => foo'}, @@ -418,7 +406,6 @@ def test_xtrig_validation_xrandom( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'xtriggers': {'myxt': 'xrandom(200)'}, 'graph': {'R1': '@myxt => foo'}, @@ -459,7 +446,6 @@ def kustom_validate(args): ) id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '1012', 'xtriggers': {'myxt': 'kustom_xt(feature=42)'}, @@ -490,7 +476,6 @@ def test_xtrig_signature_validation( ): """Test automatic xtrigger function signature validation.""" id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '2024', 'xtriggers': {'myxt': xtrig_call}, diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py index 051227eabee..f7deee96eee 100644 --- a/tests/integration/test_sequential_xtriggers.py +++ b/tests/integration/test_sequential_xtriggers.py @@ -14,18 +14,28 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +# mypy: disable-error-code=union-attr + """Test interactions with sequential xtriggers.""" +from unittest.mock import patch import pytest +from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Point +from cylc.flow.exceptions import XtriggerConfigError +from cylc.flow.scheduler import Scheduler + + +def list_cycles(schd: Scheduler): + """List the task instance cycle points present in the pool.""" + return sorted(itask.tokens['cycle'] for itask in schd.pool.get_tasks()) @pytest.fixture() def sequential(flow, scheduler): id_ = flow({ 'scheduler': { - 'allow implicit tasks': 'True', 'cycle point format': 'CCYY', }, 'scheduling': { @@ -36,17 +46,7 @@ def sequential(flow, scheduler): } } }) - - sequential = scheduler(id_) - - def list_tasks(): - """List the task instance cycle points present in the pool.""" - nonlocal sequential - return sorted(itask.tokens['cycle'] for itask in sequential.pool.get_all_tasks()) - - sequential.list_tasks = list_tasks - - return sequential + return scheduler(id_) async def test_remove(sequential, start): @@ -57,7 +57,7 @@ async def test_remove(sequential, start): """ async with start(sequential): # the scheduler starts with one task in the pool - assert sequential.list_tasks() == ['2000'] + assert list_cycles(sequential) == ['2000'] # it sequentially spawns out to the runahead limit for year in range(2000, 2010): @@ -66,7 +66,7 @@ async def test_remove(sequential, start): break sequential.xtrigger_mgr.call_xtriggers_async(foo) sequential.pool.spawn_parentless_sequential_xtriggers() - assert sequential.list_tasks() == [ + assert list_cycles(sequential) == [ '2000', '2001', '2002', @@ -77,7 +77,7 @@ async def test_remove(sequential, start): sequential.pool.remove_tasks(['*']) # the next cycle should be automatically spawned - assert sequential.list_tasks() == ['2004'] + assert list_cycles(sequential) == ['2004'] # NOTE: You won't spot this issue in a functional test because the # re-spawned tasks are detected as completed and automatically removed. @@ -94,14 +94,14 @@ async def test_trigger(sequential, start): doesn't cancel it's future instances. """ async with start(sequential): - assert sequential.list_tasks() == ['2000'] + assert list_cycles(sequential) == ['2000'] foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo') sequential.pool.force_trigger_tasks([foo.identity], {1}) foo.state_reset('succeeded') sequential.pool.spawn_on_output(foo, 'succeeded') - assert sequential.list_tasks() == ['2001'] + assert list_cycles(sequential) == ['2000', '2001'] async def test_reload(sequential, start): @@ -126,13 +126,125 @@ async def test_reload(sequential, start): assert post_reload.is_xtrigger_sequential is True -# TODO: test that a task is marked as sequential if any of its xtriggers are -# sequential (as opposed to all)? +@pytest.mark.parametrize('is_sequential', [True, False]) +@pytest.mark.parametrize('xtrig_def', [ + 'wall_clock(sequential={})', + 'wall_clock(PT1H, sequential={})', + 'xrandom(1, 1, sequential={})', +]) +async def test_sequential_arg_ok( + flow, scheduler, start, xtrig_def: str, is_sequential: bool +): + """Test passing the sequential argument to xtriggers.""" + wid = flow({ + 'scheduler': { + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '2000', + 'runahead limit': 'P1', + 'xtriggers': { + 'myxt': xtrig_def.format(is_sequential), + }, + 'graph': { + 'P1Y': '@myxt => foo', + } + } + }) + schd: Scheduler = scheduler(wid) + expected_num_cycles = 1 if is_sequential else 3 + async with start(schd): + itask = schd.pool.get_task(ISO8601Point('2000'), 'foo') + assert itask.is_xtrigger_sequential is is_sequential + assert len(list_cycles(schd)) == expected_num_cycles + + +def test_sequential_arg_bad( + flow, validate +): + """Test validation of 'sequential' arg for custom xtriggers""" + wid = flow({ + 'scheduling': { + 'xtriggers': { + 'myxt': 'custom_xt(42)' + }, + 'graph': { + 'R1': '@myxt => foo' + } + } + }) + + def xtrig1(x, sequential): + """This uses 'sequential' without a default value""" + return True + + def xtrig2(x, sequential='True'): + """This uses 'sequential' with a default of wrong type""" + return True + + for xtrig in (xtrig1, xtrig2): + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=xtrig + ): + with pytest.raises(XtriggerConfigError) as excinfo: + validate(wid) + assert ( + "reserved argument 'sequential' that has no boolean default" + ) in str(excinfo.value) + + +@pytest.mark.parametrize('is_sequential', [True, False]) +async def test_any_sequential(flow, scheduler, start, is_sequential: bool): + """Test that a task is marked as sequential if any of its xtriggers are.""" + wid = flow({ + 'scheduling': { + 'xtriggers': { + 'xt1': 'custom_xt()', + 'xt2': f'custom_xt(sequential={is_sequential})', + 'xt3': 'custom_xt(sequential=False)', + }, + 'graph': { + 'R1': '@xt1 & @xt2 & @xt3 => foo', + } + } + }) + + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=lambda *a, **k: True + ): + schd: Scheduler = scheduler(wid) + async with start(schd): + itask = schd.pool.get_task(IntegerPoint('1'), 'foo') + assert itask.is_xtrigger_sequential is is_sequential -# TODO: test setting the sequential argument in [scheduling][xtrigger] items -# changes the behaviour -# TODO: test the interaction between "spawn from xtriggers sequentially" and the -# sequential argument to [scheduling][xtrigger] -# * Should we be able to override the default by setting sequential=False? -# * Or should that result in a validation error? +async def test_override(flow, scheduler, start): + """Test that the 'sequential=False' arg can override a default of True.""" + wid = flow({ + 'scheduling': { + 'spawn from xtriggers sequentially': True, + 'xtriggers': { + 'xt1': 'custom_xt()', + 'xt2': 'custom_xt(sequential=False)', + }, + 'graph': { + 'R1': ''' + @xt1 => foo + @xt2 => bar + ''', + } + } + }) + + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=lambda *a, **k: True + ): + schd: Scheduler = scheduler(wid) + async with start(schd): + foo = schd.pool.get_task(IntegerPoint('1'), 'foo') + assert foo.is_xtrigger_sequential is True + bar = schd.pool.get_task(IntegerPoint('1'), 'bar') + assert bar.is_xtrigger_sequential is False diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index 78f8fe6d969..3cfee363d15 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -24,7 +24,7 @@ from cylc.flow.subprocctx import SubFuncContext from cylc.flow.task_proxy import TaskProxy from cylc.flow.taskdef import TaskDef -from cylc.flow.xtrigger_mgr import RE_STR_TMPL +from cylc.flow.xtrigger_mgr import RE_STR_TMPL, XtriggerManager def test_constructor(xtrigger_mgr): @@ -68,7 +68,7 @@ def test_add_xtrigger_with_params(xtrigger_mgr): assert xtrig == xtrigger_mgr.functx_map["xtrig"] -def test_check_xtrigger_with_unknown_params(xtrigger_mgr): +def test_check_xtrigger_with_unknown_params(): """Test for adding an xtrigger with an unknown parameter. The XTriggerManager contains a list of specific parameters that are @@ -90,10 +90,12 @@ def test_check_xtrigger_with_unknown_params(xtrigger_mgr): XtriggerConfigError, match="Illegal template in xtrigger: what_is_this" ): - xtrigger_mgr.check_xtrigger("xtrig", xtrig, 'fdir') + XtriggerManager.check_xtrigger("xtrig", xtrig, 'fdir') -def test_check_xtrigger_with_deprecated_params(xtrigger_mgr, caplog): +def test_check_xtrigger_with_deprecated_params( + caplog: pytest.LogCaptureFixture +): """It should flag deprecated template variables.""" xtrig = SubFuncContext( label="echo", @@ -102,7 +104,7 @@ def test_check_xtrigger_with_deprecated_params(xtrigger_mgr, caplog): func_kwargs={"succeed": True} ) caplog.set_level(logging.WARNING, CYLC_LOG) - xtrigger_mgr.check_xtrigger("xtrig", xtrig, 'fdir') + XtriggerManager.check_xtrigger("xtrig", xtrig, 'fdir') assert caplog.messages == [ 'Xtrigger "xtrig" uses deprecated template variables: suite_name' ] From c1bb3ab2c6720bd230a74e18113512162090a9a8 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 1 Mar 2024 20:56:17 +1300 Subject: [PATCH 13/14] change log entry generated --- changes.d/5738.feat.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/5738.feat.md diff --git a/changes.d/5738.feat.md b/changes.d/5738.feat.md new file mode 100644 index 00000000000..09ff84bec93 --- /dev/null +++ b/changes.d/5738.feat.md @@ -0,0 +1 @@ +Optionally spawn parentless xtriggered tasks sequentially - i.e., one at a time, after the previous xtrigger is satisfied, instead of all at once out to the runahead limit. The `wall_clock` xtrigger is now sequential by default. From 63724c9167d5833dc32c0a0cdb38535d1e4561ee Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 21 Mar 2024 14:10:04 +1300 Subject: [PATCH 14/14] Hilary review 2 --- cylc/flow/cfgspec/workflow.py | 6 +++--- cylc/flow/config.py | 2 +- cylc/flow/scheduler.py | 4 ++-- cylc/flow/task_pool.py | 6 +++--- cylc/flow/xtrigger_mgr.py | 14 ++++++-------- .../cylc-config/00-simple/section1.stdout | 2 +- tests/functional/xtriggers/04-sequential.t | 2 +- tests/integration/test_sequential_xtriggers.py | 2 +- 8 files changed, 18 insertions(+), 20 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index ec33c49e1db..8ad315a7b2c 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -802,17 +802,17 @@ def get_script_common_text(this: str, example: Optional[str] = None): :ref:`SequentialTasks`. ''') - Conf('spawn from xtriggers sequentially', VDR.V_BOOLEAN, False, + Conf('sequential xtriggers', VDR.V_BOOLEAN, False, desc=''' If ``True``, tasks that only depend on xtriggers will not spawn - until their previous (cycle point) instance is satisfied. + until the xtrigger of previous (cycle point) instance is satisfied. Otherwise, they will all spawn at once out to the runahead limit. This setting can be overridden by the reserved keyword argument ``sequential`` in individual xtrigger declarations. One sequential xtrigger on a parentless task with multiple - xtriggers will cause sequential behavior. + xtriggers will cause sequential spawning. ''') with Conf('xtriggers', desc=''' This section is for *External Trigger* function declarations - diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 8a3f6df076c..ad87315c4c1 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1715,7 +1715,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, if self.xtrigger_mgr is not None: self.xtrigger_mgr.sequential_xtriggers_default = ( - self.cfg['scheduling']['spawn from xtriggers sequentially'] + self.cfg['scheduling']['sequential xtriggers'] ) for label in xtrig_labels: try: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 67e63ab8874..a07f6744cea 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -2151,7 +2151,7 @@ def resume_workflow(self, quiet: bool = False) -> None: def command_force_trigger_tasks( self, tasks: Iterable[str], - flow: List[Union[str, int]], + flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None ): @@ -2162,7 +2162,7 @@ def command_force_trigger_tasks( def command_set( self, tasks: List[str], - flow: List[Union[str, int]], + flow: List[str], outputs: Optional[List[str]] = None, prerequisites: Optional[List[str]] = None, flow_wait: bool = False, diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 552cc06dd39..2b214d70943 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1803,7 +1803,7 @@ def set_prereqs_and_outputs( items: Iterable[str], outputs: List[str], prereqs: List[str], - flow: List[Union[str, int]], + flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None ): @@ -2001,7 +2001,7 @@ def remove_tasks(self, items): def _get_flow_nums( self, - flow: List[Union[str, int]], + flow: List[str], meta: Optional[str] = None, ) -> Optional[Set[int]]: """Get correct flow numbers given user command options.""" @@ -2073,7 +2073,7 @@ def _force_trigger(self, itask): def force_trigger_tasks( self, items: Iterable[str], - flow: List[Union[str, int]], + flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None ): diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 762d8aa5ba3..7f53b061463 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -204,20 +204,18 @@ class XtriggerManager: # Example: [scheduling] - sequential xtrigger default = True + sequential xtriggers = True [[xtriggers]] - clock_0 = wall_clock() # offset PT0H - clock_1 = wall_clock(offset=PT1H) - # or wall_clock(PT1H) + # "sequential=False" here overrides workflow and function default. + clock_0 = wall_clock(sequential=False) workflow_x = workflow_state( workflow=other, point=%(task_cycle_point)s, - sequential=False ):PT30S [[graph]] PT1H = ''' - @clock_1 & @workflow_x => foo & bar - @wall_clock = baz # pre-defined zero-offset clock + @workflow_x => foo & bar # spawned on workflow_x satisfaction + @clock_0 => baz # baz spawned out to RH ''' Args: @@ -259,7 +257,7 @@ def __init__( # Labels whose xtriggers are sequentially checked. self.sequential_xtrigger_labels: Set[str] = set() # Gather parentless tasks whose xtrigger(s) have been satisfied - # (these will be used to spawn the next occurance). + # (these will be used to spawn the next occurrence). self.sequential_spawn_next: Set[str] = set() self.sequential_has_spawned_next: Set[str] = set() diff --git a/tests/functional/cylc-config/00-simple/section1.stdout b/tests/functional/cylc-config/00-simple/section1.stdout index 87324cc9e97..f5ff23b77ae 100644 --- a/tests/functional/cylc-config/00-simple/section1.stdout +++ b/tests/functional/cylc-config/00-simple/section1.stdout @@ -6,7 +6,7 @@ hold after cycle point = stop after cycle point = cycling mode = integer runahead limit = P4 -spawn from xtriggers sequentially = False +sequential xtriggers = False [[queues]] [[[default]]] limit = 100 diff --git a/tests/functional/xtriggers/04-sequential.t b/tests/functional/xtriggers/04-sequential.t index f89c5454db5..211aa47277f 100644 --- a/tests/functional/xtriggers/04-sequential.t +++ b/tests/functional/xtriggers/04-sequential.t @@ -30,7 +30,7 @@ init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__' [scheduling] initial cycle point = 3000 runahead limit = P5 - spawn from xtriggers sequentially = True + sequential xtriggers = True [[xtriggers]] clock_1 = wall_clock(offset=P2Y, sequential=False) clock_2 = wall_clock() diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py index f7deee96eee..cbe0051d084 100644 --- a/tests/integration/test_sequential_xtriggers.py +++ b/tests/integration/test_sequential_xtriggers.py @@ -224,7 +224,7 @@ async def test_override(flow, scheduler, start): """Test that the 'sequential=False' arg can override a default of True.""" wid = flow({ 'scheduling': { - 'spawn from xtriggers sequentially': True, + 'sequential xtriggers': True, 'xtriggers': { 'xt1': 'custom_xt()', 'xt2': 'custom_xt(sequential=False)',