Skip to content

Commit

Permalink
workflow_state: reject invalid configurations
Browse files Browse the repository at this point in the history
* Do not allow users to poll for transient statuses.
* Reject invalid task states.
* Reject polling waiting and preparing tasks (not reliably pollable).
* Closes #6157
  • Loading branch information
oliver-sanders committed Jun 27, 2024
1 parent 22cd58b commit 13f7549
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 24 deletions.
1 change: 1 addition & 0 deletions changes.d/fix.6175.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The workflow-state command and xtrigger will now reject invalid polling arguments.
28 changes: 28 additions & 0 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
TASK_OUTPUT_FAILED,
TASK_OUTPUT_FINISHED,
)
from cylc.flow.task_state import (
TASK_STATE_MAP,
TASK_STATUSES_FINAL,
)
from cylc.flow.util import deserialise_set
from metomi.isodatetime.parsers import TimePointParser
from metomi.isodatetime.exceptions import ISO8601SyntaxError
Expand Down Expand Up @@ -244,6 +248,8 @@ def workflow_state_query(
stmt_args = []
stmt_wheres = []

check_polling_config(selector, is_trigger, is_message)

if is_trigger or is_message:
target_table = CylcWorkflowDAO.TABLE_TASK_OUTPUTS
mask = "name, cycle, outputs"
Expand Down Expand Up @@ -363,3 +369,25 @@ def _selector_in_outputs(selector: str, outputs: Iterable[str]) -> bool:
or TASK_OUTPUT_FAILED in outputs
)
)


def check_polling_config(selector, is_trigger, is_message):
"""Check for invalid or unreliable polling configurations."""
if selector and not (is_trigger or is_message):
# we are using task status polling
try:
trigger = TASK_STATE_MAP[selector]
except KeyError:
raise InputError(f'No such task state "{selector}"')
else:
if trigger is None:
raise InputError(
f'Cannot poll for the "{selector}" task state'
)

if selector not in TASK_STATUSES_FINAL:
raise InputError(
f'Polling for the "{selector}" task status is not'
' reliable as it is a transient state.'
f'\nPoll for the "{trigger}" trigger instead.'
)
39 changes: 25 additions & 14 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
so you can start checking before the target workflow is started.
Legacy (pre-8.3.0) options are supported, but deprecated, for existing scripts:
cylc workflow-state --task=NAME --point=CYCLE --status=STATUS
--output=MESSAGE --message=MESSAGE --task-point WORKFLOW
cylc workflow-state --task=NAME --point=CYCLE --status=STATUS
--output=MESSAGE --message=MESSAGE --task-point WORKFLOW
(Note from 8.0 until 8.3.0 --output and --message both match task messages).
In "cycle/task:selector" the selector will match task statuses, unless:
Expand All @@ -55,24 +55,23 @@
Flow numbers are only printed for flow numbers > 1.
USE IN TASK SCRIPTING:
Use in task scripting:
- To poll a task at the same cycle point in another workflow, just use
$CYLC_TASK_CYCLE_POINT in the ID.
- To poll a task at an offset cycle point, use the --offset option to
have Cylc do the datetime arithmetic for you.
- However, see also the workflow_state xtrigger for this use case.
WARNINGS:
- Typos in the workflow or task ID will result in fruitless polling.
- To avoid missing transient states ("submitted", "running") poll for the
corresponding output trigger instead ("submitted", "started").
- Cycle points are auto-converted to the DB point format (and UTC mode).
- Task outputs manually completed by "cylc set" have "(force-completed)"
recorded as the task message in the DB, so it is best to query trigger
names, not messages, unless specifically interested in forced outputs.
Warnings:
- Typos in the workflow or task ID will result in fruitless polling.
- To avoid missing transient states ("submitted", "running") poll for the
corresponding output trigger instead ("submitted", "started").
- Cycle points are auto-converted to the DB point format (and UTC mode).
- Task outputs manually completed by "cylc set" have "(force-completed)"
recorded as the task message in the DB, so it is best to query trigger
names, not messages, unless specifically interested in forced outputs.
Examples:
# Print the status of all tasks in WORKFLOW:
$ cylc workflow-state WORKFLOW
Expand Down Expand Up @@ -115,7 +114,11 @@
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import infer_latest_run_from_id
from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.task_state import (
TASK_STATUSES_ORDERED,
TASK_STATUSES_FINAL,
TASK_STATUSES_ALL,
)

if TYPE_CHECKING:
from optparse import Values
Expand Down Expand Up @@ -363,7 +366,6 @@ def get_option_parser() -> COP:

@cli_function(get_option_parser, remove_opts=["--db"])
def main(parser: COP, options: 'Values', *ids: str) -> None:

# Note it would be cleaner to use 'id_cli.parse_ids()' here to get the
# workflow ID and tokens, but that function infers run number and fails
# if the workflow is not installed yet. We want to be able to start polling
Expand Down Expand Up @@ -427,6 +429,15 @@ def main(parser: COP, options: 'Values', *ids: str) -> None:
msg += id_
else:
msg += id_.replace(options.depr_point, "$CYLC_TASK_CYCLE_POINT")

