diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 2cfaa166fdd..518ef40f018 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -558,8 +558,12 @@ def _submit_task_jobs(*args, **kwargs): return _reflog -@pytest.fixture -def complete(): +async def _complete( + schd, + *tokens_list: Union[Tokens, str], + stop_mode=StopMode.AUTO, + timeout: int = 60, +) -> None: """Wait for the workflow, or tasks within it to complete. Args: @@ -584,65 +588,67 @@ def complete(): async_timeout (handles shutdown logic more cleanly). """ - async def _complete( - schd, - *tokens_list: Union[Tokens, str], - stop_mode=StopMode.AUTO, - timeout: int = 60, - ) -> None: - start_time = time() - - _tokens_list: List[Tokens] = [] - for tokens in tokens_list: - if isinstance(tokens, str): - tokens = Tokens(tokens, relative=True) - _tokens_list.append(tokens.task) - - # capture task completion - remove_if_complete = schd.pool.remove_if_complete - - def _remove_if_complete(itask, output=None): - nonlocal _tokens_list - ret = remove_if_complete(itask) - if ret and itask.tokens.task in _tokens_list: - _tokens_list.remove(itask.tokens.task) - return ret - - schd.pool.remove_if_complete = _remove_if_complete - - # capture workflow shutdown - set_stop = schd._set_stop - has_shutdown = False - - def _set_stop(mode=None): - nonlocal has_shutdown, stop_mode - if mode == stop_mode: - has_shutdown = True - return set_stop(mode) - else: - set_stop(mode) - raise Exception(f'Workflow bailed with stop mode = {mode}') - - schd._set_stop = _set_stop - - # determine the completion condition - if _tokens_list: - condition = lambda: bool(_tokens_list) + start_time = time() + + _tokens_list: List[Tokens] = [] + for tokens in tokens_list: + if isinstance(tokens, str): + tokens = Tokens(tokens, relative=True) + _tokens_list.append(tokens.task) + + # capture task completion + remove_if_complete = schd.pool.remove_if_complete + + def _remove_if_complete(itask, output=None): + nonlocal _tokens_list + ret = remove_if_complete(itask) + if ret and itask.tokens.task in _tokens_list: + _tokens_list.remove(itask.tokens.task) + return ret + + schd.pool.remove_if_complete = _remove_if_complete + + # capture workflow shutdown + set_stop = schd._set_stop + has_shutdown = False + + def _set_stop(mode=None): + nonlocal has_shutdown, stop_mode + if mode == stop_mode: + has_shutdown = True + return set_stop(mode) else: - condition = lambda: bool(not has_shutdown) + set_stop(mode) + raise Exception(f'Workflow bailed with stop mode = {mode}') + + schd._set_stop = _set_stop + + # determine the completion condition + if _tokens_list: + condition = lambda: bool(_tokens_list) + else: + condition = lambda: bool(not has_shutdown) + + # wait for the condition to be met + while condition(): + # allow the main loop to advance + await asyncio.sleep(0) + if (time() - start_time) > timeout: + raise Exception( + f'Timeout waiting for {", ".join(map(str, _tokens_list))}' + ) + + # restore regular shutdown logic + schd._set_stop = set_stop + - # wait for the condition to be met - while condition(): - # allow the main loop to advance - await asyncio.sleep(0) - if (time() - start_time) > timeout: - raise Exception( - f'Timeout waiting for {", ".join(map(str, _tokens_list))}' - ) +@pytest.fixture +def complete(): + return _complete - # restore regular shutdown logic - schd._set_stop = set_stop +@pytest.fixture(scope='module') +def mod_complete(): return _complete diff --git a/tests/integration/test_dbstatecheck.py b/tests/integration/test_dbstatecheck.py new file mode 100644 index 00000000000..a6da4348ffb --- /dev/null +++ b/tests/integration/test_dbstatecheck.py @@ -0,0 +1,139 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Tests for the backend method of workflow_state""" + + +from asyncio import sleep +import pytest +from textwrap import dedent +from typing import TYPE_CHECKING + +from cylc.flow.dbstatecheck import CylcWorkflowDBChecker as Checker + + +if TYPE_CHECKING: + from cylc.flow.dbstatecheck import CylcWorkflowDBChecker + + +@pytest.fixture(scope='module') +async def checker( + mod_flow, mod_scheduler, mod_run, mod_complete +) -> 'CylcWorkflowDBChecker': + """Make a real world database. + + We could just write the database manually but this is a better + test of the overall working of the function under test. + """ + wid = mod_flow({ + 'scheduling': { + 'graph': {'P1Y': dedent(''' + good:succeeded + bad:failed? + output:custom_output + ''')}, + 'initial cycle point': '1000', + 'final cycle point': '1001' + }, + 'runtime': { + 'bad': {'simulation': {'fail cycle points': '1000'}}, + 'output': {'outputs': {'trigger': 'message'}} + } + }) + schd = mod_scheduler(wid, paused_start=False) + async with mod_run(schd): + await mod_complete(schd) + schd.pool.force_trigger_tasks(['1000/good'], [2]) + # Allow a cycle of the main loop to pass so that flow 2 can be + # added to db + await sleep(1) + yield Checker( + 'somestring', 'utterbunkum', + schd.workflow_db_mgr.pub_path + ) + + +def test_basic(checker): + """Pass no args, get unfiltered output""" + result = checker.workflow_state_query() + expect = [ + ['bad', '10000101T0000Z', 'failed'], + ['bad', '10010101T0000Z', 'succeeded'], + ['good', '10000101T0000Z', 'succeeded'], + ['good', '10010101T0000Z', 'succeeded'], + ['output', '10000101T0000Z', 'succeeded'], + ['output', '10010101T0000Z', 'succeeded'], + ['good', '10000101T0000Z', 'waiting', '(flows=2)'], + ] + assert result == expect + + +def test_task(checker): + """Filter by task name""" + result = checker.workflow_state_query(task='bad') + assert result == [ + ['bad', '10000101T0000Z', 'failed'], + ['bad', '10010101T0000Z', 'succeeded'] + ] + + +def test_point(checker): + """Filter by point""" + result = checker.workflow_state_query(cycle='10000101T0000Z') + assert result == [ + ['bad', '10000101T0000Z', 'failed'], + ['good', '10000101T0000Z', 'succeeded'], + ['output', '10000101T0000Z', 'succeeded'], + ['good', '10000101T0000Z', 'waiting', '(flows=2)'], + ] + + +def test_status(checker): + """Filter by status""" + result = checker.workflow_state_query(selector='failed') + expect = [ + ['bad', '10000101T0000Z', 'failed'], + ] + assert result == expect + + +def test_output(checker): + """Filter by flow number""" + result = checker.workflow_state_query(selector='message', is_message=True) + expect = [ + [ + 'output', + '10000101T0000Z', + "{'submitted': 'submitted', 'started': 'started', 'succeeded': " + "'succeeded', 'trigger': 'message'}", + ], + [ + 'output', + '10010101T0000Z', + "{'submitted': 'submitted', 'started': 'started', 'succeeded': " + "'succeeded', 'trigger': 'message'}", + ], + ] + assert result == expect + + +def test_flownum(checker): + """Pass no args, get unfiltered output""" + result = checker.workflow_state_query(flow_num=2) + expect = [ + ['good', '10000101T0000Z', 'waiting', '(flows=2)'], + ] + assert result == expect diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index 612049163cc..3bf425650c4 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -188,3 +188,46 @@ def mytrig(*args, **kwargs): # check the DB to ensure no additional entries have been created assert db_select(schd, True, 'xtriggers') == db_xtriggers + + +async def test_error_in_xtrigger(flow, start, scheduler): + """Failure in an xtrigger is handled nicely. + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'xtriggers': { + 'mytrig': 'mytrig()' + }, + 'graph': { + 'R1': '@mytrig => foo' + }, + } + }) + + # add a custom xtrigger to the workflow + run_dir = Path(get_workflow_run_dir(id_)) + xtrig_dir = run_dir / 'lib/python' + xtrig_dir.mkdir(parents=True) + (xtrig_dir / 'mytrig.py').write_text(dedent(''' + def mytrig(*args, **kwargs): + raise Exception('This Xtrigger is broken') + ''')) + + schd = scheduler(id_) + async with start(schd) as log: + foo = schd.pool.get_tasks()[0] + schd.xtrigger_mgr.call_xtriggers_async(foo) + for _ in range(50): + await asyncio.sleep(0.1) + schd.proc_pool.process() + if len(schd.proc_pool.runnings) == 0: + break + else: + raise Exception('Process pool did not clear') + + error = log.messages[-1].split('\n') + assert error[-2] == 'Exception: This Xtrigger is broken' + assert error[0] == 'ERROR in xtrigger mytrig()'