Skip to content

Commit

Permalink
Merge pull request #5943 from oliver-sanders/5939
Browse files Browse the repository at this point in the history
stop after cycle point: support offsets
  • Loading branch information
wxtim authored Feb 9, 2024
2 parents 7bf0b7a + 2887819 commit 14711c9
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 57 deletions.
1 change: 1 addition & 0 deletions changes.d/5943.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The `stop after cycle point` can now be specified as an offset from the inital cycle point.
21 changes: 19 additions & 2 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,20 @@ def get_script_common_text(this: str, example: Optional[str] = None):
''')
# NOTE: final cycle point is not a V_CYCLE_POINT to allow expressions
# such as '+P1Y' (relative to initial cycle point)
Conf('final cycle point', VDR.V_STRING, desc='''
Conf('final cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc='''
The (optional) last cycle point at which tasks are run.
Once all tasks have reached this cycle point, the
workflow will shut down.
This item can be overridden on the command line using
``cylc play --final-cycle-point`` or ``--fcp``.
Examples:
- ``2000`` - Shorthand for ``2000-01-01T00:00``.
- ``+P1D`` - The initial cycle point plus one day.
- ``2000 +P1D +P1Y`` - The year ``2000`` plus one day and one year.
''')
Conf('initial cycle point constraints', VDR.V_STRING_LIST, desc='''
Rules to allow only some initial datetime cycle points.
Expand Down Expand Up @@ -599,7 +605,7 @@ def get_script_common_text(this: str, example: Optional[str] = None):
{REPLACES}``[scheduling]hold after point``.
''')
Conf('stop after cycle point', VDR.V_CYCLE_POINT, desc='''
Conf('stop after cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc='''
Shut down the workflow after all tasks pass this cycle point.
The stop cycle point can be overridden on the command line using
Expand All @@ -612,7 +618,18 @@ def get_script_common_text(this: str, example: Optional[str] = None):
choosing not to run that part of the graph. You can play
the workflow and continue.
Examples:
- ``2000`` - Shorthand for ``2000-01-01T00:00``.
- ``+P1D`` - The initial cycle point plus one day.
- ``2000 +P1D +P1Y`` - The year ``2000`` plus one day and one year.
.. versionadded:: 8.0.0
.. versionchanged:: 8.3.0
This now supports offsets (e.g. ``+P1D``) in the same way the
:cylc:conf:`[..]final cycle point` does.
''')
Conf('cycling mode', VDR.V_STRING, Calendar.MODE_GREGORIAN,
options=list(Calendar.MODES) + ['integer'], desc='''
Expand Down
13 changes: 10 additions & 3 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,9 +851,16 @@ def process_stop_cycle_point(self) -> None:
if stopcp_str is None:
stopcp_str = self.cfg['scheduling']['stop after cycle point']

if stopcp_str is not None:
self.stop_point = get_point(stopcp_str).standardise()
if self.final_point and (self.stop_point > self.final_point):
if stopcp_str:
self.stop_point = get_point_relative(
stopcp_str,
self.initial_point,
).standardise()
if (
self.final_point is not None
and self.stop_point is not None
and self.stop_point > self.final_point
):
LOG.warning(
f"Stop cycle point '{self.stop_point}' will have no "
"effect as it is after the final cycle "
Expand Down
24 changes: 24 additions & 0 deletions cylc/flow/parsec/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ class CylcConfigValidator(ParsecValidator):
V_CYCLE_POINT = 'V_CYCLE_POINT'
V_CYCLE_POINT_FORMAT = 'V_CYCLE_POINT_FORMAT'
V_CYCLE_POINT_TIME_ZONE = 'V_CYCLE_POINT_TIME_ZONE'
V_CYCLE_POINT_WITH_OFFSETS = 'V_CYCLE_POINT_WITH_OFFSETS'
V_INTERVAL = 'V_INTERVAL'
V_INTERVAL_LIST = 'V_INTERVAL_LIST'
V_PARAMETER_LIST = 'V_PARAMETER_LIST'
Expand Down Expand Up @@ -704,6 +705,26 @@ class CylcConfigValidator(ParsecValidator):
'-0830': 'UTC minus 8 hours and 30 minutes.'
}
),
V_CYCLE_POINT_WITH_OFFSETS: (
'cycle point with support for offsets',
'An integer or date-time cycle point, with optional offset(s).',
{
'1': 'An integer cycle point.',
'1 +P5': (
'An integer cycle point with an offset'
' (this evaluates as ``6``).'
),
'+P5': (
'An integer cycle point offset.'
' This offset is added to the initial cycle point'
),
'2000-01-01T00:00Z': 'A date-time cycle point.',
'2000-02-29T00:00Z +P1D +P1M': (
'A date-time cycle point with offsets'
' (this evaluates as ``2000-04-01T00:00Z``).'
),
}
),
V_INTERVAL: (
'time interval',
'An ISO8601 duration.',
Expand Down Expand Up @@ -756,6 +777,9 @@ def __init__(self):
self.V_CYCLE_POINT: self.coerce_cycle_point,
self.V_CYCLE_POINT_FORMAT: self.coerce_cycle_point_format,
self.V_CYCLE_POINT_TIME_ZONE: self.coerce_cycle_point_time_zone,
# NOTE: This type exists for documentation reasons
# it doesn't actually process offsets, that happens later
self.V_CYCLE_POINT_WITH_OFFSETS: self.coerce_str,
self.V_INTERVAL: self.coerce_interval,
self.V_INTERVAL_LIST: self.coerce_interval_list,
self.V_PARAMETER_LIST: self.coerce_parameter_list,
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ def command_stop(
point = TaskID.get_standardised_point(cycle_point)
if point is not None and self.pool.set_stop_point(point):
self.options.stopcp = str(point)
self.config.stop_point = point
self.workflow_db_mgr.put_workflow_stop_cycle_point(
self.options.stopcp)
elif clock_time is not None:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ def set_stop_point(self, stop_point: 'PointBase') -> bool:
LOG.info(f"Stop point unchanged: {stop_point}")
return False

LOG.info("Setting stop point: {stop_point}")
LOG.info(f"Setting stop point: {stop_point}")
self.stop_point = stop_point

if (
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-play/09-invalid-cp-opt.t
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
run_fail "${TEST_NAME_BASE}-run" \
cylc play "${WORKFLOW_NAME}" --no-detach --stopcp='potato'

grep_ok "ERROR - Workflow shutting down .* potato" "${TEST_NAME_BASE}-run.stderr"
grep_ok "ERROR - Workflow shutting down .*potato" "${TEST_NAME_BASE}-run.stderr"

# Check that we haven't got a database
exists_ok "${WORKFLOW_RUN_DIR}/.service"
Expand Down
132 changes: 132 additions & 0 deletions tests/integration/test_stop_after_cycle_point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Test logic pertaining to the stop after cycle points.
This may be defined in different ways:
* In the workflow configuration.
* On the command line.
* Or loaded from the database.
When the workflow hits the "stop after" point, it should be wiped (i.e. set
to None).
"""

from typing import Optional

from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.id import Tokens
from cylc.flow.workflow_status import StopMode


async def test_stop_after_cycle_point(
flow,
scheduler,
run,
reflog,
complete,
):
"""Test the stop after cycle point.
This ensures:
* The stop after point gets loaded from the config.
* The workflow stops when it hits this point.
* The point gets wiped when the workflow hits this point.
* The point is stored/retrieved from the DB as appropriate.
"""
async def stops_after_cycle(schd) -> Optional[str]:
"""Run the workflow until it stops and return the cycle point."""
triggers = reflog(schd)
await complete(schd, timeout=2)
assert len(triggers) == 1 # only one task (i.e. cycle) should be run
return Tokens(list(triggers)[0][0], relative=True)['cycle']

def get_db_value(schd) -> Optional[str]:
"""Return the cycle point value stored in the DB."""
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
return dict(pri_dao.select_workflow_params())['stopcp']

config = {
'scheduling': {
'cycling mode': 'integer',
'initial cycle point': '1',
'stop after cycle point': '1',
'graph': {
'P1': 'a[-P1] => a',
},
},
}
id_ = flow(config)
schd = scheduler(id_, paused_start=False)
async with run(schd):
# the cycle point should be loaded from the workflow configuration
assert schd.config.stop_point == IntegerPoint('1')

# this value should *not* be written to the database
assert get_db_value(schd) is None

# the workflow should stop after cycle 1
assert await stops_after_cycle(schd) == '1'

# change the configured cycle point to "2"
config['scheduling']['stop after cycle point'] = '2'
id_ = flow(config, id_=id_)
schd = scheduler(id_, paused_start=False)
async with run(schd):
# the cycle point should be reloaded from the workflow configuration
assert schd.config.stop_point == IntegerPoint('2')

# this value should not be written to the database
assert get_db_value(schd) is None

# the workflow should stop after cycle 2
assert await stops_after_cycle(schd) == '2'

# override the configured value via the CLI option
schd = scheduler(id_, paused_start=False, **{'stopcp': '3'})
async with run(schd):
# the CLI should take precedence over the config
assert schd.config.stop_point == IntegerPoint('3')

# this value *should* be written to the database
assert get_db_value(schd) == '3'

# the workflow should stop after cycle 3
assert await stops_after_cycle(schd) == '3'

# once the workflow hits this point, it should get cleared
assert get_db_value(schd) is None

schd = scheduler(id_, paused_start=False)
async with run(schd):
# the workflow should fall back to the configured value
assert schd.config.stop_point == IntegerPoint('2')

# override this value whilst the workflow is running
schd.command_stop(
cycle_point=IntegerPoint('4'),
mode=StopMode.REQUEST_CLEAN,
)
assert schd.config.stop_point == IntegerPoint('4')

# the new *should* be written to the database
assert get_db_value(schd) == '4'

schd = scheduler(id_, paused_start=False)
async with run(schd):
# the workflow should stop after cycle 4
assert await stops_after_cycle(schd) == '4'
Loading

0 comments on commit 14711c9

Please sign in to comment.