Skip to content

Commit

Permalink
Ensure that UUID is retrieved from database on workflow restart. (#5623)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim authored Aug 3, 2023
1 parent 5da32aa commit 6c3beff
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 25 deletions.
46 changes: 28 additions & 18 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ def __init__(self, id_: str, options: Values) -> None:
workflow=self.workflow,
)
self.id = self.tokens.id
self.uuid_str = str(uuid4())
self.options = options
self.template_vars = get_template_vars(self.options)

Expand All @@ -307,6 +306,7 @@ def __init__(self, id_: str, options: Values) -> None:
pub_d=os.path.join(self.workflow_run_dir, 'log')
)
self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file()

# Map used to track incomplete remote inits for restart
# {install_target: platform}
self.incomplete_ri_map: Dict[str, Dict] = {}
Expand Down Expand Up @@ -394,7 +394,6 @@ async def initialise(self):
self.bad_hosts,
self.reset_inactivity_timer
)
self.task_events_mgr.uuid_str = self.uuid_str

self.task_job_mgr = TaskJobManager(
self.workflow,
Expand All @@ -404,11 +403,10 @@ async def initialise(self):
self.data_store_mgr,
self.bad_hosts
)
self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str

self.profiler = Profiler(self, self.options.profile_mode)

async def configure(self):
async def configure(self, params):
"""Configure the scheduler.
* Load the flow configuration.
Expand All @@ -426,7 +424,7 @@ async def configure(self):
self._check_startup_opts()

if self.is_restart:
self.load_workflow_params_and_tmpl_vars()
self._set_workflow_params(params)

self.profiler.log_memory("scheduler.py: before load_flow_file")
try:
Expand Down Expand Up @@ -549,15 +547,6 @@ async def configure(self):

self.profiler.log_memory("scheduler.py: end configure")

def load_workflow_params_and_tmpl_vars(self) -> None:
"""Load workflow params and template variables"""
with self.workflow_db_mgr.get_pri_dao() as pri_dao:
# This logic handles lack of initial cycle point in flow.cylc and
# things that can't change on workflow restart/reload.
self._load_workflow_params(pri_dao.select_workflow_params())
pri_dao.select_workflow_template_vars(self._load_template_vars)
pri_dao.execute_queued_items()

def log_start(self) -> None:
"""Log headers, that also get logged on each rollover.
Expand Down Expand Up @@ -696,12 +685,26 @@ async def run_scheduler(self) -> None:
finally:
self.profiler.stop()

def load_workflow_params_and_tmpl_vars(self) -> List[Tuple[str, str]]:
"""Load workflow params and template variables"""
with self.workflow_db_mgr.get_pri_dao() as pri_dao:
# This logic handles lack of initial cycle point in flow.cylc and
# things that can't change on workflow restart/reload.
pri_dao.select_workflow_template_vars(self._load_template_vars)
pri_dao.execute_queued_items()
return list(pri_dao.select_workflow_params())

async def start(self):
"""Run the startup sequence but don't set the main loop running.
Lightweight wrapper for testing convenience.
"""
if self.is_restart:
params = self.load_workflow_params_and_tmpl_vars()
else:
params = []

try:
await self.initialise()

Expand All @@ -716,8 +719,15 @@ async def start(self):
self.server.thread.start()
barrier.wait()

# Get UUID now:
if self.is_restart:
self.uuid_str = dict(params)['uuid_str']
else:
self.uuid_str = str(uuid4())

self._configure_contact()
await self.configure()
await self.configure(params)
self.task_events_mgr.uuid_str = self.uuid_str
except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)

Expand Down Expand Up @@ -1124,7 +1134,7 @@ async def command_reload_workflow(self) -> None:
self.reload_pending = 'applying the new config'
old_tasks = set(self.config.get_task_name_list())
# Things that can't change on workflow reload:
self._load_workflow_params(
self._set_workflow_params(
self.workflow_db_mgr.pri_dao.select_workflow_params()
)
self.apply_new_config(config, is_reload=True)
Expand Down Expand Up @@ -1305,10 +1315,10 @@ def apply_new_config(self, config, is_reload=False):
'CYLC_WORKFLOW_FINAL_CYCLE_POINT': str(self.config.final_point),
})

