Skip to content

Commit

Permalink
Response to reviews
Browse files Browse the repository at this point in the history
Update changes.d/6583.fix.md

Co-authored-by: Hilary James Oliver <[email protected]>

Update cylc/flow/config.py

Co-authored-by: Hilary James Oliver <[email protected]>

respond to Hilary's review: Don't check all outputs only terminals.

use regexs to sort out issues with qualifiers, suicide triggers &c

mypy
  • Loading branch information
wxtim committed Feb 5, 2025
1 parent 5cbd399 commit bdadd4b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 23 deletions.
2 changes: 1 addition & 1 deletion changes.d/6583.fix.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Fix bug where graph items with undefined outputs were missed at validation if the graph item was not an upstream dependency of another graph item.
Fix bug where undefined outputs were missed by validation if no tasks trigger off of them.
39 changes: 20 additions & 19 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2266,17 +2266,10 @@ def load_graph(self):
self.workflow_polling_tasks.update(
parser.workflow_state_polling_tasks)
self._proc_triggers(parser, seq, task_triggers)
self.check_outputs(
[
task_output
for task_output in parser.task_output_opt
if task_output[0]
in [
task_output.split(':')[0]
for task_output in parser.terminals
]
]
)

# Checking for undefined outputs for terminal tasks. Tasks with
# dependencies are checked in generate_triggers:
self.check_outputs(parser.terminals)

self.set_required_outputs(task_output_opt)

Expand All @@ -2290,17 +2283,25 @@ def load_graph(self):
for tdef in self.taskdefs.values():
tdef.tweak_outputs()

def check_outputs(
self, tasks_and_outputs: Iterable[Tuple[str, str]]
) -> None:
def check_outputs(self, terminals: Iterable[str]) -> None:
"""Check that task outputs have been registered with tasks.
Args: tasks_and_outputs: ((task, output), ...)
Raises: WorkflowConfigError is a user has defined a task with a
custom output, but has not registered a custom output.
Raises: WorkflowConfigError if a custom output is not defined.
"""
for task, output in tasks_and_outputs:
terminal_outputs = []
for terminal in terminals:
if ':' in terminal:
task = re.findall(TaskID.NAME_RE, terminal)[0]
qualifier = re.findall(GraphParser._RE_QUAL, terminal)
if qualifier:
qualifier_str = qualifier[0].strip(':')
else:
qualifier_str = ''

Check warning on line 2299 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L2299

Added line #L2299 was not covered by tests
if qualifier_str in ALT_QUALIFIERS:
qualifier_str = ALT_QUALIFIERS[qualifier_str]
terminal_outputs.append((task, qualifier_str))

for task, output in terminal_outputs:
registered_outputs = self.cfg['runtime'][task]['outputs']
if (
not TaskOutputs.is_valid_std_name(output)
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/reftests/test_pre_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def test_drop(flow, scheduler, reftest):
}


async def test_over_bracketed(flow, scheduler, reftest):
async def test_over_bracketed(flow, scheduler, reftest, validate):
"""Test nested conditional simplification for pre-initial cycling."""
wid = flow({
'scheduling': {
Expand All @@ -108,6 +108,7 @@ async def test_over_bracketed(flow, scheduler, reftest):
},
},
})
validate(wid)
schd = scheduler(wid, paused_start=False)

assert await reftest(schd) == {
Expand Down
35 changes: 33 additions & 2 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
import sys
from optparse import Values
from typing import (
TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type)
import pytest
import logging
from textwrap import dedent
from types import SimpleNamespace
from contextlib import suppress

Expand All @@ -47,6 +47,10 @@

from cylc.flow.cycling.iso8601 import ISO8601Point


param = pytest.param


if TYPE_CHECKING:
from pathlib import Path
Fixture = Any
Expand Down Expand Up @@ -1703,7 +1707,6 @@ def test_cylc_env_at_parsing(

def test_force_workflow_compat_mode(tmp_path):
fpath = (tmp_path / 'flow.cylc')
from textwrap import dedent
fpath.write_text(dedent("""
[scheduler]
allow implicit tasks = true
Expand All @@ -1716,3 +1719,31 @@ def test_force_workflow_compat_mode(tmp_path):
WorkflowConfig('foo', str(fpath), {})
# It succeeds with compat mode:
WorkflowConfig('foo', str(fpath), {}, force_compat_mode=True)


@pytest.mark.parametrize(
'registered_outputs, tasks_and_outputs, fails',
(
param([], ['foo:x'], True, id='output-unregistered'),
param([], ['foo:x?'], True, id='optional-output-unregistered'),
param([], ['foo'], False, id='no-modifier-unregistered'),
param(['x'], ['foo:x'], False, id='output-registered'),
param([], ['foo:succeed'], False, id='alt-default-ok'),
param([], ['foo:failed'], False, id='default-ok'),
)
)
def test_check_outputs(tmp_path, registered_outputs, tasks_and_outputs, fails):
(tmp_path / 'flow.cylc').write_text(dedent("""
[scheduler]
allow implicit tasks = true
[scheduling]
[[graph]]
R1 = foo
"""))
cfg = WorkflowConfig('', tmp_path / 'flow.cylc', '')
cfg.cfg['runtime']['foo']['outputs'] = registered_outputs
if fails:
with pytest.raises(WorkflowConfigError, match='Undefined custom output'):
cfg.check_outputs(tasks_and_outputs)
else:
assert cfg.check_outputs(tasks_and_outputs) is None

0 comments on commit bdadd4b

Please sign in to comment.