Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent malformed --pre options taking the scheduler down #46

Merged
merged 5 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1691,18 +1691,25 @@ def _standardise_prereqs(
msg = self.config.get_taskdef(
pre['task']
).outputs[output][0]
cycle = standardise_point_string(pre['cycle'])
except KeyError:
# The task does not have this output.
LOG.warning(
f"output {pre.relative_id_with_selectors} not found")
continue
except WorkflowConfigError as exc:
# The workflow does not have the task from --pre:
LOG.warning(f'Invalid pre task name set:\n {exc.args[0]}')
except PointParsingError as exc:
# The CP from --pre is invalid:
LOG.warning(f'Invalid pre cycle point set:\n {exc.args[0]}')
else:
_prereqs[
pre.duplicate(
task_sel=msg,
cycle=standardise_point_string(pre['cycle'])
)
] = prereq

return _prereqs

def _standardise_outputs(
Expand Down
57 changes: 50 additions & 7 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.id import Tokens
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED
)

Expand All @@ -50,7 +48,6 @@
TASK_STATUS_FAILED,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUSES_ALL,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1345,7 +1342,10 @@ async def test_set_prereqs(

# it should start up with foo, bar, baz
assert (
pool_get_task_ids(schd.pool) == ["20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo"]
pool_get_task_ids(schd.pool) == [
"20400101T0000Z/bar",
"20400101T0000Z/baz",
"20400101T0000Z/foo"]
)

# try to set an invalid prereq of qux
Expand All @@ -1356,17 +1356,26 @@ async def test_set_prereqs(

# it should not add 20400101T0000Z/qux to the pool
assert (
pool_get_task_ids(schd.pool) == ["20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo"]
pool_get_task_ids(schd.pool) == [
"20400101T0000Z/bar",
"20400101T0000Z/baz",
"20400101T0000Z/foo"]
)

# set one prereq of future task 20400101T0000Z/qux
schd.pool.set_prereqs_and_outputs(
["20400101T0000Z/qux"], None, ["20400101T0000Z/foo:succeeded"], ['all'])
["20400101T0000Z/qux"],
None,
["20400101T0000Z/foo:succeeded"],
['all'])

# it should add 20400101T0000Z/qux to the pool
assert (
pool_get_task_ids(schd.pool) == [
"20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo", "20400101T0000Z/qux"
"20400101T0000Z/bar",
"20400101T0000Z/baz",
"20400101T0000Z/foo",
"20400101T0000Z/qux"
]
)

Expand All @@ -1383,6 +1392,40 @@ async def test_set_prereqs(
assert qux.state.prerequisites_all_satisfied()


async def test_set_bad_prereqs(
flow,
scheduler,
start,
log_filter,
):
"""Check manual setting of prerequisites.

"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': '2040',
'graph': {'R1': "foo => bar"}},
})
schd = scheduler(id_)

def set_prereqs(prereqs):
"""Shorthand so only varible under test given as arg"""
schd.pool.set_prereqs_and_outputs(
["2040/bar"], None, prereqs, ['all'])

async with start(schd) as log:
# Invalid - task name wildcard:
set_prereqs(["2040/*"])
assert 'Illegal task name' in log.messages[-1]

# Invalid cycle point wildcard.
set_prereqs(["*/foo"])
assert 'Invalid ISO 8601' in log.messages[-1]


async def test_set_outputs_live(
flow,
scheduler,
Expand Down
Loading