From e43bc729ca9515d5e6de9f4da7b894d970655798 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 4 Jul 2024 14:29:18 +1200 Subject: [PATCH] Allow manual trigger when workflow paused. --- cylc/flow/scheduler.py | 7 +++---- cylc/flow/task_queues/independent.py | 13 ++++++++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index ff593648e7b..64ff2e8714b 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1225,8 +1225,7 @@ def release_queued_tasks(self) -> bool: """ if ( - not self.is_paused - and self.stop_mode is None + self.stop_mode is None and self.auto_restart_time is None and self.reload_pending is False ): @@ -1883,7 +1882,7 @@ def pause_workflow(self, msg: Optional[str] = None) -> None: if msg: _msg += f': {msg}' LOG.info(_msg) - self.is_paused = True + self.is_paused = self.pool.task_queue_mgr.is_paused = True self.workflow_db_mgr.put_workflow_paused(True) self.update_data_store() @@ -1905,7 +1904,7 @@ def resume_workflow(self, quiet: bool = False) -> None: return if not quiet: LOG.info("RESUMING the workflow now") - self.is_paused = False + self.is_paused = self.pool.task_queue_mgr.is_paused = False self.workflow_db_mgr.put_workflow_paused(False) self.update_data_store() diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index 185edee4f2b..4b42ab7c2cc 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -40,7 +40,9 @@ def push_task(self, itask: 'TaskProxy') -> None: if itask.tdef.name in self.members: self.deque.appendleft(itask) - def release(self, active: Counter[str]) -> List['TaskProxy']: + def release( + self, active: Counter[str], is_paused: bool = False + ) -> List['TaskProxy']: """Release tasks if below the active limit.""" # The "active" argument counts active tasks by name. released: List['TaskProxy'] = [] @@ -54,7 +56,10 @@ def release(self, active: Counter[str]) -> List['TaskProxy']: except IndexError: # deque empty break - if itask.state.is_held: + if ( + itask.state.is_held or + (is_paused and not itask.is_manual_submit) + ): held.append(itask) else: released.append(itask) @@ -114,6 +119,8 @@ def __init__(self, ) self.force_released: Set['TaskProxy'] = set() + # if paused don't release tasks unless manually triggered + self.is_paused = False def push_task(self, itask: 'TaskProxy') -> None: """Push a task to the appropriate queue.""" @@ -124,7 +131,7 @@ def release_tasks(self, active: Counter[str]) -> List['TaskProxy']: """Release tasks up to the queue limits.""" released: List['TaskProxy'] = [] for queue in self.queues.values(): - released += queue.release(active) + released += queue.release(active, self.is_paused) if self.force_released: released.extend(self.force_released) self.force_released = set()