Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

finish implementing task expiration #5934

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5658.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New "cylc set" command for setting task prerequisites and outputs.
33 changes: 28 additions & 5 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
)
from cylc.flow.task_id import TaskID
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TaskOutputs
)
from cylc.flow.task_trigger import TaskTrigger, Dependency
Expand Down Expand Up @@ -1649,7 +1649,7 @@
offset_is_irregular, offset_is_absolute) = (
GraphNodeParser.get_inst().parse(left))

# Qualifier.
# Qualifier. Note ":succeeded" made explicit by the graph parser.
outputs = self.cfg['runtime'][name]['outputs']
if outputs and (output in outputs):
# Qualifier is a custom task message.
Expand All @@ -1660,9 +1660,6 @@
f"Undefined custom output: {name}:{output}"
)
qualifier = output
else:
# No qualifier specified => use "succeeded".
qualifier = TASK_OUTPUT_SUCCEEDED

# Generate TaskTrigger if not already done.
key = (name, offset, qualifier,
Expand Down Expand Up @@ -2131,7 +2128,11 @@

Args:
task_output_opt: {(task, output): (is-optional, default, is_set)}

"""
# task_output_opt: outputs parsed from graph triggers
# taskdef.outputs: outputs listed under runtime

for name, taskdef in self.taskdefs.items():
for output in taskdef.outputs:
try:
Expand All @@ -2141,6 +2142,28 @@
continue
taskdef.set_required_output(output, not optional)

# Add expired outputs to taskdefs if flagged in the graph.
graph_exp = set()
for (task, output) in task_output_opt.keys():
if output == TASK_OUTPUT_EXPIRED:
graph_exp.add(task)
self.taskdefs[task].add_output(

Check warning on line 2150 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L2149-L2150

Added lines #L2149 - L2150 were not covered by tests
TASK_OUTPUT_EXPIRED, TASK_OUTPUT_EXPIRED
)

# clock-expire must be flagged in the graph for visibility
bad_exp = set()
for task in self.expiration_offsets:
if task not in graph_exp:
bad_exp.add(task)

if bad_exp:
msg = '\n '.join(
[t + f":{TASK_OUTPUT_EXPIRED}?" for t in bad_exp])
raise WorkflowConfigError(
f"Clock-expire must be visible in the graph:\n {msg}"
)

def find_taskdefs(self, name: str) -> Set[TaskDef]:
"""Find TaskDef objects in family "name" or matching "name".

Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2185,8 +2185,9 @@ def update_workflow(self, reloaded=False):
w_delta.n_edge_distance = self.n_edge_distance
delta_set = True

if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
if self.schd.pool.active_tasks:
pool_points = set(self.schd.pool.active_tasks)

oldest_point = str(min(pool_points))
if w_data.oldest_active_cycle_point != oldest_point:
w_delta.oldest_active_cycle_point = oldest_point
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
Expand Down
153 changes: 130 additions & 23 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,99 @@
import datetime

from cylc.flow import LOG
from cylc.flow.exceptions import InputError


if TYPE_CHECKING:
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


FlowNums = Set[int]
# Flow constants
FLOW_ALL = "all"
FLOW_NEW = "new"
FLOW_NONE = "none"

# For flow-related CLI options:
ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)


def add_flow_opts(parser):
parser.add_option(
"--flow", action="append", dest="flow", metavar="FLOW",
help=f'Assign new tasks to all active flows ("{FLOW_ALL}");'
f' no flow ("{FLOW_NONE}"); a new flow ("{FLOW_NEW}");'
f' or a specific flow (e.g. "2"). The default is "{FLOW_ALL}".'
' Specific flow numbers can be new or existing.'
' Reuse the option to assign multiple flow numbers.'
)

parser.add_option(
"--meta", metavar="DESCRIPTION", action="store",
dest="flow_descr", default=None,
help=f"description of new flow (with --flow={FLOW_NEW})."
)

parser.add_option(
"--wait", action="store_true", default=False, dest="flow_wait",
help="Wait for merge with current active flows before flowing on."
)


def validate_flow_opts(options):
"""Check validity of flow-related CLI options."""
if options.flow is None:
# Default to all active flows
options.flow = [FLOW_ALL]

for val in options.flow:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if len(options.flow) != 1:
raise InputError(ERR_OPT_FLOW_INT)
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))

if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]:
raise InputError(ERR_OPT_FLOW_WAIT)


