Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 29, 2023
1 parent 286710d commit 2a0ff51
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 32 deletions.
5 changes: 4 additions & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,10 @@ def generate_ghost_task(
point,
flow_nums,
submit_num=0,
data_mode=True
data_mode=True,
sequential_xtrigger_labels=(
self.schd.xtrigger_mgr.sequential_xtrigger_labels
),
)

is_orphan = False
Expand Down
72 changes: 42 additions & 30 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ def load_db_task_pool_for_restart(self, row_idx, row):
submit_num=submit_num,
is_late=bool(is_late),
flow_wait=bool(flow_wait),
is_manual_submit=bool(is_manual_submit)
is_manual_submit=bool(is_manual_submit),
sequential_xtrigger_labels=(
self.xtrigger_mgr.sequential_xtrigger_labels
),
)

except WorkflowConfigError:
Expand Down Expand Up @@ -602,15 +605,6 @@ def load_db_task_pool_for_restart(self, row_idx, row):
)
)

# Set xtrigger checking type, which effects parentless spawning.
if (
itask.tdef.is_parentless(itask.point)
and set(itask.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
)
):
itask.is_xtrigger_sequential = True

if itask.state_reset(status, is_runahead=True):
self.data_store_mgr.delta_task_runahead(itask)
self.add_to_pool(itask)
Expand Down Expand Up @@ -734,7 +728,21 @@ def rh_release_and_queue(self, itask) -> None:
def _get_spawned_or_merged_task(
self, point: 'PointBase', tdef: 'TaskDef', flow_nums: 'FlowNums'
) -> 'Tuple[Optional[TaskProxy], bool, bool]':
"""Return new or existing task point/name with merged flow_nums"""
"""Return new or existing task point/name with merged flow_nums
Returns:
tuple - (itask, is_in_pool, is_xtrig_sequential)
itask:
The requested task proxy, or None if task does not
exist or cannot spawn.
is_in_pool:
Was the task found in a pool.
is_xtrig_sequential:
Is the next task occurance spawned on xtrigger satisfaction,
or do all occurances spawn out to the runahead limit.
"""
taskid = Tokens(cycle=str(point), task=tdef.name).relative_id
ntask = (
self._get_hidden_task_by_id(taskid)
Expand All @@ -745,28 +753,25 @@ def _get_spawned_or_merged_task(
if ntask is None:
# ntask does not exist: spawn it in the flow.
ntask = self.spawn_task(tdef.name, point, flow_nums)
# if the task was found set xtrigger checking type.
# if the task was found set xtrigger spawning type.
# otherwise find the xtrigger type if it can't spawn
# for whatever reason.
if ntask is not None:
if set(ntask.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
):
ntask.is_xtrigger_sequential = True
is_xtrig_sequential = True
elif {
xtrig_label
is_xtrig_sequential = ntask.is_xtrigger_sequential
elif any(
xtrig_label in self.xtrigger_mgr.sequential_xtrigger_labels
for sequence, xtrig_labels in tdef.xtrig_labels.items()
for xtrig_label in xtrig_labels
if sequence.is_valid(point)
}.intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
):
is_xtrig_sequential = True
else:
# ntask already exists (n=0 or incomplete): merge flows.
is_in_pool = True
self.merge_flows(ntask, flow_nums)
is_xtrig_sequential = ntask.is_xtrigger_sequential
return ntask, is_in_pool, is_xtrig_sequential # may be None
# ntask may still be None
return ntask, is_in_pool, is_xtrig_sequential

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit.
Expand Down Expand Up @@ -822,6 +827,12 @@ def remove(self, itask, reason=""):
if reason:
msg += f" ({reason})"

if itask.is_xtrigger_sequential:
with suppress(ValueError):
self.xtrigger_mgr.sequential_spawn_next.remove(
itask.identity
)

try:
del self.hidden_pool[itask.point][itask.identity]
except KeyError:
Expand Down Expand Up @@ -1056,20 +1067,15 @@ def reload_taskdefs(self, config: 'WorkflowConfig') -> None:
itask.point,
itask.flow_nums,
itask.state.status,
sequential_xtrigger_labels=(
self.xtrigger_mgr.sequential_xtrigger_labels
),
)
itask.copy_to_reload_successor(
new_task,
self.check_task_output,
)
self._swap_out(new_task)
# Set xtrigger checking type for parentless spawning.
if (
new_task.tdef.is_parentless(new_task.point)
and set(new_task.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
)
):
new_task.is_xtrigger_sequential = True
self.data_store_mgr.delta_task_prerequisite(new_task)
LOG.info(f"[{itask}] reloaded task definition")
if itask.state(*TASK_STATUSES_ACTIVE):
Expand Down Expand Up @@ -1622,6 +1628,9 @@ def spawn_task(
submit_num=submit_num,
is_manual_submit=is_manual_submit,
flow_wait=flow_wait,
sequential_xtrigger_labels=(
self.xtrigger_mgr.sequential_xtrigger_labels
),
)
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
Expand Down Expand Up @@ -1702,6 +1711,9 @@ def force_spawn_children(
taskdef,
point,
flow_nums=flow_nums,
sequential_xtrigger_labels=(
self.xtrigger_mgr.sequential_xtrigger_labels
),
)
# Spawn children of selected outputs.
for trig, out, _ in itask.state.outputs.get_all():
Expand Down
14 changes: 13 additions & 1 deletion cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def __init__(
is_manual_submit: bool = False,
flow_wait: bool = False,
data_mode: bool = False,
sequential_xtrigger_labels: Optional[Set[str]] = None,
) -> None:

self.tdef = tdef
Expand All @@ -213,7 +214,6 @@ def __init__(
task=self.tdef.name,
)
self.identity = self.tokens.relative_id
self.is_xtrigger_sequential = False
self.reload_successor: Optional['TaskProxy'] = None
self.point_as_seconds: Optional[int] = None

Expand Down Expand Up @@ -254,6 +254,18 @@ def __init__(

self.state = TaskState(tdef, self.point, status, is_held)

# Set xtrigger checking type, which effects parentless spawning.
if (
sequential_xtrigger_labels
and self.tdef.is_parentless(start_point)
and set(self.state.xtriggers.keys()).intersection(
sequential_xtrigger_labels
)
):
self.is_xtrigger_sequential = True
else:
self.is_xtrigger_sequential = False

# Determine graph children of this task (for spawning).
if data_mode:
self.graph_children = {}
Expand Down

0 comments on commit 2a0ff51

Please sign in to comment.