Skip to content

Commit

Permalink
xtrigger efficiency fix. (#5908)
Browse files Browse the repository at this point in the history
xtrigger efficiency fix.

* Update the DB xtriggers table when the xtriggers get satisfied, not
  when the tasks that depend on them get satisfied.
* Don't delete xtriggers table before every update.

---------

Co-authored-by: Oliver Sanders <[email protected]>
Co-authored-by: Tim Pillinger <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2024
1 parent 2ec98dd commit 80f06f2
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 116 deletions.
1 change: 1 addition & 0 deletions changes.d/5908.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger.
2 changes: 2 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ class CylcWorkflowDAO:
["prereq_output", {"is_primary_key": True}],
["satisfied"],
],
# The xtriggers table holds the function signature and result of
# already-satisfied (the scheduler no longer needs to call them).
TABLE_XTRIGGERS: [
["signature", {"is_primary_key": True}],
["results"],
Expand Down
32 changes: 9 additions & 23 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ async def initialise(self):
self.workflow,
user=self.owner,
broadcast_mgr=self.broadcast_mgr,
workflow_db_mgr=self.workflow_db_mgr,
data_store_mgr=self.data_store_mgr,
proc_pool=self.proc_pool,
workflow_run_dir=self.workflow_run_dir,
Expand Down Expand Up @@ -1705,14 +1706,8 @@ async def main_loop(self) -> None:
await self.process_command_queue()
self.proc_pool.process()

# Tasks in the main pool that are waiting but not queued must be
# waiting on external dependencies, i.e. xtriggers or ext_triggers.
# For these tasks, call any unsatisfied xtrigger functions, and
# queue tasks that have become ready. (Tasks do not appear in the
# main pool at all until all other-task deps are satisfied, and are
# queued immediately on release from runahead limiting if they are
# not waiting on external deps).
housekeep_xtriggers = False
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
Expand All @@ -1725,28 +1720,19 @@ async def main_loop(self) -> None:
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
# Call unsatisfied xtriggers if not already in-process.
# Results are returned asynchronously.
self.xtrigger_mgr.call_xtriggers_async(itask)
# Check for satisfied xtriggers, and queue if ready.
if self.xtrigger_mgr.check_xtriggers(
itask, self.workflow_db_mgr.put_xtriggers):
housekeep_xtriggers = True
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

# Check for satisfied ext_triggers, and queue if ready.

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
and self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
and all(itask.is_ready_to_run())
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ def put_task_event_timers(self, task_events_mgr):

def put_xtriggers(self, sat_xtrig):
"""Put statements to update external triggers table."""
self.db_deletes_map[self.TABLE_XTRIGGERS].append({})
for sig, res in sat_xtrig.items():
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
"signature": sig,
Expand Down
82 changes: 45 additions & 37 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
import re
from copy import deepcopy
from time import time
from typing import Any, Dict, List, Optional, Tuple, Callable
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING

from cylc.flow import LOG
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocpool import get_func
from cylc.flow.xtriggers.wall_clock import wall_clock

from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.subprocpool import get_func
if TYPE_CHECKING:
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


class TemplateVariables(Enum):
Expand Down Expand Up @@ -185,6 +187,7 @@ class XtriggerManager:
Args:
workflow: workflow name
user: workflow owner
workflow_db_mgr: the DB Manager
broadcast_mgr: the Broadcast Manager
proc_pool: pool of Subprocesses
workflow_run_dir: workflow run directory
Expand All @@ -195,9 +198,10 @@ class XtriggerManager:
def __init__(
self,
workflow: str,
broadcast_mgr: BroadcastMgr,
data_store_mgr: DataStoreMgr,
proc_pool: SubProcPool,
broadcast_mgr: 'BroadcastMgr',
workflow_db_mgr: 'WorkflowDatabaseManager',
data_store_mgr: 'DataStoreMgr',
proc_pool: 'SubProcPool',
user: Optional[str] = None,
workflow_run_dir: Optional[str] = None,
workflow_share_dir: Optional[str] = None,
Expand Down Expand Up @@ -230,11 +234,15 @@ def __init__(
}

self.proc_pool = proc_pool
self.workflow_db_mgr = workflow_db_mgr
self.broadcast_mgr = broadcast_mgr
self.data_store_mgr = data_store_mgr
self.do_housekeeping = False

@staticmethod
def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
def validate_xtrigger(
label: str, fctx: 'SubFuncContext', fdir: str
) -> None:
"""Validate an Xtrigger function.
Args:
Expand Down Expand Up @@ -305,7 +313,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
f' {", ".join(t.value for t in deprecated_variables)}'
)

def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None:
def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.
Check the xtrigger function exists here (e.g. during validation).
Expand Down Expand Up @@ -334,7 +342,7 @@ def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]):
sig, results = row
self.sat_xtrig[sig] = json.loads(results)

def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False,
def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False,
sigs_only: bool = False):
"""(Internal helper method.)
Expand All @@ -361,7 +369,9 @@ def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False,
res.append((label, sig, ctx, satisfied))
return res

def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
def get_xtrig_ctx(
self, itask: 'TaskProxy', label: str
) -> 'SubFuncContext':
"""Get a real function context from the template.
Args:
Expand Down Expand Up @@ -412,7 +422,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
ctx.update_command(self.workflow_run_dir)
return ctx

def call_xtriggers_async(self, itask: TaskProxy):
def call_xtriggers_async(self, itask: 'TaskProxy'):
"""Call itask's xtrigger functions via the process pool...
...if previous call not still in-process and retry period is up.
Expand All @@ -421,16 +431,23 @@ def call_xtriggers_async(self, itask: TaskProxy):
itask: task proxy to check.
"""
for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True):
# Special case: quick synchronous clock check:
if sig.startswith("wall_clock"):
# Special case: quick synchronous clock check.
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
if sig in self.sat_xtrig:
# Already satisfied, just update the task
itask.state.xtriggers[label] = True

Check warning on line 438 in cylc/flow/xtrigger_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtrigger_mgr.py#L438

Added line #L438 was not covered by tests
elif wall_clock(*ctx.func_args, **ctx.func_kwargs):
# Newly satisfied
itask.state.xtriggers[label] = True
self.sat_xtrig[sig] = {}
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: {}})
LOG.info('xtrigger satisfied: %s = %s', label, sig)
self.do_housekeeping = True
continue
# General case: potentially slow asynchronous function call.
if sig in self.sat_xtrig:
# Already satisfied, just update the task
if not itask.state.xtriggers[label]:
itask.state.xtriggers[label] = True
res = {}
Expand All @@ -445,6 +462,8 @@ def call_xtriggers_async(self, itask: TaskProxy):
xtrigger_env
)
continue

# Call the function to check the unsatisfied xtrigger.
if sig in self.active:
# Already waiting on this result.
continue
Expand All @@ -457,8 +476,10 @@ def call_xtriggers_async(self, itask: TaskProxy):
self.active.append(sig)
self.proc_pool.put_command(ctx, callback=self.callback)

def housekeep(self, itasks: List[TaskProxy]):
"""Delete satisfied xtriggers no longer needed by any task.
def housekeep(self, itasks):
"""Forget satisfied xtriggers no longer needed by any task.
Check self.do_housekeeping before calling this method.
Args:
itasks: list of all task proxies.
Expand All @@ -469,8 +490,9 @@ def housekeep(self, itasks: List[TaskProxy]):
for sig in list(self.sat_xtrig):
if sig not in all_xtrig:
del self.sat_xtrig[sig]
self.do_housekeeping = False

def callback(self, ctx: SubFuncContext):
def callback(self, ctx: 'SubFuncContext'):
"""Callback for asynchronous xtrigger functions.
Record satisfaction status and function results dict.
Expand All @@ -489,23 +511,9 @@ def callback(self, ctx: SubFuncContext):
return
LOG.debug('%s: returned %s', sig, results)
if satisfied:
# Newly satisfied
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: results})
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.sat_xtrig[sig] = results

