Skip to content

Commit

Permalink
Merge pull request #46 from wxtim/cylc-set-task.fix.bad-pre
Browse files Browse the repository at this point in the history
Prevent malformed `--pre` options taking the scheduler down
  • Loading branch information
hjoliver authored Mar 10, 2024
2 parents 59c92b6 + e182f14 commit b8b2b10
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
16 changes: 9 additions & 7 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1691,18 +1691,20 @@ def _standardise_prereqs(
msg = self.config.get_taskdef(
pre['task']
).outputs[output][0]
cycle = standardise_point_string(pre['cycle'])
except KeyError:
# The task does not have this output.
LOG.warning(
f"output {pre.relative_id_with_selectors} not found")
continue
_prereqs[
pre.duplicate(
task_sel=msg,
cycle=standardise_point_string(pre['cycle'])
)
] = prereq

except WorkflowConfigError as exc:
LOG.warning(

Check warning on line 1701 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1701

Added line #L1701 was not covered by tests
f'Invalid prerequisite task name:\n{exc.args[0]}')
except PointParsingError as exc:
LOG.warning(

Check warning on line 1704 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1703-L1704

Added lines #L1703 - L1704 were not covered by tests
f'Invalid prerequisite cycle point:\n{exc.args[0]}')
else:
_prereqs[pre.duplicate(task_sel=msg, cycle=cycle)] = prereq
return _prereqs

def _standardise_outputs(
Expand Down
57 changes: 50 additions & 7 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.id import Tokens
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED
)

Expand All @@ -50,7 +48,6 @@
TASK_STATUS_FAILED,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUSES_ALL,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1345,7 +1342,10 @@ async def test_set_prereqs(

# it should start up with foo, bar, baz
assert (
pool_get_task_ids(schd.pool) == ["20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo"]
pool_get_task_ids(schd.pool) == [
"20400101T0000Z/bar",
"20400101T0000Z/baz",
"20400101T0000Z/foo"]
)

# try to set an invalid prereq of qux
Expand All @@ -1356,17 +1356,26 @@ async def test_set_prereqs(

# it should not add 20400101T0000Z/qux to the pool
assert (
pool_get_task_ids(schd.pool) == ["20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo"]
pool_get_task_ids(schd.pool) == [
"20400101T0000Z/bar",
"20400101T0000Z/baz",
"20400101T0000Z/foo"]
)

# set one prereq of future task 20400101T0000Z/qux
schd.pool.set_prereqs_and_outputs(
["20400101T0000Z/qux"], None, ["20400101T0000Z/foo:succeeded"], ['all'])
["20400101T0000Z/qux"],
None,
["20400101T0000Z/foo:succeeded"],
['all'])

# it should add 20400101T0000Z/qux to the pool
assert (
pool_get_task_ids(schd.pool) == [
"20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo", "20400101T0000Z/qux"
"20400101T0000Z/bar",
"20400101T0000Z/baz",
"20400101T0000Z/foo",
"20400101T0000Z/qux"
]
)

Expand All @@ -1383,6 +1392,40 @@ async def test_set_prereqs(
assert qux.state.prerequisites_all_satisfied()


async def test_set_bad_prereqs(
flow,
scheduler,
start,
log_filter,
):
"""Check manual setting of prerequisites.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': '2040',
'graph': {'R1': "foo => bar"}},
})
schd = scheduler(id_)

def set_prereqs(prereqs):
"""Shorthand so only varible under test given as arg"""
schd.pool.set_prereqs_and_outputs(
["2040/bar"], None, prereqs, ['all'])

async with start(schd) as log:
# Invalid: task name wildcard:
set_prereqs(["2040/*"])
assert log_filter(log, contains='Invalid prerequisite task name' )

# Invalid: cycle point wildcard.
set_prereqs(["*/foo"])
assert log_filter(log, contains='Invalid prerequisite cycle point')


async def test_set_outputs_live(
flow,
scheduler,
Expand Down

0 comments on commit b8b2b10

Please sign in to comment.