From 2eb2e632a55428e371cecac8e7edf7be061ee3bb Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Wed, 11 Sep 2024 19:31:58 +1200 Subject: [PATCH 01/15] Trigger and set: fix default flow assignment for n=0 tasks. --- cylc/flow/command_validation.py | 6 +- cylc/flow/network/schema.py | 17 ++--- cylc/flow/scripts/trigger.py | 21 +++++-- cylc/flow/task_pool.py | 80 ++++++++++++------------ tests/integration/test_data_store_mgr.py | 2 +- tests/integration/test_trigger.py | 8 ++- 6 files changed, 77 insertions(+), 57 deletions(-) diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index 34b7a0f1460..e5f87e85ae6 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -39,10 +39,11 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: """Check validity of flow-related CLI options. - Note the schema defaults flows to ["all"]. + Note the schema defaults flows to []. Examples: Good: + >>> flow_opts([], False) >>> flow_opts(["new"], False) >>> flow_opts(["1", "2"], False) >>> flow_opts(["1", "2"], True) @@ -61,6 +62,9 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: cylc.flow.exceptions.InputError: ... """ + if not flows: + return + for val in flows: val = val.strip() if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]: diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 70e40232c1d..64ad2c72c31 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1998,17 +1998,20 @@ class Arguments: class FlowMutationArguments: flow = graphene.List( graphene.NonNull(Flow), - default_value=[FLOW_ALL], + default_value=[], description=sstrip(f''' The flow(s) to trigger these tasks in. - This should be a list of flow numbers OR a single-item list - containing one of the following three strings: + By default: + * active tasks (n=0) keep their existing flow assignment + * future tasks (n>0) get assigned all active flows - * {FLOW_ALL} - Triggered tasks belong to all active flows - (default). - * {FLOW_NEW} - Triggered tasks are assigned to a new flow. - * {FLOW_NONE} - Triggered tasks do not belong to any flow. + Otherwise you can assign (future tasks) or add to (active tasks): + * a list of integer flow numbers + or a single-item list containing one of the following strings: + * {FLOW_ALL} - all active flows + * {FLOW_NEW} - an automatically generated new flow number + * {FLOW_NONE} - (ignored for active tasks): no flow ''') ) flow_wait = Boolean( diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index de788481cfe..73f96ad9357 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,18 +17,25 @@ """cylc trigger [OPTIONS] ARGS -Force tasks to run despite unsatisfied prerequisites. +Force tasks to run regardless of prerequisites. * Triggering an unqueued waiting task queues it, regardless of prerequisites. * Triggering a queued task submits it, regardless of queue limiting. * Triggering an active task has no effect (it already triggered). -Incomplete and active-waiting tasks in the n=0 window already belong to a flow. -Triggering them queues them to run (or rerun) in the same flow. +Future tasks (n>0) do not already belong to a flow. +* by default they are assigned to all active flows +* otherwise, according to the --flow option -Beyond n=0, triggered tasks get all current active flow numbers by default, or -specified flow numbers via the --flow option. Those flows - if/when they catch -up - will see tasks that ran after triggering event as having run already. +Active tasks (n=0) already belong to a flow. +* by default they run in the same flow +* with --flow=all, they are assigned to all active flows +* with --flow=INT or --flow=new, the new flow merges with the old one +* --flow=none is ignored, to avoid blocking the existing flow + +Note --flow=new increments the global flow counter so if you need multiple +commands to start a single new flow only use --flow=new in the first command, +then use the actual new flow number (e.g. read it from the scheduler log). Examples: # trigger task foo in cycle 1234 in test @@ -53,6 +60,7 @@ ) from cylc.flow.terminal import cli_function from cylc.flow.flow_mgr import add_flow_opts +from cylc.flow.command_validation import flow_opts if TYPE_CHECKING: @@ -114,6 +122,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list): @cli_function(get_option_parser) def main(parser: COP, options: 'Values', *ids: str): """CLI for "cylc trigger".""" + flow_opts(options.flow, options.flow_wait) rets = call_multi( partial(run, options), *ids, diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 4003046bb54..0a60814da95 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1908,11 +1908,6 @@ def set_prereqs_and_outputs( flow_descr: description of new flow """ - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - # Illegal flow command opts - return - # Get matching pool tasks and future task definitions. itasks, future_tasks, unmatched = self.filter_task_proxies( items, @@ -1920,14 +1915,17 @@ def set_prereqs_and_outputs( warn=False, ) + # Set existing task proxies. + flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in itasks: - # Existing task proxies. self.merge_flows(itask, flow_nums) if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) else: self._set_outputs_itask(itask, outputs) + # Spawn and set future tasks. + flow_nums = self._get_flow_nums(flow, flow_descr, active=False) for name, point in future_tasks: tdef = self.config.get_taskdef(name) if prereqs: @@ -2066,34 +2064,43 @@ def _get_flow_nums( self, flow: List[str], meta: Optional[str] = None, - ) -> Optional[Set[int]]: - """Get correct flow numbers given user command options.""" - if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}): - if len(flow) != 1: - LOG.warning( - f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}' - ' cannot be used in combination with integer flow numbers.' - ) - return None - if flow[0] == FLOW_ALL: - flow_nums = self._get_active_flow_nums() - elif flow[0] == FLOW_NEW: - flow_nums = {self.flow_mgr.get_flow_num(meta=meta)} - elif flow[0] == FLOW_NONE: + active: bool = False + ) -> Set[int]: + """Return flow numbers corresponding to user command options. + + Arg should have been validated already during command validation. + + Call this method separately for active (n=0) and future tasks. + - future tasks: assign the result to the new task + - active tasks: merge the result with existing flow numbers + + The result is different in the default case (no --flow option): + - future tasks: return all active flows + - active tasks: stick with the existing flows (so return empty set). + + """ + if not flow: + # default (i.e. no --flow option was used) + if active: + # active tasks: stick with the existing flow flow_nums = set() + else: + # future tasks: assign to all active flows + flow_nums = self._get_active_flow_nums() + elif flow == [FLOW_NONE]: + flow_nums = set() + elif flow == [FLOW_ALL]: + flow_nums = self._get_active_flow_nums() + elif flow == [FLOW_NEW]: + flow_nums = {self.flow_mgr.get_flow_num(meta=meta)} else: - try: - flow_nums = { - self.flow_mgr.get_flow_num( - flow_num=int(n), meta=meta - ) - for n in flow - } - except ValueError: - LOG.warning( - f"Ignoring command: illegal flow values {flow}" + # specific flow numbers + flow_nums = { + self.flow_mgr.get_flow_num( + flow_num=int(n), meta=meta ) - return None + for n in flow + } return flow_nums def _force_trigger(self, itask): @@ -2157,17 +2164,13 @@ def force_trigger_tasks( unless flow-wait is set. """ - # Get flow numbers for the tasks to be triggered. - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - return - # Get matching tasks proxies, and matching future task IDs. existing_tasks, future_ids, unmatched = self.filter_task_proxies( items, future=True, warn=False, ) - # Trigger existing tasks. + # Trigger active tasks. + flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in existing_tasks: if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") @@ -2176,11 +2179,10 @@ def force_trigger_tasks( self._force_trigger(itask) # Spawn and trigger future tasks. + flow_nums = self._get_flow_nums(flow, flow_descr, active=False) for name, point in future_ids: - if not self.can_be_spawned(name, point): continue - submit_num, _, prev_fwait = ( self._get_task_history(name, point, flow_nums) ) diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index 906b1ac052d..15bbca39e58 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -301,7 +301,7 @@ def test_delta_task_prerequisite(harness): [t.identity for t in schd.pool.get_tasks()], [(TASK_STATUS_SUCCEEDED,)], [], - "all" + ["all"] ) assert all({ p.satisfied diff --git a/tests/integration/test_trigger.py b/tests/integration/test_trigger.py index 30ae3404ed8..3f6b5dee138 100644 --- a/tests/integration/test_trigger.py +++ b/tests/integration/test_trigger.py @@ -17,6 +17,8 @@ import logging from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE +from cylc.flow.command_validation import flow_opts +from cylc.flow.exceptions import InputError import pytest import time @@ -34,11 +36,11 @@ ) ) async def test_trigger_invalid(mod_one, start, log_filter, flow_strs): - """Ensure invalid flow values are rejected.""" + """Ensure invalid flow values are rejected during command validation.""" async with start(mod_one) as log: log.clear() - assert mod_one.pool.force_trigger_tasks(['*'], flow_strs) is None - assert len(log_filter(log, level=logging.WARN)) == 1 + with pytest.raises(InputError): + flow_opts(flow_strs, False) async def test_trigger_no_flows(one, start, log_filter): From 54f037d36f6e26c4fe0f81426906b06166acea0f Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Wed, 18 Sep 2024 10:28:58 +1200 Subject: [PATCH 02/15] Add change log entry. --- changes.d/6367.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/6367.fix.md diff --git a/changes.d/6367.fix.md b/changes.d/6367.fix.md new file mode 100644 index 00000000000..b9dcfe9a800 --- /dev/null +++ b/changes.d/6367.fix.md @@ -0,0 +1 @@ +Fix bug where trigger would assign active flows to existing tasks by default. From d07c86fbcfeac5d62720507ff8a596586f66650a Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Wed, 18 Sep 2024 11:38:16 +1200 Subject: [PATCH 03/15] Tweak for split calls to get flow nums. --- cylc/flow/task_pool.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 0a60814da95..b279b3bf25f 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1915,6 +1915,15 @@ def set_prereqs_and_outputs( warn=False, ) + if flow == [FLOW_NEW]: + # Translate --flow=new to an actual flow number now to avoid + # incrementing it twice below. + flow = [ + str( + self.flow_mgr.get_flow_num(meta=flow_descr) + ) + ] + # Set existing task proxies. flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in itasks: @@ -2074,6 +2083,10 @@ def _get_flow_nums( - future tasks: assign the result to the new task - active tasks: merge the result with existing flow numbers + Note if a single command results in two calls to this method (for + active and future tasks), translate --flow=new to an actual flow + number first, to avoid incrementing the flow counter twice. + The result is different in the default case (no --flow option): - future tasks: return all active flows - active tasks: stick with the existing flows (so return empty set). @@ -2169,6 +2182,15 @@ def force_trigger_tasks( items, future=True, warn=False, ) + if flow == [FLOW_NEW]: + # Translate --flow=new to an actual flow number now to avoid + # incrementing it twice below. + flow = [ + str( + self.flow_mgr.get_flow_num(meta=flow_descr) + ) + ] + # Trigger active tasks. flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in existing_tasks: From 484b2c6079cc6286805857a41e8b3a22d19ff89c Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Wed, 18 Sep 2024 14:58:49 +1200 Subject: [PATCH 04/15] De-flakify a functional test. --- .../triggering/08-fam-finish-any/flow.cylc | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/functional/triggering/08-fam-finish-any/flow.cylc b/tests/functional/triggering/08-fam-finish-any/flow.cylc index 6d8790a829f..6ecb0bf9781 100644 --- a/tests/functional/triggering/08-fam-finish-any/flow.cylc +++ b/tests/functional/triggering/08-fam-finish-any/flow.cylc @@ -2,12 +2,19 @@ [[graph]] R1 = """FAM:finish-any => foo""" [runtime] + [[root]] + script = true [[FAM]] - script = sleep 10 - [[a,c]] + [[a]] inherit = FAM + script = """ + cylc__job__poll_grep_workflow_log -E "1/b.*succeeded" + """ [[b]] inherit = FAM - script = true + [[c]] + inherit = FAM + script = """ + cylc__job__poll_grep_workflow_log -E "1/b.*succeeded" + """ [[foo]] - script = true From 75bd752d12d444c3ed535314e7afc9c5ec059bd2 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Wed, 25 Sep 2024 18:28:24 +0100 Subject: [PATCH 05/15] Simplify flow number wrangling --- cylc/flow/task_pool.py | 83 +++++++++++++----------------------------- 1 file changed, 25 insertions(+), 58 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index b279b3bf25f..38eaeb9dede 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1915,17 +1915,9 @@ def set_prereqs_and_outputs( warn=False, ) - if flow == [FLOW_NEW]: - # Translate --flow=new to an actual flow number now to avoid - # incrementing it twice below. - flow = [ - str( - self.flow_mgr.get_flow_num(meta=flow_descr) - ) - ] + flow_nums = self._get_flow_nums(flow, flow_descr) # Set existing task proxies. - flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in itasks: self.merge_flows(itask, flow_nums) if prereqs: @@ -1934,7 +1926,9 @@ def set_prereqs_and_outputs( self._set_outputs_itask(itask, outputs) # Spawn and set future tasks. - flow_nums = self._get_flow_nums(flow, flow_descr, active=False) + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() for name, point in future_tasks: tdef = self.config.get_taskdef(name) if prereqs: @@ -2070,51 +2064,30 @@ def remove_tasks(self, items): return len(bad_items) def _get_flow_nums( - self, - flow: List[str], - meta: Optional[str] = None, - active: bool = False + self, + flow: List[str], + meta: Optional[str] = None, ) -> Set[int]: """Return flow numbers corresponding to user command options. Arg should have been validated already during command validation. - Call this method separately for active (n=0) and future tasks. - - future tasks: assign the result to the new task - - active tasks: merge the result with existing flow numbers - - Note if a single command results in two calls to this method (for - active and future tasks), translate --flow=new to an actual flow - number first, to avoid incrementing the flow counter twice. - - The result is different in the default case (no --flow option): - - future tasks: return all active flows - - active tasks: stick with the existing flows (so return empty set). + In the default case (--flow option not provided), stick with the + existing flows (so return empty set) - NOTE this only applies for + active tasks. """ - if not flow: - # default (i.e. no --flow option was used) - if active: - # active tasks: stick with the existing flow - flow_nums = set() - else: - # future tasks: assign to all active flows - flow_nums = self._get_active_flow_nums() - elif flow == [FLOW_NONE]: - flow_nums = set() - elif flow == [FLOW_ALL]: - flow_nums = self._get_active_flow_nums() - elif flow == [FLOW_NEW]: - flow_nums = {self.flow_mgr.get_flow_num(meta=meta)} - else: - # specific flow numbers - flow_nums = { - self.flow_mgr.get_flow_num( - flow_num=int(n), meta=meta - ) - for n in flow - } - return flow_nums + if flow == [FLOW_NONE]: + return set() + if flow == [FLOW_ALL]: + return self._get_active_flow_nums() + if flow == [FLOW_NEW]: + return {self.flow_mgr.get_flow_num(meta=meta)} + # else specific flow numbers: + return { + self.flow_mgr.get_flow_num(flow_num=int(n), meta=meta) + for n in flow + } def _force_trigger(self, itask): """Assumes task is in the pool""" @@ -2182,17 +2155,9 @@ def force_trigger_tasks( items, future=True, warn=False, ) - if flow == [FLOW_NEW]: - # Translate --flow=new to an actual flow number now to avoid - # incrementing it twice below. - flow = [ - str( - self.flow_mgr.get_flow_num(meta=flow_descr) - ) - ] + flow_nums = self._get_flow_nums(flow, flow_descr) # Trigger active tasks. - flow_nums = self._get_flow_nums(flow, flow_descr, active=True) for itask in existing_tasks: if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") @@ -2201,7 +2166,9 @@ def force_trigger_tasks( self._force_trigger(itask) # Spawn and trigger future tasks. - flow_nums = self._get_flow_nums(flow, flow_descr, active=False) + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() for name, point in future_ids: if not self.can_be_spawned(name, point): continue From 48bef4e577893c0408f34b8219298f05a18e353c Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Wed, 25 Sep 2024 19:19:21 +0100 Subject: [PATCH 06/15] Improve `--flow` error message --- cylc/flow/command_validation.py | 19 +++++++------ tests/integration/test_trigger.py | 26 +---------------- tests/unit/test_command_validation.py | 41 +++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 33 deletions(-) create mode 100644 tests/unit/test_command_validation.py diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index e5f87e85ae6..d87c0711a8d 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -30,7 +30,7 @@ ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'" -ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued" +ERR_OPT_FLOW_COMBINE = "Cannot combine --flow={0} with other flow values" ERR_OPT_FLOW_WAIT = ( f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}" ) @@ -51,7 +51,8 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: Bad: >>> flow_opts(["none", "1"], False) Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... must all be integer valued + cylc.flow.exceptions.InputError: Cannot combine --flow=none with other + flow values >>> flow_opts(["cheese", "2"], True) Traceback (most recent call last): @@ -59,24 +60,26 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: >>> flow_opts(["new"], True) Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... + cylc.flow.exceptions.InputError: --wait is not compatible with + --flow=new or --flow=none """ if not flows: return + flows = [val.strip() for val in flows] + for val in flows: - val = val.strip() - if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]: + if val in {FLOW_NONE, FLOW_NEW, FLOW_ALL}: if len(flows) != 1: - raise InputError(ERR_OPT_FLOW_INT) + raise InputError(ERR_OPT_FLOW_COMBINE.format(val)) else: try: int(val) except ValueError: - raise InputError(ERR_OPT_FLOW_VAL.format(val)) + raise InputError(ERR_OPT_FLOW_VAL) - if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]: + if flow_wait and flows[0] in {FLOW_NEW, FLOW_NONE}: raise InputError(ERR_OPT_FLOW_WAIT) diff --git a/tests/integration/test_trigger.py b/tests/integration/test_trigger.py index 3f6b5dee138..d9c5304b745 100644 --- a/tests/integration/test_trigger.py +++ b/tests/integration/test_trigger.py @@ -14,33 +14,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import logging - -from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE -from cylc.flow.command_validation import flow_opts -from cylc.flow.exceptions import InputError - -import pytest import time - -@pytest.mark.parametrize( - 'flow_strs', - ( - [FLOW_ALL, '1'], - ['1', FLOW_ALL], - [FLOW_NEW, '1'], - [FLOW_NONE, '1'], - ['a'], - ['1', 'a'], - ) -) -async def test_trigger_invalid(mod_one, start, log_filter, flow_strs): - """Ensure invalid flow values are rejected during command validation.""" - async with start(mod_one) as log: - log.clear() - with pytest.raises(InputError): - flow_opts(flow_strs, False) +from cylc.flow.flow_mgr import FLOW_ALL async def test_trigger_no_flows(one, start, log_filter): diff --git a/tests/unit/test_command_validation.py b/tests/unit/test_command_validation.py new file mode 100644 index 00000000000..42fdda5aedf --- /dev/null +++ b/tests/unit/test_command_validation.py @@ -0,0 +1,41 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest + +from cylc.flow.command_validation import ( + ERR_OPT_FLOW_COMBINE, + ERR_OPT_FLOW_VAL, + flow_opts, +) +from cylc.flow.exceptions import InputError +from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE + + +@pytest.mark.parametrize('flow_strs, expected_msg', [ + ([FLOW_ALL, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_ALL)), + (['1', FLOW_ALL], ERR_OPT_FLOW_COMBINE.format(FLOW_ALL)), + ([FLOW_NEW, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_NEW)), + ([FLOW_NONE, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_NONE)), + ([FLOW_NONE, FLOW_ALL], ERR_OPT_FLOW_COMBINE.format(FLOW_NONE)), + (['a'], ERR_OPT_FLOW_VAL), + (['1', 'a'], ERR_OPT_FLOW_VAL), +]) +async def test_trigger_invalid(flow_strs, expected_msg): + """Ensure invalid flow values are rejected during command validation.""" + with pytest.raises(InputError) as exc_info: + flow_opts(flow_strs, False) + assert str(exc_info.value) == expected_msg From 7c193ad0beea6789d7d07f793b89358ffcb070f4 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 30 Sep 2024 10:53:39 +1300 Subject: [PATCH 07/15] Apply suggestions from code review [skip ci] Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- changes.d/6367.fix.md | 2 +- cylc/flow/network/schema.py | 2 +- cylc/flow/scripts/trigger.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/changes.d/6367.fix.md b/changes.d/6367.fix.md index b9dcfe9a800..44045a632a6 100644 --- a/changes.d/6367.fix.md +++ b/changes.d/6367.fix.md @@ -1 +1 @@ -Fix bug where trigger would assign active flows to existing tasks by default. +Fix bug where `cylc trigger` and `cylc set` would assign active flows to existing tasks by default. diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 64ad2c72c31..93ed23fbac9 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2000,7 +2000,7 @@ class FlowMutationArguments: graphene.NonNull(Flow), default_value=[], description=sstrip(f''' - The flow(s) to trigger these tasks in. + The flow(s) to trigger/set these tasks in. By default: * active tasks (n=0) keep their existing flow assignment diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 73f96ad9357..f77d6debbd0 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -31,7 +31,7 @@ * by default they run in the same flow * with --flow=all, they are assigned to all active flows * with --flow=INT or --flow=new, the new flow merges with the old one -* --flow=none is ignored, to avoid blocking the existing flow +* --flow=none is ignored Note --flow=new increments the global flow counter so if you need multiple commands to start a single new flow only use --flow=new in the first command, From 61ec89fc7186c0cf87b0767fea36f87aa5209196 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 30 Sep 2024 16:47:53 +1300 Subject: [PATCH 08/15] Update tests/integration/test_data_store_mgr.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- tests/integration/test_data_store_mgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index 15bbca39e58..f50333b6944 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -301,7 +301,7 @@ def test_delta_task_prerequisite(harness): [t.identity for t in schd.pool.get_tasks()], [(TASK_STATUS_SUCCEEDED,)], [], - ["all"] + flow=[] ) assert all({ p.satisfied From f586f825330a8b38d71ca61f3d18935a5bec3e75 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Mon, 30 Sep 2024 17:15:08 +1300 Subject: [PATCH 09/15] Tweaks. --- cylc/flow/task_pool.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 38eaeb9dede..20c3fb96cc5 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1280,17 +1280,17 @@ def set_hold_point(self, point: 'PointBase') -> None: def hold_tasks(self, items: Iterable[str]) -> int: """Hold tasks with IDs matching the specified items.""" # Hold active tasks: - itasks, future_tasks, unmatched = self.filter_task_proxies( + itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, warn=False, future=True, ) for itask in itasks: self.hold_active_task(itask) - # Set future tasks to be held: - for name, cycle in future_tasks: + # Set inactive tasks to be held: + for name, cycle in inactive_tasks: self.data_store_mgr.delta_task_held((name, cycle, True)) - self.tasks_to_hold.update(future_tasks) + self.tasks_to_hold.update(inactive_tasks) self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold) LOG.debug(f"Tasks to hold: {self.tasks_to_hold}") return len(unmatched) @@ -1298,17 +1298,17 @@ def hold_tasks(self, items: Iterable[str]) -> int: def release_held_tasks(self, items: Iterable[str]) -> int: """Release held tasks with IDs matching any specified items.""" # Release active tasks: - itasks, future_tasks, unmatched = self.filter_task_proxies( + itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, warn=False, future=True, ) for itask in itasks: self.release_held_active_task(itask) - # Unhold future tasks: - for name, cycle in future_tasks: + # Unhold inactive tasks: + for name, cycle in inactive_tasks: self.data_store_mgr.delta_task_held((name, cycle, False)) - self.tasks_to_hold.difference_update(future_tasks) + self.tasks_to_hold.difference_update(inactive_tasks) self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold) LOG.debug(f"Tasks to hold: {self.tasks_to_hold}") return len(unmatched) @@ -1887,7 +1887,7 @@ def set_prereqs_and_outputs( Task matching restrictions (for now): - globs (cycle and name) only match in the pool - - future tasks must be specified individually + - inactive tasks must be specified individually - family names are not expanded to members Uses a transient task proxy to spawn children. (Even if parent was @@ -1909,7 +1909,7 @@ def set_prereqs_and_outputs( """ # Get matching pool tasks and future task definitions. - itasks, future_tasks, unmatched = self.filter_task_proxies( + itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, future=True, warn=False, @@ -1925,11 +1925,11 @@ def set_prereqs_and_outputs( else: self._set_outputs_itask(itask, outputs) - # Spawn and set future tasks. + # Spawn and set inactive tasks. if not flow: # default: assign to all active flows flow_nums = self._get_active_flow_nums() - for name, point in future_tasks: + for name, point in inactive_tasks: tdef = self.config.get_taskdef(name) if prereqs: self._set_prereqs_tdef( @@ -2016,7 +2016,7 @@ def _set_prereqs_itask( def _set_prereqs_tdef( self, point, taskdef, prereqs, flow_nums, flow_wait ): - """Spawn a future task and set prerequisites on it.""" + """Spawn an inactive task and set prerequisites on it.""" itask = self.spawn_task(taskdef.name, point, flow_nums, flow_wait) if itask is None: @@ -2165,7 +2165,7 @@ def force_trigger_tasks( self.merge_flows(itask, flow_nums) self._force_trigger(itask) - # Spawn and trigger future tasks. + # Spawn and trigger inactive tasks. if not flow: # default: assign to all active flows flow_nums = self._get_active_flow_nums() @@ -2312,13 +2312,13 @@ def filter_task_proxies( ) future_matched: 'Set[Tuple[str, PointBase]]' = set() if future and unmatched: - future_matched, unmatched = self.match_future_tasks( + future_matched, unmatched = self.match_inactive_tasks( unmatched ) return matched, future_matched, unmatched - def match_future_tasks( + def match_inactive_tasks( self, ids: Iterable[str], ) -> Tuple[Set[Tuple[str, 'PointBase']], List[str]]: From 0b9c857ed496c73e807aa0bd5321a2e2a969fed7 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Tue, 1 Oct 2024 09:05:18 +1300 Subject: [PATCH 10/15] Expunge future task terminology. --- cylc/flow/data_store_mgr.py | 2 +- cylc/flow/network/schema.py | 4 ++-- cylc/flow/task_pool.py | 4 ++-- tests/functional/cylc-set/04-switch/flow.cylc | 2 +- tests/functional/cylc-set/05-expire/flow.cylc | 2 +- tests/integration/test_task_pool.py | 12 ++++++------ 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index b98a055f882..291b674f5cf 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -1419,7 +1419,7 @@ def apply_task_proxy_db_history(self): itask, is_parent = self.db_load_task_proxies[relative_id] itask.submit_num = submit_num flow_nums = deserialise_set(flow_nums_str) - # Do not set states and outputs for future tasks in flow. + # Do not set states and outputs for inactive tasks in flow. if ( itask.flow_nums and flow_nums != itask.flow_nums and diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 93ed23fbac9..a4c956ff3c0 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2004,9 +2004,9 @@ class FlowMutationArguments: By default: * active tasks (n=0) keep their existing flow assignment - * future tasks (n>0) get assigned all active flows + * inactive tasks (n>0) get assigned all active flows - Otherwise you can assign (future tasks) or add to (active tasks): + Otherwise you can assign (inactive tasks) or add to (active tasks): * a list of integer flow numbers or a single-item list containing one of the following strings: * {FLOW_ALL} - all active flows diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 20c3fb96cc5..5d5d667da92 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1908,7 +1908,7 @@ def set_prereqs_and_outputs( flow_descr: description of new flow """ - # Get matching pool tasks and future task definitions. + # Get matching pool tasks and inactive task definitions. itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, future=True, @@ -2150,7 +2150,7 @@ def force_trigger_tasks( unless flow-wait is set. """ - # Get matching tasks proxies, and matching future task IDs. + # Get matching tasks proxies, and matching inactive task IDs. existing_tasks, future_ids, unmatched = self.filter_task_proxies( items, future=True, warn=False, ) diff --git a/tests/functional/cylc-set/04-switch/flow.cylc b/tests/functional/cylc-set/04-switch/flow.cylc index 18402c7b64c..8f7c4329af6 100644 --- a/tests/functional/cylc-set/04-switch/flow.cylc +++ b/tests/functional/cylc-set/04-switch/flow.cylc @@ -1,4 +1,4 @@ -# Set outputs of future task to direct the flow at an optional branch point. +# Set outputs of inactive task to direct the flow at an optional branch point. [scheduler] [[events]] diff --git a/tests/functional/cylc-set/05-expire/flow.cylc b/tests/functional/cylc-set/05-expire/flow.cylc index 9717664132f..4e5ca9f0608 100644 --- a/tests/functional/cylc-set/05-expire/flow.cylc +++ b/tests/functional/cylc-set/05-expire/flow.cylc @@ -1,4 +1,4 @@ -# Expire a future task, so it won't run. +# Expire an inactive task, so it won't run. [scheduler] [[events]] diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index a69e6b0cb72..16018a09d95 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -342,7 +342,7 @@ async def test_match_taskdefs( [ param( ['1/foo', '3/asd'], ['1/foo', '3/asd'], [], - id="Active & future tasks" + id="Active & inactive tasks" ), param( ['1/*', '2/*', '3/*', '6/*'], @@ -367,7 +367,7 @@ async def test_match_taskdefs( ['1/foo:waiting', '1/foo:failed', '6/bar:waiting'], ['1/foo'], ["No active tasks matching: 1/foo:failed", "No active tasks matching: 6/bar:waiting"], - id="Specifying task state works for active tasks, not future tasks" + id="Specifying task state works for active tasks, not inactive tasks" ) ] ) @@ -412,7 +412,7 @@ async def test_release_held_tasks( ) -> None: """Test TaskPool.release_held_tasks(). - For a workflow with held active tasks 1/foo & 1/bar, and held future task + For a workflow with held active tasks 1/foo & 1/bar, and held inactive task 3/asd. We skip testing the matching logic here because it would be slow using the @@ -1357,7 +1357,7 @@ async def test_set_prereqs( "20400101T0000Z/foo"] ) - # set one prereq of future task 20400101T0000Z/qux + # set one prereq of inactive task 20400101T0000Z/qux schd.pool.set_prereqs_and_outputs( ["20400101T0000Z/qux"], None, @@ -1536,7 +1536,7 @@ async def test_set_outputs_future( start, log_filter, ): - """Check manual setting of future task outputs. + """Check manual setting of inactive task outputs. """ id_ = flow( @@ -1566,7 +1566,7 @@ async def test_set_outputs_future( # it should start up with just 1/a assert pool_get_task_ids(schd.pool) == ["1/a"] - # setting future task b succeeded should spawn c but not b + # setting inactive task b succeeded should spawn c but not b schd.pool.set_prereqs_and_outputs( ["1/b"], ["succeeded"], None, ['all']) assert ( From 12783364b48f2b4e6e891eb6ddac891a768e63d8 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Tue, 1 Oct 2024 22:21:28 +1300 Subject: [PATCH 11/15] Add integration tests. --- tests/integration/test_trigger.py | 201 +++++++++++++++++++++++++++++- 1 file changed, 199 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_trigger.py b/tests/integration/test_trigger.py index d9c5304b745..580a758f41d 100644 --- a/tests/integration/test_trigger.py +++ b/tests/integration/test_trigger.py @@ -14,10 +14,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import time +"""Test for flow-assignment in triggered tasks.""" -from cylc.flow.flow_mgr import FLOW_ALL +import time +from cylc.flow.flow_mgr import FLOW_NONE, FLOW_NEW, FLOW_ALL async def test_trigger_no_flows(one, start, log_filter): """Test triggering a task with no flows present. @@ -49,3 +50,199 @@ async def test_trigger_no_flows(one, start, log_filter): assert len(one.pool.get_tasks()) == 1 task = one.pool.get_tasks()[0] assert task.flow_nums == {5, 9} + + +async def test_get_flow_nums(one, start, log_filter): + """Test the task pool _get_flow_nums() method.""" + async with start(one): + # flow 1 is already present + task = one.pool.get_tasks()[0] + two, = *one.pool._get_flow_nums([FLOW_NEW]), + one.pool.merge_flows(task, set([two])) + # now we have flows {1, 2}: + + assert one.pool._get_flow_nums([FLOW_NONE]) == set() + assert one.pool._get_flow_nums([FLOW_ALL]) == set([1, two]) + assert one.pool._get_flow_nums([FLOW_NEW]) == set([3]) + assert one.pool._get_flow_nums([4, 5]) == set([4, 5]) + # the only active task still only has flows {1, 2} + assert one.pool._get_flow_nums([FLOW_ALL]) == set([1, two]) + + +async def test_trigger(flow, scheduler, start): + """Test flow assignment when triggering tasks. + + Active tasks: + By default keep existing flows, else merge with requested flows. + Inactive tasks: + By default assign active flows; else assign requested flows. + + """ + conf = { + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'graph': { + 'R1': "foo & bar => a & b & c & d & e" + } + } + } + id_ = flow(conf) + schd = scheduler(id_, run_mode='simulation', paused_start=True) + async with start(schd): + active_a, active_b = schd.pool.get_tasks() + schd.pool.merge_flows( + active_b, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_a.flow_nums == set([1]) + assert active_b.flow_nums == set([1, 2]) + + #-----(1. Test active tasks)----- + + # By default active tasks keep existing flow assignment. + schd.pool.force_trigger_tasks( + [active_a.identity], flow=[] + ) + assert active_a.flow_nums == set([1]) + + # Else merge existing flow with requested flows. + schd.pool.force_trigger_tasks( + [active_a.identity], flow=[FLOW_ALL] + ) + assert active_a.flow_nums == set([1, 2]) + + # (no-flow is ignored for active tasks) + schd.pool.force_trigger_tasks( + [active_a.identity], flow=[FLOW_NONE] + ) + assert active_a.flow_nums == set([1, 2]) + + schd.pool.force_trigger_tasks( + [active_a.identity], flow=[FLOW_NEW] + ) + assert active_a.flow_nums == set([1, 2, 3]) + + #-----(2. Test inactive tasks)----- + + # By default inactive tasks get all active flows. + schd.pool.force_trigger_tasks( + ['1/a'], flow=[] + ) + assert schd.pool._get_task_by_id('1/a').flow_nums == set( + [1, 2, 3] + ) + + # Else assign requested flows. + schd.pool.force_trigger_tasks( + ['1/b'], flow=[FLOW_NONE] + ) + assert schd.pool._get_task_by_id('1/b').flow_nums == set([]) + + schd.pool.force_trigger_tasks( + ['1/c'], flow=[FLOW_NEW] + ) + assert schd.pool._get_task_by_id('1/c').flow_nums == set([4]) + + schd.pool.force_trigger_tasks( + ['1/d'], flow=[FLOW_ALL] + ) + assert schd.pool._get_task_by_id('1/d').flow_nums == set( + [1, 2, 3, 4] + ) + schd.pool.force_trigger_tasks( + ['1/e'], flow=[7] + ) + assert schd.pool._get_task_by_id('1/e').flow_nums == set([7]) + + +async def test_set(flow, scheduler, start): + """Test flow assignment when triggering tasks. + + Active tasks: + By default keep existing flows, else merge with requested flows. + Inactive tasks: + By default assign active flows; else assign requested flows. + + """ + conf = { + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'graph': { + 'R1': "foo & bar => a & b & c & d & e" + } + }, + 'runtime': { + 'foo': { + 'outputs': { 'x': 'x' } + } + } + } + id_ = flow(conf) + schd = scheduler(id_, run_mode='simulation', paused_start=True) + async with start(schd): + active_a, active_b = schd.pool.get_tasks() + schd.pool.merge_flows( + active_b, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_a.flow_nums == set([1]) + assert active_b.flow_nums == set([1, 2]) + + #-----(1. Test active tasks)----- + + # By default active tasks keep existing flow assignment. + schd.pool.set_prereqs_and_outputs( + [active_a.identity], ['x'], [], flow=[] + ) + assert active_a.flow_nums == set([1]) + + # Else merge existing flow with requested flows. + schd.pool.set_prereqs_and_outputs( + [active_a.identity], ['x'], [], flow=[FLOW_ALL] + ) + assert active_a.flow_nums == set([1, 2]) + + # (no-flow is ignored for active tasks) + schd.pool.set_prereqs_and_outputs( + [active_a.identity], ['x'], [], flow=[FLOW_NONE] + ) + assert active_a.flow_nums == set([1, 2]) + + schd.pool.set_prereqs_and_outputs( + [active_a.identity], ['x'], [], flow=[FLOW_NEW] + ) + assert active_a.flow_nums == set([1, 2, 3]) + + #-----(2. Test inactive tasks)----- + + # By default inactive tasks get all active flows. + schd.pool.set_prereqs_and_outputs( + ['1/a'], [], ['all'], flow=[] + ) + assert schd.pool._get_task_by_id('1/a').flow_nums == set( + [1, 2, 3] + ) + + # Else assign requested flows. + schd.pool.set_prereqs_and_outputs( + ['1/b'], [], ['all'], flow=[FLOW_NONE] + ) + assert schd.pool._get_task_by_id('1/b').flow_nums == set([]) + + schd.pool.set_prereqs_and_outputs( + ['1/c'], [], ['all'], flow=[FLOW_NEW] + ) + assert schd.pool._get_task_by_id('1/c').flow_nums == set([4]) + + schd.pool.set_prereqs_and_outputs( + ['1/d'], [], ['all'], flow=[FLOW_ALL] + ) + assert schd.pool._get_task_by_id('1/d').flow_nums == set( + [1, 2, 3, 4] + ) + + schd.pool.set_prereqs_and_outputs( + ['1/e'], [], ['all'], flow=[7] + ) + assert schd.pool._get_task_by_id('1/e').flow_nums == set([7]) + From fbda42e278b57197dd16876c932230e1abdc57c2 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Tue, 1 Oct 2024 22:28:38 +1300 Subject: [PATCH 12/15] Fix small command validation regression. --- cylc/flow/scripts/trigger.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index f77d6debbd0..d8a431b058f 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -60,7 +60,6 @@ ) from cylc.flow.terminal import cli_function from cylc.flow.flow_mgr import add_flow_opts -from cylc.flow.command_validation import flow_opts if TYPE_CHECKING: @@ -122,7 +121,6 @@ async def run(options: 'Values', workflow_id: str, *tokens_list): @cli_function(get_option_parser) def main(parser: COP, options: 'Values', *ids: str): """CLI for "cylc trigger".""" - flow_opts(options.flow, options.flow_wait) rets = call_multi( partial(run, options), *ids, From 6c37d6734d40db42e35c353340d70a19b7668f67 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Tue, 8 Oct 2024 17:36:42 +1300 Subject: [PATCH 13/15] Tweak trigger command help. --- cylc/flow/scripts/trigger.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index d8a431b058f..1e6ef913696 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -23,20 +23,6 @@ * Triggering a queued task submits it, regardless of queue limiting. * Triggering an active task has no effect (it already triggered). -Future tasks (n>0) do not already belong to a flow. -* by default they are assigned to all active flows -* otherwise, according to the --flow option - -Active tasks (n=0) already belong to a flow. -* by default they run in the same flow -* with --flow=all, they are assigned to all active flows -* with --flow=INT or --flow=new, the new flow merges with the old one -* --flow=none is ignored - -Note --flow=new increments the global flow counter so if you need multiple -commands to start a single new flow only use --flow=new in the first command, -then use the actual new flow number (e.g. read it from the scheduler log). - Examples: # trigger task foo in cycle 1234 in test $ cylc trigger test//1234/foo @@ -46,6 +32,21 @@ # start a new flow by triggering 1234/foo in test $ cylc trigger --flow=new test//1234/foo + +Flows: + Active tasks (in the n=0 window) already belong to a flow. + * by default, if triggered, they run in the same flow + * or with --flow=all, they are assigned all active flows + * or with --flow=INT or --flow=new, the original and new flows are merged + * (--flow=none is ignored for active tasks) + + Inactive tasks (n>0) do not already belong to a flow. + * by default they are assigned all active flows + * otherwise, they are assigned the --flow value + + Note --flow=new increments the global flow counter with each use. If it + takes multiple commands to start a new flow use the actual flow number + after the first command (you can read it from the scheduler log). """ from functools import partial From 350241a611c3157d7e0730fcf51e7226fd46d5ca Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 14 Oct 2024 15:43:01 +0100 Subject: [PATCH 14/15] Consolidate tests --- tests/integration/test_flow_assignment.py | 155 ++++++++++++++ tests/integration/test_trigger.py | 248 ---------------------- 2 files changed, 155 insertions(+), 248 deletions(-) create mode 100644 tests/integration/test_flow_assignment.py delete mode 100644 tests/integration/test_trigger.py diff --git a/tests/integration/test_flow_assignment.py b/tests/integration/test_flow_assignment.py new file mode 100644 index 00000000000..5816b08527f --- /dev/null +++ b/tests/integration/test_flow_assignment.py @@ -0,0 +1,155 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test for flow-assignment in triggered/set tasks.""" + +import functools +import time +from typing import Callable + +import pytest + +from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE +from cylc.flow.scheduler import Scheduler + + +async def test_trigger_no_flows(one, start): + """Test triggering a task with no flows present. + + It should get the flow numbers of the most recent active tasks. + """ + async with start(one): + + # Remove the task (flow 1) --> pool empty + task = one.pool.get_tasks()[0] + one.pool.remove(task) + assert len(one.pool.get_tasks()) == 0 + + # Trigger the task, with new flow nums. + time.sleep(2) # The flows need different timestamps! + one.pool.force_trigger_tasks([task.identity], flow=['5', '9']) + assert len(one.pool.get_tasks()) == 1 + + # Ensure the new flow is in the db. + one.pool.workflow_db_mgr.process_queued_ops() + + # Remove the task --> pool empty + task = one.pool.get_tasks()[0] + one.pool.remove(task) + assert len(one.pool.get_tasks()) == 0 + + # Trigger the task; it should get flow nums 5, 9 + one.pool.force_trigger_tasks([task.identity], [FLOW_ALL]) + assert len(one.pool.get_tasks()) == 1 + task = one.pool.get_tasks()[0] + assert task.flow_nums == {5, 9} + + +async def test_get_flow_nums(one: Scheduler, start): + """Test the task pool _get_flow_nums() method.""" + async with start(one): + # flow 1 is already present + task = one.pool.get_tasks()[0] + assert one.pool._get_flow_nums([FLOW_NEW]) == {2} + one.pool.merge_flows(task, {2}) + # now we have flows {1, 2}: + + assert one.pool._get_flow_nums([FLOW_NONE]) == set() + assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} + assert one.pool._get_flow_nums([FLOW_NEW]) == {3} + assert one.pool._get_flow_nums(['4', '5']) == {4, 5} + # the only active task still only has flows {1, 2} + assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} + + +@pytest.mark.parametrize('command', ['trigger', 'set']) +async def test_flow_assignment(flow, scheduler, start, command: str): + """Test flow assignment when triggering/setting tasks. + + Active tasks: + By default keep existing flows, else merge with requested flows. + Inactive tasks: + By default assign active flows; else assign requested flows. + + """ + conf = { + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'graph': { + 'R1': "foo & bar => a & b & c & d & e" + } + }, + 'runtime': { + 'foo': { + 'outputs': {'x': 'x'} + } + }, + } + id_ = flow(conf) + schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) + async with start(schd): + if command == 'set': + do_command: Callable = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] + ) + else: + do_command = schd.pool.force_trigger_tasks + + active_a, active_b = schd.pool.get_tasks() + schd.pool.merge_flows(active_b, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_a.flow_nums == {1} + assert active_b.flow_nums == {1, 2} + + # -----(1. Test active tasks)----- + + # By default active tasks keep existing flow assignment. + do_command([active_a.identity], flow=[]) + assert active_a.flow_nums == {1} + + # Else merge existing flow with requested flows. + do_command([active_a.identity], flow=[FLOW_ALL]) + assert active_a.flow_nums == {1, 2} + + # (no-flow is ignored for active tasks) + do_command([active_a.identity], flow=[FLOW_NONE]) + assert active_a.flow_nums == {1, 2} + + do_command([active_a.identity], flow=[FLOW_NEW]) + assert active_a.flow_nums == {1, 2, 3} + + # -----(2. Test inactive tasks)----- + if command == 'set': + do_command = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] + ) + + # By default inactive tasks get all active flows. + do_command(['1/a'], flow=[]) + assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2, 3} + + # Else assign requested flows. + do_command(['1/b'], flow=[FLOW_NONE]) + assert schd.pool._get_task_by_id('1/b').flow_nums == set() + + do_command(['1/c'], flow=[FLOW_NEW]) + assert schd.pool._get_task_by_id('1/c').flow_nums == {4} + + do_command(['1/d'], flow=[FLOW_ALL]) + assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3, 4} + do_command(['1/e'], flow=[7]) + assert schd.pool._get_task_by_id('1/e').flow_nums == {7} diff --git a/tests/integration/test_trigger.py b/tests/integration/test_trigger.py deleted file mode 100644 index 580a758f41d..00000000000 --- a/tests/integration/test_trigger.py +++ /dev/null @@ -1,248 +0,0 @@ -# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. -# Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -"""Test for flow-assignment in triggered tasks.""" - -import time - -from cylc.flow.flow_mgr import FLOW_NONE, FLOW_NEW, FLOW_ALL - -async def test_trigger_no_flows(one, start, log_filter): - """Test triggering a task with no flows present. - - It should get the flow numbers of the most recent active tasks. - """ - async with start(one): - - # Remove the task (flow 1) --> pool empty - task = one.pool.get_tasks()[0] - one.pool.remove(task) - assert len(one.pool.get_tasks()) == 0 - - # Trigger the task, with new flow nums. - time.sleep(2) # The flows need different timestamps! - one.pool.force_trigger_tasks([task.identity], [5, 9]) - assert len(one.pool.get_tasks()) == 1 - - # Ensure the new flow is in the db. - one.pool.workflow_db_mgr.process_queued_ops() - - # Remove the task --> pool empty - task = one.pool.get_tasks()[0] - one.pool.remove(task) - assert len(one.pool.get_tasks()) == 0 - - # Trigger the task; it should get flow nums 5, 9 - one.pool.force_trigger_tasks([task.identity], [FLOW_ALL]) - assert len(one.pool.get_tasks()) == 1 - task = one.pool.get_tasks()[0] - assert task.flow_nums == {5, 9} - - -async def test_get_flow_nums(one, start, log_filter): - """Test the task pool _get_flow_nums() method.""" - async with start(one): - # flow 1 is already present - task = one.pool.get_tasks()[0] - two, = *one.pool._get_flow_nums([FLOW_NEW]), - one.pool.merge_flows(task, set([two])) - # now we have flows {1, 2}: - - assert one.pool._get_flow_nums([FLOW_NONE]) == set() - assert one.pool._get_flow_nums([FLOW_ALL]) == set([1, two]) - assert one.pool._get_flow_nums([FLOW_NEW]) == set([3]) - assert one.pool._get_flow_nums([4, 5]) == set([4, 5]) - # the only active task still only has flows {1, 2} - assert one.pool._get_flow_nums([FLOW_ALL]) == set([1, two]) - - -async def test_trigger(flow, scheduler, start): - """Test flow assignment when triggering tasks. - - Active tasks: - By default keep existing flows, else merge with requested flows. - Inactive tasks: - By default assign active flows; else assign requested flows. - - """ - conf = { - 'scheduler': { - 'allow implicit tasks': 'True' - }, - 'scheduling': { - 'graph': { - 'R1': "foo & bar => a & b & c & d & e" - } - } - } - id_ = flow(conf) - schd = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd): - active_a, active_b = schd.pool.get_tasks() - schd.pool.merge_flows( - active_b, schd.pool._get_flow_nums([FLOW_NEW])) - assert active_a.flow_nums == set([1]) - assert active_b.flow_nums == set([1, 2]) - - #-----(1. Test active tasks)----- - - # By default active tasks keep existing flow assignment. - schd.pool.force_trigger_tasks( - [active_a.identity], flow=[] - ) - assert active_a.flow_nums == set([1]) - - # Else merge existing flow with requested flows. - schd.pool.force_trigger_tasks( - [active_a.identity], flow=[FLOW_ALL] - ) - assert active_a.flow_nums == set([1, 2]) - - # (no-flow is ignored for active tasks) - schd.pool.force_trigger_tasks( - [active_a.identity], flow=[FLOW_NONE] - ) - assert active_a.flow_nums == set([1, 2]) - - schd.pool.force_trigger_tasks( - [active_a.identity], flow=[FLOW_NEW] - ) - assert active_a.flow_nums == set([1, 2, 3]) - - #-----(2. Test inactive tasks)----- - - # By default inactive tasks get all active flows. - schd.pool.force_trigger_tasks( - ['1/a'], flow=[] - ) - assert schd.pool._get_task_by_id('1/a').flow_nums == set( - [1, 2, 3] - ) - - # Else assign requested flows. - schd.pool.force_trigger_tasks( - ['1/b'], flow=[FLOW_NONE] - ) - assert schd.pool._get_task_by_id('1/b').flow_nums == set([]) - - schd.pool.force_trigger_tasks( - ['1/c'], flow=[FLOW_NEW] - ) - assert schd.pool._get_task_by_id('1/c').flow_nums == set([4]) - - schd.pool.force_trigger_tasks( - ['1/d'], flow=[FLOW_ALL] - ) - assert schd.pool._get_task_by_id('1/d').flow_nums == set( - [1, 2, 3, 4] - ) - schd.pool.force_trigger_tasks( - ['1/e'], flow=[7] - ) - assert schd.pool._get_task_by_id('1/e').flow_nums == set([7]) - - -async def test_set(flow, scheduler, start): - """Test flow assignment when triggering tasks. - - Active tasks: - By default keep existing flows, else merge with requested flows. - Inactive tasks: - By default assign active flows; else assign requested flows. - - """ - conf = { - 'scheduler': { - 'allow implicit tasks': 'True' - }, - 'scheduling': { - 'graph': { - 'R1': "foo & bar => a & b & c & d & e" - } - }, - 'runtime': { - 'foo': { - 'outputs': { 'x': 'x' } - } - } - } - id_ = flow(conf) - schd = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd): - active_a, active_b = schd.pool.get_tasks() - schd.pool.merge_flows( - active_b, schd.pool._get_flow_nums([FLOW_NEW])) - assert active_a.flow_nums == set([1]) - assert active_b.flow_nums == set([1, 2]) - - #-----(1. Test active tasks)----- - - # By default active tasks keep existing flow assignment. - schd.pool.set_prereqs_and_outputs( - [active_a.identity], ['x'], [], flow=[] - ) - assert active_a.flow_nums == set([1]) - - # Else merge existing flow with requested flows. - schd.pool.set_prereqs_and_outputs( - [active_a.identity], ['x'], [], flow=[FLOW_ALL] - ) - assert active_a.flow_nums == set([1, 2]) - - # (no-flow is ignored for active tasks) - schd.pool.set_prereqs_and_outputs( - [active_a.identity], ['x'], [], flow=[FLOW_NONE] - ) - assert active_a.flow_nums == set([1, 2]) - - schd.pool.set_prereqs_and_outputs( - [active_a.identity], ['x'], [], flow=[FLOW_NEW] - ) - assert active_a.flow_nums == set([1, 2, 3]) - - #-----(2. Test inactive tasks)----- - - # By default inactive tasks get all active flows. - schd.pool.set_prereqs_and_outputs( - ['1/a'], [], ['all'], flow=[] - ) - assert schd.pool._get_task_by_id('1/a').flow_nums == set( - [1, 2, 3] - ) - - # Else assign requested flows. - schd.pool.set_prereqs_and_outputs( - ['1/b'], [], ['all'], flow=[FLOW_NONE] - ) - assert schd.pool._get_task_by_id('1/b').flow_nums == set([]) - - schd.pool.set_prereqs_and_outputs( - ['1/c'], [], ['all'], flow=[FLOW_NEW] - ) - assert schd.pool._get_task_by_id('1/c').flow_nums == set([4]) - - schd.pool.set_prereqs_and_outputs( - ['1/d'], [], ['all'], flow=[FLOW_ALL] - ) - assert schd.pool._get_task_by_id('1/d').flow_nums == set( - [1, 2, 3, 4] - ) - - schd.pool.set_prereqs_and_outputs( - ['1/e'], [], ['all'], flow=[7] - ) - assert schd.pool._get_task_by_id('1/e').flow_nums == set([7]) - From 43b50e64bf8a205913f4dcbeeda1e33a4f376c4a Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 15 Oct 2024 08:47:07 +1300 Subject: [PATCH 15/15] Update cylc/flow/network/schema.py [skip ci] Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/network/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index a4c956ff3c0..5fc277fb607 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2008,7 +2008,7 @@ class FlowMutationArguments: Otherwise you can assign (inactive tasks) or add to (active tasks): * a list of integer flow numbers - or a single-item list containing one of the following strings: + or one of the following strings: * {FLOW_ALL} - all active flows * {FLOW_NEW} - an automatically generated new flow number * {FLOW_NONE} - (ignored for active tasks): no flow