From 27c89a4fc83c76c85131a4b2ec3849022e6b218e Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 8 Jan 2024 13:35:37 +0000 Subject: [PATCH 1/9] Ensure that sys argv cleaner cleans up --workflow-name=foo as well as --workflow-name foo --- changes.d/5909.fix.md | 2 + cylc/flow/option_parsers.py | 64 ++++++++++++++++++++++--------- tests/unit/test_option_parsers.py | 16 ++++++++ 3 files changed, 63 insertions(+), 19 deletions(-) create mode 100644 changes.d/5909.fix.md diff --git a/changes.d/5909.fix.md b/changes.d/5909.fix.md new file mode 100644 index 00000000000..b6a229498a0 --- /dev/null +++ b/changes.d/5909.fix.md @@ -0,0 +1,2 @@ +Fix a bug where Cylc VIP did not remove --workflow-name= from +Cylc play arguments. \ No newline at end of file diff --git a/cylc/flow/option_parsers.py b/cylc/flow/option_parsers.py index 3bf59720d21..1b4945f2bc2 100644 --- a/cylc/flow/option_parsers.py +++ b/cylc/flow/option_parsers.py @@ -33,7 +33,7 @@ import sys from textwrap import dedent -from typing import Any, Dict, Optional, List, Tuple, Union +from typing import Any, Dict, Iterable, Optional, List, Tuple, Union from cylc.flow import LOG from cylc.flow.terminal import supports_color, DIM @@ -820,17 +820,33 @@ def combine_options(*args, modify=None): def cleanup_sysargv( - script_name, - workflow_id, - options, - compound_script_opts, - script_opts, - source, -): + script_name: str, + workflow_id: str, + options: 'Values', + compound_script_opts: Iterable['OptionSettings'], + script_opts: Iterable['OptionSettings'], + source: str, +) -> None: """Remove unwanted options from sys.argv Some cylc scripts (notably Cylc Play when it is re-invoked on a scheduler - server) require the correct content in sys.argv. + server) require the correct content in sys.argv: This function + subtracts the unwanted options from sys.argv. + + Args: + script_name: + Name of the target script. For example if we are + using this for the play step of cylc vip then this + will be "play". + workflow_id: + options: + Actual options provided to the compound script. + compound_script_options: + Options available in compound script. + script_options: + Options available in target script. + source: + Source directory. """ # Organize Options by dest. script_opts_by_dest = { @@ -841,21 +857,31 @@ def cleanup_sysargv( x.kwargs.get('dest', x.args[0].strip(DOUBLEDASH)): x for x in compound_script_opts } - # Filter out non-cylc-play options. - args = [i.split('=')[0] for i in sys.argv] - for unwanted_opt in (set(options.__dict__)) - set(script_opts_by_dest): - for arg in compound_opts_by_dest[unwanted_opt].args: - if arg in sys.argv: - index = sys.argv.index(arg) + + # Filter out non-cylc-play options: + # The set of options which we want to weed out: + for unwanted_dest in (set(options.__dict__)) - set(script_opts_by_dest): + + # The possible ways this could be written - if the above + # were "workflow_name" this could be '-n' or '--workflow-name': + for unwanted_arg in compound_opts_by_dest[unwanted_dest].args: + + # Check for args which are standalone or space separated + # `--workflow-name foo`: + if unwanted_arg in sys.argv: + index = sys.argv.index(unwanted_arg) sys.argv.pop(index) if ( - compound_opts_by_dest[unwanted_opt].kwargs['action'] + compound_opts_by_dest[unwanted_dest].kwargs['action'] not in ['store_true', 'store_false'] ): sys.argv.pop(index) - elif arg in args: - index = args.index(arg) - sys.argv.pop(index) + + # Check for `--workflow-name=foo`: + elif unwanted_arg in [a.split('=')[0] for a in sys.argv]: + for cli_arg in sys.argv: + if cli_arg.startswith(unwanted_arg): + sys.argv.remove(cli_arg) # replace compound script name: sys.argv[1] = script_name diff --git a/tests/unit/test_option_parsers.py b/tests/unit/test_option_parsers.py index e3831a1daea..b183b3ef8a5 100644 --- a/tests/unit/test_option_parsers.py +++ b/tests/unit/test_option_parsers.py @@ -410,6 +410,22 @@ def test_combine_options(inputs, expect): 'play myworkflow'.split(), id='removes --key=value' ), + param( + # Test for https://github.com/cylc/cylc-flow/issues/5905 + 'vip ./myworkflow --no-run-name --workflow-name=hi'.split(), + { + 'script_name': 'play', + 'workflow_id': 'myworkflow', + 'compound_script_opts': [ + OptionSettings(['--no-run-name'], action='store_true'), + OptionSettings(['--workflow-name'], action='store') + ], + 'script_opts': [], + 'source': './myworkflow', + }, + 'play myworkflow'.split(), + id='equals-bug' + ), ] ) def test_cleanup_sysargv(monkeypatch, argv_before, kwargs, expect): From 88e821978b4dd89ce354e39bb049dbef9c21d931 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 9 Jan 2024 13:56:28 +0000 Subject: [PATCH 2/9] Improved the sysargv filter. Wrote some extra tests. --- cylc/flow/option_parsers.py | 75 +++++++++++++++++-------- tests/unit/test_option_parsers.py | 92 ++++++++++++++++--------------- 2 files changed, 99 insertions(+), 68 deletions(-) diff --git a/cylc/flow/option_parsers.py b/cylc/flow/option_parsers.py index 1b4945f2bc2..ab5f1fcbf91 100644 --- a/cylc/flow/option_parsers.py +++ b/cylc/flow/option_parsers.py @@ -858,39 +858,66 @@ def cleanup_sysargv( for x in compound_script_opts } - # Filter out non-cylc-play options: - # The set of options which we want to weed out: + # Get a list of unwanted args: + unwanted_compound: List[str] = [] + unwanted_simple: List[str] = [] for unwanted_dest in (set(options.__dict__)) - set(script_opts_by_dest): - - # The possible ways this could be written - if the above - # were "workflow_name" this could be '-n' or '--workflow-name': for unwanted_arg in compound_opts_by_dest[unwanted_dest].args: + if ( + compound_opts_by_dest[unwanted_dest].kwargs.get('action', None) + in ['store_true', 'store_false'] + ): + unwanted_simple.append(unwanted_arg) + else: + unwanted_compound.append(unwanted_arg) - # Check for args which are standalone or space separated - # `--workflow-name foo`: - if unwanted_arg in sys.argv: - index = sys.argv.index(unwanted_arg) - sys.argv.pop(index) - if ( - compound_opts_by_dest[unwanted_dest].kwargs['action'] - not in ['store_true', 'store_false'] - ): - sys.argv.pop(index) - - # Check for `--workflow-name=foo`: - elif unwanted_arg in [a.split('=')[0] for a in sys.argv]: - for cli_arg in sys.argv: - if cli_arg.startswith(unwanted_arg): - sys.argv.remove(cli_arg) + new_args = filter_sysargv(sys.argv, unwanted_simple, unwanted_compound) # replace compound script name: - sys.argv[1] = script_name + new_args[1] = script_name # replace source path with workflow ID. if str(source) in sys.argv: - sys.argv.remove(str(source)) + new_args.remove(str(source)) if workflow_id not in sys.argv: - sys.argv.append(workflow_id) + new_args.append(workflow_id) + + sys.argv = new_args + + +def filter_sysargv( + sysargs, unwanted_simple: List, unwanted_compound: List +) -> List: + """Create a copy of sys.argv without unwanted arguments: + + Cases: + >>> this = filter_sysargv + >>> this(['--foo', 'expects-a-value', '--bar'], [], ['--foo']) + ['--bar'] + >>> this(['--foo=expects-a-value', '--bar'], [], ['--foo']) + ['--bar'] + >>> this(['--foo', '--bar'], ['--foo'], []) + ['--bar'] + """ + pop_next: bool = False + new_args: List = [] + for this_arg in sysargs: + parts = this_arg.split('=', 1) + if pop_next: + pop_next = False + continue + elif parts[0] in unwanted_compound: + # Case --foo=value or --foo value + if len(parts) == 1: + # --foo value + pop_next = True + continue + elif parts[0] in unwanted_simple: + # Case --foo does not expect a value: + continue + else: + new_args.append(this_arg) + return new_args def log_subcommand(*args): diff --git a/tests/unit/test_option_parsers.py b/tests/unit/test_option_parsers.py index b183b3ef8a5..05b52badfbe 100644 --- a/tests/unit/test_option_parsers.py +++ b/tests/unit/test_option_parsers.py @@ -26,7 +26,7 @@ import cylc.flow.flags from cylc.flow.option_parsers import ( CylcOptionParser as COP, Options, combine_options, combine_options_pair, - OptionSettings, cleanup_sysargv + OptionSettings, cleanup_sysargv, filter_sysargv ) @@ -321,20 +321,6 @@ def test_combine_options(inputs, expect): @pytest.mark.parametrize( 'argv_before, kwargs, expect', [ - param( - 'vip myworkflow --foo something'.split(), - { - 'script_name': 'play', - 'workflow_id': 'myworkflow', - 'compound_script_opts': [ - OptionSettings(['--foo', '-f']), - ], - 'script_opts': [ - OptionSettings(['--foo', '-f'])] - }, - 'play myworkflow --foo something'.split(), - id='no opts to remove' - ), param( 'vip myworkflow -f something -b something_else --baz'.split(), { @@ -397,35 +383,6 @@ def test_combine_options(inputs, expect): 'play --foo something myworkflow'.split(), id='no path given' ), - param( - 'vip --bar=something'.split(), - { - 'script_name': 'play', - 'workflow_id': 'myworkflow', - 'compound_script_opts': [ - OptionSettings(['--bar', '-b'])], - 'script_opts': [], - 'source': './myworkflow', - }, - 'play myworkflow'.split(), - id='removes --key=value' - ), - param( - # Test for https://github.com/cylc/cylc-flow/issues/5905 - 'vip ./myworkflow --no-run-name --workflow-name=hi'.split(), - { - 'script_name': 'play', - 'workflow_id': 'myworkflow', - 'compound_script_opts': [ - OptionSettings(['--no-run-name'], action='store_true'), - OptionSettings(['--workflow-name'], action='store') - ], - 'script_opts': [], - 'source': './myworkflow', - }, - 'play myworkflow'.split(), - id='equals-bug' - ), ] ) def test_cleanup_sysargv(monkeypatch, argv_before, kwargs, expect): @@ -448,6 +405,53 @@ def test_cleanup_sysargv(monkeypatch, argv_before, kwargs, expect): assert sys.argv == dummy_cylc_path + expect +@pytest.mark.parametrize( + 'sysargs, simple, compound, expect', ( + param( + # Test for https://github.com/cylc/cylc-flow/issues/5905 + '--no-run-name --workflow-name=name'.split(), + ['--no-run-name'], + ['--workflow-name'], + [], + id='--workflow-name=name' + ), + param( + '--foo something'.split(), + [], [], '--foo something'.split(), + id='no-opts-removed' + ), + param( + [], ['--foo'], ['--bar'], [], + id='Null-check' + ), + param( + '''--keep1 --keep2 42 --keep3=Hi + --throw1 --throw2 84 --throw3=There + '''.split(), + ['--throw1'], + '--throw2 --throw3'.split(), + '--keep1 --keep2 42 --keep3=Hi'.split(), + id='complex' + ), + param( + "--foo 'foo=42' --bar='foo=94'".split(), + [], ['--foo'], + ['--bar=\'foo=94\''], + id='--bar=\'foo=94\'' + ) + ) +) +def test_filter_sysargv( + sysargs, simple, compound, expect +): + """It returns the subset of sys.argv that we ask for. + + n.b. The three most basic cases for this function are stored in + its own docstring. + """ + assert filter_sysargv(sysargs, simple, compound) == expect + + class TestOptionSettings(): @staticmethod def test_init(): From 2ec98dde3be31bc25e6d4cf5ca442ac973113375 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 11 Jan 2024 13:43:45 +0000 Subject: [PATCH 3/9] Made execution time limit unsettable by reload or broadcast. (#5902) * Made execution time limit unsettable by reload or broadcast. Formerly a suppress statement was supressing cases when we wanted to set itask.summary['execution time limit'] = None. Added tests. * response to verbal review * Apply suggestions from code review Co-authored-by: Oliver Sanders * Prevent false positives in test for broadcast. Test broadcast using _prep_submit_task_job rather than _prep_submit_task_job_impl so that the fake broadcast is applied. * Update changes.d/5902.fix.md Co-authored-by: Hilary James Oliver * made function less tolerant --------- Co-authored-by: Oliver Sanders Co-authored-by: Hilary James Oliver --- changes.d/5902.fix.md | 1 + cylc/flow/task_job_mgr.py | 35 +++++++++++++--- tests/integration/test_task_job_mgr.py | 58 ++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 5 deletions(-) create mode 100644 changes.d/5902.fix.md diff --git a/changes.d/5902.fix.md b/changes.d/5902.fix.md new file mode 100644 index 00000000000..803b9dc9590 --- /dev/null +++ b/changes.d/5902.fix.md @@ -0,0 +1 @@ +Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload. \ No newline at end of file diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 20ee7379d27..e41e05dfd30 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -36,7 +36,7 @@ ) from shutil import rmtree from time import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Union, Optional from cylc.flow import LOG from cylc.flow.job_runner_mgr import JobPollContext @@ -1262,10 +1262,11 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): itask.submit_num] = itask.platform['name'] itask.summary['job_runner_name'] = itask.platform['job runner'] - with suppress(TypeError): - itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float( - rtconfig['execution time limit'] - ) + + # None is an allowed non-float number for Execution time limit. + itask.summary[ + self.KEY_EXECUTE_TIME_LIMIT + ] = self.get_execution_time_limit(rtconfig['execution time limit']) # Location of job file, etc self._create_job_log_path(workflow, itask) @@ -1281,6 +1282,30 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): job_d=job_d ) + @staticmethod + def get_execution_time_limit( + config_execution_time_limit: Any + ) -> Union[None, float]: + """Get execution time limit from config and process it. + + If the etl from the config is a Falsy then return None. + Otherwise try and parse value as float. + + Examples: + >>> from pytest import raises + >>> this = TaskJobManager.get_execution_time_limit + + >>> this(None) + >>> this("54") + 54.0 + >>> this({}) + >>> with raises(ValueError): + ... this('🇳🇿') + """ + if config_execution_time_limit: + return float(config_execution_time_limit) + return None + def get_job_conf( self, workflow, diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index 297ad889df5..b085162a1da 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from contextlib import suppress import logging from typing import Any as Fixture @@ -128,3 +129,60 @@ async def test__run_job_cmd_logs_platform_lookup_fail( warning = caplog.records[-1] assert warning.levelname == 'ERROR' assert 'Unable to run command jobs-poll' in warning.msg + + +async def test__prep_submit_task_job_impl_handles_execution_time_limit( + flow: Fixture, + scheduler: Fixture, + start: Fixture, +): + """Ensure that emptying the execution time limit unsets it. + + Previously unsetting the etl by either broadcast or reload + would not unset a previous etl. + + See https://github.com/cylc/cylc-flow/issues/5891 + """ + id_ = flow({ + "scheduling": { + "cycling mode": "integer", + "graph": {"R1": "a"} + }, + "runtime": { + "root": {}, + "a": { + "script": "sleep 10", + "execution time limit": 'PT5S' + } + } + }) + + # Run in live mode - function not called in sim mode. + schd = scheduler(id_, run_mode='live') + async with start(schd): + task_a = schd.pool.get_tasks()[0] + # We're not interested in the job file stuff, just + # in the summary state. + with suppress(FileExistsError): + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert task_a.summary['execution_time_limit'] == 5.0 + + # If we delete the etl it gets deleted in the summary: + task_a.tdef.rtconfig['execution time limit'] = None + with suppress(FileExistsError): + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert not task_a.summary.get('execution_time_limit', '') + + # put everything back and test broadcast too. + task_a.tdef.rtconfig['execution time limit'] = 5.0 + task_a.summary['execution_time_limit'] = 5.0 + schd.broadcast_mgr.broadcasts = { + '1': {'a': {'execution time limit': None}}} + with suppress(FileExistsError): + # We run a higher level function here to ensure + # that the broadcast is applied. + schd.task_job_mgr._prep_submit_task_job( + schd.workflow, task_a) + assert not task_a.summary.get('execution_time_limit', '') From 80f06f2a1c5523c8436d2d053bd63c845344b78e Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Fri, 12 Jan 2024 02:55:04 +1300 Subject: [PATCH 4/9] xtrigger efficiency fix. (#5908) xtrigger efficiency fix. * Update the DB xtriggers table when the xtriggers get satisfied, not when the tasks that depend on them get satisfied. * Don't delete xtriggers table before every update. --------- Co-authored-by: Oliver Sanders Co-authored-by: Tim Pillinger <26465611+wxtim@users.noreply.github.com> --- changes.d/5908.fix.md | 1 + cylc/flow/rundb.py | 2 + cylc/flow/scheduler.py | 32 ++------ cylc/flow/workflow_db_mgr.py | 1 - cylc/flow/xtrigger_mgr.py | 82 ++++++++++--------- setup.cfg | 2 + .../xtriggers/02-persistence/flow.cylc | 1 + tests/integration/test_xtrigger_mgr.py | 54 ++++++++++++ tests/unit/conftest.py | 1 + tests/unit/test_xtrigger_mgr.py | 55 ------------- 10 files changed, 115 insertions(+), 116 deletions(-) create mode 100644 changes.d/5908.fix.md diff --git a/changes.d/5908.fix.md b/changes.d/5908.fix.md new file mode 100644 index 00000000000..6dfbf76c1c1 --- /dev/null +++ b/changes.d/5908.fix.md @@ -0,0 +1 @@ +Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger. diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 84360083c5c..256da3b3129 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -298,6 +298,8 @@ class CylcWorkflowDAO: ["prereq_output", {"is_primary_key": True}], ["satisfied"], ], + # The xtriggers table holds the function signature and result of + # already-satisfied (the scheduler no longer needs to call them). TABLE_XTRIGGERS: [ ["signature", {"is_primary_key": True}], ["results"], diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 835be9b735e..0d793997296 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -377,6 +377,7 @@ async def initialise(self): self.workflow, user=self.owner, broadcast_mgr=self.broadcast_mgr, + workflow_db_mgr=self.workflow_db_mgr, data_store_mgr=self.data_store_mgr, proc_pool=self.proc_pool, workflow_run_dir=self.workflow_run_dir, @@ -1705,14 +1706,8 @@ async def main_loop(self) -> None: await self.process_command_queue() self.proc_pool.process() - # Tasks in the main pool that are waiting but not queued must be - # waiting on external dependencies, i.e. xtriggers or ext_triggers. - # For these tasks, call any unsatisfied xtrigger functions, and - # queue tasks that have become ready. (Tasks do not appear in the - # main pool at all until all other-task deps are satisfied, and are - # queued immediately on release from runahead limiting if they are - # not waiting on external deps). - housekeep_xtriggers = False + # Unqueued tasks with satisfied prerequisites must be waiting on + # xtriggers or ext_triggers. Check these and queue tasks if ready. for itask in self.pool.get_tasks(): if ( not itask.state(TASK_STATUS_WAITING) @@ -1725,28 +1720,19 @@ async def main_loop(self) -> None: itask.state.xtriggers and not itask.state.xtriggers_all_satisfied() ): - # Call unsatisfied xtriggers if not already in-process. - # Results are returned asynchronously. self.xtrigger_mgr.call_xtriggers_async(itask) - # Check for satisfied xtriggers, and queue if ready. - if self.xtrigger_mgr.check_xtriggers( - itask, self.workflow_db_mgr.put_xtriggers): - housekeep_xtriggers = True - if all(itask.is_ready_to_run()): - self.pool.queue_task(itask) - - # Check for satisfied ext_triggers, and queue if ready. + if ( itask.state.external_triggers and not itask.state.external_triggers_all_satisfied() - and self.broadcast_mgr.check_ext_triggers( - itask, self.ext_trigger_queue) - and all(itask.is_ready_to_run()) ): + self.broadcast_mgr.check_ext_triggers( + itask, self.ext_trigger_queue) + + if all(itask.is_ready_to_run()): self.pool.queue_task(itask) - if housekeep_xtriggers: - # (Could do this periodically?) + if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) self.pool.set_expired_tasks() diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index 091d1712429..7d6968b137c 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -404,7 +404,6 @@ def put_task_event_timers(self, task_events_mgr): def put_xtriggers(self, sat_xtrig): """Put statements to update external triggers table.""" - self.db_deletes_map[self.TABLE_XTRIGGERS].append({}) for sig, res in sat_xtrig.items(): self.db_inserts_map[self.TABLE_XTRIGGERS].append({ "signature": sig, diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 35b8e88b36c..4f8e31da597 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -20,20 +20,22 @@ import re from copy import deepcopy from time import time -from typing import Any, Dict, List, Optional, Tuple, Callable +from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING from cylc.flow import LOG from cylc.flow.exceptions import XtriggerConfigError import cylc.flow.flags from cylc.flow.hostuserutil import get_user +from cylc.flow.subprocpool import get_func from cylc.flow.xtriggers.wall_clock import wall_clock -from cylc.flow.subprocctx import SubFuncContext -from cylc.flow.broadcast_mgr import BroadcastMgr -from cylc.flow.data_store_mgr import DataStoreMgr -from cylc.flow.subprocpool import SubProcPool -from cylc.flow.task_proxy import TaskProxy -from cylc.flow.subprocpool import get_func +if TYPE_CHECKING: + from cylc.flow.broadcast_mgr import BroadcastMgr + from cylc.flow.data_store_mgr import DataStoreMgr + from cylc.flow.subprocctx import SubFuncContext + from cylc.flow.subprocpool import SubProcPool + from cylc.flow.task_proxy import TaskProxy + from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager class TemplateVariables(Enum): @@ -185,6 +187,7 @@ class XtriggerManager: Args: workflow: workflow name user: workflow owner + workflow_db_mgr: the DB Manager broadcast_mgr: the Broadcast Manager proc_pool: pool of Subprocesses workflow_run_dir: workflow run directory @@ -195,9 +198,10 @@ class XtriggerManager: def __init__( self, workflow: str, - broadcast_mgr: BroadcastMgr, - data_store_mgr: DataStoreMgr, - proc_pool: SubProcPool, + broadcast_mgr: 'BroadcastMgr', + workflow_db_mgr: 'WorkflowDatabaseManager', + data_store_mgr: 'DataStoreMgr', + proc_pool: 'SubProcPool', user: Optional[str] = None, workflow_run_dir: Optional[str] = None, workflow_share_dir: Optional[str] = None, @@ -230,11 +234,15 @@ def __init__( } self.proc_pool = proc_pool + self.workflow_db_mgr = workflow_db_mgr self.broadcast_mgr = broadcast_mgr self.data_store_mgr = data_store_mgr + self.do_housekeeping = False @staticmethod - def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: + def validate_xtrigger( + label: str, fctx: 'SubFuncContext', fdir: str + ) -> None: """Validate an Xtrigger function. Args: @@ -305,7 +313,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: f' {", ".join(t.value for t in deprecated_variables)}' ) - def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None: + def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: """Add a new xtrigger function. Check the xtrigger function exists here (e.g. during validation). @@ -334,7 +342,7 @@ def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]): sig, results = row self.sat_xtrig[sig] = json.loads(results) - def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False, + def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False, sigs_only: bool = False): """(Internal helper method.) @@ -361,7 +369,9 @@ def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False, res.append((label, sig, ctx, satisfied)) return res - def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: + def get_xtrig_ctx( + self, itask: 'TaskProxy', label: str + ) -> 'SubFuncContext': """Get a real function context from the template. Args: @@ -412,7 +422,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: ctx.update_command(self.workflow_run_dir) return ctx - def call_xtriggers_async(self, itask: TaskProxy): + def call_xtriggers_async(self, itask: 'TaskProxy'): """Call itask's xtrigger functions via the process pool... ...if previous call not still in-process and retry period is up. @@ -421,16 +431,23 @@ def call_xtriggers_async(self, itask: TaskProxy): itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): + # Special case: quick synchronous clock check: if sig.startswith("wall_clock"): - # Special case: quick synchronous clock check. - if wall_clock(*ctx.func_args, **ctx.func_kwargs): + if sig in self.sat_xtrig: + # Already satisfied, just update the task + itask.state.xtriggers[label] = True + elif wall_clock(*ctx.func_args, **ctx.func_kwargs): + # Newly satisfied itask.state.xtriggers[label] = True self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) + self.workflow_db_mgr.put_xtriggers({sig: {}}) LOG.info('xtrigger satisfied: %s = %s', label, sig) + self.do_housekeeping = True continue # General case: potentially slow asynchronous function call. if sig in self.sat_xtrig: + # Already satisfied, just update the task if not itask.state.xtriggers[label]: itask.state.xtriggers[label] = True res = {} @@ -445,6 +462,8 @@ def call_xtriggers_async(self, itask: TaskProxy): xtrigger_env ) continue + + # Call the function to check the unsatisfied xtrigger. if sig in self.active: # Already waiting on this result. continue @@ -457,8 +476,10 @@ def call_xtriggers_async(self, itask: TaskProxy): self.active.append(sig) self.proc_pool.put_command(ctx, callback=self.callback) - def housekeep(self, itasks: List[TaskProxy]): - """Delete satisfied xtriggers no longer needed by any task. + def housekeep(self, itasks): + """Forget satisfied xtriggers no longer needed by any task. + + Check self.do_housekeeping before calling this method. Args: itasks: list of all task proxies. @@ -469,8 +490,9 @@ def housekeep(self, itasks: List[TaskProxy]): for sig in list(self.sat_xtrig): if sig not in all_xtrig: del self.sat_xtrig[sig] + self.do_housekeeping = False - def callback(self, ctx: SubFuncContext): + def callback(self, ctx: 'SubFuncContext'): """Callback for asynchronous xtrigger functions. Record satisfaction status and function results dict. @@ -489,23 +511,9 @@ def callback(self, ctx: SubFuncContext): return LOG.debug('%s: returned %s', sig, results) if satisfied: + # Newly satisfied self.data_store_mgr.delta_task_xtrigger(sig, True) + self.workflow_db_mgr.put_xtriggers({sig: results}) LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig) self.sat_xtrig[sig] = results - - def check_xtriggers( - self, - itask: TaskProxy, - db_update_func: Callable[[dict], None]) -> bool: - """Check if all of itasks' xtriggers have become satisfied. - - Return True if satisfied, else False - - Args: - itasks: task proxies to check - db_update_func: method to update xtriggers in the DB - """ - if itask.state.xtriggers_all_satisfied(): - db_update_func(self.sat_xtrig) - return True - return False + self.do_housekeeping = True diff --git a/setup.cfg b/setup.cfg index 67ed7ac71c3..dfeb55189d0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -119,6 +119,8 @@ tests = pytest-asyncio>=0.17,!=0.23.* pytest-cov>=2.8.0 pytest-xdist>=2 + pytest-env>=0.6.2 + pytest-mock>=3.7 pytest>=6 testfixtures>=6.11.0 towncrier>=23 diff --git a/tests/functional/xtriggers/02-persistence/flow.cylc b/tests/functional/xtriggers/02-persistence/flow.cylc index 29ba08712a9..f6d004316fb 100644 --- a/tests/functional/xtriggers/02-persistence/flow.cylc +++ b/tests/functional/xtriggers/02-persistence/flow.cylc @@ -3,6 +3,7 @@ [scheduling] initial cycle point = 2010 final cycle point = 2011 + runahead limit = P0 [[xtriggers]] x1 = faker(name="bob") [[graph]] diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index 07abbdac24d..b4d2d503799 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -16,6 +16,7 @@ """Tests for the behaviour of xtrigger manager. """ +from pytest_mock import mocker async def test_2_xtriggers(flow, start, scheduler, monkeypatch): """Test that if an itask has 2 wall_clock triggers with different @@ -65,3 +66,56 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch): 'clock_2': False, 'clock_3': False, } + + +async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker): + """ + If multiple tasks depend on the same satisfied xtrigger, the DB mgr method + put_xtriggers should only be called once - when the xtrigger gets satisfied. + + See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908) + + """ + task_point = 1588636800 # 2020-05-05 + ten_years_ahead = 1904169600 # 2030-05-05 + monkeypatch.setattr( + 'cylc.flow.xtriggers.wall_clock.time', + lambda: ten_years_ahead - 1 + ) + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': True + }, + 'scheduling': { + 'initial cycle point': '2020-05-05', + 'xtriggers': { + 'clock_1': 'wall_clock()', + }, + 'graph': { + 'R1': '@clock_1 => foo & bar' + } + } + }) + + schd = scheduler(id_) + spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers') + + async with start(schd): + + # Call the clock trigger via its dependent tasks, to get it satisfied. + for task in schd.pool.get_tasks(): + # (For clock triggers this is synchronous) + schd.xtrigger_mgr.call_xtriggers_async(task) + + # It should now be satisfied. + assert task.state.xtriggers == {'clock_1': True} + + # Check one put_xtriggers call only, not two. + assert spy.call_count == 1 + + # Note on master prior to GH #5908 the call is made from the + # scheduler main loop when the two tasks become satisified, + # resulting in two calls to put_xtriggers. This test fails + # on master, but with call count 0 (not 2) because the main + # loop doesn't run in this test. + diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 68095a5795e..09dcbcd40ad 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -199,6 +199,7 @@ def xtrigger_mgr() -> XtriggerManager: workflow=workflow_name, user=user, proc_pool=Mock(put_command=lambda *a, **k: True), + workflow_db_mgr=Mock(housekeep=lambda *a, **k: True), broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True), data_store_mgr=DataStoreMgr( create_autospec(Scheduler, workflow=workflow_name, owner=user) diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index 89cc0024ddf..bba77c1f7f1 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -281,58 +281,3 @@ def test_callback(xtrigger_mgr): xtrigger_mgr.callback(get_name) # this means that the xtrigger was satisfied assert xtrigger_mgr.sat_xtrig - - -def test_check_xtriggers(xtrigger_mgr): - """Test process_xtriggers call.""" - - xtrigger_mgr.validate_xtrigger = lambda *a, **k: True # Ignore validation - # add an xtrigger - get_name = SubFuncContext( - label="get_name", - func_name="get_name", - func_args=[], - func_kwargs={} - ) - xtrigger_mgr.add_trig("get_name", get_name, 'fdir') - get_name.out = "[\"True\", {\"name\": \"Yossarian\"}]" - tdef1 = TaskDef( - name="foo", - rtcfg=None, - run_mode="live", - start_point=1, - initial_point=1 - ) - init() - sequence = ISO8601Sequence('P1D', '2019') - tdef1.xtrig_labels[sequence] = ["get_name"] - start_point = ISO8601Point('2019') - itask1 = TaskProxy(Tokens('~user/workflow'), tdef1, start_point) - itask1.state.xtriggers["get_name"] = False # satisfied? - - # add a clock xtrigger - wall_clock = SubFuncContext( - label="wall_clock", - func_name="wall_clock", - func_args=[], - func_kwargs={} - ) - wall_clock.out = "[\"True\", \"1\"]" - xtrigger_mgr.add_trig("wall_clock", wall_clock, "fdir") - # create a task - tdef2 = TaskDef( - name="foo", - rtcfg=None, - run_mode="live", - start_point=1, - initial_point=1 - ) - tdef2.xtrig_labels[sequence] = ["wall_clock"] - init() - start_point = ISO8601Point('20000101T0000+05') - # create task proxy - TaskProxy(Tokens('~user/workflow'), tdef2, start_point) - - xtrigger_mgr.check_xtriggers(itask1, lambda foo: None) - # won't be satisfied, as it is async, we are are not calling callback - assert not xtrigger_mgr.sat_xtrig From 57b5e8c8c1a8c7c83c51a63e044ea9b0755032ec Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 11 Jan 2024 14:32:51 +0000 Subject: [PATCH 5/9] Update CONTRIBUTING.md --- CONTRIBUTING.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6fbc4283968..c6d27175ff9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -92,6 +92,7 @@ requests_). - John Haiducek - (Andrew Huang) - Cheng Da + - Mark Dawson (All contributors are identifiable with email addresses in the git version From 41ff8c95befff55d06a25fddb74db639725d5e3d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 11 Jan 2024 14:37:07 +0000 Subject: [PATCH 6/9] Prepare release 8.2.4 Workflow: Release stage 1 - create release PR (Cylc 8+ only), run: 31 --- CHANGES.md | 29 +++++++++++++++++++++++++++++ changes.d/5772.feat.md | 1 - changes.d/5789.fix.md | 1 - changes.d/5801.fix.md | 1 - changes.d/5821.fix.md | 1 - changes.d/5838.feat.md | 1 - changes.d/5841.fix.md | 1 - changes.d/5885.fix.md | 1 - changes.d/5893.fix | 1 - changes.d/5902.fix.md | 1 - changes.d/5908.fix.md | 1 - changes.d/5909.fix.md | 2 -- cylc/flow/__init__.py | 2 +- 13 files changed, 30 insertions(+), 13 deletions(-) delete mode 100644 changes.d/5772.feat.md delete mode 100644 changes.d/5789.fix.md delete mode 100644 changes.d/5801.fix.md delete mode 100644 changes.d/5821.fix.md delete mode 100644 changes.d/5838.feat.md delete mode 100644 changes.d/5841.fix.md delete mode 100644 changes.d/5885.fix.md delete mode 100644 changes.d/5893.fix delete mode 100644 changes.d/5902.fix.md delete mode 100644 changes.d/5908.fix.md delete mode 100644 changes.d/5909.fix.md diff --git a/CHANGES.md b/CHANGES.md index 4dfe61f42db..6c69859adea 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,35 @@ $ towncrier create ..md --content "Short description" +## __cylc-8.2.4 (Released 2024-01-11)__ + +### 🚀 Enhancements + +[#5772](https://github.com/cylc/cylc-flow/pull/5772) - `cylc lint`: added a check for indentation being 4N spaces. + +[#5838](https://github.com/cylc/cylc-flow/pull/5838) - `cylc lint`: added rule to check for `rose date` usage (should be replaced with `isodatetime`). + +### 🔧 Fixes + +[#5789](https://github.com/cylc/cylc-flow/pull/5789) - Stop users changing run modes on restart. + +[#5801](https://github.com/cylc/cylc-flow/pull/5801) - Fix traceback when using parentheses on right hand side of graph trigger. + +[#5821](https://github.com/cylc/cylc-flow/pull/5821) - Fixed issue where large uncommitted changes could cause `cylc install` to hang. + +[#5841](https://github.com/cylc/cylc-flow/pull/5841) - `cylc lint`: improved handling of S011 to not warn if the `#` is `#$` (e.g. shell base arithmetic). + +[#5885](https://github.com/cylc/cylc-flow/pull/5885) - Fixed bug in using a final cycle point with chained offsets e.g. 'final cycle point = +PT6H+PT1S'. + +[#5893](https://github.com/cylc/cylc-flow/pull/5893) - Fixed bug in computing a time interval-based runahead limit when future triggers are present. + +[#5902](https://github.com/cylc/cylc-flow/pull/5902) - Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload. + +[#5908](https://github.com/cylc/cylc-flow/pull/5908) - Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger. + +[#5909](https://github.com/cylc/cylc-flow/pull/5909) - Fix a bug where Cylc VIP did not remove --workflow-name= from + Cylc play arguments. + ## __cylc-8.2.3 (Released 2023-11-02)__ ### 🔧 Fixes diff --git a/changes.d/5772.feat.md b/changes.d/5772.feat.md deleted file mode 100644 index da0984a82ec..00000000000 --- a/changes.d/5772.feat.md +++ /dev/null @@ -1 +0,0 @@ -`cylc lint`: added a check for indentation being 4N spaces. diff --git a/changes.d/5789.fix.md b/changes.d/5789.fix.md deleted file mode 100644 index 7eda67036e0..00000000000 --- a/changes.d/5789.fix.md +++ /dev/null @@ -1 +0,0 @@ -Stop users changing run modes on restart. diff --git a/changes.d/5801.fix.md b/changes.d/5801.fix.md deleted file mode 100644 index e7fd0584090..00000000000 --- a/changes.d/5801.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fix traceback when using parentheses on right hand side of graph trigger. diff --git a/changes.d/5821.fix.md b/changes.d/5821.fix.md deleted file mode 100644 index 0c6c8b7918d..00000000000 --- a/changes.d/5821.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed issue where large uncommitted changes could cause `cylc install` to hang. diff --git a/changes.d/5838.feat.md b/changes.d/5838.feat.md deleted file mode 100644 index 8e9919d3a0f..00000000000 --- a/changes.d/5838.feat.md +++ /dev/null @@ -1 +0,0 @@ -`cylc lint`: added rule to check for `rose date` usage (should be replaced with `isodatetime`). diff --git a/changes.d/5841.fix.md b/changes.d/5841.fix.md deleted file mode 100644 index 4bc41462fca..00000000000 --- a/changes.d/5841.fix.md +++ /dev/null @@ -1 +0,0 @@ -`cylc lint`: improved handling of S011 to not warn if the `#` is `#$` (e.g. shell base arithmetic). diff --git a/changes.d/5885.fix.md b/changes.d/5885.fix.md deleted file mode 100644 index b9071bae612..00000000000 --- a/changes.d/5885.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed bug in using a final cycle point with chained offsets e.g. 'final cycle point = +PT6H+PT1S'. \ No newline at end of file diff --git a/changes.d/5893.fix b/changes.d/5893.fix deleted file mode 100644 index 504cd6a649e..00000000000 --- a/changes.d/5893.fix +++ /dev/null @@ -1 +0,0 @@ -Fixed bug in computing a time interval-based runahead limit when future triggers are present. diff --git a/changes.d/5902.fix.md b/changes.d/5902.fix.md deleted file mode 100644 index 803b9dc9590..00000000000 --- a/changes.d/5902.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload. \ No newline at end of file diff --git a/changes.d/5908.fix.md b/changes.d/5908.fix.md deleted file mode 100644 index 6dfbf76c1c1..00000000000 --- a/changes.d/5908.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger. diff --git a/changes.d/5909.fix.md b/changes.d/5909.fix.md deleted file mode 100644 index b6a229498a0..00000000000 --- a/changes.d/5909.fix.md +++ /dev/null @@ -1,2 +0,0 @@ -Fix a bug where Cylc VIP did not remove --workflow-name= from -Cylc play arguments. \ No newline at end of file diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py index f6e90da3c53..56b73965180 100644 --- a/cylc/flow/__init__.py +++ b/cylc/flow/__init__.py @@ -53,7 +53,7 @@ def environ_init(): environ_init() -__version__ = '8.2.4.dev' +__version__ = '8.2.4' def iter_entry_points(entry_point_name): From c15fbc7726522bcd7f777d27f6eb241a500c6b7d Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 11 Jan 2024 15:32:24 +0000 Subject: [PATCH 7/9] Update CHANGES.md Co-authored-by: Oliver Sanders --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 6c69859adea..6ac7e4d345c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,7 +21,7 @@ $ towncrier create ..md --content "Short description" ### 🔧 Fixes -[#5789](https://github.com/cylc/cylc-flow/pull/5789) - Stop users changing run modes on restart. +[#5789](https://github.com/cylc/cylc-flow/pull/5789) - Prevent the run mode from being changed on restart. [#5801](https://github.com/cylc/cylc-flow/pull/5801) - Fix traceback when using parentheses on right hand side of graph trigger. From 6365f4c43f2321e33213002dd837744e76f43fff Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 11 Jan 2024 15:36:54 +0000 Subject: [PATCH 8/9] Bump dev version Workflow: Release stage 2 - auto publish, run: 67 --- cylc/flow/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py index 56b73965180..6ff3aa1fac5 100644 --- a/cylc/flow/__init__.py +++ b/cylc/flow/__init__.py @@ -53,7 +53,7 @@ def environ_init(): environ_init() -__version__ = '8.2.4' +__version__ = '8.2.5.dev' def iter_entry_points(entry_point_name): From 22f41e250e54a92aea375e6692035002e9c7e7ec Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 12 Jan 2024 14:11:05 +0000 Subject: [PATCH 9/9] Update setup.cfg --- setup.cfg | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 3870c083545..7db9eb8bf4e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -121,7 +121,6 @@ tests = pytest-asyncio>=0.17,!=0.23.* pytest-cov>=2.8.0 pytest-xdist>=2 - pytest-env>=0.6.2 pytest-mock>=3.7 pytest>=6 testfixtures>=6.11.0