def check_xtriggers(
self,
itask: TaskProxy,
db_update_func: Callable[[dict], None]) -> bool:
"""Check if all of itasks' xtriggers have become satisfied.
Return True if satisfied, else False
Args:
itasks: task proxies to check
db_update_func: method to update xtriggers in the DB
"""
if itask.state.xtriggers_all_satisfied():
db_update_func(self.sat_xtrig)
return True
return False
self.do_housekeeping = True
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ tests =
pytest-asyncio>=0.17,!=0.23.*
pytest-cov>=2.8.0
pytest-xdist>=2
pytest-env>=0.6.2
pytest-mock>=3.7
pytest>=6
testfixtures>=6.11.0
towncrier>=23
Expand Down
1 change: 1 addition & 0 deletions tests/functional/xtriggers/02-persistence/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[scheduling]
initial cycle point = 2010
final cycle point = 2011
runahead limit = P0
[[xtriggers]]
x1 = faker(name="bob")
[[graph]]
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""Tests for the behaviour of xtrigger manager.
"""

from pytest_mock import mocker

async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
Expand Down Expand Up @@ -65,3 +66,56 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
'clock_2': False,
'clock_3': False,
}


async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
"""
If multiple tasks depend on the same satisfied xtrigger, the DB mgr method
put_xtriggers should only be called once - when the xtrigger gets satisfied.
See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908)
"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
},
'graph': {
'R1': '@clock_1 => foo & bar'
}
}
})

schd = scheduler(id_)
spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers')

async with start(schd):

# Call the clock trigger via its dependent tasks, to get it satisfied.
for task in schd.pool.get_tasks():
# (For clock triggers this is synchronous)
schd.xtrigger_mgr.call_xtriggers_async(task)

# It should now be satisfied.
assert task.state.xtriggers == {'clock_1': True}

# Check one put_xtriggers call only, not two.
assert spy.call_count == 1

# Note on master prior to GH #5908 the call is made from the
# scheduler main loop when the two tasks become satisified,
# resulting in two calls to put_xtriggers. This test fails
# on master, but with call count 0 (not 2) because the main
# loop doesn't run in this test.

1 change: 1 addition & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def xtrigger_mgr() -> XtriggerManager:
workflow=workflow_name,
user=user,
proc_pool=Mock(put_command=lambda *a, **k: True),
workflow_db_mgr=Mock(housekeep=lambda *a, **k: True),
broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True),
data_store_mgr=DataStoreMgr(
create_autospec(Scheduler, workflow=workflow_name, owner=user)
Expand Down
Loading

0 comments on commit 80f06f2

Please sign in to comment.