Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parentless sequential xtrigger spawning #5738

Merged
1 change: 1 addition & 0 deletions changes.d/5738.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optionally spawn parentless xtriggered tasks sequentially - i.e., one at a time, after the previous xtrigger is satisfied, instead of all at once out to the runahead limit. The `wall_clock` xtrigger is now sequential by default.
16 changes: 14 additions & 2 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,21 @@ def get_script_common_text(this: str, example: Optional[str] = None):
:ref:`SequentialTasks`.
''')

Conf('sequential xtriggers', VDR.V_BOOLEAN, False,
desc='''
If ``True``, tasks that only depend on xtriggers will not spawn
until the xtrigger of previous (cycle point) instance is satisfied.
Otherwise, they will all spawn at once out to the runahead limit.

This setting can be overridden by the reserved keyword argument
``sequential`` in individual xtrigger declarations.

One sequential xtrigger on a parentless task with multiple
xtriggers will cause sequential spawning.
''')
with Conf('xtriggers', desc='''
This section is for *External Trigger* function declarations -
see :ref:`Section External Triggers`.
This section is for *External Trigger* function declarations -
see :ref:`Section External Triggers`.
'''):
Conf('<xtrigger name>', VDR.V_XTRIGGER, desc='''
Any user-defined event trigger function declarations and
Expand Down
9 changes: 7 additions & 2 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1705,16 +1705,21 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
self.taskdefs[right].add_dependency(dependency, seq)

validator = XtriggerNameValidator.validate
for label in self.cfg['scheduling']['xtriggers']:
xtrigs = self.cfg['scheduling']['xtriggers']
for label in xtrigs:
valid, msg = validator(label)
if not valid:
raise WorkflowConfigError(
f'Invalid xtrigger name "{label}" - {msg}'
)

if self.xtrigger_mgr is not None:
self.xtrigger_mgr.sequential_xtriggers_default = (
self.cfg['scheduling']['sequential xtriggers']
)
for label in xtrig_labels:
try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
xtrig = xtrigs[label]
except KeyError:
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,10 @@ def generate_ghost_task(
point,
flow_nums,
submit_num=0,
data_mode=True
data_mode=True,
sequential_xtrigger_labels=(
self.schd.xtrigger_mgr.sequential_xtrigger_labels
),
)

is_orphan = False
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ async def configure(self, params):
self.config,
self.workflow_db_mgr,
self.task_events_mgr,
self.xtrigger_mgr,
self.data_store_mgr,
self.flow_mgr
)
Expand Down Expand Up @@ -1748,6 +1749,9 @@ async def main_loop(self) -> None:
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

if self.xtrigger_mgr.sequential_spawn_next:
self.pool.spawn_parentless_sequential_xtriggers()

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

Expand Down
59 changes: 49 additions & 10 deletions cylc/flow/subprocctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,39 @@
Coerce more value type from string (to time point, duration, xtriggers, etc.).
"""

from inspect import Parameter
import json
from shlex import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

from cylc.flow.wallclock import get_current_time_string

if TYPE_CHECKING:
from inspect import Signature


def add_kwarg_to_sig(
sig: 'Signature', arg_name: str, default: Any
) -> 'Signature':
"""Return a new signature with a kwarg added."""
# Note: added kwarg has to be before **kwargs ("variadic") in the signature
positional_or_keyword: List[Parameter] = []
variadic: List[Parameter] = []
for param in sig.parameters.values():
if param.kind == Parameter.VAR_KEYWORD:
variadic.append(param)
else:
positional_or_keyword.append(param)
return sig.replace(parameters=[
*positional_or_keyword,
Parameter(
arg_name,
kind=Parameter.KEYWORD_ONLY,
default=default,
),
*variadic,
])


class SubProcContext: # noqa: SIM119 (not really relevant to this case)
"""Represent the context of an external command to run as a subprocess.
Expand Down Expand Up @@ -115,23 +143,31 @@ class SubFuncContext(SubProcContext):

Attributes:
# See also parent class attributes.
.label (str):
.label:
function label under [xtriggers] in flow.cylc
.func_name (str):
.func_name:
function name
.func_args (list):
.func_args:
function positional args
.func_kwargs (dict):
.func_kwargs:
function keyword args
.intvl (float - seconds):
function call interval (how often to check the external trigger)
.ret_val (bool, dict)
.intvl:
function call interval in secs (how often to check the
external trigger)
.ret_val
function return: (satisfied?, result to pass to trigger tasks)
"""

DEFAULT_INTVL = 10.0

def __init__(self, label, func_name, func_args, func_kwargs, intvl=None):
def __init__(
self,
label: str,
func_name: str,
func_args: List[Any],
func_kwargs: Dict[str, Any],
intvl: Union[float, str] = DEFAULT_INTVL
):
"""Initialize a function context."""
self.label = label
self.func_name = func_name
Expand All @@ -141,9 +177,12 @@ def __init__(self, label, func_name, func_args, func_kwargs, intvl=None):
self.intvl = float(intvl)
except (TypeError, ValueError):
self.intvl = self.DEFAULT_INTVL
self.ret_val = (False, None) # (satisfied, broadcast)
self.ret_val: Tuple[
bool, Optional[dict]
] = (False, None) # (satisfied, broadcast)
super(SubFuncContext, self).__init__(
'xtrigger-func', cmd=[], shell=False)
'xtrigger-func', cmd=[], shell=False
)

def update_command(self, workflow_run_dir):
"""Update the function wrap command after changes."""
Expand Down
Loading
Loading