Skip to content

Commit

Permalink
Merge pull request #5929 from oliver-sanders/tui-timeout
Browse files Browse the repository at this point in the history
tui: handle client timeout more elegantly and make it configurable
  • Loading branch information
oliver-sanders authored Jan 18, 2024
2 parents ddc80b6 + de85ac1 commit c99b7c6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
21 changes: 20 additions & 1 deletion cylc/flow/scripts/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ def get_option_parser() -> COP:
color=False
)

parser.add_option(
'--comms-timeout',
metavar='SEC',
help=(
"Set a timeout for network connections"
" to the running workflow. The default is no timeout."
" For task messaging connections see"
" site/user config file documentation."
),
action='store',
default=3,
dest='comms_timeout',
type=int,
)

return parser


Expand All @@ -76,5 +91,9 @@ def main(_, options: 'Values', workflow_id: Optional[str] = None) -> None:
workflow_id = tokens.duplicate(user=getuser()).id

# start Tui
with suppress_logging(), TuiApp().main(workflow_id):
with suppress_logging(), TuiApp().main(
workflow_id,
client_timeout=options.comms_timeout,
):
# tui stops according to user input
pass
14 changes: 10 additions & 4 deletions cylc/flow/tui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def load_child_node(self, key):


@contextmanager
def updater_subproc(filters):
def updater_subproc(filters, client_timeout):
"""Runs the Updater in its own process.
The updater provides the data for Tui to render. Running the updater
Expand All @@ -209,7 +209,7 @@ def updater_subproc(filters):
decoupling the application update logic from the data update logic.
"""
# start the updater
updater = Updater()
updater = Updater(client_timeout=client_timeout)
p = Process(target=updater.start, args=(filters,))
try:
p.start()
Expand Down Expand Up @@ -285,7 +285,13 @@ def __init__(self, screen=None):
self.filters = get_default_filters()

@contextmanager
def main(self, w_id=None, id_filter=None, interactive=True):
def main(
self,
w_id=None,
id_filter=None,
interactive=True,
client_timeout=3,
):
"""Start the Tui app.
With interactive=False, this does not start the urwid event loop to
Expand All @@ -299,7 +305,7 @@ def main(self, w_id=None, id_filter=None, interactive=True):
"""
self.set_initial_filters(w_id, id_filter)

with updater_subproc(self.filters) as updater:
with updater_subproc(self.filters, client_timeout) as updater:
self.updater = updater

# pre-subscribe to the provided workflow if requested
Expand Down
23 changes: 16 additions & 7 deletions cylc/flow/tui/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ class Updater():
"""

# the maximum time to wait for a workflow update
CLIENT_TIMEOUT = 2

# the interval between workflow listing scans
BASE_SCAN_INTERVAL = 20

Expand All @@ -105,7 +102,7 @@ class Updater():
# the command signal used to tell the updater to shut down
SIGNAL_TERMINATE = 'terminate'

def __init__(self):
def __init__(self, client_timeout=3):
# Cylc comms clients for each workflow we're connected to
self._clients = {}

Expand All @@ -124,6 +121,9 @@ def __init__(self):
# queue for commands to the updater
self._command_queue = Queue()

# the maximum time to wait for a workflow update
self.client_timeout = client_timeout

def subscribe(self, w_id):
"""Subscribe to updates from a workflow."""
self._command_queue.put((self._subscribe.__name__, w_id))
Expand Down Expand Up @@ -269,10 +269,17 @@ async def _update_workflow(self, w_id, client, data):
'id': w_id,
'status': 'stopped',
})
except (CylcError, ZMQError):
except (CylcError, ZMQError) as exc:
# something went wrong :(
# remove the client on any error, we'll reconnect next time
self._clients[w_id] = None
for workflow in data['workflows']:
if workflow['id'] == w_id:
workflow['_tui_data'] = (
f'Error - {str(exc).splitlines()[0]}'
)
break

else:
# the data arrived, add it to the update
workflow_data = workflow_update['workflows'][0]
Expand All @@ -288,7 +295,7 @@ def _connect(self, data):
try:
self._clients[w_id] = get_client(
Tokens(w_id)['workflow'],
timeout=self.CLIENT_TIMEOUT
timeout=self.client_timeout,
)
except WorkflowStopped:
for workflow in data['workflows']:
Expand All @@ -297,7 +304,9 @@ def _connect(self, data):
except (ZMQError, ClientError, ClientTimeout) as exc:
for workflow in data['workflows']:
if workflow['id'] == w_id:
workflow['_tui_data'] = f'Error: {exc}'
workflow['_tui_data'] = (
f'Error - {str(exc).splitlines()[0]}'
)
break

async def _scan(self):
Expand Down

0 comments on commit c99b7c6

Please sign in to comment.