From 6c3beffcd4ea653e9059b238e055a4dcdaed640b Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 3 Aug 2023 11:42:07 +0100 Subject: [PATCH] Ensure that UUID is retrieved from database on workflow restart. (#5623) --- cylc/flow/scheduler.py | 46 +++++++++++-------- cylc/flow/task_events_mgr.py | 2 +- cylc/flow/task_job_mgr.py | 4 +- cylc/flow/task_remote_mgr.py | 1 - cylc/flow/workflow_events.py | 2 +- .../01-job-nn-localhost/db.sqlite3 | 1 + tests/integration/test_examples.py | 4 +- tests/integration/test_scheduler.py | 30 ++++++++++++ tests/integration/test_workflow_files.py | 2 + 9 files changed, 67 insertions(+), 25 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index f73e197ab1b..7449e7d274a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -282,7 +282,6 @@ def __init__(self, id_: str, options: Values) -> None: workflow=self.workflow, ) self.id = self.tokens.id - self.uuid_str = str(uuid4()) self.options = options self.template_vars = get_template_vars(self.options) @@ -307,6 +306,7 @@ def __init__(self, id_: str, options: Values) -> None: pub_d=os.path.join(self.workflow_run_dir, 'log') ) self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file() + # Map used to track incomplete remote inits for restart # {install_target: platform} self.incomplete_ri_map: Dict[str, Dict] = {} @@ -394,7 +394,6 @@ async def initialise(self): self.bad_hosts, self.reset_inactivity_timer ) - self.task_events_mgr.uuid_str = self.uuid_str self.task_job_mgr = TaskJobManager( self.workflow, @@ -404,11 +403,10 @@ async def initialise(self): self.data_store_mgr, self.bad_hosts ) - self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str self.profiler = Profiler(self, self.options.profile_mode) - async def configure(self): + async def configure(self, params): """Configure the scheduler. * Load the flow configuration. @@ -426,7 +424,7 @@ async def configure(self): self._check_startup_opts() if self.is_restart: - self.load_workflow_params_and_tmpl_vars() + self._set_workflow_params(params) self.profiler.log_memory("scheduler.py: before load_flow_file") try: @@ -549,15 +547,6 @@ async def configure(self): self.profiler.log_memory("scheduler.py: end configure") - def load_workflow_params_and_tmpl_vars(self) -> None: - """Load workflow params and template variables""" - with self.workflow_db_mgr.get_pri_dao() as pri_dao: - # This logic handles lack of initial cycle point in flow.cylc and - # things that can't change on workflow restart/reload. - self._load_workflow_params(pri_dao.select_workflow_params()) - pri_dao.select_workflow_template_vars(self._load_template_vars) - pri_dao.execute_queued_items() - def log_start(self) -> None: """Log headers, that also get logged on each rollover. @@ -696,12 +685,26 @@ async def run_scheduler(self) -> None: finally: self.profiler.stop() + def load_workflow_params_and_tmpl_vars(self) -> List[Tuple[str, str]]: + """Load workflow params and template variables""" + with self.workflow_db_mgr.get_pri_dao() as pri_dao: + # This logic handles lack of initial cycle point in flow.cylc and + # things that can't change on workflow restart/reload. + pri_dao.select_workflow_template_vars(self._load_template_vars) + pri_dao.execute_queued_items() + return list(pri_dao.select_workflow_params()) + async def start(self): """Run the startup sequence but don't set the main loop running. Lightweight wrapper for testing convenience. """ + if self.is_restart: + params = self.load_workflow_params_and_tmpl_vars() + else: + params = [] + try: await self.initialise() @@ -716,8 +719,15 @@ async def start(self): self.server.thread.start() barrier.wait() + # Get UUID now: + if self.is_restart: + self.uuid_str = dict(params)['uuid_str'] + else: + self.uuid_str = str(uuid4()) + self._configure_contact() - await self.configure() + await self.configure(params) + self.task_events_mgr.uuid_str = self.uuid_str except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) @@ -1124,7 +1134,7 @@ async def command_reload_workflow(self) -> None: self.reload_pending = 'applying the new config' old_tasks = set(self.config.get_task_name_list()) # Things that can't change on workflow reload: - self._load_workflow_params( + self._set_workflow_params( self.workflow_db_mgr.pri_dao.select_workflow_params() ) self.apply_new_config(config, is_reload=True) @@ -1305,10 +1315,10 @@ def apply_new_config(self, config, is_reload=False): 'CYLC_WORKFLOW_FINAL_CYCLE_POINT': str(self.config.final_point), }) - def _load_workflow_params( + def _set_workflow_params( self, params: Iterable[Tuple[str, Optional[str]]] ) -> None: - """Load a row in the "workflow_params" table in a restart/reload. + """Set workflow params on restart/reload. This currently includes: * Initial/Final cycle points. diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 010181ab3af..852b28f848e 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -186,7 +186,7 @@ class EventData(Enum): .. deprecated:: 8.0.0 - Use 'uuid_str'. + Use 'uuid'. """ CyclePoint = 'point' diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 524ac6d5b93..20ee7379d27 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -1324,7 +1324,7 @@ def get_job_conf( 'workflow_name': workflow, 'task_id': itask.identity, 'try_num': itask.get_try_num(), - 'uuid_str': self.task_remote_mgr.uuid_str, + 'uuid_str': self.task_events_mgr.uuid_str, 'work_d': rtconfig['work sub-directory'], # this field is populated retrospectively for regular job subs 'logfiles': [], @@ -1357,7 +1357,7 @@ def get_simulation_job_conf(self, itask, workflow): 'workflow_name': workflow, 'task_id': itask.identity, 'try_num': itask.get_try_num(), - 'uuid_str': self.task_remote_mgr.uuid_str, + 'uuid_str': self.task_events_mgr.uuid_str, 'work_d': 'SIMULATION', # this field is populated retrospectively for regular job subs 'logfiles': [], diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 780ba70fa07..541bc1265a0 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -103,7 +103,6 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr): self.remote_command_map = {} # self.remote_init_map = {(install target): status, ...} self.remote_init_map = {} - self.uuid_str = None # This flag is turned on when a host init/select command completes self.ready = False self.rsync_includes = None diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py index b7631e1fd9f..63fe8464eca 100644 --- a/cylc/flow/workflow_events.py +++ b/cylc/flow/workflow_events.py @@ -107,7 +107,7 @@ class EventData(Enum): .. deprecated:: 8.0.0 - Use "uuid_str". + Use "uuid". """ # BACK COMPAT: "suite_url" deprecated diff --git a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 index d1138aafba8..d3dcf24f339 100644 --- a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 +++ b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 @@ -7,6 +7,7 @@ INSERT INTO inheritance VALUES('root','["root"]'); INSERT INTO inheritance VALUES('foo','["foo", "root"]'); CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key)); INSERT INTO workflow_params VALUES('cylc_version', '8.0.0'); +INSERT INTO workflow_params VALUES('uuid_str', 'Something'); CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key)); CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key)); INSERT INTO task_action_timers VALUES('1','foo','"poll_timer"','["tuple", [[99, "running"]]]','[]',0,NULL,NULL); diff --git a/tests/integration/test_examples.py b/tests/integration/test_examples.py index cd0fa9f5277..02a1bb0a497 100644 --- a/tests/integration/test_examples.py +++ b/tests/integration/test_examples.py @@ -199,13 +199,13 @@ async def myflow(mod_flow, mod_scheduler, mod_one_conf): def test_module_scoped_fixture(myflow): - """Ensure the uuid is set on __init__. + """Ensure the host is set on __init__. The myflow fixture will be shared between all test functions within this Python module. """ - assert myflow.uuid_str + assert myflow.host async def test_db_select(one, start, db_select): diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 622bd083d09..6befe2b34d1 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -16,7 +16,9 @@ import asyncio import logging +from pathlib import Path import pytest +import re from typing import Any, Callable from cylc.flow.exceptions import CylcError @@ -296,6 +298,34 @@ def mock_auto_restart(*a, **k): assert TRACEBACK_MSG in log.text +async def test_uuid_unchanged_on_restart( + one: Scheduler, + scheduler: Callable, + start: Callable, +): + """Restart gets UUID from Database: + + See https://github.com/cylc/cylc-flow/issues/5615 + + Process: + * Create a scheduler then shut it down. + * Create a new scheduler for the same workflow and check that it has + retrieved the UUID from the Daatabase. + """ + uuid_re = re.compile('CYLC_WORKFLOW_UUID=(.*)') + contact_file = Path(one.workflow_run_dir) / '.service/contact' + + async with start(one): + pass + + schd = scheduler(one.workflow_name, paused_start=True) + async with start(schd): + # UUID in contact file should be the same as that set in the database + # and the scheduler. + cf_uuid = uuid_re.findall(contact_file.read_text()) + assert cf_uuid == [schd.uuid_str] + + async def test_restart_timeout( flow, one_conf, diff --git a/tests/integration/test_workflow_files.py b/tests/integration/test_workflow_files.py index 54d60bea4d0..5b036ef2c11 100644 --- a/tests/integration/test_workflow_files.py +++ b/tests/integration/test_workflow_files.py @@ -19,6 +19,7 @@ from os import unlink from pathlib import Path from textwrap import dedent +from uuid import uuid4 import pytest @@ -69,6 +70,7 @@ async def workflow(flow, scheduler, one_conf, run_dir): from collections import namedtuple Server = namedtuple('Server', ['port', 'pub_port']) schd.server = Server(1234, pub_port=2345) + schd.uuid_str = str(uuid4()) contact_data = schd.get_contact_data() contact_file = Path(