diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 693617df7aa..dbe60fb7292 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1691,18 +1691,20 @@ def _standardise_prereqs( msg = self.config.get_taskdef( pre['task'] ).outputs[output][0] + cycle = standardise_point_string(pre['cycle']) except KeyError: # The task does not have this output. LOG.warning( f"output {pre.relative_id_with_selectors} not found") continue - _prereqs[ - pre.duplicate( - task_sel=msg, - cycle=standardise_point_string(pre['cycle']) - ) - ] = prereq - + except WorkflowConfigError as exc: + LOG.warning( + f'Invalid prerequisite task name:\n{exc.args[0]}') + except PointParsingError as exc: + LOG.warning( + f'Invalid prerequisite cycle point:\n{exc.args[0]}') + else: + _prereqs[pre.duplicate(task_sel=msg, cycle=cycle)] = prereq return _prereqs def _standardise_outputs( diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 13c696292a9..be1f335d45b 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -33,10 +33,8 @@ from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Point from cylc.flow.data_store_mgr import TASK_PROXIES -from cylc.flow.id import Tokens from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_outputs import ( - TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED ) @@ -50,7 +48,6 @@ TASK_STATUS_FAILED, TASK_STATUS_EXPIRED, TASK_STATUS_SUBMIT_FAILED, - TASK_STATUSES_ALL, ) if TYPE_CHECKING: @@ -1345,7 +1342,10 @@ async def test_set_prereqs( # it should start up with foo, bar, baz assert ( - pool_get_task_ids(schd.pool) == ["20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo"] + pool_get_task_ids(schd.pool) == [ + "20400101T0000Z/bar", + "20400101T0000Z/baz", + "20400101T0000Z/foo"] ) # try to set an invalid prereq of qux @@ -1356,17 +1356,26 @@ async def test_set_prereqs( # it should not add 20400101T0000Z/qux to the pool assert ( - pool_get_task_ids(schd.pool) == ["20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo"] + pool_get_task_ids(schd.pool) == [ + "20400101T0000Z/bar", + "20400101T0000Z/baz", + "20400101T0000Z/foo"] ) # set one prereq of future task 20400101T0000Z/qux schd.pool.set_prereqs_and_outputs( - ["20400101T0000Z/qux"], None, ["20400101T0000Z/foo:succeeded"], ['all']) + ["20400101T0000Z/qux"], + None, + ["20400101T0000Z/foo:succeeded"], + ['all']) # it should add 20400101T0000Z/qux to the pool assert ( pool_get_task_ids(schd.pool) == [ - "20400101T0000Z/bar", "20400101T0000Z/baz", "20400101T0000Z/foo", "20400101T0000Z/qux" + "20400101T0000Z/bar", + "20400101T0000Z/baz", + "20400101T0000Z/foo", + "20400101T0000Z/qux" ] ) @@ -1383,6 +1392,40 @@ async def test_set_prereqs( assert qux.state.prerequisites_all_satisfied() +async def test_set_bad_prereqs( + flow, + scheduler, + start, + log_filter, +): + """Check manual setting of prerequisites. + + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': '%Y'}, + 'scheduling': { + 'initial cycle point': '2040', + 'graph': {'R1': "foo => bar"}}, + }) + schd = scheduler(id_) + + def set_prereqs(prereqs): + """Shorthand so only varible under test given as arg""" + schd.pool.set_prereqs_and_outputs( + ["2040/bar"], None, prereqs, ['all']) + + async with start(schd) as log: + # Invalid: task name wildcard: + set_prereqs(["2040/*"]) + assert log_filter(log, contains='Invalid prerequisite task name' ) + + # Invalid: cycle point wildcard. + set_prereqs(["*/foo"]) + assert log_filter(log, contains='Invalid prerequisite cycle point') + + async def test_set_outputs_live( flow, scheduler,