From 59c92b6135c9ad55458dd3e48ae733e4a6efb687 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Fri, 8 Mar 2024 00:51:53 +1300 Subject: [PATCH] Log outputs, not task messages, for unmatched prerequisites --- cylc/flow/task_pool.py | 32 +++++++++++++++++++++-------- cylc/flow/task_proxy.py | 23 ++++++++------------- tests/integration/test_task_pool.py | 2 +- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index b6b480486c1..693617df7aa 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1672,9 +1672,15 @@ def _get_task_proxy_db_outputs( itask.state.outputs.set_completed_by_msg(msg) return itask - def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]': - """Convert prerequisites to task output messages.""" - _prereqs = [] + def _standardise_prereqs( + self, prereqs: 'List[str]' + ) -> 'Dict[Tokens, str]': + """Convert prerequisites to a map of task messages: outputs. + + (So satsify_me logs failures) + + """ + _prereqs = {} for prereq in prereqs: pre = Tokens(prereq, relative=True) # add implicit "succeeded"; convert "succeed" to "succeeded" etc. @@ -1690,12 +1696,13 @@ def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]': LOG.warning( f"output {pre.relative_id_with_selectors} not found") continue - _prereqs.append( + _prereqs[ pre.duplicate( task_sel=msg, cycle=standardise_point_string(pre['cycle']) ) - ) + ] = prereq + return _prereqs def _standardise_outputs( @@ -1839,11 +1846,18 @@ def _set_prereqs_itask( if prereqs == ["all"]: itask.state.set_all_satisfied() else: - if not itask.satisfy_me( - self._standardise_prereqs(prereqs) - ): + # Attempt to set the given presrequisites. + # Log any that aren't valid for the task. + presus = self._standardise_prereqs(prereqs) + unmatched = itask.satisfy_me(list(presus.keys())) + for task_msg in unmatched: + LOG.warning( + f"{itask.identity} does not depend on" + f' "{presus[task_msg]}"' + ) + if len(unmatched) == len(prereqs): + # No prereqs matched. return False - if ( self.runahead_limit_point is not None and itask.point <= self.runahead_limit_point diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 08fa94260b2..6afb6a2087e 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -26,7 +26,6 @@ Counter as TypingCounter, Dict, List, - Iterable, Optional, Set, TYPE_CHECKING, @@ -536,23 +535,19 @@ def state_reset( return False - def satisfy_me(self, outputs: 'Iterable[Tokens]') -> bool: - """Try to satisfy my prerequisites with given outputs. + def satisfy_me( + self, task_messages: 'List[Tokens]' + ) -> 'Set[Tokens]': + """Try to satisfy my prerequisites with given output messages. - The output strings are of the form "cycle/task:message" - Log a warning for outputs that I don't depend on. + The task output messages are of the form "cycle/task:message" + Log a warning for messages that I don't depend on. - Return True if any match, else False. + Return a set of unmatched task messages. """ - used = self.state.satisfy_me(outputs) - for output in set(outputs) - used: - # Note this logs the task message not the output. - LOG.warning( - f"{self.identity} does not depend on" - f' "{output.relative_id_with_selectors}"' - ) - return bool(used) + used = self.state.satisfy_me(task_messages) + return set(task_messages) - used def clock_expire(self) -> bool: """Return True if clock expire time is up, else False.""" diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 7e205580b98..13c696292a9 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1352,7 +1352,7 @@ async def test_set_prereqs( schd.pool.set_prereqs_and_outputs( ["20400101T0000Z/qux"], None, ["20400101T0000Z/foo:a"], ['all']) assert log_filter( - log, contains='20400101T0000Z/qux does not depend on "20400101T0000Z/foo:drugs and money"') + log, contains='20400101T0000Z/qux does not depend on "20400101T0000Z/foo:a"') # it should not add 20400101T0000Z/qux to the pool assert (