From 2a0ff518c515a2041b15eec6662e9145a1b6a323 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 29 Nov 2023 20:45:33 +1300 Subject: [PATCH] review fixes --- cylc/flow/data_store_mgr.py | 5 ++- cylc/flow/task_pool.py | 72 +++++++++++++++++++++---------------- cylc/flow/task_proxy.py | 14 +++++++- 3 files changed, 59 insertions(+), 32 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index ef77105b3a4..f3c02af4624 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -1192,7 +1192,10 @@ def generate_ghost_task( point, flow_nums, submit_num=0, - data_mode=True + data_mode=True, + sequential_xtrigger_labels=( + self.schd.xtrigger_mgr.sequential_xtrigger_labels + ), ) is_orphan = False diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 84cf7c10111..4aac56046cd 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -517,7 +517,10 @@ def load_db_task_pool_for_restart(self, row_idx, row): submit_num=submit_num, is_late=bool(is_late), flow_wait=bool(flow_wait), - is_manual_submit=bool(is_manual_submit) + is_manual_submit=bool(is_manual_submit), + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) except WorkflowConfigError: @@ -602,15 +605,6 @@ def load_db_task_pool_for_restart(self, row_idx, row): ) ) - # Set xtrigger checking type, which effects parentless spawning. - if ( - itask.tdef.is_parentless(itask.point) - and set(itask.state.xtriggers.keys()).intersection( - self.xtrigger_mgr.sequential_xtrigger_labels - ) - ): - itask.is_xtrigger_sequential = True - if itask.state_reset(status, is_runahead=True): self.data_store_mgr.delta_task_runahead(itask) self.add_to_pool(itask) @@ -734,7 +728,21 @@ def rh_release_and_queue(self, itask) -> None: def _get_spawned_or_merged_task( self, point: 'PointBase', tdef: 'TaskDef', flow_nums: 'FlowNums' ) -> 'Tuple[Optional[TaskProxy], bool, bool]': - """Return new or existing task point/name with merged flow_nums""" + """Return new or existing task point/name with merged flow_nums + + Returns: + tuple - (itask, is_in_pool, is_xtrig_sequential) + + itask: + The requested task proxy, or None if task does not + exist or cannot spawn. + is_in_pool: + Was the task found in a pool. + is_xtrig_sequential: + Is the next task occurance spawned on xtrigger satisfaction, + or do all occurances spawn out to the runahead limit. + + """ taskid = Tokens(cycle=str(point), task=tdef.name).relative_id ntask = ( self._get_hidden_task_by_id(taskid) @@ -745,20 +753,16 @@ def _get_spawned_or_merged_task( if ntask is None: # ntask does not exist: spawn it in the flow. ntask = self.spawn_task(tdef.name, point, flow_nums) - # if the task was found set xtrigger checking type. + # if the task was found set xtrigger spawning type. + # otherwise find the xtrigger type if it can't spawn + # for whatever reason. if ntask is not None: - if set(ntask.state.xtriggers.keys()).intersection( - self.xtrigger_mgr.sequential_xtrigger_labels - ): - ntask.is_xtrigger_sequential = True - is_xtrig_sequential = True - elif { - xtrig_label + is_xtrig_sequential = ntask.is_xtrigger_sequential + elif any( + xtrig_label in self.xtrigger_mgr.sequential_xtrigger_labels for sequence, xtrig_labels in tdef.xtrig_labels.items() for xtrig_label in xtrig_labels if sequence.is_valid(point) - }.intersection( - self.xtrigger_mgr.sequential_xtrigger_labels ): is_xtrig_sequential = True else: @@ -766,7 +770,8 @@ def _get_spawned_or_merged_task( is_in_pool = True self.merge_flows(ntask, flow_nums) is_xtrig_sequential = ntask.is_xtrigger_sequential - return ntask, is_in_pool, is_xtrig_sequential # may be None + # ntask may still be None + return ntask, is_in_pool, is_xtrig_sequential def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: """Spawn parentless task instances from point to runahead limit. @@ -822,6 +827,12 @@ def remove(self, itask, reason=""): if reason: msg += f" ({reason})" + if itask.is_xtrigger_sequential: + with suppress(ValueError): + self.xtrigger_mgr.sequential_spawn_next.remove( + itask.identity + ) + try: del self.hidden_pool[itask.point][itask.identity] except KeyError: @@ -1056,20 +1067,15 @@ def reload_taskdefs(self, config: 'WorkflowConfig') -> None: itask.point, itask.flow_nums, itask.state.status, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) itask.copy_to_reload_successor( new_task, self.check_task_output, ) self._swap_out(new_task) - # Set xtrigger checking type for parentless spawning. - if ( - new_task.tdef.is_parentless(new_task.point) - and set(new_task.state.xtriggers.keys()).intersection( - self.xtrigger_mgr.sequential_xtrigger_labels - ) - ): - new_task.is_xtrigger_sequential = True self.data_store_mgr.delta_task_prerequisite(new_task) LOG.info(f"[{itask}] reloaded task definition") if itask.state(*TASK_STATUSES_ACTIVE): @@ -1622,6 +1628,9 @@ def spawn_task( submit_num=submit_num, is_manual_submit=is_manual_submit, flow_wait=flow_wait, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) if (name, point) in self.tasks_to_hold: LOG.info(f"[{itask}] holding (as requested earlier)") @@ -1702,6 +1711,9 @@ def force_spawn_children( taskdef, point, flow_nums=flow_nums, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) # Spawn children of selected outputs. for trig, out, _ in itask.state.outputs.get_all(): diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 479ffb11a1d..62be993f6c5 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -194,6 +194,7 @@ def __init__( is_manual_submit: bool = False, flow_wait: bool = False, data_mode: bool = False, + sequential_xtrigger_labels: Optional[Set[str]] = None, ) -> None: self.tdef = tdef @@ -213,7 +214,6 @@ def __init__( task=self.tdef.name, ) self.identity = self.tokens.relative_id - self.is_xtrigger_sequential = False self.reload_successor: Optional['TaskProxy'] = None self.point_as_seconds: Optional[int] = None @@ -254,6 +254,18 @@ def __init__( self.state = TaskState(tdef, self.point, status, is_held) + # Set xtrigger checking type, which effects parentless spawning. + if ( + sequential_xtrigger_labels + and self.tdef.is_parentless(start_point) + and set(self.state.xtriggers.keys()).intersection( + sequential_xtrigger_labels + ) + ): + self.is_xtrigger_sequential = True + else: + self.is_xtrigger_sequential = False + # Determine graph children of this task (for spawning). if data_mode: self.graph_children = {}