From 9a45a550f9d785579ff7de7c534cbc93e1895bcb Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 24 Jun 2024 02:12:46 +0000 Subject: [PATCH 1/5] Implement group re-run. --- cylc/flow/prerequisite.py | 7 + cylc/flow/scheduler.py | 10 +- cylc/flow/task_job_mgr.py | 3 +- cylc/flow/task_pool.py | 191 +++++++++++------- cylc/flow/task_proxy.py | 12 +- cylc/flow/taskdef.py | 16 ++ .../reload/20-stop-point/reference.log | 2 +- .../spawn-on-demand/02-merge/reference.log | 2 +- tests/integration/test_task_pool.py | 8 +- tests/integration/test_trigger.py | 3 +- 10 files changed, 168 insertions(+), 86 deletions(-) diff --git a/cylc/flow/prerequisite.py b/cylc/flow/prerequisite.py index 486c7e84ab3..6cfd5e3664c 100644 --- a/cylc/flow/prerequisite.py +++ b/cylc/flow/prerequisite.py @@ -120,6 +120,13 @@ def __init__(self, point: 'PointBase'): # * `False` (prerequisite unsatisfied). self._all_satisfied: Optional[bool] = None + def dump(self): + print(f"POINT {self.point}") + for k, v in self._satisfied.items(): + print(f"_SAT: {k}, {v}") + print(f"COND: {self.conditional_expression}") + print(f"ALL SAT: {self._all_satisfied}") + def instantaneous_hash(self) -> int: """Generate a hash of this prerequisite in its current state. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 92702b0b55e..b584511882b 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1271,14 +1271,16 @@ def release_queued_tasks(self) -> bool: self.server.client_pub_key_dir, is_simulation=(self.get_run_mode() == RunMode.SIMULATION) ): + LOG.warning(f"SUBMITTING {itask} ... {itask.is_manual_submit}") if itask.flow_nums: flow = ','.join(str(i) for i in itask.flow_nums) else: flow = FLOW_NONE - log( - f"{itask.identity} -triggered off " - f"{itask.state.get_resolved_dependencies()} in flow {flow}" - ) + if itask.is_manual_submit: + off = f"[] in flow {flow}" + else: + off = f"{itask.state.get_resolved_dependencies()} in flow {flow}" + log(f"{itask.identity} -triggered off {off}") # one or more tasks were passed through the submission pipeline return True diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 500aa830b2c..b7f355fd0fe 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -447,7 +447,6 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, 'platform_name': itask.platform['name'], 'job_runner_name': itask.summary['job_runner_name'], }) - itask.is_manual_submit = False if ri_map[install_target] == REMOTE_FILE_INSTALL_255: del ri_map[install_target] @@ -1103,6 +1102,8 @@ def _submit_task_job_callback(self, workflow, itask, cmd_ctx, line): self.task_events_mgr.process_message( itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED, ctx.timestamp) + # Submitted, so reset the is_manual submit flag in case of retries. + itask.is_manual_submit = False def _prep_submit_task_job( self, diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 4b767bd7674..fa7cae3eb48 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1663,6 +1663,7 @@ def spawn_task( point: 'PointBase', flow_nums: Set[int], flow_wait: bool = False, + force: bool = False ) -> Optional[TaskProxy]: """Return a new task proxy for the given flow if possible. @@ -1697,7 +1698,8 @@ def spawn_task( return None if ( - prev_status is not None + not force + and prev_status is not None and not itask.state.outputs.get_completed_outputs() ): # If itask has any history in this flow but no completed outputs @@ -1710,7 +1712,7 @@ def spawn_task( LOG.debug(f"Not respawning {point}/{name} - task was removed") return None - if prev_status in TASK_STATUSES_FINAL: + if not force and prev_status in TASK_STATUSES_FINAL: # Task finished previously. msg = f"[{point}/{name}:{prev_status}] already finished" if itask.is_complete(): @@ -1927,6 +1929,7 @@ def set_prereqs_and_outputs( self.merge_flows(itask, flow_nums) if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) + self._force_trigger_if_ready(itask) else: self._set_outputs_itask(itask, outputs) @@ -1993,6 +1996,8 @@ def _set_prereqs_itask( """ if prereqs == ["all"]: itask.state.set_prerequisites_all_satisfied() + self._force_trigger_if_ready(itask) + else: # Attempt to set the given presrequisites. # Log any that aren't valid for the task. @@ -2006,12 +2011,8 @@ def _set_prereqs_itask( if len(unmatched) == len(prereqs): # No prereqs matched. return False - if ( - self.runahead_limit_point is not None - and itask.point <= self.runahead_limit_point - ): - self.rh_release_and_queue(itask) - self.data_store_mgr.delta_task_prerequisite(itask) + self._force_trigger_if_ready(itask) + return True def _set_prereqs_tdef( @@ -2020,7 +2021,7 @@ def _set_prereqs_tdef( """Spawn a future task and set prerequisites on it.""" itask = self.spawn_task( - taskdef.name, point, flow_nums, flow_wait=flow_wait + taskdef.name, point, flow_nums, flow_wait=flow_wait, force=True ) if itask is None: return @@ -2104,12 +2105,16 @@ def _force_trigger(self, itask): """Assumes task is in the pool""" # TODO is this flag still needed, and consistent with "cylc set"? itask.is_manual_submit = True + LOG.warning(f"FORCE TRIGGER {itask} ... {itask.is_manual_submit}") itask.reset_try_timers() if itask.state_reset(TASK_STATUS_WAITING): # (could also be unhandled failed) self.data_store_mgr.delta_task_state(itask) - # (No need to set prerequisites satisfied here). - if itask.state.is_runahead: + if ( + itask.state.is_runahead + and self.runahead_limit_point is not None + and itask.point <= self.runahead_limit_point + ): # Release from runahead, and queue it. self.rh_release_and_queue(itask) self.spawn_to_rh_limit( @@ -2138,8 +2143,17 @@ def _force_trigger(self, itask): # De-queue it to run now. self.task_queue_mgr.force_release_task(itask) + def _force_trigger_if_ready(self, itask): + if not itask.is_task_prereqs_done(): + return + itask.is_manual_submit = True + itask.reset_try_timers() + self.data_store_mgr.delta_task_prerequisite(itask) + self._force_trigger(itask) + def force_trigger_tasks( - self, items: Iterable[str], + self, + items: Iterable[str], flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None @@ -2160,57 +2174,91 @@ def force_trigger_tasks( - just spawn (if not already spawned in this flow) unless flow-wait is set. + TODO: ensure above description of triggering still works. + TODO: and original trigger functionality + + (Re)run a selected group of tasks. + + Set any off-flow prerequisites (all task proxies). + Unset any in-flow prerequisites (existing task proxies). + + TODO - check triggering if waiting on xtrigger + TODO - CALL _force_trigger on the initial tasks? + TODO - get_resolved no longer reports "triggered off []" for manual + triggering because we now do it by setting prerequisites as if + naturally. Use is_manual_submit instead? """ # Get flow numbers for the tasks to be triggered. flow_nums = self._get_flow_nums(flow, flow_descr) if flow_nums is None: return + # if ignore_deps: TODO + # return self.force_trigger_tasks( + # items, flow, flow_wait, flow_descr) + # Get matching tasks proxies, and matching future task IDs. existing_tasks, future_ids, unmatched = self.filter_task_proxies( items, future=True, warn=False, ) + all_ids = ( + list(future_ids) + + [(itask.tdef.name, itask.point) for itask in existing_tasks] + ) - # Trigger existing tasks. for itask in existing_tasks: + # active tasks, present in the pool if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") continue + self.merge_flows(itask, flow_nums) - self._force_trigger(itask) + for pre in itask.state.prerequisites: + for ( + p_point, p_name, p_out + ), p_state in pre._satisfied.items(): + if ( + not p_state and + (p_name, get_point(p_point)) not in all_ids + ): + # set off-flow prerequisite + itask.satisfy_me( + [ + Tokens( + cycle=p_point, + task=p_name, + task_sel=p_out + ) + ] + ) + # not in loop! we could trigger a task with no prereqs + self._force_trigger_if_ready(itask) - # Spawn and trigger future tasks. for name, point in future_ids: - - if not self.can_be_spawned(name, point): - continue - - submit_num, _, prev_fwait = ( - self._get_task_history(name, point, flow_nums) - ) - itask = TaskProxy( - self.tokens, - self.config.get_taskdef(name), - point, - flow_nums, - flow_wait=flow_wait, - submit_num=submit_num, - sequential_xtrigger_labels=( - self.xtrigger_mgr.xtriggers.sequential_xtrigger_labels - ), - ) - if itask is None: - continue - - self.db_add_new_flow_rows(itask) - - if prev_fwait: - # update completed outputs from the DB - self._load_historical_outputs(itask) - - # run it (or run it again for incomplete flow-wait) - self.add_to_pool(itask) - self._force_trigger(itask) + tdef = self.config.taskdefs[name] + if tdef.is_parentless(point): + # parentless: promote to task pool + self.set_prereqs_and_outputs( + [f"{point}/{name}"], + [], ["all"], + flow_nums, + flow_wait, + flow_descr + ) + for pid in tdef.get_triggers(point): + p_point = pid.get_point(point) + p_name = pid.task_name + if (p_name, p_point) in all_ids: + # in-flow + continue + # set off-flow prerequisite + self.set_prereqs_and_outputs( + [f"{point}/{name}"], + [], [f"{p_point}/{p_name}"], + flow_nums, + flow_wait, + flow_descr + ) def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" @@ -2376,36 +2424,35 @@ def match_future_tasks( point_str = tokens['cycle'] name_str = tokens['task'] - if name_str not in self.config.taskdefs: - if self.config.find_taskdefs(name_str): - # It's a family name; was not matched by active tasks - LOG.warning( - f"No active tasks in the family {name_str}" - f' matching: {id_}' - ) - else: - LOG.warning(self.ERR_TMPL_NO_TASKID_MATCH.format(name_str)) - unmatched_tasks.append(id_) - continue - try: - point_str = standardise_point_string(point_str) - except PointParsingError as exc: - LOG.warning( - f"{id_} - invalid cycle point: {point_str} ({exc})") + members = self.config.find_taskdefs(name_str) + if not members: + LOG.warning(self.ERR_TMPL_NO_TASKID_MATCH.format(name_str)) unmatched_tasks.append(id_) continue - point = get_point(point_str) - taskdef = self.config.taskdefs[name_str] - if taskdef.is_valid_point(point): - matched_tasks.add((taskdef.name, point)) - else: - LOG.warning( - self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE.format( - taskdef.name, point + for name in [m.name for m in members]: + try: + point_str = standardise_point_string(point_str) + except PointParsingError as exc: + LOG.warning( + f"{id_} - invalid cycle point: {point_str} ({exc})") + unmatched_tasks.append(id_) + continue + point = get_point(point_str) + try: + taskdef = self.config.taskdefs[name] + except KeyError: + # family name + continue + if taskdef.is_valid_point(point): + matched_tasks.add((taskdef.name, point)) + else: + LOG.warning( + self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE.format( + taskdef.name, point + ) ) - ) - unmatched_tasks.append(id_) - continue + unmatched_tasks.append(id_) + continue return matched_tasks, unmatched_tasks def match_taskdefs( diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 27332ac9316..d9f8636671a 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -480,10 +480,18 @@ def set_summary_time(self, event_key, time_str=None): self.summary[event_key + '_time'] = float(str2time(time_str)) self.summary[event_key + '_time_string'] = time_str + def is_task_prereqs_done(self): + """Are all task prerequisites satisfied?""" + return ( + all( + pre.is_satisfied() + for pre in self.state.prerequisites + ) + ) + def is_task_prereqs_not_done(self): """Are some task prerequisites not satisfied?""" - return (not all(pre.is_satisfied() - for pre in self.state.prerequisites)) + return not self.is_task_prereqs_done() def is_waiting_prereqs_done(self): """Are ALL prerequisites satisfied?""" diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 762324c9d75..91446f9303f 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -295,6 +295,22 @@ def get_parent_points(self, point): parent_points.add(trig.get_parent_point(point)) return parent_points + def get_triggers(self, point): + """Return my triggers, at point.""" + triggers = set() + for seq in self.sequences: + if not seq.is_valid(point): + continue + if seq in self.dependencies: + # task has prereqs in this sequence + for dep in self.dependencies[seq]: + # TODO? + if dep.suicide: + continue + for trig in dep.task_triggers: + triggers.add(trig) + return triggers + def has_only_abs_triggers(self, point): """Return whether I have only absolute triggers at point.""" if not self.has_abs_triggers: diff --git a/tests/functional/reload/20-stop-point/reference.log b/tests/functional/reload/20-stop-point/reference.log index 2822c1470fa..fabf835457b 100644 --- a/tests/functional/reload/20-stop-point/reference.log +++ b/tests/functional/reload/20-stop-point/reference.log @@ -2,6 +2,6 @@ Initial point: 1 Final point: 5 1/set-stop-point -triggered off [] 1/reload -triggered off ['1/set-stop-point'] -1/t1 -triggered off ['0/t1', '1/reload'] +1/t1 -triggered off ['1/reload'] 2/t1 -triggered off ['1/t1'] 3/t1 -triggered off ['2/t1'] diff --git a/tests/functional/spawn-on-demand/02-merge/reference.log b/tests/functional/spawn-on-demand/02-merge/reference.log index e150aa34f34..162bcb6ea75 100644 --- a/tests/functional/spawn-on-demand/02-merge/reference.log +++ b/tests/functional/spawn-on-demand/02-merge/reference.log @@ -1,6 +1,6 @@ Initial point: 1 Final point: 3 -1/foo -triggered off ['0/foo'] +1/foo -triggered off [] 2/foo -triggered off ['1/foo'] 1/bar -triggered off ['1/foo'] 3/foo -triggered off ['2/foo'] diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 85394edd6a9..403012827ca 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -352,9 +352,9 @@ async def test_match_taskdefs( id="Name globs hold active tasks only" # (active means n=0 here) ), param( - ['1/FAM', '2/FAM', '6/FAM'], ['1/bar', '2/bar'], - ["No active tasks in the family FAM matching: 6/FAM"], - id="Family names hold active tasks only" + ['1/FAM', '2/FAM', '6/FAM'], ['1/bar', '2/bar', '6/bar'], + [], + id="Family names hold active and future tasks" ), param( ['1/grogu', 'H/foo', '20/foo', '1/pub'], [], @@ -504,7 +504,7 @@ async def test_trigger_states( ): """It should only trigger tasks in compatible states.""" - async with start(one): + async with start(one): #, level=logging.DEBUG): task = one.pool.filter_task_proxies(['1/one'])[0][0] # reset task a to the provided state diff --git a/tests/integration/test_trigger.py b/tests/integration/test_trigger.py index 30ae3404ed8..b5474d25f52 100644 --- a/tests/integration/test_trigger.py +++ b/tests/integration/test_trigger.py @@ -46,7 +46,8 @@ async def test_trigger_no_flows(one, start, log_filter): It should get the flow numbers of the most recent active tasks. """ - async with start(one): + import logging + async with start(one, level=logging.DEBUG): # Remove the task (flow 1) --> pool empty task = one.pool.get_tasks()[0] From 984bf5c5dbaf4b09da4a52cd3e98aebf7883cb4d Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Mon, 7 Oct 2024 12:45:45 +1300 Subject: [PATCH 2/5] flake 8... --- cylc/flow/scheduler.py | 5 ++++- cylc/flow/task_pool.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index b584511882b..4862d6131d5 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1279,7 +1279,10 @@ def release_queued_tasks(self) -> bool: if itask.is_manual_submit: off = f"[] in flow {flow}" else: - off = f"{itask.state.get_resolved_dependencies()} in flow {flow}" + off = ( + f"{itask.state.get_resolved_dependencies()}" + f" in flow {flow}" + ) log(f"{itask.identity} -triggered off {off}") # one or more tasks were passed through the submission pipeline diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index fa7cae3eb48..8f5524dd8a3 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2175,7 +2175,7 @@ def force_trigger_tasks( unless flow-wait is set. TODO: ensure above description of triggering still works. - TODO: and original trigger functionality + TODO: and original trigger functionality (Re)run a selected group of tasks. From b16b6bba2084e5fc6fe825becdbebf12f658e57a Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Mon, 7 Oct 2024 14:46:41 +1300 Subject: [PATCH 3/5] Clean up --- cylc/flow/flow_mgr.py | 2 +- cylc/flow/scheduler.py | 1 - cylc/flow/task_pool.py | 39 +++++++++++++------ .../cylc-show/06-past-present-future.t | 3 +- .../cylc-trigger/01-queued/reference.log | 2 +- .../reload/20-stop-point/reference.log | 2 +- .../10-retrigger/reference.log | 2 +- tests/unit/test_flow_mgr.py | 2 +- 8 files changed, 35 insertions(+), 18 deletions(-) diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index 1cd1c1e8c70..6b54291831d 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -127,7 +127,7 @@ def get_flow_num( if flow_num in self.flows: if meta is not None: - LOG.warning( + LOG.debug( f'Ignoring flow metadata "{meta}":' f' {flow_num} is not a new flow' ) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 4862d6131d5..0d4960ead3c 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1271,7 +1271,6 @@ def release_queued_tasks(self) -> bool: self.server.client_pub_key_dir, is_simulation=(self.get_run_mode() == RunMode.SIMULATION) ): - LOG.warning(f"SUBMITTING {itask} ... {itask.is_manual_submit}") if itask.flow_nums: flow = ','.join(str(i) for i in itask.flow_nums) else: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 8f5524dd8a3..c781d8bcf5a 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1873,7 +1873,8 @@ def set_prereqs_and_outputs( prereqs: List[str], flow: List[str], flow_wait: bool = False, - flow_descr: Optional[str] = None + flow_descr: Optional[str] = None, + trigger: bool = False ): """Set prerequisites or outputs of target tasks. @@ -1928,8 +1929,7 @@ def set_prereqs_and_outputs( # Existing task proxies. self.merge_flows(itask, flow_nums) if prereqs: - self._set_prereqs_itask(itask, prereqs, flow_nums) - self._force_trigger_if_ready(itask) + self._set_prereqs_itask(itask, prereqs, flow_nums, trigger) else: self._set_outputs_itask(itask, outputs) @@ -1937,7 +1937,7 @@ def set_prereqs_and_outputs( tdef = self.config.get_taskdef(name) if prereqs: self._set_prereqs_tdef( - point, tdef, prereqs, flow_nums, flow_wait) + point, tdef, prereqs, flow_nums, flow_wait, trigger) else: trans = self._get_task_proxy_db_outputs( point, tdef, flow_nums, @@ -1986,6 +1986,7 @@ def _set_prereqs_itask( itask: 'TaskProxy', prereqs: 'List[str]', flow_nums: 'Set[int]', + trigger: bool = False ) -> bool: """Set prerequisites on a task proxy. @@ -1993,11 +1994,12 @@ def _set_prereqs_itask( Return True if any prereqs are valid, else False. + Args: + trigger: trigger tasks immediately if fully satisfied. + """ if prereqs == ["all"]: itask.state.set_prerequisites_all_satisfied() - self._force_trigger_if_ready(itask) - else: # Attempt to set the given presrequisites. # Log any that aren't valid for the task. @@ -2011,12 +2013,26 @@ def _set_prereqs_itask( if len(unmatched) == len(prereqs): # No prereqs matched. return False + if ( + itask.state.is_runahead + and self.runahead_limit_point is not None + and itask.point <= self.runahead_limit_point + ): + # Release from runahead, and queue it. + self.rh_release_and_queue(itask) + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + + if trigger: self._force_trigger_if_ready(itask) return True def _set_prereqs_tdef( - self, point, taskdef, prereqs, flow_nums, flow_wait + self, point, taskdef, prereqs, flow_nums, flow_wait, trigger=False ): """Spawn a future task and set prerequisites on it.""" @@ -2025,7 +2041,7 @@ def _set_prereqs_tdef( ) if itask is None: return - if self._set_prereqs_itask(itask, prereqs, flow_nums): + if self._set_prereqs_itask(itask, prereqs, flow_nums, trigger): self.add_to_pool(itask) def _get_active_flow_nums(self) -> Set[int]: @@ -2105,7 +2121,6 @@ def _force_trigger(self, itask): """Assumes task is in the pool""" # TODO is this flag still needed, and consistent with "cylc set"? itask.is_manual_submit = True - LOG.warning(f"FORCE TRIGGER {itask} ... {itask.is_manual_submit}") itask.reset_try_timers() if itask.state_reset(TASK_STATUS_WAITING): # (could also be unhandled failed) @@ -2243,7 +2258,8 @@ def force_trigger_tasks( [], ["all"], flow_nums, flow_wait, - flow_descr + flow_descr, + trigger=True ) for pid in tdef.get_triggers(point): p_point = pid.get_point(point) @@ -2257,7 +2273,8 @@ def force_trigger_tasks( [], [f"{p_point}/{p_name}"], flow_nums, flow_wait, - flow_descr + flow_descr, + trigger=True ) def spawn_parentless_sequential_xtriggers(self): diff --git a/tests/functional/cylc-show/06-past-present-future.t b/tests/functional/cylc-show/06-past-present-future.t index a67636bc613..73a749f523b 100644 --- a/tests/functional/cylc-show/06-past-present-future.t +++ b/tests/functional/cylc-show/06-past-present-future.t @@ -42,11 +42,12 @@ state: succeeded prerequisites: (n/a for past tasks) __END__ +# Note trigger command satisfies off-flow prerequisites. TEST_NAME="${TEST_NAME_BASE}-show.present" contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__ state: running prerequisites: ('⨯': not satisfied) - ⨯ 1/b succeeded + ✓ 1/b succeeded __END__ TEST_NAME="${TEST_NAME_BASE}-show.future" diff --git a/tests/functional/cylc-trigger/01-queued/reference.log b/tests/functional/cylc-trigger/01-queued/reference.log index 1a9f846c98d..499c919d40d 100644 --- a/tests/functional/cylc-trigger/01-queued/reference.log +++ b/tests/functional/cylc-trigger/01-queued/reference.log @@ -1,4 +1,4 @@ Initial point: 1 Final point: 1 1/foo -triggered off [] -1/bar -triggered off ['1/foo'] +1/bar -triggered off [] diff --git a/tests/functional/reload/20-stop-point/reference.log b/tests/functional/reload/20-stop-point/reference.log index fabf835457b..2822c1470fa 100644 --- a/tests/functional/reload/20-stop-point/reference.log +++ b/tests/functional/reload/20-stop-point/reference.log @@ -2,6 +2,6 @@ Initial point: 1 Final point: 5 1/set-stop-point -triggered off [] 1/reload -triggered off ['1/set-stop-point'] -1/t1 -triggered off ['1/reload'] +1/t1 -triggered off ['0/t1', '1/reload'] 2/t1 -triggered off ['1/t1'] 3/t1 -triggered off ['2/t1'] diff --git a/tests/functional/spawn-on-demand/10-retrigger/reference.log b/tests/functional/spawn-on-demand/10-retrigger/reference.log index 7c9a0a19599..ad8394c752e 100644 --- a/tests/functional/spawn-on-demand/10-retrigger/reference.log +++ b/tests/functional/spawn-on-demand/10-retrigger/reference.log @@ -3,5 +3,5 @@ Final point: 1 1/foo -triggered off [] 1/oops -triggered off ['1/foo'] 1/triggerer -triggered off ['1/foo'] -1/oops -triggered off ['1/foo'] +1/oops -triggered off [] 1/bar -triggered off ['1/oops'] diff --git a/tests/unit/test_flow_mgr.py b/tests/unit/test_flow_mgr.py index c9171b02073..33a623de173 100644 --- a/tests/unit/test_flow_mgr.py +++ b/tests/unit/test_flow_mgr.py @@ -48,7 +48,7 @@ def test_all( db_mgr = WorkflowDatabaseManager() flow_mgr = FlowMgr(db_mgr) - caplog.set_level(logging.INFO, CYLC_LOG) + caplog.set_level(logging.DEBUG, CYLC_LOG) meta = "the quick brown fox" assert flow_mgr.get_flow_num(None, meta) == 1 From a958b356c49d94ce7f9e2cf1489f6103108fa092 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Mon, 7 Oct 2024 15:33:44 +1300 Subject: [PATCH 4/5] More clean up. --- cylc/flow/task_pool.py | 93 ++++++++++++++++++------------------------ 1 file changed, 40 insertions(+), 53 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index c781d8bcf5a..baa9c61c700 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2173,11 +2173,16 @@ def force_trigger_tasks( flow_wait: bool = False, flow_descr: Optional[str] = None ): - """Force a task to trigger (user command). + """Force trigger a selected group of tasks. - Always run the task, even if a previous run was flow-waited. + Set any off-group prerequisites (all task proxies). + - Tasks with only off-group prereqs will run immediately + Unset any in-flow prerequisites (existing task proxies). + - The flow will respect dependencies within the group + + # TODO: check the following (presumably there are tests): - If the task did not run before in the flow: + If a task did not run before in the flow: - run it, and spawn on outputs unless flow-wait is set. (but load the previous outputs from the DB) @@ -2189,29 +2194,7 @@ def force_trigger_tasks( - just spawn (if not already spawned in this flow) unless flow-wait is set. - TODO: ensure above description of triggering still works. - TODO: and original trigger functionality - - (Re)run a selected group of tasks. - - Set any off-flow prerequisites (all task proxies). - Unset any in-flow prerequisites (existing task proxies). - - TODO - check triggering if waiting on xtrigger - TODO - CALL _force_trigger on the initial tasks? - TODO - get_resolved no longer reports "triggered off []" for manual - triggering because we now do it by setting prerequisites as if - naturally. Use is_manual_submit instead? """ - # Get flow numbers for the tasks to be triggered. - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - return - - # if ignore_deps: TODO - # return self.force_trigger_tasks( - # items, flow, flow_wait, flow_descr) - # Get matching tasks proxies, and matching future task IDs. existing_tasks, future_ids, unmatched = self.filter_task_proxies( items, future=True, warn=False, @@ -2221,6 +2204,38 @@ def force_trigger_tasks( [(itask.tdef.name, itask.point) for itask in existing_tasks] ) + for name, point in future_ids: + tdef = self.config.taskdefs[name] + if tdef.is_parentless(point): + # parentless: promote to task pool + self.set_prereqs_and_outputs( + [f"{point}/{name}"], + [], ["all"], + flow, + flow_wait, + flow_descr, + trigger=True + ) + for pid in tdef.get_triggers(point): + p_point = pid.get_point(point) + p_name = pid.task_name + if (p_name, p_point) in all_ids: + # in-flow + continue + # set off-flow prerequisite + self.set_prereqs_and_outputs( + [f"{point}/{name}"], + [], [f"{p_point}/{p_name}"], + flow, + flow_wait, + flow_descr, + trigger=True + ) + + flow_nums = self._get_flow_nums(flow, flow_descr) + if flow_nums is None: + return + for itask in existing_tasks: # active tasks, present in the pool if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): @@ -2249,34 +2264,6 @@ def force_trigger_tasks( # not in loop! we could trigger a task with no prereqs self._force_trigger_if_ready(itask) - for name, point in future_ids: - tdef = self.config.taskdefs[name] - if tdef.is_parentless(point): - # parentless: promote to task pool - self.set_prereqs_and_outputs( - [f"{point}/{name}"], - [], ["all"], - flow_nums, - flow_wait, - flow_descr, - trigger=True - ) - for pid in tdef.get_triggers(point): - p_point = pid.get_point(point) - p_name = pid.task_name - if (p_name, p_point) in all_ids: - # in-flow - continue - # set off-flow prerequisite - self.set_prereqs_and_outputs( - [f"{point}/{name}"], - [], [f"{p_point}/{p_name}"], - flow_nums, - flow_wait, - flow_descr, - trigger=True - ) - def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" while self.xtrigger_mgr.sequential_spawn_next: From 1dbef6278508f090822fc53afc475a843003b47d Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Mon, 7 Oct 2024 17:12:22 +1300 Subject: [PATCH 5/5] Avoid double-incrememting flow numbers via new trigger implementation. --- cylc/flow/task_pool.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index baa9c61c700..56a8043188e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1871,9 +1871,10 @@ def set_prereqs_and_outputs( items: Iterable[str], outputs: List[str], prereqs: List[str], - flow: List[str], + flow: Optional[List[str]] = None, flow_wait: bool = False, flow_descr: Optional[str] = None, + flow_nums: Optional[Set[int]] = None, trigger: bool = False ): """Set prerequisites or outputs of target tasks. @@ -1908,12 +1909,15 @@ def set_prereqs_and_outputs( items: task ID match patterns prereqs: prerequisites to set outputs: outputs to set - flow: flow numbers for spawned or merged tasks + flow: raw input flow numbers for spawned or merged tasks + flow_nums: if actual flow numbers have aldready been computed flow_wait: wait for flows to catch up before continuing flow_descr: description of new flow """ - flow_nums = self._get_flow_nums(flow, flow_descr) + if flow is not None: + flow_nums = self._get_flow_nums(flow, flow_descr) + if flow_nums is None: # Illegal flow command opts return @@ -2195,6 +2199,10 @@ def force_trigger_tasks( unless flow-wait is set. """ + flow_nums = self._get_flow_nums(flow, flow_descr) + if flow_nums is None: + return + # Get matching tasks proxies, and matching future task IDs. existing_tasks, future_ids, unmatched = self.filter_task_proxies( items, future=True, warn=False, @@ -2211,9 +2219,10 @@ def force_trigger_tasks( self.set_prereqs_and_outputs( [f"{point}/{name}"], [], ["all"], - flow, + None, flow_wait, flow_descr, + flow_nums, trigger=True ) for pid in tdef.get_triggers(point): @@ -2226,16 +2235,13 @@ def force_trigger_tasks( self.set_prereqs_and_outputs( [f"{point}/{name}"], [], [f"{p_point}/{p_name}"], - flow, + None, flow_wait, flow_descr, + flow_nums, trigger=True ) - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - return - for itask in existing_tasks: # active tasks, present in the pool if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):