if (
options.depr_status
and options.depr_status in TASK_STATUSES_ALL
and options.depr_status not in TASK_STATUSES_FINAL
):
# polling for non-final task stauses is flaky
msg += 'and the --triggers option'

Check warning on line 439 in cylc/flow/scripts/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/workflow_state.py#L439

Added line #L439 was not covered by tests

LOG.warning(msg)

poller = WorkflowPoller(
Expand Down
28 changes: 20 additions & 8 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@

from typing import List, Iterable, Set, TYPE_CHECKING
from cylc.flow.prerequisite import Prerequisite
from cylc.flow.task_outputs import TaskOutputs
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_SUCCEEDED,
TaskOutputs,
)
from cylc.flow.wallclock import get_current_time_string

if TYPE_CHECKING:
Expand Down Expand Up @@ -144,13 +152,17 @@
TASK_STATUS_RUNNING,
}

# Task statuses that can be manually triggered.
TASK_STATUSES_TRIGGERABLE = {
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
# Mapping between task outputs and their corresponding states
TASK_STATE_MAP = {
# status: trigger
TASK_STATUS_WAITING: None,
TASK_STATUS_EXPIRED: TASK_OUTPUT_EXPIRED,
TASK_STATUS_PREPARING: None,
TASK_STATUS_SUBMIT_FAILED: TASK_OUTPUT_SUBMIT_FAILED,
TASK_STATUS_SUBMITTED: TASK_OUTPUT_SUBMITTED,
TASK_STATUS_RUNNING: TASK_OUTPUT_STARTED,
TASK_STATUS_FAILED: TASK_OUTPUT_FAILED,
TASK_STATUS_SUCCEEDED: TASK_OUTPUT_SUCCEEDED,
}


Expand Down
17 changes: 15 additions & 2 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

from cylc.flow.scripts.workflow_state import WorkflowPoller
from cylc.flow.id import tokenise
from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.exceptions import WorkflowConfigError, InputError
from cylc.flow.task_state import TASK_STATUS_SUCCEEDED
from cylc.flow.dbstatecheck import check_polling_config


DEFAULT_STATUS = TASK_STATUS_SUCCEEDED


def workflow_state(
Expand Down Expand Up @@ -84,7 +88,7 @@ def workflow_state(
offset,
flow_num,
alt_cylc_run_dir,
TASK_STATUS_SUCCEEDED,
DEFAULT_STATUS,
is_trigger, is_message,
old_format=False,
condition=workflow_task_id,
Expand Down Expand Up @@ -151,6 +155,15 @@ def validate(args: Dict[str, Any]):
):
raise WorkflowConfigError("flow_num must be an integer if given.")

try:
check_polling_config(
tokens['cycle_sel'] or tokens['task_sel'] or DEFAULT_STATUS,
args['is_trigger'],
args['is_message'],
)
except InputError as exc:
raise WorkflowConfigError(str(exc)) from None


# BACK COMPAT: workflow_state_backcompat
# from: 8.0.0
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/test_dbstatecheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.dbstatecheck import check_polling_config
from cylc.flow.exceptions import InputError

import pytest


def test_check_polling_config():
"""It should reject invalid or unreliable polling configurations.
See https://github.com/cylc/cylc-flow/issues/6157
"""
# invalid polling use cases
with pytest.raises(InputError, match='No such task state'):
check_polling_config('elephant', False, False)

with pytest.raises(InputError, match='Cannot poll for'):
check_polling_config('waiting', False, False)

with pytest.raises(InputError, match='is not reliable'):
check_polling_config('running', False, False)

# valid polling use cases
check_polling_config('started', True, False)
check_polling_config('started', False, True)

# valid query use cases
check_polling_config(None, False, True)
check_polling_config(None, False, False)
32 changes: 32 additions & 0 deletions tests/unit/xtriggers/test_workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ def test_validate_ok():
"""Validate returns ok with valid args."""
validate({
'workflow_task_id': 'foo//1/bar',
'is_trigger': False,
'is_message': False,
'offset': 'PT1H',
'flow_num': 44,
})
Expand Down Expand Up @@ -292,3 +294,33 @@ def test_validate_fail_non_int_flow(flow_num):
'offset': 'PT1H',
'flow_num': flow_num,
})


def test_validate_polling_config():
"""It should reject invalid or unreliable polling configurations.
See https://github.com/cylc/cylc-flow/issues/6157
"""
with pytest.raises(WorkflowConfigError, match='No such task state'):
validate({
'workflow_task_id': 'foo//1/bar:elephant',
'is_trigger': False,
'is_message': False,
'flow_num': 44,
})

with pytest.raises(WorkflowConfigError, match='Cannot poll for'):
validate({
'workflow_task_id': 'foo//1/bar:waiting',
'is_trigger': False,
'is_message': False,
'flow_num': 44,
})

with pytest.raises(WorkflowConfigError, match='is not reliable'):
validate({
'workflow_task_id': 'foo//1/bar:submitted',
'is_trigger': False,
'is_message': False,
'flow_num': 44,
})

0 comments on commit 13f7549

Please sign in to comment.