def _load_workflow_params(
def _set_workflow_params(
self, params: Iterable[Tuple[str, Optional[str]]]
) -> None:
"""Load a row in the "workflow_params" table in a restart/reload.
"""Set workflow params on restart/reload.
This currently includes:
* Initial/Final cycle points.
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class EventData(Enum):
.. deprecated:: 8.0.0
Use 'uuid_str'.
Use 'uuid'.
"""

CyclePoint = 'point'
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ def get_job_conf(
'workflow_name': workflow,
'task_id': itask.identity,
'try_num': itask.get_try_num(),
'uuid_str': self.task_remote_mgr.uuid_str,
'uuid_str': self.task_events_mgr.uuid_str,
'work_d': rtconfig['work sub-directory'],
# this field is populated retrospectively for regular job subs
'logfiles': [],
Expand Down Expand Up @@ -1357,7 +1357,7 @@ def get_simulation_job_conf(self, itask, workflow):
'workflow_name': workflow,
'task_id': itask.identity,
'try_num': itask.get_try_num(),
'uuid_str': self.task_remote_mgr.uuid_str,
'uuid_str': self.task_events_mgr.uuid_str,
'work_d': 'SIMULATION',
# this field is populated retrospectively for regular job subs
'logfiles': [],
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr):
self.remote_command_map = {}
# self.remote_init_map = {(install target): status, ...}
self.remote_init_map = {}
self.uuid_str = None
# This flag is turned on when a host init/select command completes
self.ready = False
self.rsync_includes = None
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class EventData(Enum):
.. deprecated:: 8.0.0
Use "uuid_str".
Use "uuid".
"""

# BACK COMPAT: "suite_url" deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INSERT INTO inheritance VALUES('root','["root"]');
INSERT INTO inheritance VALUES('foo','["foo", "root"]');
CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
INSERT INTO workflow_params VALUES('cylc_version', '8.0.0');
INSERT INTO workflow_params VALUES('uuid_str', 'Something');
CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key));
CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key));
INSERT INTO task_action_timers VALUES('1','foo','"poll_timer"','["tuple", [[99, "running"]]]','[]',0,NULL,NULL);
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ async def myflow(mod_flow, mod_scheduler, mod_one_conf):


def test_module_scoped_fixture(myflow):
"""Ensure the uuid is set on __init__.
"""Ensure the host is set on __init__.
The myflow fixture will be shared between all test functions within this
Python module.
"""
assert myflow.uuid_str
assert myflow.host


async def test_db_select(one, start, db_select):
Expand Down
30 changes: 30 additions & 0 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import asyncio
import logging
from pathlib import Path
import pytest
import re
from typing import Any, Callable

from cylc.flow.exceptions import CylcError
Expand Down Expand Up @@ -296,6 +298,34 @@ def mock_auto_restart(*a, **k):
assert TRACEBACK_MSG in log.text


async def test_uuid_unchanged_on_restart(
one: Scheduler,
scheduler: Callable,
start: Callable,
):
"""Restart gets UUID from Database:
See https://github.com/cylc/cylc-flow/issues/5615
Process:
* Create a scheduler then shut it down.
* Create a new scheduler for the same workflow and check that it has
retrieved the UUID from the Daatabase.
"""
uuid_re = re.compile('CYLC_WORKFLOW_UUID=(.*)')
contact_file = Path(one.workflow_run_dir) / '.service/contact'

async with start(one):
pass

schd = scheduler(one.workflow_name, paused_start=True)
async with start(schd):
# UUID in contact file should be the same as that set in the database
# and the scheduler.
cf_uuid = uuid_re.findall(contact_file.read_text())
assert cf_uuid == [schd.uuid_str]


async def test_restart_timeout(
flow,
one_conf,
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/test_workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from os import unlink
from pathlib import Path
from textwrap import dedent
from uuid import uuid4

import pytest

Expand Down Expand Up @@ -69,6 +70,7 @@ async def workflow(flow, scheduler, one_conf, run_dir):
from collections import namedtuple
Server = namedtuple('Server', ['port', 'pub_port'])
schd.server = Server(1234, pub_port=2345)
schd.uuid_str = str(uuid4())

contact_data = schd.get_contact_data()
contact_file = Path(
Expand Down

0 comments on commit 6c3beff

Please sign in to comment.