Skip to content

Commit

Permalink
Cached timeout time for sim tasks on the sim_modes object.
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Jan 19, 2024
1 parent a2f9982 commit f2e9635
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 33 deletions.
43 changes: 26 additions & 17 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ class ModeSettings:
simulated_run_length: float = 0.0
sim_task_fails: bool = False

def __init__(self, itask: 'TaskProxy', broadcast_mgr: 'BroadcastMgr'):
def __init__(
self,
itask: 'TaskProxy',
broadcast_mgr: 'BroadcastMgr',
db_mgr: 'WorkflowDatabaseManager' = None
):
overrides = broadcast_mgr.get_broadcast(itask.tokens)
if overrides:
rtconfig = pdeepcopy(itask.tdef.rtconfig)
Expand All @@ -68,6 +73,22 @@ def __init__(self, itask: 'TaskProxy', broadcast_mgr: 'BroadcastMgr'):
itask.submit_num
)

# itask.summary['started_time'] and mode_settings.timeout need
# repopulating from the DB on workflow restart:
started_time = itask.summary['started_time']
if started_time is None:
started_time = int(
TimePointParser()
.parse(
db_mgr.pub_dao.select_task_job(
*itask.tokens.relative_id.split("/")
)["time_submit"]
)
.seconds_since_unix_epoch
)
itask.summary['started_time'] = started_time

Check warning on line 89 in cylc/flow/simulation.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/simulation.py#L89

Added line #L89 was not covered by tests

self.timeout = started_time + self.simulated_run_length

def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.
Expand Down Expand Up @@ -207,24 +228,12 @@ def sim_time_check(
):
continue

# Started time and mode_settings are not set on restart:
started_time = itask.summary['started_time']
if started_time is None:
started_time = int(
TimePointParser()
.parse(
db_mgr.pub_dao.select_task_job(
*itask.tokens.relative_id.split("/")
)["time_submit"]
)
.seconds_since_unix_epoch
)
itask.summary['started_time'] = started_time


if itask.mode_settings is None:
itask.mode_settings = ModeSettings(itask, broadcast_mgr)
itask.mode_settings = ModeSettings(itask, broadcast_mgr, db_mgr)

Check warning on line 234 in cylc/flow/simulation.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/simulation.py#L234

Added line #L234 was not covered by tests

