diff --git a/changes.d/5908.fix.md b/changes.d/5908.fix.md new file mode 100644 index 00000000000..6dfbf76c1c1 --- /dev/null +++ b/changes.d/5908.fix.md @@ -0,0 +1 @@ +Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger. diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 84360083c5c..256da3b3129 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -298,6 +298,8 @@ class CylcWorkflowDAO: ["prereq_output", {"is_primary_key": True}], ["satisfied"], ], + # The xtriggers table holds the function signature and result of + # already-satisfied (the scheduler no longer needs to call them). TABLE_XTRIGGERS: [ ["signature", {"is_primary_key": True}], ["results"], diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 835be9b735e..0d793997296 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -377,6 +377,7 @@ async def initialise(self): self.workflow, user=self.owner, broadcast_mgr=self.broadcast_mgr, + workflow_db_mgr=self.workflow_db_mgr, data_store_mgr=self.data_store_mgr, proc_pool=self.proc_pool, workflow_run_dir=self.workflow_run_dir, @@ -1705,14 +1706,8 @@ async def main_loop(self) -> None: await self.process_command_queue() self.proc_pool.process() - # Tasks in the main pool that are waiting but not queued must be - # waiting on external dependencies, i.e. xtriggers or ext_triggers. - # For these tasks, call any unsatisfied xtrigger functions, and - # queue tasks that have become ready. (Tasks do not appear in the - # main pool at all until all other-task deps are satisfied, and are - # queued immediately on release from runahead limiting if they are - # not waiting on external deps). - housekeep_xtriggers = False + # Unqueued tasks with satisfied prerequisites must be waiting on + # xtriggers or ext_triggers. Check these and queue tasks if ready. for itask in self.pool.get_tasks(): if ( not itask.state(TASK_STATUS_WAITING) @@ -1725,28 +1720,19 @@ async def main_loop(self) -> None: itask.state.xtriggers and not itask.state.xtriggers_all_satisfied() ): - # Call unsatisfied xtriggers if not already in-process. - # Results are returned asynchronously. self.xtrigger_mgr.call_xtriggers_async(itask) - # Check for satisfied xtriggers, and queue if ready. - if self.xtrigger_mgr.check_xtriggers( - itask, self.workflow_db_mgr.put_xtriggers): - housekeep_xtriggers = True - if all(itask.is_ready_to_run()): - self.pool.queue_task(itask) - - # Check for satisfied ext_triggers, and queue if ready. + if ( itask.state.external_triggers and not itask.state.external_triggers_all_satisfied() - and self.broadcast_mgr.check_ext_triggers( - itask, self.ext_trigger_queue) - and all(itask.is_ready_to_run()) ): + self.broadcast_mgr.check_ext_triggers( + itask, self.ext_trigger_queue) + + if all(itask.is_ready_to_run()): self.pool.queue_task(itask) - if housekeep_xtriggers: - # (Could do this periodically?) + if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) self.pool.set_expired_tasks() diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index 091d1712429..7d6968b137c 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -404,7 +404,6 @@ def put_task_event_timers(self, task_events_mgr): def put_xtriggers(self, sat_xtrig): """Put statements to update external triggers table.""" - self.db_deletes_map[self.TABLE_XTRIGGERS].append({}) for sig, res in sat_xtrig.items(): self.db_inserts_map[self.TABLE_XTRIGGERS].append({ "signature": sig, diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 35b8e88b36c..4f8e31da597 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -20,20 +20,22 @@ import re from copy import deepcopy from time import time -from typing import Any, Dict, List, Optional, Tuple, Callable +from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING from cylc.flow import LOG from cylc.flow.exceptions import XtriggerConfigError import cylc.flow.flags from cylc.flow.hostuserutil import get_user +from cylc.flow.subprocpool import get_func from cylc.flow.xtriggers.wall_clock import wall_clock -from cylc.flow.subprocctx import SubFuncContext -from cylc.flow.broadcast_mgr import BroadcastMgr -from cylc.flow.data_store_mgr import DataStoreMgr -from cylc.flow.subprocpool import SubProcPool -from cylc.flow.task_proxy import TaskProxy -from cylc.flow.subprocpool import get_func +if TYPE_CHECKING: + from cylc.flow.broadcast_mgr import BroadcastMgr + from cylc.flow.data_store_mgr import DataStoreMgr + from cylc.flow.subprocctx import SubFuncContext + from cylc.flow.subprocpool import SubProcPool + from cylc.flow.task_proxy import TaskProxy + from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager class TemplateVariables(Enum): @@ -185,6 +187,7 @@ class XtriggerManager: Args: workflow: workflow name user: workflow owner + workflow_db_mgr: the DB Manager broadcast_mgr: the Broadcast Manager proc_pool: pool of Subprocesses workflow_run_dir: workflow run directory @@ -195,9 +198,10 @@ class XtriggerManager: def __init__( self, workflow: str, - broadcast_mgr: BroadcastMgr, - data_store_mgr: DataStoreMgr, - proc_pool: SubProcPool, + broadcast_mgr: 'BroadcastMgr', + workflow_db_mgr: 'WorkflowDatabaseManager', + data_store_mgr: 'DataStoreMgr', + proc_pool: 'SubProcPool', user: Optional[str] = None, workflow_run_dir: Optional[str] = None, workflow_share_dir: Optional[str] = None, @@ -230,11 +234,15 @@ def __init__( } self.proc_pool = proc_pool + self.workflow_db_mgr = workflow_db_mgr self.broadcast_mgr = broadcast_mgr self.data_store_mgr = data_store_mgr + self.do_housekeeping = False @staticmethod - def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: + def validate_xtrigger( + label: str, fctx: 'SubFuncContext', fdir: str + ) -> None: """Validate an Xtrigger function. Args: @@ -305,7 +313,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: f' {", ".join(t.value for t in deprecated_variables)}' ) - def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None: + def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: """Add a new xtrigger function. Check the xtrigger function exists here (e.g. during validation). @@ -334,7 +342,7 @@ def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]): sig, results = row self.sat_xtrig[sig] = json.loads(results) - def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False, + def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False, sigs_only: bool = False): """(Internal helper method.) @@ -361,7 +369,9 @@ def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False, res.append((label, sig, ctx, satisfied)) return res - def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: + def get_xtrig_ctx( + self, itask: 'TaskProxy', label: str + ) -> 'SubFuncContext': """Get a real function context from the template. Args: @@ -412,7 +422,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: ctx.update_command(self.workflow_run_dir) return ctx - def call_xtriggers_async(self, itask: TaskProxy): + def call_xtriggers_async(self, itask: 'TaskProxy'): """Call itask's xtrigger functions via the process pool... ...if previous call not still in-process and retry period is up. @@ -421,16 +431,23 @@ def call_xtriggers_async(self, itask: TaskProxy): itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): + # Special case: quick synchronous clock check: if sig.startswith("wall_clock"): - # Special case: quick synchronous clock check. - if wall_clock(*ctx.func_args, **ctx.func_kwargs): + if sig in self.sat_xtrig: + # Already satisfied, just update the task + itask.state.xtriggers[label] = True + elif wall_clock(*ctx.func_args, **ctx.func_kwargs): + # Newly satisfied itask.state.xtriggers[label] = True self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) + self.workflow_db_mgr.put_xtriggers({sig: {}}) LOG.info('xtrigger satisfied: %s = %s', label, sig) + self.do_housekeeping = True continue # General case: potentially slow asynchronous function call. if sig in self.sat_xtrig: + # Already satisfied, just update the task if not itask.state.xtriggers[label]: itask.state.xtriggers[label] = True res = {} @@ -445,6 +462,8 @@ def call_xtriggers_async(self, itask: TaskProxy): xtrigger_env ) continue + + # Call the function to check the unsatisfied xtrigger. if sig in self.active: # Already waiting on this result. continue @@ -457,8 +476,10 @@ def call_xtriggers_async(self, itask: TaskProxy): self.active.append(sig) self.proc_pool.put_command(ctx, callback=self.callback) - def housekeep(self, itasks: List[TaskProxy]): - """Delete satisfied xtriggers no longer needed by any task. + def housekeep(self, itasks): + """Forget satisfied xtriggers no longer needed by any task. + + Check self.do_housekeeping before calling this method. Args: itasks: list of all task proxies. @@ -469,8 +490,9 @@ def housekeep(self, itasks: List[TaskProxy]): for sig in list(self.sat_xtrig): if sig not in all_xtrig: del self.sat_xtrig[sig] + self.do_housekeeping = False - def callback(self, ctx: SubFuncContext): + def callback(self, ctx: 'SubFuncContext'): """Callback for asynchronous xtrigger functions. Record satisfaction status and function results dict. @@ -489,23 +511,9 @@ def callback(self, ctx: SubFuncContext): return LOG.debug('%s: returned %s', sig, results) if satisfied: + # Newly satisfied self.data_store_mgr.delta_task_xtrigger(sig, True) + self.workflow_db_mgr.put_xtriggers({sig: results}) LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig) self.sat_xtrig[sig] = results - - def check_xtriggers( - self, - itask: TaskProxy, - db_update_func: Callable[[dict], None]) -> bool: - """Check if all of itasks' xtriggers have become satisfied. - - Return True if satisfied, else False - - Args: - itasks: task proxies to check - db_update_func: method to update xtriggers in the DB - """ - if itask.state.xtriggers_all_satisfied(): - db_update_func(self.sat_xtrig) - return True - return False + self.do_housekeeping = True diff --git a/setup.cfg b/setup.cfg index 67ed7ac71c3..dfeb55189d0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -119,6 +119,8 @@ tests = pytest-asyncio>=0.17,!=0.23.* pytest-cov>=2.8.0 pytest-xdist>=2 + pytest-env>=0.6.2 + pytest-mock>=3.7 pytest>=6 testfixtures>=6.11.0 towncrier>=23 diff --git a/tests/functional/xtriggers/02-persistence/flow.cylc b/tests/functional/xtriggers/02-persistence/flow.cylc index 29ba08712a9..f6d004316fb 100644 --- a/tests/functional/xtriggers/02-persistence/flow.cylc +++ b/tests/functional/xtriggers/02-persistence/flow.cylc @@ -3,6 +3,7 @@ [scheduling] initial cycle point = 2010 final cycle point = 2011 + runahead limit = P0 [[xtriggers]] x1 = faker(name="bob") [[graph]] diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index 07abbdac24d..b4d2d503799 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -16,6 +16,7 @@ """Tests for the behaviour of xtrigger manager. """ +from pytest_mock import mocker async def test_2_xtriggers(flow, start, scheduler, monkeypatch): """Test that if an itask has 2 wall_clock triggers with different @@ -65,3 +66,56 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch): 'clock_2': False, 'clock_3': False, } + + +async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker): + """ + If multiple tasks depend on the same satisfied xtrigger, the DB mgr method + put_xtriggers should only be called once - when the xtrigger gets satisfied. + + See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908) + + """ + task_point = 1588636800 # 2020-05-05 + ten_years_ahead = 1904169600 # 2030-05-05 + monkeypatch.setattr( + 'cylc.flow.xtriggers.wall_clock.time', + lambda: ten_years_ahead - 1 + ) + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': True + }, + 'scheduling': { + 'initial cycle point': '2020-05-05', + 'xtriggers': { + 'clock_1': 'wall_clock()', + }, + 'graph': { + 'R1': '@clock_1 => foo & bar' + } + } + }) + + schd = scheduler(id_) + spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers') + + async with start(schd): + + # Call the clock trigger via its dependent tasks, to get it satisfied. + for task in schd.pool.get_tasks(): + # (For clock triggers this is synchronous) + schd.xtrigger_mgr.call_xtriggers_async(task) + + # It should now be satisfied. + assert task.state.xtriggers == {'clock_1': True} + + # Check one put_xtriggers call only, not two. + assert spy.call_count == 1 + + # Note on master prior to GH #5908 the call is made from the + # scheduler main loop when the two tasks become satisified, + # resulting in two calls to put_xtriggers. This test fails + # on master, but with call count 0 (not 2) because the main + # loop doesn't run in this test. + diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 68095a5795e..09dcbcd40ad 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -199,6 +199,7 @@ def xtrigger_mgr() -> XtriggerManager: workflow=workflow_name, user=user, proc_pool=Mock(put_command=lambda *a, **k: True), + workflow_db_mgr=Mock(housekeep=lambda *a, **k: True), broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True), data_store_mgr=DataStoreMgr( create_autospec(Scheduler, workflow=workflow_name, owner=user) diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index 89cc0024ddf..bba77c1f7f1 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -281,58 +281,3 @@ def test_callback(xtrigger_mgr): xtrigger_mgr.callback(get_name) # this means that the xtrigger was satisfied assert xtrigger_mgr.sat_xtrig - - -def test_check_xtriggers(xtrigger_mgr): - """Test process_xtriggers call.""" - - xtrigger_mgr.validate_xtrigger = lambda *a, **k: True # Ignore validation - # add an xtrigger - get_name = SubFuncContext( - label="get_name", - func_name="get_name", - func_args=[], - func_kwargs={} - ) - xtrigger_mgr.add_trig("get_name", get_name, 'fdir') - get_name.out = "[\"True\", {\"name\": \"Yossarian\"}]" - tdef1 = TaskDef( - name="foo", - rtcfg=None, - run_mode="live", - start_point=1, - initial_point=1 - ) - init() - sequence = ISO8601Sequence('P1D', '2019') - tdef1.xtrig_labels[sequence] = ["get_name"] - start_point = ISO8601Point('2019') - itask1 = TaskProxy(Tokens('~user/workflow'), tdef1, start_point) - itask1.state.xtriggers["get_name"] = False # satisfied? - - # add a clock xtrigger - wall_clock = SubFuncContext( - label="wall_clock", - func_name="wall_clock", - func_args=[], - func_kwargs={} - ) - wall_clock.out = "[\"True\", \"1\"]" - xtrigger_mgr.add_trig("wall_clock", wall_clock, "fdir") - # create a task - tdef2 = TaskDef( - name="foo", - rtcfg=None, - run_mode="live", - start_point=1, - initial_point=1 - ) - tdef2.xtrig_labels[sequence] = ["wall_clock"] - init() - start_point = ISO8601Point('20000101T0000+05') - # create task proxy - TaskProxy(Tokens('~user/workflow'), tdef2, start_point) - - xtrigger_mgr.check_xtriggers(itask1, lambda foo: None) - # won't be satisfied, as it is async, we are are not calling callback - assert not xtrigger_mgr.sat_xtrig