Skip to content

Commit

Permalink
xtriggers: differentiate "not satisfied" from "not yet evaluated"
Browse files Browse the repository at this point in the history
* Closes cylc#6298
* The `cylc show` command will now convey if an xtrigger has not yet
  started polling so that it doesn't look like n>0 xtriggers are not
  satisfied even if the external condition they will poll for is
  • Loading branch information
oliver-sanders committed Jan 16, 2025
1 parent 0ba5275 commit abb7043
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 86 deletions.
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ message PbTrigger {
optional string message = 3;
optional bool satisfied = 4;
optional double time = 5;
optional string status = 6;
}

message PbTaskProxy {
Expand Down
73 changes: 36 additions & 37 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

68 changes: 35 additions & 33 deletions cylc/flow/data_messages_pb2.pyi

Large diffs are not rendered by default.

19 changes: 18 additions & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
TIME_ZONE_UTC_INFO,
get_utc_mode
)
from cylc.flow.xtrigger_mgr import should_run_xtriggers

if TYPE_CHECKING:
from cylc.flow.cycling import PointBase
Expand Down Expand Up @@ -1519,10 +1520,11 @@ def _process_internal_task_proxy(
for label, satisfied in itask.state.xtriggers.items():
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature()
xtrig = tproxy.xtriggers[sig]
xtrig = tproxy.xtriggers[sig] # this creates a new PbTrigger
xtrig.id = sig
xtrig.label = label
xtrig.satisfied = satisfied
xtrig.status = get_xtrigger_status(itask, satisfied)
self.xtrigger_tasks.setdefault(sig, set()).add((tproxy.id, label))

if tproxy.state in self.latest_state_tasks:
Expand Down Expand Up @@ -2327,6 +2329,10 @@ def delta_task_state(self, itask: 'TaskProxy') -> None:
self.updated[TASKS].setdefault(
t_id,
PbTask(id=t_id)).MergeFrom(t_delta)

for label, satisfied in itask.state.xtriggers.items():
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(itask, label).get_signature()
tproxy.xtriggers[sig].status = get_xtrigger_status(itask, satisfied)
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

Expand Down Expand Up @@ -2539,6 +2545,7 @@ def delta_task_xtrigger(self, sig, satisfied):
xtrigger.label = label
xtrigger.satisfied = satisfied
xtrigger.time = update_time
xtrigger.status = 'running' if not satisfied else 'succeeded'
self.updates_pending = True

def delta_from_task_proxy(self, itask: TaskProxy) -> None:
Expand Down Expand Up @@ -2791,3 +2798,13 @@ def edge_id(self, left_tokens: Tokens, right_tokens: Tokens) -> str:
f'$edge|{left_tokens.relative_id}|{right_tokens.relative_id}'
)
).id


def get_xtrigger_status(itask: 'TaskProxy', xtrig_satisfied: bool):
if xtrig_satisfied:
status = 'satisfied'
elif should_run_xtriggers(itask):
status = 'running'
else:
status = 'waiting'
return status
1 change: 1 addition & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ class Meta:
message = String()
satisfied = Boolean()
time = Float()
status = String()


class TaskProxy(ObjectType):
Expand Down
8 changes: 2 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
AutoRestartMode,
StopMode,
)
from cylc.flow.xtrigger_mgr import XtriggerManager
from cylc.flow.xtrigger_mgr import XtriggerManager, should_run_xtriggers


if TYPE_CHECKING:
Expand Down Expand Up @@ -1759,11 +1759,7 @@ async def _main_loop(self) -> None:
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
if not should_run_xtriggers(itask):
continue

if (
Expand Down
27 changes: 18 additions & 9 deletions cylc/flow/scripts/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import json
import sys
from textwrap import indent
from typing import Any, Dict, TYPE_CHECKING
from typing import Any, Dict, Optional, TYPE_CHECKING

from ansimarkup import ansiprint

Expand All @@ -62,7 +62,7 @@
ID_MULTI_ARG_DOC,
Options,
)
from cylc.flow.terminal import cli_function
from cylc.flow.terminal import DIM, cli_function
from cylc.flow.util import BOOL_SYMBOLS


Expand Down Expand Up @@ -142,6 +142,7 @@
id
label
satisfied
status
}
runtime {
completion
Expand All @@ -153,13 +154,16 @@

SATISFIED = BOOL_SYMBOLS[True]
UNSATISFIED = BOOL_SYMBOLS[False]
PENDING = 'o'


def print_msg_state(msg, state):
if state:
def print_msg_state(msg, state: Optional[bool]):
if state is False:
ansiprint(f'<green> {SATISFIED} {msg}</green>')
else:
elif state is True:
ansiprint(f'<red> {UNSATISFIED} {msg}</red>')
else:
ansiprint(f'<{DIM}> {PENDING} {msg}</{DIM}>')


def print_completion_state(t_proxy):
Expand Down Expand Up @@ -391,7 +395,8 @@ async def prereqs_and_outputs_query(
):
ansiprint(
"<bold>other:</bold>"
f" ('<red>{UNSATISFIED}</red>': not satisfied)"
f" ('<red>{UNSATISFIED}</red>': not satisfied,"
f" '<{DIM}>{PENDING}</{DIM}>': not yet evaluated)"
)
for ext_trig in t_proxy['externalTriggers']:
state = ext_trig['satisfied']
Expand All @@ -400,11 +405,15 @@ async def prereqs_and_outputs_query(
state)
for xtrig in t_proxy['xtriggers']:
label = get_wallclock_label(xtrig) or xtrig['id']
state = xtrig['satisfied']
satisfied = xtrig['satisfied']
status = xtrig.get(
'status',
'succeeded' if satisfied else 'running'
)
print_msg_state(
f'xtrigger "{xtrig["label"]} = {label}"',
state)

None if status == 'waiting' else satisfied == 'succeeded'
)
print_completion_state(t_proxy)

if not task_proxies:
Expand Down
9 changes: 9 additions & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocctx import add_kwarg_to_sig
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.xtriggers.wall_clock import _wall_clock
from cylc.flow.xtriggers.workflow_state import (
workflow_state,
Expand Down Expand Up @@ -776,3 +777,11 @@ def callback(self, ctx: 'SubFuncContext'):
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.sat_xtrig[sig] = results
self.do_housekeeping = True


def should_run_xtriggers(itask: 'TaskProxy'):
return (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_queued
and not itask.state.is_runahead
)

0 comments on commit abb7043

Please sign in to comment.