timeout = started_time + itask.mode_settings.simulated_run_length
if now > timeout:
if now > itask.mode_settings.timeout:
job_d = itask.tokens.duplicate(job=str(itask.submit_num))
now_str = get_current_time_string()

Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,9 @@ def _set_retry_timers(

def _simulation_submit_task_jobs(self, itasks, workflow):
"""Simulation mode task jobs submission."""
now = time()
for itask in itasks:
itask.summary['started_time'] = now
itask.mode_settings = ModeSettings(
itask, self.task_events_mgr.broadcast_mgr)
itask.waiting_on_job_prep = False
Expand Down
42 changes: 26 additions & 16 deletions tests/integration/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def get_msg_queue_item(queue, id_):
return item


@pytest.fixture
def monkeytime(monkeypatch):
"""Convenience function monkeypatching time."""
def _inner(time_: int):
monkeypatch.setattr('cylc.flow.task_job_mgr.time', lambda: time_)
monkeypatch.setattr('cylc.flow.simulation.time', lambda: time_)
return _inner


@pytest.fixture(scope='module')
async def sim_time_check_setup(
mod_flow, mod_scheduler, mod_start, mod_one_conf,
Expand Down Expand Up @@ -134,11 +143,13 @@ def test_sim_time_check_sets_started_time(
"""
schd, _, msg_q = sim_time_check_setup
one_1066 = schd.pool.get_task(ISO8601Point('1066'), 'one')
# Add info to databse as if it's be started before shutdown:
schd.task_job_mgr._simulation_submit_task_jobs(
[one_1066], schd.workflow)
schd.workflow_db_mgr.process_queued_ops()
one_1066.summary['started_time'] = None
one_1066.state.is_queued = False
one_1066.mode_settings = None
assert one_1066.summary['started_time'] is None
assert sim_time_check(
msg_q, [one_1066], schd.task_events_mgr.broadcast_mgr,
Expand All @@ -147,7 +158,7 @@ def test_sim_time_check_sets_started_time(
assert one_1066.summary['started_time'] is not None


def test_task_finishes(sim_time_check_setup, monkeypatch):
def test_task_finishes(sim_time_check_setup, monkeytime):
"""...and an appropriate message sent.
Checks that failed and bar are output if a task is set to fail.
Expand All @@ -156,7 +167,7 @@ def test_task_finishes(sim_time_check_setup, monkeypatch):
in unit tests.
"""
schd, _, msg_q = sim_time_check_setup
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 0)
monkeytime(0)

# Setup a task to fail, submit it.
fail_all_1066 = schd.pool.get_task(ISO8601Point('1066'), 'fail_all')
Expand All @@ -175,7 +186,7 @@ def test_task_finishes(sim_time_check_setup, monkeypatch):
) is False

# Time's up...
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 12)
monkeytime(12)

# After simulation time is up it Fails and records custom outputs:
assert sim_time_check(
Expand All @@ -184,35 +195,34 @@ def test_task_finishes(sim_time_check_setup, monkeypatch):
assert sorted(i.message for i in msg_q.queue) == ['bar', 'failed']


def test_task_sped_up(sim_time_check_setup, monkeypatch):
def test_task_sped_up(sim_time_check_setup, monkeytime):
"""Task will speed up by a factor set in config."""

schd, _, msg_q = sim_time_check_setup
fast_forward_1066 = schd.pool.get_task(
ISO8601Point('1066'), 'fast_forward')

# Run the job submission method:
monkeytime(0)
schd.task_job_mgr._simulation_submit_task_jobs(
[fast_forward_1066], schd.workflow)

# For the purpose of the test delete the started time set but
# _simulation_submit_task_jobs.
fast_forward_1066.summary['started_time'] = 0
fast_forward_1066.state.is_queued = False

monkeypatch.setattr('cylc.flow.simulation.time', lambda: 0)
assert sim_time_check(
msg_q, [fast_forward_1066], schd.task_events_mgr.broadcast_mgr, ''
) is False
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 29)
monkeytime(29)
assert sim_time_check(
msg_q, [fast_forward_1066], schd.task_events_mgr.broadcast_mgr, ''
) is False
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 31)
monkeytime(31)
assert sim_time_check(
msg_q, [fast_forward_1066], schd.task_events_mgr.broadcast_mgr, ''
) is True


async def test_simulation_mode_settings_restart(
monkeypatch, flow, scheduler, run, start
monkeytime, flow, scheduler, run, start
):
"""Check that simulation mode settings are correctly restored
upon restart.
Expand Down Expand Up @@ -250,7 +260,7 @@ async def test_simulation_mode_settings_restart(
# Submit it, then mock the wallclock and assert that it's not finshed.
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 0)
monkeytime(0)

assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
Expand All @@ -262,7 +272,7 @@ async def test_simulation_mode_settings_restart(
async with start(schd):
# Get our tasks and fix wallclock:
itask = schd.pool.get_tasks()[0]
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 12)
monkeytime(12)
itask.state.status = 'running'

# Check that we haven't got started time back
Expand All @@ -275,14 +285,14 @@ async def test_simulation_mode_settings_restart(
schd.workflow_db_mgr.process_queued_ops()

# Set the current time:
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 12)
monkeytime(12)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is False

# Set the current time > timeout
monkeypatch.setattr('cylc.flow.simulation.time', lambda: 61)
monkeytime(61)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
Expand Down

0 comments on commit f2e9635

Please sign in to comment.