diff --git a/cylc/flow/scripts/tui.py b/cylc/flow/scripts/tui.py index f8f0879a1e6..2d48e649ef2 100644 --- a/cylc/flow/scripts/tui.py +++ b/cylc/flow/scripts/tui.py @@ -61,6 +61,21 @@ def get_option_parser() -> COP: color=False ) + parser.add_option( + '--comms-timeout', + metavar='SEC', + help=( + "Set a timeout for network connections" + " to the running workflow. The default is no timeout." + " For task messaging connections see" + " site/user config file documentation." + ), + action='store', + default=3, + dest='comms_timeout', + type=int, + ) + return parser @@ -76,5 +91,9 @@ def main(_, options: 'Values', workflow_id: Optional[str] = None) -> None: workflow_id = tokens.duplicate(user=getuser()).id # start Tui - with suppress_logging(), TuiApp().main(workflow_id): + with suppress_logging(), TuiApp().main( + workflow_id, + client_timeout=options.comms_timeout, + ): + # tui stops according to user input pass diff --git a/cylc/flow/tui/app.py b/cylc/flow/tui/app.py index fff634ea894..d802a2d5738 100644 --- a/cylc/flow/tui/app.py +++ b/cylc/flow/tui/app.py @@ -200,7 +200,7 @@ def load_child_node(self, key): @contextmanager -def updater_subproc(filters): +def updater_subproc(filters, client_timeout): """Runs the Updater in its own process. The updater provides the data for Tui to render. Running the updater @@ -209,7 +209,7 @@ def updater_subproc(filters): decoupling the application update logic from the data update logic. """ # start the updater - updater = Updater() + updater = Updater(client_timeout=client_timeout) p = Process(target=updater.start, args=(filters,)) try: p.start() @@ -285,7 +285,13 @@ def __init__(self, screen=None): self.filters = get_default_filters() @contextmanager - def main(self, w_id=None, id_filter=None, interactive=True): + def main( + self, + w_id=None, + id_filter=None, + interactive=True, + client_timeout=3, + ): """Start the Tui app. With interactive=False, this does not start the urwid event loop to @@ -299,7 +305,7 @@ def main(self, w_id=None, id_filter=None, interactive=True): """ self.set_initial_filters(w_id, id_filter) - with updater_subproc(self.filters) as updater: + with updater_subproc(self.filters, client_timeout) as updater: self.updater = updater # pre-subscribe to the provided workflow if requested diff --git a/cylc/flow/tui/updater.py b/cylc/flow/tui/updater.py index f9bd0cf2c00..26a8614b1a3 100644 --- a/cylc/flow/tui/updater.py +++ b/cylc/flow/tui/updater.py @@ -93,9 +93,6 @@ class Updater(): """ - # the maximum time to wait for a workflow update - CLIENT_TIMEOUT = 2 - # the interval between workflow listing scans BASE_SCAN_INTERVAL = 20 @@ -105,7 +102,7 @@ class Updater(): # the command signal used to tell the updater to shut down SIGNAL_TERMINATE = 'terminate' - def __init__(self): + def __init__(self, client_timeout=3): # Cylc comms clients for each workflow we're connected to self._clients = {} @@ -124,6 +121,9 @@ def __init__(self): # queue for commands to the updater self._command_queue = Queue() + # the maximum time to wait for a workflow update + self.client_timeout = client_timeout + def subscribe(self, w_id): """Subscribe to updates from a workflow.""" self._command_queue.put((self._subscribe.__name__, w_id)) @@ -269,10 +269,17 @@ async def _update_workflow(self, w_id, client, data): 'id': w_id, 'status': 'stopped', }) - except (CylcError, ZMQError): + except (CylcError, ZMQError) as exc: # something went wrong :( # remove the client on any error, we'll reconnect next time self._clients[w_id] = None + for workflow in data['workflows']: + if workflow['id'] == w_id: + workflow['_tui_data'] = ( + f'Error - {str(exc).splitlines()[0]}' + ) + break + else: # the data arrived, add it to the update workflow_data = workflow_update['workflows'][0] @@ -288,7 +295,7 @@ def _connect(self, data): try: self._clients[w_id] = get_client( Tokens(w_id)['workflow'], - timeout=self.CLIENT_TIMEOUT + timeout=self.client_timeout, ) except WorkflowStopped: for workflow in data['workflows']: @@ -297,7 +304,9 @@ def _connect(self, data): except (ZMQError, ClientError, ClientTimeout) as exc: for workflow in data['workflows']: if workflow['id'] == w_id: - workflow['_tui_data'] = f'Error: {exc}' + workflow['_tui_data'] = ( + f'Error - {str(exc).splitlines()[0]}' + ) break async def _scan(self):