From d7178d3018ce60f9bd001d9411cfc9144bb3794f Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Wed, 11 Sep 2024 19:31:58 +1200 Subject: [PATCH] Trigger and set: fix default flow assignment for n=0 tasks. --- cylc/flow/command_validation.py | 6 ++- cylc/flow/network/schema.py | 6 +-- cylc/flow/scripts/trigger.py | 21 ++++++--- cylc/flow/task_pool.py | 76 ++++++++++++++++----------------- 4 files changed, 59 insertions(+), 50 deletions(-) diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index 1c57d452c43..6f1495d30f6 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -39,10 +39,11 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: """Check validity of flow-related CLI options. - Note the schema defaults flows to ["all"]. + Note the schema defaults flows to []. Examples: Good: + >>> flow_opts([], False) >>> flow_opts(["new"], False) >>> flow_opts(["1", "2"], False) >>> flow_opts(["1", "2"], True) @@ -61,6 +62,9 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: cylc.flow.exceptions.InputError: ... """ + if not flows: + return + for val in flows: val = val.strip() if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]: diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 70e40232c1d..9a6480e8348 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1998,15 +1998,15 @@ class Arguments: class FlowMutationArguments: flow = graphene.List( graphene.NonNull(Flow), - default_value=[FLOW_ALL], + default_value=[], description=sstrip(f''' The flow(s) to trigger these tasks in. This should be a list of flow numbers OR a single-item list containing one of the following three strings: - * {FLOW_ALL} - Triggered tasks belong to all active flows - (default). + * (nothing) - Triggered tasks keep flow, or {FLOW_ALL} (default) + * {FLOW_ALL} - Triggered tasks belong to all active flows. * {FLOW_NEW} - Triggered tasks are assigned to a new flow. * {FLOW_NONE} - Triggered tasks do not belong to any flow. ''') diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 58c2f2a3939..8f5fdaef70e 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,18 +17,25 @@ """cylc trigger [OPTIONS] ARGS -Force tasks to run despite unsatisfied prerequisites. +Force tasks to run regardless of prerequisites. * Triggering an unqueued waiting task queues it, regardless of prerequisites. * Triggering a queued task submits it, regardless of queue limiting. * Triggering an active task has no effect (it already triggered). -Incomplete and active-waiting tasks in the n=0 window already belong to a flow. -Triggering them queues them to run (or rerun) in the same flow. +Future tasks (n>0) do not already belong to a flow. +* by default they are assigned to all active flows +* otherwise, according to the --flow option -Beyond n=0, triggered tasks get all current active flow numbers by default, or -specified flow numbers via the --flow option. Those flows - if/when they catch -up - will see tasks that ran after triggering event as having run already. +Active tasks (n=0) already belong to a flow. +* by default they run in the same flow +* with --flow=all, they are assigned to all active flows +* with --flow=INT or --flow=new, the new flow merges with the old one +* --flow=none is ignored, to avoid blocking the existing flow + +Note --flow=new increments the global flow counter so if you need multiple +commands to start a single new flow only use --flow=new in the first command, +then use the actual new flow number (e.g. read it from the scheduler log). Examples: # trigger task foo in cycle 1234 in test @@ -115,7 +122,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list): @cli_function(get_option_parser) def main(parser: COP, options: 'Values', *ids: str): """CLI for "cylc trigger".""" - command_validation.flow_opts(options.flow or ['all'], options.flow_wait) + command_validation.flow_opts(options.flow, options.flow_wait) rets = call_multi( partial(run, options), *ids, diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 9c27dcd232d..97f667e018c 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1908,11 +1908,6 @@ def set_prereqs_and_outputs( flow_descr: description of new flow """ - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - # Illegal flow command opts - return - # Get matching pool tasks and future task definitions. itasks, future_tasks, unmatched = self.filter_task_proxies( items, @@ -1920,14 +1915,17 @@ def set_prereqs_and_outputs( warn=False, ) + # Set existing task proxies. + flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in itasks: - # Existing task proxies. self.merge_flows(itask, flow_nums) if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) else: self._set_outputs_itask(itask, outputs) + # Spawn and set future tasks. + flow_nums = self._get_flow_nums(flow, flow_descr, active=False) for name, point in future_tasks: tdef = self.config.get_taskdef(name) if prereqs: @@ -2066,34 +2064,39 @@ def _get_flow_nums( self, flow: List[str], meta: Optional[str] = None, - ) -> Optional[Set[int]]: - """Get correct flow numbers given user command options.""" - if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}): - if len(flow) != 1: - LOG.warning( - f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}' - ' cannot be used in combination with integer flow numbers.' - ) - return None - if flow[0] == FLOW_ALL: - flow_nums = self._get_active_flow_nums() - elif flow[0] == FLOW_NEW: - flow_nums = {self.flow_mgr.get_flow_num(meta=meta)} - elif flow[0] == FLOW_NONE: + active: bool = False + ) -> Set[int]: + """Return flow numbers corresponding to user command options. + + Must be called separately for active (n=0) and future tasks. + Future tasks: assign the result to the new task. + Active tasks: merge the result with its existing flow numbers + + Option validity is pre-checked during command validation. + + """ + if not flow: + # default (no --flow option used) + if active: + # active tasks: stick with the existing flow flow_nums = set() + else: + # future tasks: assign to all active flows + flow_nums = self._get_active_flow_nums() + elif flow == [FLOW_NONE]: + flow_nums = set() + elif flow == [FLOW_ALL]: + flow_nums = self._get_active_flow_nums() + elif flow == [FLOW_NEW]: + flow_nums = {self.flow_mgr.get_flow_num(meta=meta)} else: - try: - flow_nums = { - self.flow_mgr.get_flow_num( - flow_num=int(n), meta=meta - ) - for n in flow - } - except ValueError: - LOG.warning( - f"Ignoring command: illegal flow values {flow}" + # specific flow numbers + flow_nums = { + self.flow_mgr.get_flow_num( + flow_num=int(n), meta=meta ) - return None + for n in flow + } return flow_nums def _force_trigger(self, itask): @@ -2157,17 +2160,13 @@ def force_trigger_tasks( unless flow-wait is set. """ - # Get flow numbers for the tasks to be triggered. - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - return - # Get matching tasks proxies, and matching future task IDs. existing_tasks, future_ids, unmatched = self.filter_task_proxies( items, future=True, warn=False, ) - # Trigger existing tasks. + # Trigger active tasks. + flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in existing_tasks: if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") @@ -2176,11 +2175,10 @@ def force_trigger_tasks( self._force_trigger(itask) # Spawn and trigger future tasks. + flow_nums = self._get_flow_nums(flow, flow_descr, active=False) for name, point in future_ids: - if not self.can_be_spawned(name, point): continue - submit_num, _, prev_fwait = ( self._get_task_history(name, point, flow_nums) )