diff --git a/CHANGES.md b/CHANGES.md index 0c33e6ce915..15e89f118a2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,8 +29,6 @@ $ towncrier create ..md --content "Short description" ### 🔧 Fixes -[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing. - [#6186](https://github.com/cylc/cylc-flow/pull/6186) - Fixed bug where using flow numbers with `cylc set` would not work correctly. [#6200](https://github.com/cylc/cylc-flow/pull/6200) - Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused @@ -47,6 +45,8 @@ $ towncrier create ..md --content "Short description" [#6176](https://github.com/cylc/cylc-flow/pull/6176) - Fix bug where jobs which fail to submit are not shown in GUI/TUI if submission retries are set. +[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing. + ## __cylc-8.3.0 (Released 2024-06-18)__ ### ⚠ Breaking Changes diff --git a/changes.d/6137.feat.md b/changes.d/6137.feat.md new file mode 100644 index 00000000000..d947999de99 --- /dev/null +++ b/changes.d/6137.feat.md @@ -0,0 +1 @@ +New Cylc lint rule: S014: Don't use job runner specific execution time limit directives, use execution time limit. \ No newline at end of file diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py index 3f0bfe5c0b1..71a45f6bbf3 100644 --- a/cylc/flow/__init__.py +++ b/cylc/flow/__init__.py @@ -15,9 +15,8 @@ # along with this program. If not, see . """Set up the cylc environment.""" -import os import logging - +import os CYLC_LOG = 'cylc' @@ -53,7 +52,7 @@ def environ_init(): environ_init() -__version__ = '8.3.4.dev' +__version__ = '8.4.0.dev' def iter_entry_points(entry_point_name): diff --git a/cylc/flow/broadcast_report.py b/cylc/flow/broadcast_report.py index 72fedb4cbef..cb44f2212f3 100644 --- a/cylc/flow/broadcast_report.py +++ b/cylc/flow/broadcast_report.py @@ -72,7 +72,7 @@ def get_broadcast_change_iter(modified_settings, is_cancel=False): value = setting keys_str = "" while isinstance(value, dict): - key, value = list(value.items())[0] + key, value = next(iter(value.items())) if isinstance(value, dict): keys_str += "[" + key + "]" else: diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index 7dc04c99a02..b9eb90c6569 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -1311,7 +1311,7 @@ def default_for( The means by which task progress messages are reported back to the running workflow. - Options: + ..rubric:: Options: zmq Direct client-server TCP communication via network ports @@ -1320,6 +1320,8 @@ def default_for( ssh Use non-interactive ssh for task communications + For more information, see :ref:`TaskComms`. + .. versionchanged:: 8.0.0 {REPLACES}``global.rc[hosts][]task communication diff --git a/cylc/flow/clean.py b/cylc/flow/clean.py index b38f01b12fc..b4597780fd3 100644 --- a/cylc/flow/clean.py +++ b/cylc/flow/clean.py @@ -129,7 +129,7 @@ def _clean_check(opts: 'Values', id_: str, run_dir: Path) -> None: except ContactFileExists as exc: raise ServiceFileError( f"Cannot clean running workflow {id_}.\n\n{exc}" - ) + ) from None def init_clean(id_: str, opts: 'Values') -> None: @@ -173,7 +173,7 @@ def init_clean(id_: str, opts: 'Values') -> None: try: platform_names = get_platforms_from_db(local_run_dir) except ServiceFileError as exc: - raise ServiceFileError(f"Cannot clean {id_} - {exc}") + raise ServiceFileError(f"Cannot clean {id_} - {exc}") from None except sqlite3.OperationalError as exc: # something went wrong with the query # e.g. the table/field we need isn't there @@ -186,7 +186,7 @@ def init_clean(id_: str, opts: 'Values') -> None: ' local files (you may need to remove files on other' ' platforms manually).' ) - raise ServiceFileError(f"Cannot clean {id_} - {exc}") + raise ServiceFileError(f"Cannot clean {id_} - {exc}") from exc if platform_names and platform_names != {'localhost'}: remote_clean( @@ -361,7 +361,8 @@ def remote_clean( except PlatformLookupError as exc: raise PlatformLookupError( f"Cannot clean {id_} on remote platforms as the workflow database " - f"is out of date/inconsistent with the global config - {exc}") + f"is out of date/inconsistent with the global config - {exc}" + ) from None queue: Deque[RemoteCleanQueueTuple] = deque() remote_clean_cmd = partial( diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index 1c57d452c43..ea67f00206e 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -70,7 +70,7 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: try: int(val) except ValueError: - raise InputError(ERR_OPT_FLOW_VAL.format(val)) + raise InputError(ERR_OPT_FLOW_VAL.format(val)) from None if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]: raise InputError(ERR_OPT_FLOW_WAIT) diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 134681bdfd5..173984f17e0 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -211,7 +211,7 @@ async def stop( try: mode = StopMode(mode) except ValueError: - raise CommandFailedError(f"Invalid stop mode: '{mode}'") + raise CommandFailedError(f"Invalid stop mode: '{mode}'") from None schd._set_stop(mode) if mode is StopMode.REQUEST_KILL: schd.time_next_kill = time() @@ -309,7 +309,7 @@ async def set_verbosity(schd: 'Scheduler', level: Union[int, str]): lvl = int(level) LOG.setLevel(lvl) except (TypeError, ValueError) as exc: - raise CommandFailedError(exc) + raise CommandFailedError(exc) from None cylc.flow.flags.verbosity = log_level_to_verbosity(lvl) diff --git a/cylc/flow/config.py b/cylc/flow/config.py index c5e82d74d97..cb987ac6d4c 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -199,11 +199,11 @@ def interpolate_template(tmpl, params_dict): try: return tmpl % params_dict except KeyError: - raise ParamExpandError('bad parameter') + raise ParamExpandError('bad parameter') from None except TypeError: - raise ParamExpandError('wrong data type for parameter') + raise ParamExpandError('wrong data type for parameter') from None except ValueError: - raise ParamExpandError('bad template syntax') + raise ParamExpandError('bad template syntax') from None class WorkflowConfig: @@ -480,8 +480,8 @@ def __init__( get_interval(offset_string).standardise()) except IntervalParsingError: raise WorkflowConfigError( - "Illegal %s spec: %s" % ( - s_type, offset_string)) + "Illegal %s spec: %s" % (s_type, offset_string) + ) from None extn = "(" + offset_string + ")" # Replace family names with members. @@ -709,7 +709,7 @@ def process_initial_cycle_point(self) -> None: try: icp = ingest_time(orig_icp, get_current_time_string()) except IsodatetimeError as exc: - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None if orig_icp != icp: # now/next()/previous() was used, need to store # evaluated point in DB @@ -761,7 +761,7 @@ def process_start_cycle_point(self) -> None: for taskid in self.options.starttask ] except ValueError as exc: - raise InputError(str(exc)) + raise InputError(str(exc)) from None self.start_point = min( get_point(cycle).standardise() for cycle in cycle_points if cycle @@ -1114,7 +1114,7 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None: f'\n {expr}' '\nThe "finished" output cannot be used in completion' ' expressions, use "succeeded or failed".' - ) + ) from None for alt_qualifier, qualifier in ALT_QUALIFIERS.items(): _alt_compvar = trigger_to_completion_variable(alt_qualifier) @@ -1125,21 +1125,21 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None: f'\n {expr}' f'\nUse "{_compvar}" not "{_alt_compvar}" ' 'in completion expressions.' - ) + ) from None raise WorkflowConfigError( # NOTE: str(exc) == "name 'x' is not defined" tested in # tests/integration/test_optional_outputs.py f'Error in [runtime][{task_name}]completion:' f'\n{error}' - ) + ) from None except Exception as exc: # includes InvalidCompletionExpression # expression contains non-whitelisted syntax or any other error in # the expression e.g. SyntaxError raise WorkflowConfigError( f'Error in [runtime][{task_name}]completion:' f'\n{str(exc)}' - ) + ) from None # ensure consistency between the graph and the completion expression for compvar in ( @@ -1415,11 +1415,12 @@ def compute_family_tree(self): c3_single.mro(name)) except RecursionError: raise WorkflowConfigError( - "circular [runtime] inheritance?") + "circular [runtime] inheritance?" + ) from None except Exception as exc: # catch inheritance errors # TODO - specialise MRO exceptions - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None for name in self.cfg['runtime']: ancestors = self.runtime['linearized ancestors'][name] @@ -1771,7 +1772,7 @@ def _check_task_event_handlers(self): f' {taskdef.name}:' f' {handler_template}:' f' {repr(exc)}' - ) + ) from None def _check_special_tasks(self): """Check declared special tasks are valid, and detect special @@ -1878,7 +1879,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, try: expr_list = listify(lexpression) except SyntaxError: - raise WorkflowConfigError('Error in expression "%s"' % lexpression) + raise WorkflowConfigError( + 'Error in expression "%s"' % lexpression + ) from None triggers = {} xtrig_labels = set() @@ -1955,7 +1958,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, xtrig = xtrigs[label] except KeyError: if label != 'wall_clock': - raise WorkflowConfigError(f"xtrigger not defined: {label}") + raise WorkflowConfigError( + f"xtrigger not defined: {label}" + ) from None else: # Allow "@wall_clock" in graph as implicit zero-offset. xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {}) @@ -2289,7 +2294,7 @@ def load_graph(self): msg += ' (final cycle point=%s)' % fcp if isinstance(exc, CylcError): msg += ' %s' % exc.args[0] - raise WorkflowConfigError(msg) + raise WorkflowConfigError(msg) from None self.sequences.append(seq) parser = GraphParser( family_map, @@ -2444,7 +2449,7 @@ def get_taskdef( except TaskDefError as exc: if orig_expr: LOG.error(orig_expr) - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None else: # Record custom message outputs from [runtime]. messages = set(self.cfg['runtime'][name]['outputs'].values()) @@ -2465,14 +2470,14 @@ def get_taskdef( f'Invalid task output "' f'[runtime][{name}][outputs]' f'{output} = {message}" - {msg}' - ) + ) from None valid, msg = TaskMessageValidator.validate(message) if not valid: raise WorkflowConfigError( f'Invalid task message "' f'[runtime][{name}][outputs]' f'{output} = {message}" - {msg}' - ) + ) from None self.taskdefs[name].add_output(output, message) return self.taskdefs[name] @@ -2484,7 +2489,7 @@ def _get_taskdef(self, name: str) -> TaskDef: try: rtcfg = self.cfg['runtime'][name] except KeyError: - raise WorkflowConfigError("Task not defined: %s" % name) + raise WorkflowConfigError("Task not defined: %s" % name) from None # We may want to put in some handling for cases of changing the # initial cycle via restart (accidentally or otherwise). @@ -2576,7 +2581,9 @@ def process_metadata_urls(self): 'workflow': self.workflow, } except (KeyError, ValueError): - raise InputError(f'Invalid template [meta]URL: {url}') + raise InputError( + f'Invalid template [meta]URL: {url}' + ) from None else: LOG.warning( 'Detected deprecated template variables in [meta]URL.' @@ -2612,7 +2619,9 @@ def process_metadata_urls(self): 'task': name, } except (KeyError, ValueError): - raise InputError(f'Invalid template [meta]URL: {url}') + raise InputError( + f'Invalid template [meta]URL: {url}' + ) from None else: LOG.warning( 'Detected deprecated template variables in' diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py index 749c651fc08..20cc3765670 100644 --- a/cylc/flow/cycling/integer.py +++ b/cylc/flow/cycling/integer.py @@ -150,7 +150,7 @@ def standardise(self): try: self.value = str(int(self)) except (TypeError, ValueError) as exc: - raise PointParsingError(type(self), self.value, exc) + raise PointParsingError(type(self), self.value, exc) from None return self def __int__(self): diff --git a/cylc/flow/cycling/iso8601.py b/cylc/flow/cycling/iso8601.py index a66ce3f5ba0..2ab311df425 100644 --- a/cylc/flow/cycling/iso8601.py +++ b/cylc/flow/cycling/iso8601.py @@ -102,7 +102,7 @@ def standardise(self): WorkflowSpecifics.NUM_EXPANDED_YEAR_DIGITS) else: message = str(exc) - raise PointParsingError(type(self), self.value, message) + raise PointParsingError(type(self), self.value, message) from None return self def sub(self, other): @@ -176,7 +176,7 @@ def standardise(self): try: self.value = str(interval_parse(self.value)) except IsodatetimeError: - raise IntervalParsingError(type(self), self.value) + raise IntervalParsingError(type(self), self.value) from None return self def add(self, other): @@ -782,7 +782,7 @@ def prev_next( raise WorkflowConfigError( f'Invalid offset: {my_time}:' f' Offset lists are semicolon separated, try {suggest}' - ) + ) from None timepoints.append(parsed_point + now) diff --git a/cylc/flow/dbstatecheck.py b/cylc/flow/dbstatecheck.py index b38c394ccdb..1890c697c71 100644 --- a/cylc/flow/dbstatecheck.py +++ b/cylc/flow/dbstatecheck.py @@ -78,7 +78,7 @@ def __init__(self, rund, workflow, db_path=None): try: self.db_point_fmt = self._get_db_point_format() self.c7_back_compat_mode = False - except sqlite3.OperationalError as exc: + except sqlite3.OperationalError: # BACK COMPAT: Cylc 7 DB (see method below). try: self.db_point_fmt = self._get_db_point_format_compat() @@ -86,7 +86,7 @@ def __init__(self, rund, workflow, db_path=None): except sqlite3.OperationalError: with suppress(Exception): self.conn.close() - raise exc # original error + raise def __enter__(self): return self @@ -137,7 +137,7 @@ def adjust_point_to_db(self, cycle, offset): raise InputError( f'Cycle point "{cycle}" is not compatible' f' with DB point format "{self.db_point_fmt}"' - ) + ) from None return cycle @staticmethod diff --git a/cylc/flow/etc/examples/extending-workflow/.validate b/cylc/flow/etc/examples/extending-workflow/.validate new file mode 100755 index 00000000000..43c810372ce --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/.validate @@ -0,0 +1,80 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +set -eux + +test_simple () { + local ID + ID="$(< /dev/urandom tr -dc A-Za-z | head -c6)" + + # lint + cylc lint ./simple + + # copy into a temp directory + local SRC_DIR + SRC_DIR="$(mktemp -d)" + cp simple/flow.cylc "$SRC_DIR" + + # speed things up with simulation mode + cat >>"${SRC_DIR}/flow.cylc" <<__HERE__ + [runtime] + [[root]] + [[[simulation]]] + default run length = PT0S +__HERE__ + + # start the workflow + cylc vip \ + --check-circular \ + --no-run-name \ + --no-detach \ + --workflow-name "$ID" \ + --mode=simulation \ + "$SRC_DIR" + + # it should have reached the 2002 cycle + grep '2002/a' "${HOME}/cylc-run/${ID}/log/scheduler/log" + if grep '2003/a' "${HOME}/cylc-run/${ID}/log/scheduler/log"; then + exit 1 + fi + + # edit the "stop after cycle point" + sed -i \ + 's/stop after cycle point.*/stop after cycle point = 2004/' \ + "${SRC_DIR}/flow.cylc" + + # continue the run + cylc vr \ + --no-detach \ + --mode=simulation \ + --yes \ + "$ID" + + # it should have reached the 2004 cycle + grep '2004/a' "${HOME}/cylc-run/${ID}/log/scheduler/log" + if grep '2005/a' "${HOME}/cylc-run/${ID}/log/scheduler/log"; then + exit 1 + fi + + # clean up + cylc clean "$ID" + + rm -r "${SRC_DIR}" +} + + +test_simple diff --git a/cylc/flow/etc/examples/extending-workflow/index.rst b/cylc/flow/etc/examples/extending-workflow/index.rst new file mode 100644 index 00000000000..c5bc023dc11 --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/index.rst @@ -0,0 +1,77 @@ +Extending Workflow +================== + +.. cylc-scope:: flow.cylc[scheduling] + +Sometimes we may run a workflow to :term:`completion `, +but subsequently wish to run it for a few more cycles. + +With Cylc 7 this was often done by changing the `final cycle point` and +restarting the workflow. This approach worked, but was a little awkward. +It's possible with Cylc 8, but we would recommend moving away from this +pattern instead. + +The recommended approach to this problem (Cylc 6+) is to use the +`stop after cycle point` rather than the `final cycle point`. + +The `stop after cycle point` tells Cylc to **stop** after the workflow passes +the specified point, whereas the `final cycle point` tells Cylc that the +workflow **finishes** at the specified point. + +When a workflow **finishes**, it is a little awkward to restart as you have to +tell Cylc which tasks to continue on from. The `stop after cycle point` +solution avoids this issue. + + +Example +------- + +.. admonition:: Get a copy of this example + :class: hint + + .. code-block:: console + + $ cylc get-resources examples/extending-workflow/simple + +This workflow will stop at the end of the ``2002`` cycle: + +.. literalinclude:: simple/flow.cylc + :language: cylc + +After it has run and shut down, change the `stop after cycle point` to +the desired value and restart it. E.g: + +.. code-block:: bash + + # install and run the workflow: + cylc vip + + # then later edit "stop after cycle point" to "2004" + + # then reinstall and restart the workflow: + cylc vr + +The workflow will continue from where it left off and run until the end of the +``2004`` cycle. Because the workflow never hit the `final cycle point` it +never "finished" so no special steps are required to restart the workflow. + +You can also set the `stop after cycle point` when you start the workflow: + +.. code-block:: bash + + cylc play --stop-cycle-point=2020 myworkflow + +Or change it at any point whilst the workflow is running: + +.. code-block:: bash + + cylc stop myworkflow//2030 # change the stop after cycle point to 2030 + +.. note:: + + If you set the `stop after cycle point` on the command line, this value will + take precedence over the one in the workflow configuration. Use + ``cylc play --stop-cycle-point=reload`` to restart the workflow using the + `stop after cycle point` configured in the workflow configuration. + +.. cylc-scope:: diff --git a/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc b/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc new file mode 100644 index 00000000000..2de2f28f2ea --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc @@ -0,0 +1,24 @@ +[meta] + title = "Basic extendable workflow" + description = """ + Use the "stop after cycle point" rather than the "final cycle point" + to allow this workflow to be easily extended at a later date. + """ + +[scheduler] + # use the year for the cycle point + # (strip off the month, day, hour and minute) + cycle point format = CCYY + +[scheduling] + initial cycle point = 2000 + stop after cycle point = 2002 # stop after two years of simulated time + [[graph]] + P1Y = """ + z[-P1Y] => a + a => z + """ + +[runtime] + [[a]] + [[z]] diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index efbce16fb36..3fbfd8c1754 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -346,7 +346,7 @@ def parse_graph(self, graph_string: str) -> None: raise GraphParseError( f"Dangling {seq}:" f"{this_line}" - ) + ) from None part_lines.append(this_line) # Check that a continuation sequence doesn't end this line and @@ -638,7 +638,8 @@ def _proc_dep_pair( except KeyError: # "FAM:bad => foo" in LHS (includes "FAM => bar" too). raise GraphParseError( - f"Illegal family trigger in {expr}") + f"Illegal family trigger in {expr}" + ) from None else: # Not a family. if trig in self.__class__.fam_to_mem_trigger_map: @@ -911,7 +912,8 @@ def _compute_triggers( except KeyError: # Illegal family trigger on RHS of a pair. raise GraphParseError( - f"Illegal family trigger: {name}:{output}") + f"Illegal family trigger: {name}:{output}" + ) from None else: fam = False if not output: diff --git a/cylc/flow/host_select.py b/cylc/flow/host_select.py index 0eb34d088ca..69e32c68a71 100644 --- a/cylc/flow/host_select.py +++ b/cylc/flow/host_select.py @@ -373,7 +373,7 @@ def _filter_by_ranking(hosts, rankings, results, data=None): f'\n Expression: {item}' f'\n Configuration: {GLBL_CFG_STR}' f'\n Error: {exc}' - ) + ) from None if isinstance(result, bool): host_rankings[item] = result data[host][item] = result diff --git a/cylc/flow/id.py b/cylc/flow/id.py index 58fff7fa7bc..f2c8b05b4a1 100644 --- a/cylc/flow/id.py +++ b/cylc/flow/id.py @@ -128,7 +128,7 @@ def __getitem__(self, key): return dict.__getitem__(self, key) except KeyError: if key not in self._KEYS: - raise ValueError(f'Invalid token: {key}') + raise ValueError(f'Invalid token: {key}') from None return None def __str__(self): diff --git a/cylc/flow/id_cli.py b/cylc/flow/id_cli.py index 9c7493fd612..35afbf80d5a 100644 --- a/cylc/flow/id_cli.py +++ b/cylc/flow/id_cli.py @@ -167,9 +167,9 @@ def _parse_cli(*ids: str) -> List[Tokens]: # this ID is invalid with or without the trailing slash tokens = cli_tokenise(id_[:-1]) except ValueError: - raise InputError(f'Invalid ID: {id_}') + raise InputError(f'Invalid ID: {id_}') from None else: - raise InputError(f'Invalid ID: {id_}') + raise InputError(f'Invalid ID: {id_}') from None is_partial = tokens.get('workflow') and not tokens.get('cycle') is_relative = not tokens.get('workflow') @@ -347,7 +347,7 @@ async def parse_ids_async( if src: if not flow_file_path: # get the workflow file path from the run dir - flow_file_path = get_flow_file(list(workflows)[0]) + flow_file_path = get_flow_file(next(iter(workflows))) return workflows, flow_file_path return workflows, multi_mode @@ -375,7 +375,7 @@ async def parse_id_async( 'max_tasks': 1, }, ) - workflow_id = list(workflows)[0] + workflow_id = next(iter(workflows)) tokens_list = workflows[workflow_id] tokens: Optional[Tokens] if tokens_list: diff --git a/cylc/flow/install.py b/cylc/flow/install.py index 27810f72e97..2e94943d9d6 100644 --- a/cylc/flow/install.py +++ b/cylc/flow/install.py @@ -328,7 +328,7 @@ def install_workflow( # This occurs when the file exists but is _not_ a directory. raise WorkflowFilesError( f"Cannot install as there is an existing file at {rundir}." - ) + ) from None if relink: link_runN(rundir) rsync_cmd = get_rsync_rund_cmd(source, rundir) @@ -529,7 +529,7 @@ def parse_cli_sym_dirs(symlink_dirs: str) -> Dict[str, Dict[str, Any]]: 'There is an error in --symlink-dirs option:' f' {pair}. Try entering option in the form ' '--symlink-dirs=\'log=$DIR, share=$DIR2, ...\'' - ) + ) from None if key not in possible_symlink_dirs: dirs = ', '.join(possible_symlink_dirs) raise InputError( diff --git a/cylc/flow/install_plugins/log_vc_info.py b/cylc/flow/install_plugins/log_vc_info.py index 29d861f7654..41479d6f1ed 100644 --- a/cylc/flow/install_plugins/log_vc_info.py +++ b/cylc/flow/install_plugins/log_vc_info.py @@ -253,7 +253,7 @@ def _run_cmd( except FileNotFoundError as exc: # This will only be raised if the VCS command is not installed, # otherwise Popen() will succeed with a non-zero return code - raise VCSNotInstalledError(vcs, exc) + raise VCSNotInstalledError(vcs, exc) from None if stdout == PIPE: out, err = pipe_poller(proc, proc.stdout, proc.stderr) else: diff --git a/cylc/flow/job_runner_handlers/loadleveler.py b/cylc/flow/job_runner_handlers/loadleveler.py index ee8203b2b47..d097ff85faf 100644 --- a/cylc/flow/job_runner_handlers/loadleveler.py +++ b/cylc/flow/job_runner_handlers/loadleveler.py @@ -83,6 +83,7 @@ class LoadlevelerHandler(): re.compile("^llsubmit: Processed command file through Submit Filter:")] SUBMIT_CMD_TMPL = "llsubmit '%(job)s'" VACATION_SIGNAL = "USR1" + TIME_LIMIT_DIRECTIVE = "wall_clock_limit" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -96,8 +97,8 @@ def format_directives(self, job_conf): directives["output"] = job_file_path + ".out" directives["error"] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("wall_clock_limit") is None): - directives["wall_clock_limit"] = "%d,%d" % ( + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d,%d" % ( job_conf["execution_time_limit"] + 60, job_conf["execution_time_limit"]) for key, value in list(job_conf["directives"].items()): diff --git a/cylc/flow/job_runner_handlers/lsf.py b/cylc/flow/job_runner_handlers/lsf.py index a465c9b7924..534d1205a9d 100644 --- a/cylc/flow/job_runner_handlers/lsf.py +++ b/cylc/flow/job_runner_handlers/lsf.py @@ -70,6 +70,7 @@ class LSFHandler(): POLL_CMD = "bjobs" REC_ID_FROM_SUBMIT_OUT = re.compile(r"^Job <(?P\d+)>") SUBMIT_CMD_TMPL = "bsub" + TIME_LIMIT_DIRECTIVE = "-W" @classmethod def format_directives(cls, job_conf): @@ -82,8 +83,11 @@ def format_directives(cls, job_conf): ) directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" - if job_conf["execution_time_limit"] and directives.get("-W") is None: - directives["-W"] = str(math.ceil( + if ( + job_conf["execution_time_limit"] + and directives.get(cls.TIME_LIMIT_DIRECTIVE) is None + ): + directives[cls.TIME_LIMIT_DIRECTIVE] = str(math.ceil( job_conf["execution_time_limit"] / 60)) for key, value in list(job_conf["directives"].items()): directives[key] = value diff --git a/cylc/flow/job_runner_handlers/moab.py b/cylc/flow/job_runner_handlers/moab.py index 839d246ccbc..ec068d48420 100644 --- a/cylc/flow/job_runner_handlers/moab.py +++ b/cylc/flow/job_runner_handlers/moab.py @@ -78,6 +78,7 @@ class MoabHandler: POLL_CMD = "checkjob" REC_ID_FROM_SUBMIT_OUT = re.compile(r"""\A\s*(?P\S+)\s*\Z""") SUBMIT_CMD_TMPL = "msub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l walltime" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -91,8 +92,9 @@ def format_directives(self, job_conf): directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("-l walltime") is None): - directives["-l walltime"] = "%d" % job_conf["execution_time_limit"] + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[ + "execution_time_limit"] # restartable? directives.update(job_conf["directives"]) lines = [] diff --git a/cylc/flow/job_runner_handlers/pbs.py b/cylc/flow/job_runner_handlers/pbs.py index aa264311fc4..ac0d6c47a00 100644 --- a/cylc/flow/job_runner_handlers/pbs.py +++ b/cylc/flow/job_runner_handlers/pbs.py @@ -84,6 +84,7 @@ class PBSHandler: POLL_CANT_CONNECT_ERR = "Connection refused" REC_ID_FROM_SUBMIT_OUT = re.compile(r"^\s*(?P\d+)", re.M) SUBMIT_CMD_TMPL = "qsub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l walltime" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -105,9 +106,12 @@ def format_directives(self, job_conf): directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" - if (job_conf["execution_time_limit"] and - directives.get("-l walltime") is None): - directives["-l walltime"] = "%d" % job_conf["execution_time_limit"] + if ( + job_conf["execution_time_limit"] + and directives.get(self.TIME_LIMIT_DIRECTIVE) is None + ): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[ + "execution_time_limit"] for key, value in list(job_conf["directives"].items()): directives[key] = value lines = [] diff --git a/cylc/flow/job_runner_handlers/sge.py b/cylc/flow/job_runner_handlers/sge.py index 33f7d5a26d7..c7c50956fb9 100644 --- a/cylc/flow/job_runner_handlers/sge.py +++ b/cylc/flow/job_runner_handlers/sge.py @@ -37,7 +37,6 @@ -cwd = -q = foo -l h_data = 1024M - -l h_rt = 24:00:00 These are written to the top of the job script like this: @@ -76,6 +75,7 @@ class SGEHandler: POLL_CMD = "qstat" REC_ID_FROM_SUBMIT_OUT = re.compile(r"\D+(?P\d+)\D+") SUBMIT_CMD_TMPL = "qsub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l h_rt" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -88,8 +88,8 @@ def format_directives(self, job_conf): directives['-o'] = job_file_path + ".out" directives['-e'] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("-l h_rt") is None): - directives["-l h_rt"] = "%d:%02d:%02d" % ( + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d:%02d:%02d" % ( job_conf["execution_time_limit"] / 3600, (job_conf["execution_time_limit"] / 60) % 60, job_conf["execution_time_limit"] % 60) diff --git a/cylc/flow/job_runner_handlers/slurm.py b/cylc/flow/job_runner_handlers/slurm.py index 4ec6be20471..33df2ebc926 100644 --- a/cylc/flow/job_runner_handlers/slurm.py +++ b/cylc/flow/job_runner_handlers/slurm.py @@ -135,6 +135,8 @@ class SLURMHandler(): # Separator between het job directive sections SEP_HETJOB = "#SBATCH hetjob" + TIME_LIMIT_DIRECTIVE = "--time" + @classmethod def filter_poll_many_output(cls, out): """Return list of job IDs extracted from job poll stdout. @@ -161,8 +163,8 @@ def format_directives(cls, job_conf): directives['--output'] = job_file_path.replace('%', '%%') + ".out" directives['--error'] = job_file_path.replace('%', '%%') + ".err" if (job_conf["execution_time_limit"] and - directives.get("--time") is None): - directives["--time"] = "%d:%02d" % ( + directives.get(cls.TIME_LIMIT_DIRECTIVE) is None): + directives[cls.TIME_LIMIT_DIRECTIVE] = "%d:%02d" % ( job_conf["execution_time_limit"] / 60, job_conf["execution_time_limit"] % 60) for key, value in list(job_conf['directives'].items()): diff --git a/cylc/flow/loggingutil.py b/cylc/flow/loggingutil.py index cb645a929ff..55701cf3fe4 100644 --- a/cylc/flow/loggingutil.py +++ b/cylc/flow/loggingutil.py @@ -211,7 +211,7 @@ def should_rollover(self, record: logging.LogRecord) -> bool: self.stream.seek(0, 2) except ValueError as exc: # intended to catch - ValueError: I/O operation on closed file - raise SystemExit(exc) + raise SystemExit(exc) from None return self.stream.tell() + len(msg.encode('utf8')) >= self.max_bytes @property diff --git a/cylc/flow/main_loop/__init__.py b/cylc/flow/main_loop/__init__.py index 2350153842c..e9f9f35f5da 100644 --- a/cylc/flow/main_loop/__init__.py +++ b/cylc/flow/main_loop/__init__.py @@ -329,14 +329,14 @@ def load(config, additional_plugins=None): f'No main-loop plugin: "{plugin_name}"\n' + ' Available plugins:\n' + indent('\n'.join(sorted(entry_points)), ' ') - ) + ) from None # load plugin try: module = entry_point.load() except Exception as exc: raise PluginError( 'cylc.main_loop', entry_point.name, exc - ) + ) from None # load coroutines log = [] for coro_name, coro in getmembers(module): diff --git a/cylc/flow/main_loop/health_check.py b/cylc/flow/main_loop/health_check.py index b488c878c0e..a470fd3cb88 100644 --- a/cylc/flow/main_loop/health_check.py +++ b/cylc/flow/main_loop/health_check.py @@ -56,4 +56,4 @@ def _check_contact_file(scheduler): raise CylcError( '%s: contact file corrupted/modified and may be left' % workflow_files.get_contact_file_path(scheduler.workflow) - ) + ) from None diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index 916b129e244..315a57a23ba 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -78,7 +78,7 @@ def get_location(workflow: str) -> Tuple[str, int, int]: contact = load_contact_file(workflow) except (IOError, ValueError, ServiceFileError): # Contact file does not exist or corrupted, workflow should be dead - raise WorkflowStopped(workflow) + raise WorkflowStopped(workflow) from None host = contact[ContactFileFields.HOST] host = get_fqdn_by_host(host) @@ -179,13 +179,13 @@ def _socket_bind(self, min_port, max_port, srv_prv_key_loc=None): f"Failed to find server's public " f"key in " f"{srv_prv_key_info.full_key_path}." - ) + ) from None except OSError: raise ServiceFileError( f"IO error opening server's private " f"key from " f"{srv_prv_key_info.full_key_path}." - ) + ) from None if server_private_key is None: # this can't be caught by exception raise ServiceFileError( f"Failed to find server's private " @@ -204,7 +204,9 @@ def _socket_bind(self, min_port, max_port, srv_prv_key_loc=None): self.port = self.socket.bind_to_random_port( 'tcp://*', min_port, max_port) except (zmq.error.ZMQError, zmq.error.ZMQBindError) as exc: - raise CylcError(f'could not start Cylc ZMQ server: {exc}') + raise CylcError( + f'could not start Cylc ZMQ server: {exc}' + ) from None # Keeping srv_public_key_loc as optional arg so as to not break interface def _socket_connect(self, host, port, srv_public_key_loc=None): @@ -237,7 +239,7 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): client_public_key, client_priv_key = zmq.auth.load_certificate( client_priv_key_info.full_key_path) except (OSError, ValueError): - raise ClientError(error_msg) + raise ClientError(error_msg) from None if client_priv_key is None: # this can't be caught by exception raise ClientError(error_msg) self.socket.curve_publickey = client_public_key @@ -256,7 +258,8 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): self.socket.curve_serverkey = server_public_key except (OSError, ValueError): # ValueError raised w/ no public key raise ClientError( - "Failed to load the workflow's public key, so cannot connect.") + "Failed to load the workflow's public key, so cannot connect." + ) from None self.socket.connect(f'tcp://{host}:{port}') diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index e7e26954d56..099ef8bc0ff 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -326,7 +326,7 @@ async def async_request( raise ClientError( error.get('message'), # type: ignore error.get('traceback'), # type: ignore - ) + ) from None def get_header(self) -> dict: """Return "header" data to attach to each request for traceability. diff --git a/cylc/flow/network/multi.py b/cylc/flow/network/multi.py index 2b9ea418976..9c190f68799 100644 --- a/cylc/flow/network/multi.py +++ b/cylc/flow/network/multi.py @@ -235,7 +235,7 @@ def _report( """ try: ret: List[Tuple[Optional[str], Optional[str], bool]] = [] - for _mutation_name, mutation_response in response.items(): + for mutation_response in response.values(): # extract the result of each mutation result in the response success, msg = mutation_response['result'][0]['response'] out = None diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 11eafd8bea5..fc9b67eeef5 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -750,7 +750,7 @@ async def _mutation_mapper( try: meth = COMMANDS[command] except KeyError: - raise ValueError(f"Command '{command}' not found") + raise ValueError(f"Command '{command}' not found") from None try: # Initiate the command. Validation may be performed at this point, diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 70e40232c1d..b962b582b70 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -271,7 +271,7 @@ def field_name_from_type( try: return NODE_MAP[named_type.name] except KeyError: - raise ValueError(f"'{named_type.name}' is not a node type") + raise ValueError(f"'{named_type.name}' is not a node type") from None def get_resolvers(info: 'ResolveInfo') -> 'BaseResolvers': diff --git a/cylc/flow/network/ssh_client.py b/cylc/flow/network/ssh_client.py index d1dd4fb6da4..d2ed0dd33e9 100644 --- a/cylc/flow/network/ssh_client.py +++ b/cylc/flow/network/ssh_client.py @@ -90,7 +90,7 @@ async def async_request( f"Command exceeded the timeout {timeout}s. " "This could be due to network problems. " "Check the workflow log." - ) + ) from None def prepare_command( self, command: str, args: Optional[dict], timeout: Union[float, str] diff --git a/cylc/flow/param_expand.py b/cylc/flow/param_expand.py index 0707a46e1a3..6ab104404e4 100644 --- a/cylc/flow/param_expand.py +++ b/cylc/flow/param_expand.py @@ -195,8 +195,9 @@ def _expand_name(self, results, tmpl, params, spec_vals=None): try: results.append((tmpl % current_values, current_values)) except KeyError as exc: - raise ParamExpandError('parameter %s is not ' - 'defined.' % str(exc.args[0])) + raise ParamExpandError( + 'parameter %s is not ' 'defined.' % str(exc.args[0]) + ) from None else: for param_val in params[0][1]: spec_vals[params[0][0]] = param_val @@ -306,8 +307,8 @@ def expand_parent_params(self, parent, param_values, origin): used[item] = param_values[item] except KeyError: raise ParamExpandError( - "parameter '%s' undefined in '%s'" % ( - item, origin)) + "parameter '%s' undefined in '%s'" % (item, origin) + ) from None # For each parameter substitute the param_tmpl_cfg. tmpl = tmpl.format(**self.param_tmpl_cfg) @@ -425,8 +426,9 @@ def _expand_graph(self, line, all_params, try: repl = tmpl % param_values except KeyError as exc: - raise ParamExpandError('parameter %s is not ' - 'defined.' % str(exc.args[0])) + raise ParamExpandError( + 'parameter %s is not ' 'defined.' % str(exc.args[0]) + ) from None line = line.replace('<' + p_group + '>', repl) if line: line_set.add(line) diff --git a/cylc/flow/parsec/config.py b/cylc/flow/parsec/config.py index 19f937d8e5b..29944c03b30 100644 --- a/cylc/flow/parsec/config.py +++ b/cylc/flow/parsec/config.py @@ -150,10 +150,12 @@ def get(self, keys: Optional[Iterable[str]] = None, sparse: bool = False): # setting not present in __MANY__ section: key in self.spec.get(*parents) ): - raise ItemNotFoundError(itemstr(parents, key)) + raise ItemNotFoundError( + itemstr(parents, key) + ) from None raise InvalidConfigError( itemstr(parents, key), self.spec.name - ) + ) from None else: parents.append(key) diff --git a/cylc/flow/parsec/empysupport.py b/cylc/flow/parsec/empysupport.py index b4164894e0f..e3dc5e28df4 100644 --- a/cylc/flow/parsec/empysupport.py +++ b/cylc/flow/parsec/empysupport.py @@ -66,7 +66,7 @@ def empyprocess( raise EmPyError( str(exc), lines={'