Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more tests #54

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ def validate(args: Dict[str, Any]):
must be a valid path

"""
try:
tokens = tokenise(args["workflow_task_id"])
except KeyError:
raise WorkflowConfigError(
# TODO better message
"Full ID needed: workflow//cycle/task[:selector].")
tokens = tokenise(args["workflow_task_id"])

if any(
tokens[token] is None
Expand Down
120 changes: 63 additions & 57 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,12 @@ def _submit_task_jobs(*args, **kwargs):
return _reflog


@pytest.fixture
def complete():
async def _complete(
schd,
*tokens_list: Union[Tokens, str],
stop_mode=StopMode.AUTO,
timeout: int = 60,
) -> None:
"""Wait for the workflow, or tasks within it to complete.

Args:
Expand All @@ -584,65 +588,67 @@ def complete():
async_timeout (handles shutdown logic more cleanly).

"""
async def _complete(
schd,
*tokens_list: Union[Tokens, str],
stop_mode=StopMode.AUTO,
timeout: int = 60,
) -> None:
start_time = time()

_tokens_list: List[Tokens] = []
for tokens in tokens_list:
if isinstance(tokens, str):
tokens = Tokens(tokens, relative=True)
_tokens_list.append(tokens.task)

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask, output=None):
nonlocal _tokens_list
ret = remove_if_complete(itask)
if ret and itask.tokens.task in _tokens_list:
_tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if _tokens_list:
condition = lambda: bool(_tokens_list)
start_time = time()

_tokens_list: List[Tokens] = []
for tokens in tokens_list:
if isinstance(tokens, str):
tokens = Tokens(tokens, relative=True)
_tokens_list.append(tokens.task)

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask, output=None):
nonlocal _tokens_list
ret = remove_if_complete(itask)
if ret and itask.tokens.task in _tokens_list:
_tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
condition = lambda: bool(not has_shutdown)
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if _tokens_list:
condition = lambda: bool(_tokens_list)
else:
condition = lambda: bool(not has_shutdown)

# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if (time() - start_time) > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, _tokens_list))}'
)

# restore regular shutdown logic
schd._set_stop = set_stop


# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if (time() - start_time) > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, _tokens_list))}'
)
@pytest.fixture
def complete():
return _complete

# restore regular shutdown logic
schd._set_stop = set_stop

@pytest.fixture(scope='module')
def mod_complete():
return _complete


Expand Down
139 changes: 139 additions & 0 deletions tests/integration/test_dbstatecheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Tests for the backend method of workflow_state"""


from asyncio import sleep
import pytest
from textwrap import dedent
from typing import TYPE_CHECKING

from cylc.flow.dbstatecheck import CylcWorkflowDBChecker as Checker


if TYPE_CHECKING:
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker


@pytest.fixture(scope='module')
async def checker(
mod_flow, mod_scheduler, mod_run, mod_complete
) -> 'CylcWorkflowDBChecker':
"""Make a real world database.

We could just write the database manually but this is a better
test of the overall working of the function under test.
"""
wid = mod_flow({
'scheduling': {
'graph': {'P1Y': dedent('''
good:succeeded
bad:failed?
output:custom_output
''')},
'initial cycle point': '1000',
'final cycle point': '1001'
},
'runtime': {
'bad': {'simulation': {'fail cycle points': '1000'}},
'output': {'outputs': {'trigger': 'message'}}
}
})
schd = mod_scheduler(wid, paused_start=False)
async with mod_run(schd):
await mod_complete(schd)
schd.pool.force_trigger_tasks(['1000/good'], [2])
# Allow a cycle of the main loop to pass so that flow 2 can be
# added to db
await sleep(1)
yield Checker(
'somestring', 'utterbunkum',
schd.workflow_db_mgr.pub_path
)


def test_basic(checker):
"""Pass no args, get unfiltered output"""
result = checker.workflow_state_query()
expect = [
['bad', '10000101T0000Z', 'failed'],
['bad', '10010101T0000Z', 'succeeded'],
['good', '10000101T0000Z', 'succeeded'],
['good', '10010101T0000Z', 'succeeded'],
['output', '10000101T0000Z', 'succeeded'],
['output', '10010101T0000Z', 'succeeded'],
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
]
assert result == expect


def test_task(checker):
"""Filter by task name"""
result = checker.workflow_state_query(task='bad')
assert result == [
['bad', '10000101T0000Z', 'failed'],
['bad', '10010101T0000Z', 'succeeded']
]


def test_point(checker):
"""Filter by point"""
result = checker.workflow_state_query(cycle='10000101T0000Z')
assert result == [
['bad', '10000101T0000Z', 'failed'],
['good', '10000101T0000Z', 'succeeded'],
['output', '10000101T0000Z', 'succeeded'],
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
]


def test_status(checker):
"""Filter by status"""
result = checker.workflow_state_query(selector='failed')
expect = [
['bad', '10000101T0000Z', 'failed'],
]
assert result == expect


def test_output(checker):
"""Filter by flow number"""
result = checker.workflow_state_query(selector='message', is_message=True)
expect = [
[
'output',
'10000101T0000Z',
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
"'succeeded', 'trigger': 'message'}",
],
[
'output',
'10010101T0000Z',
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
"'succeeded', 'trigger': 'message'}",
],
]
assert result == expect


def test_flownum(checker):
"""Pass no args, get unfiltered output"""
result = checker.workflow_state_query(flow_num=2)
expect = [
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
]
assert result == expect
43 changes: 43 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,46 @@ def mytrig(*args, **kwargs):

# check the DB to ensure no additional entries have been created
assert db_select(schd, True, 'xtriggers') == db_xtriggers


async def test_error_in_xtrigger(flow, start, scheduler):
"""Failure in an xtrigger is handled nicely.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'xtriggers': {
'mytrig': 'mytrig()'
},
'graph': {
'R1': '@mytrig => foo'
},
}
})

# add a custom xtrigger to the workflow
run_dir = Path(get_workflow_run_dir(id_))
xtrig_dir = run_dir / 'lib/python'
xtrig_dir.mkdir(parents=True)
(xtrig_dir / 'mytrig.py').write_text(dedent('''
def mytrig(*args, **kwargs):
raise Exception('This Xtrigger is broken')
'''))

schd = scheduler(id_)
async with start(schd) as log:
foo = schd.pool.get_tasks()[0]
schd.xtrigger_mgr.call_xtriggers_async(foo)
for _ in range(50):
await asyncio.sleep(0.1)
schd.proc_pool.process()
if len(schd.proc_pool.runnings) == 0:
break
else:
raise Exception('Process pool did not clear')

error = log.messages[-1].split('\n')
assert error[-2] == 'Exception: This Xtrigger is broken'
assert error[0] == 'ERROR in xtrigger mytrig()'
Loading