def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
"""Return a string representation of a set of flow numbers

If the set contains only the original flow 1, return an empty string
so that users can disregard flows unless they trigger new ones.

Otherwise return e.g. "(1,2,3)".

Examples:
>>> stringify_flow_nums({})
'(none)'

>>> stringify_flow_nums({1})
''

>>> stringify_flow_nums({1}, True)
'(1)'

>>> stringify_flow_nums({1,2,3})
'(1,2,3)'

"""
if not full and flow_nums == {1}:
return ""
return (
"("
f"{','.join(str(i) for i in flow_nums) or 'none'}"
")"
)


class FlowMgr:
"""Logic to manage flow counter and flow metadata."""
Expand All @@ -42,28 +123,54 @@ def __init__(self, db_mgr: "WorkflowDatabaseManager") -> None:
self.flows: Dict[int, Dict[str, str]] = {}
self.counter: int = 0

def get_new_flow(self, description: Optional[str] = None) -> int:
"""Increment flow counter, record flow metadata."""
self.counter += 1
# record start time to nearest second
now = datetime.datetime.now()
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
description = description or "no description"
self.flows[self.counter] = {
"description": description,
"start_time": now_sec
}
LOG.info(
f"New flow: {self.counter} "
f"({description}) "
f"{now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
self.counter,
self.flows[self.counter]
)
return self.counter
def get_flow_num(
self,
flow_num: Optional[int] = None,
meta: Optional[str] = None
) -> int:
"""Return a valid flow number, and record a new flow if necessary.

If asked for a new flow:
- increment the automatic counter until we find an unused number

If given a flow number:
- record a new flow if the number is unused
- else return it, as an existing flow number.

The metadata string is only used if it is a new flow.

"""
if flow_num is None:
self.counter += 1
while self.counter in self.flows:
# Skip manually-created out-of-sequence flows.
self.counter += 1
flow_num = self.counter

if flow_num in self.flows:
if meta is not None:
LOG.warning(
f'Ignoring flow metadata "{meta}":'
f' {flow_num} is not a new flow'
)
else:
# Record a new flow.
now = datetime.datetime.now()
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
meta = meta or "no description"
self.flows[flow_num] = {
"description": meta,
"start_time": now_sec
}
LOG.info(
f"New flow: {flow_num} ({meta}) {now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
flow_num,
self.flows[flow_num]
)
return flow_num

def load_from_db(self, flow_nums: FlowNums) -> None:
"""Load flow data for scheduler restart.
Expand Down
12 changes: 12 additions & 0 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from cylc.flow.task_id import TaskID
from cylc.flow.task_trigger import TaskTrigger
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_FAILED,
Expand All @@ -41,6 +42,8 @@
TASK_OUTPUT_SUBMIT_FAILED
)
from cylc.flow.task_qualifiers import (
QUAL_FAM_EXPIRE_ALL,
QUAL_FAM_EXPIRE_ANY,
QUAL_FAM_SUCCEED_ALL,
QUAL_FAM_SUCCEED_ANY,
QUAL_FAM_FAIL_ALL,
Expand All @@ -58,6 +61,7 @@

class Replacement:
"""A class to remember match group information in re.sub() calls"""

def __init__(self, replacement):
self.replacement = replacement
self.substitutions = []
Expand Down Expand Up @@ -124,6 +128,8 @@ class GraphParser:
# E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps
# "FAM:start-all" to "MEMBER:started" and "-all" (all members).
fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = {
QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True),
QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False),
QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True),
QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False),
QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True),
Expand All @@ -140,6 +146,8 @@ class GraphParser:

# Map family pseudo triggers to affected member outputs.
fam_to_mem_output_map: Dict[str, List[str]] = {
QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED],
QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED],
QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED],
Expand Down Expand Up @@ -738,6 +746,10 @@ def _set_output_opt(
if suicide:
return

if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Output {name}:{output} must be optional (append '?')")

if output == TASK_OUTPUT_FINISHED:
# Interpret :finish pseudo-output
if optional:
Expand Down
9 changes: 9 additions & 0 deletions cylc/flow/id.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)

from cylc.flow import LOG
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED


class IDTokens(Enum):
Expand Down Expand Up @@ -356,6 +357,14 @@ def is_null(self) -> bool:
self[key] for key in self._REGULAR_KEYS
)

def to_prereq_tuple(self) -> Tuple[str, str, str]:
"""Return (cycle, task, selector) as used for task prerequisites."""
return (
self['cycle'],
self['task'],
self['task_sel'] or TASK_OUTPUT_SUCCEEDED
)

def duplicate(
self,
*tokens_list,
Expand Down
Loading
Loading