Skip to content

Commit

Permalink
Merge pull request cylc#5913 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
oliver-sanders authored Jan 12, 2024
2 parents a7cf51b + 22f41e2 commit c07392d
Show file tree
Hide file tree
Showing 22 changed files with 341 additions and 174 deletions.
29 changes: 29 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,35 @@ $ towncrier create <PR-number>.<break|feat|fix>.md --content "Short description"

<!-- towncrier release notes start -->

## __cylc-8.2.4 (Released 2024-01-11)__

### 🚀 Enhancements

[#5772](https://github.com/cylc/cylc-flow/pull/5772) - `cylc lint`: added a check for indentation being 4N spaces.

[#5838](https://github.com/cylc/cylc-flow/pull/5838) - `cylc lint`: added rule to check for `rose date` usage (should be replaced with `isodatetime`).

### 🔧 Fixes

[#5789](https://github.com/cylc/cylc-flow/pull/5789) - Prevent the run mode from being changed on restart.

[#5801](https://github.com/cylc/cylc-flow/pull/5801) - Fix traceback when using parentheses on right hand side of graph trigger.

[#5821](https://github.com/cylc/cylc-flow/pull/5821) - Fixed issue where large uncommitted changes could cause `cylc install` to hang.

[#5841](https://github.com/cylc/cylc-flow/pull/5841) - `cylc lint`: improved handling of S011 to not warn if the `#` is `#$` (e.g. shell base arithmetic).

[#5885](https://github.com/cylc/cylc-flow/pull/5885) - Fixed bug in using a final cycle point with chained offsets e.g. 'final cycle point = +PT6H+PT1S'.

[#5893](https://github.com/cylc/cylc-flow/pull/5893) - Fixed bug in computing a time interval-based runahead limit when future triggers are present.

[#5902](https://github.com/cylc/cylc-flow/pull/5902) - Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload.

[#5908](https://github.com/cylc/cylc-flow/pull/5908) - Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger.

[#5909](https://github.com/cylc/cylc-flow/pull/5909) - Fix a bug where Cylc VIP did not remove --workflow-name=<name> from
Cylc play arguments.

## __cylc-8.2.3 (Released 2023-11-02)__

### 🔧 Fixes
Expand Down
1 change: 0 additions & 1 deletion changes.d/5772.feat.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5789.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5801.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5821.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5838.feat.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5841.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5885.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5893.fix

This file was deleted.

107 changes: 80 additions & 27 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import sys
from textwrap import dedent
from typing import Any, Dict, Optional, List, Tuple
from typing import Any, Dict, Iterable, Optional, List, Tuple

from cylc.flow import LOG
from cylc.flow.terminal import supports_color, DIM
Expand Down Expand Up @@ -732,17 +732,33 @@ def combine_options(*args, modify=None):


def cleanup_sysargv(
script_name,
workflow_id,
options,
compound_script_opts,
script_opts,
source,
):
script_name: str,
workflow_id: str,
options: 'Values',
compound_script_opts: Iterable['OptionSettings'],
script_opts: Iterable['OptionSettings'],
source: str,
) -> None:
"""Remove unwanted options from sys.argv
Some cylc scripts (notably Cylc Play when it is re-invoked on a scheduler
server) require the correct content in sys.argv.
server) require the correct content in sys.argv: This function
subtracts the unwanted options from sys.argv.
Args:
script_name:
Name of the target script. For example if we are
using this for the play step of cylc vip then this
will be "play".
workflow_id:
options:
Actual options provided to the compound script.
compound_script_options:
Options available in compound script.
script_options:
Options available in target script.
source:
Source directory.
"""
# Organize Options by dest.
script_opts_by_dest = {
Expand All @@ -753,30 +769,67 @@ def cleanup_sysargv(
x.kwargs.get('dest', x.args[0].strip(DOUBLEDASH)): x
for x in compound_script_opts
}
# Filter out non-cylc-play options.
args = [i.split('=')[0] for i in sys.argv]
for unwanted_opt in (set(options.__dict__)) - set(script_opts_by_dest):
for arg in compound_opts_by_dest[unwanted_opt].args:
if arg in sys.argv:
index = sys.argv.index(arg)
sys.argv.pop(index)
if (
compound_opts_by_dest[unwanted_opt].kwargs['action']
not in ['store_true', 'store_false']
):
sys.argv.pop(index)
elif arg in args:
index = args.index(arg)
sys.argv.pop(index)

# Get a list of unwanted args:
unwanted_compound: List[str] = []
unwanted_simple: List[str] = []
for unwanted_dest in (set(options.__dict__)) - set(script_opts_by_dest):
for unwanted_arg in compound_opts_by_dest[unwanted_dest].args:
if (
compound_opts_by_dest[unwanted_dest].kwargs.get('action', None)
in ['store_true', 'store_false']
):
unwanted_simple.append(unwanted_arg)
else:
unwanted_compound.append(unwanted_arg)

new_args = filter_sysargv(sys.argv, unwanted_simple, unwanted_compound)

# replace compound script name:
sys.argv[1] = script_name
new_args[1] = script_name

# replace source path with workflow ID.
if str(source) in sys.argv:
sys.argv.remove(str(source))
new_args.remove(str(source))
if workflow_id not in sys.argv:
sys.argv.append(workflow_id)
new_args.append(workflow_id)

sys.argv = new_args


def filter_sysargv(
sysargs, unwanted_simple: List, unwanted_compound: List
) -> List:
"""Create a copy of sys.argv without unwanted arguments:
Cases:
>>> this = filter_sysargv
>>> this(['--foo', 'expects-a-value', '--bar'], [], ['--foo'])
['--bar']
>>> this(['--foo=expects-a-value', '--bar'], [], ['--foo'])
['--bar']
>>> this(['--foo', '--bar'], ['--foo'], [])
['--bar']
"""
pop_next: bool = False
new_args: List = []
for this_arg in sysargs:
parts = this_arg.split('=', 1)
if pop_next:
pop_next = False
continue
elif parts[0] in unwanted_compound:
# Case --foo=value or --foo value
if len(parts) == 1:
# --foo value
pop_next = True
continue
elif parts[0] in unwanted_simple:
# Case --foo does not expect a value:
continue
else:
new_args.append(this_arg)
return new_args


def log_subcommand(*args):
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ class CylcWorkflowDAO:
["prereq_output", {"is_primary_key": True}],
["satisfied"],
],
# The xtriggers table holds the function signature and result of
# already-satisfied (the scheduler no longer needs to call them).
TABLE_XTRIGGERS: [
["signature", {"is_primary_key": True}],
["results"],
Expand Down
32 changes: 9 additions & 23 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ async def initialise(self):
self.workflow,
user=self.owner,
broadcast_mgr=self.broadcast_mgr,
workflow_db_mgr=self.workflow_db_mgr,
data_store_mgr=self.data_store_mgr,
proc_pool=self.proc_pool,
workflow_run_dir=self.workflow_run_dir,
Expand Down Expand Up @@ -1706,14 +1707,8 @@ async def main_loop(self) -> None:
await self.process_command_queue()
self.proc_pool.process()

# Tasks in the main pool that are waiting but not queued must be
# waiting on external dependencies, i.e. xtriggers or ext_triggers.
# For these tasks, call any unsatisfied xtrigger functions, and
# queue tasks that have become ready. (Tasks do not appear in the
# main pool at all until all other-task deps are satisfied, and are
# queued immediately on release from runahead limiting if they are
# not waiting on external deps).
housekeep_xtriggers = False
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
Expand All @@ -1726,28 +1721,19 @@ async def main_loop(self) -> None:
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
# Call unsatisfied xtriggers if not already in-process.
# Results are returned asynchronously.
self.xtrigger_mgr.call_xtriggers_async(itask)
# Check for satisfied xtriggers, and queue if ready.
if self.xtrigger_mgr.check_xtriggers(
itask, self.workflow_db_mgr.put_xtriggers):
housekeep_xtriggers = True
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

# Check for satisfied ext_triggers, and queue if ready.

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
and self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
and all(itask.is_ready_to_run())
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
Expand Down
35 changes: 30 additions & 5 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
)
from shutil import rmtree
from time import time
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Union, Optional

from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
Expand Down Expand Up @@ -1262,10 +1262,11 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig):
itask.submit_num] = itask.platform['name']

itask.summary['job_runner_name'] = itask.platform['job runner']
with suppress(TypeError):
itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float(
rtconfig['execution time limit']
)

# None is an allowed non-float number for Execution time limit.
itask.summary[
self.KEY_EXECUTE_TIME_LIMIT
] = self.get_execution_time_limit(rtconfig['execution time limit'])

# Location of job file, etc
self._create_job_log_path(workflow, itask)
Expand All @@ -1281,6 +1282,30 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig):
job_d=job_d
)

@staticmethod
def get_execution_time_limit(
config_execution_time_limit: Any
) -> Union[None, float]:
"""Get execution time limit from config and process it.
If the etl from the config is a Falsy then return None.
Otherwise try and parse value as float.
Examples:
>>> from pytest import raises
>>> this = TaskJobManager.get_execution_time_limit
>>> this(None)
>>> this("54")
54.0
>>> this({})
>>> with raises(ValueError):
... this('🇳🇿')
"""
if config_execution_time_limit:
return float(config_execution_time_limit)
return None

def get_job_conf(
self,
workflow,
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ def put_task_event_timers(self, task_events_mgr):

def put_xtriggers(self, sat_xtrig):
"""Put statements to update external triggers table."""
self.db_deletes_map[self.TABLE_XTRIGGERS].append({})
for sig, res in sat_xtrig.items():
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
"signature": sig,
Expand Down
Loading

0 comments on commit c07392d

Please sign in to comment.