Skip to content

Commit

Permalink
Improve server-client communication error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Jan 28, 2025
1 parent b942907 commit ee3102d
Show file tree
Hide file tree
Showing 12 changed files with 377 additions and 165 deletions.
70 changes: 56 additions & 14 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import asyncio
import getpass
import json
from typing import Optional, Tuple
from typing import (
TYPE_CHECKING,
Optional,
Tuple,
Union,
)

import zmq
import zmq.asyncio
Expand All @@ -30,34 +35,71 @@
CylcError,
CylcVersionError,
ServiceFileError,
WorkflowStopped
WorkflowStopped,
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.workflow_files import (
ContactFileFields,
KeyType,
KeyOwner,
KeyInfo,
KeyOwner,
KeyType,
get_workflow_srv_dir,
load_contact_file,
get_workflow_srv_dir
)


if TYPE_CHECKING:
# BACK COMPAT: typing_extensions.TypedDict
# FROM: Python 3.7
# TO: Python 3.11
from typing_extensions import TypedDict


API = 5 # cylc API version
MSG_TIMEOUT = "TIMEOUT"

if TYPE_CHECKING:
class ResponseDict(TypedDict, total=False):
"""Structure of server response messages.
def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
try:
return json.dumps(message)
except TypeError as exc:
return json.dumps({'errors': [{'message': str(exc)}]})
Confusingly, has similar format to GraphQL execution result.
But if we change this now we could break compatibility for
issuing commands to/receiving responses from workflows running in
different versions of Cylc 8.
"""
data: object
"""For most Cylc commands that issue GQL mutations, the data field will
look like:
data: {
<mutationName1>: {
result: [
{
id: <workflow/task ID>,
response: [<success_bool>, <message>]
},
...
]
}
}
but this is not 100% consistent unfortunately
"""
error: Union[Exception, str, dict]
"""If an error occurred that could not be handled.
(usually a dict {message: str, traceback?: str}).
"""
user: str
cylc_version: str
"""Server (i.e. running workflow) Cylc version.
Going forward, we include this so we can more easily handle any future
back-compat issues."""


def decode_(message):
"""Convert an encoded message string to JSON with an added 'user' field."""
def load_server_response(message: str) -> 'ResponseDict':
"""Convert a JSON message string to dict with an added 'user' field."""
msg = json.loads(message)
msg['user'] = getpass.getuser() # assume this is the user
if 'user' not in msg:
msg['user'] = getpass.getuser() # assume this is the user
return msg


Expand Down
70 changes: 43 additions & 27 deletions cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Client for workflow runtime API."""

from abc import ABCMeta, abstractmethod
from abc import (
ABCMeta,
abstractmethod,
)
import asyncio
import json
import os
from shutil import which
import socket
import sys
from typing import Any, Optional, Union, Dict
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
Union,
)

import zmq
import zmq.asyncio

from cylc.flow import LOG
from cylc.flow import (
LOG,
__version__ as CYLC_VERSION,
)
from cylc.flow.exceptions import (
ClientError,
ClientTimeout,
Expand All @@ -36,16 +49,17 @@
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.network import (
encode_,
decode_,
ZMQSocketBase,
get_location,
ZMQSocketBase
load_server_response,
)
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.workflow_files import (
detect_old_contact_file,
)
from cylc.flow.workflow_files import detect_old_contact_file


if TYPE_CHECKING:
from cylc.flow.network import ResponseDict


class WorkflowRuntimeClientBase(metaclass=ABCMeta):
Expand Down Expand Up @@ -270,7 +284,7 @@ async def async_request(
args: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
req_meta: Optional[Dict[str, Any]] = None
) -> object:
) -> Union[bytes, object]:
"""Send an asynchronous request using asyncio.
Has the same arguments and return values as ``serial_request``.
Expand All @@ -292,12 +306,12 @@ async def async_request(
if req_meta:
msg['meta'].update(req_meta)
LOG.debug('zmq:send %s', msg)
message = encode_(msg)
message = json.dumps(msg)
self.socket.send_string(message)

# receive response
if self.poller.poll(timeout):
res = await self.socket.recv()
res: bytes = await self.socket.recv()
else:
self.timeout_handler()
raise ClientTimeout(
Expand All @@ -307,26 +321,28 @@ async def async_request(
' --comms-timeout option;'
'\n* or check the workflow log.'
)
LOG.debug('zmq:recv %s', res)

if msg['command'] in PB_METHOD_MAP:
response = {'data': res}
else:
response = decode_(
res.decode() if isinstance(res, bytes) else res
)
LOG.debug('zmq:recv %s', response)
if command in PB_METHOD_MAP:
return res

Check warning on line 327 in cylc/flow/network/client.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/client.py#L327

Added line #L327 was not covered by tests

response: ResponseDict = load_server_response(res.decode())

try:
return response['data']
except KeyError:
error = response.get(
'error',
{'message': f'Received invalid response: {response}'},
)
raise ClientError(
error.get('message'), # type: ignore
error.get('traceback'), # type: ignore
) from None
error = response.get('error')

Check warning on line 334 in cylc/flow/network/client.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/client.py#L334

Added line #L334 was not covered by tests
if not error:
error = (

Check warning on line 336 in cylc/flow/network/client.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/client.py#L336

Added line #L336 was not covered by tests
f"Received invalid response for Cylc {CYLC_VERSION}: "
f"{response}"
)
wflow_cylc_ver = response.get('cylc_version')

Check warning on line 340 in cylc/flow/network/client.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/client.py#L340

Added line #L340 was not covered by tests
if wflow_cylc_ver:
error += (

Check warning on line 342 in cylc/flow/network/client.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/client.py#L342

Added line #L342 was not covered by tests
f"\n(Workflow is running in Cylc {wflow_cylc_ver})"
)
raise ClientError(str(error)) from None

Check warning on line 345 in cylc/flow/network/client.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/client.py#L345

Added line #L345 was not covered by tests

def get_header(self) -> dict:
"""Return "header" data to attach to each request for traceability.
Expand Down
28 changes: 23 additions & 5 deletions cylc/flow/network/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@

import asyncio
import sys
from typing import Callable, Dict, List, Tuple, Optional, Union, Type
from typing import (
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
)

from ansimarkup import ansiprint

from cylc.flow.async_util import unordered_map
from cylc.flow.exceptions import CylcError, WorkflowStopped
from cylc.flow.exceptions import (
CylcError,
WorkflowStopped,
)
import cylc.flow.flags
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.terminal import DIM
Expand Down Expand Up @@ -220,21 +231,28 @@ def _process_response(


def _report(
response: dict,
response: Union[dict, list],
) -> Tuple[Optional[str], Optional[str], bool]:
"""Report the result of a GraphQL operation.
This analyses GraphQL mutation responses to determine the outcome.
Args:
response: The GraphQL response.
response: The workflow server response (NOT necessarily conforming to
GraphQL execution result spec).
Returns:
(stdout, stderr, outcome)
"""
try:
ret: List[Tuple[Optional[str], Optional[str], bool]] = []
if not isinstance(response, dict):
if isinstance(response, list) and response[0].get('error'):
# If operating on workflow running in older Cylc version,
# may get a error response like [{'error': '...'}]
raise Exception(response)
raise Exception(f"Unexpected response: {response}")

Check warning on line 255 in cylc/flow/network/multi.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/multi.py#L254-L255

Added lines #L254 - L255 were not covered by tests
for mutation_response in response.values():
# extract the result of each mutation result in the response
success, msg = mutation_response['result'][0]['response']
Expand Down Expand Up @@ -268,7 +286,7 @@ def _report(
# response returned is not in the expected format - this shouldn't
# happen but we need to protect against it
err_msg = ''
if cylc.flow.flags.verbosity > 1: # debug mode
if cylc.flow.flags.verbosity > 0: # verbose mode
# print the full result to stderr
err_msg += f'\n <{DIM}>response={response}</{DIM}>'
return (
Expand Down
54 changes: 34 additions & 20 deletions cylc/flow/network/replier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Server for workflow runtime API."""

import json
from queue import Queue
from typing import TYPE_CHECKING, Optional
from typing import (
TYPE_CHECKING,
Optional,
)

import zmq

from cylc.flow import LOG
from cylc.flow.network import encode_, decode_, ZMQSocketBase
from cylc.flow import (
LOG,
__version__ as CYLC_VERSION,
)
from cylc.flow.network import (
ZMQSocketBase,
load_server_response,
)


if TYPE_CHECKING:
from cylc.flow.network import ResponseDict
from cylc.flow.network.server import WorkflowRuntimeServer


Expand Down Expand Up @@ -69,7 +81,7 @@ def _bespoke_stop(self) -> None:
LOG.debug('stopping zmq replier...')
self.queue.put('STOP')

def listener(self):
def listener(self) -> None:
"""The server main loop, listen for and serve requests.
When called, this method will receive and respond until there are no
Expand All @@ -90,7 +102,9 @@ def listener(self):

try:
# Check for messages
msg = self.socket.recv_string(zmq.NOBLOCK)
msg = self.socket.recv_string( # type: ignore[union-attr]
zmq.NOBLOCK
)
except zmq.error.Again:
# No messages, break to parent loop/caller.
break
Expand All @@ -99,27 +113,27 @@ def listener(self):
continue
# attempt to decode the message, authenticating the user in the
# process
res: ResponseDict
response: bytes
try:
message = decode_(msg)
message = load_server_response(msg)
except Exception as exc: # purposefully catch generic exception
# failed to decode message, possibly resulting from failed
# authentication
LOG.exception('failed to decode message: "%s"', exc)
import traceback
response = encode_(
{
'error': {
'message': 'failed to decode message: "%s"' % msg,
'traceback': traceback.format_exc(),
}
}
).encode()
LOG.exception(exc)
LOG.error(f'failed to decode message: "{msg}"')
res = {

Check warning on line 125 in cylc/flow/network/replier.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/replier.py#L123-L125

Added lines #L123 - L125 were not covered by tests
'error': {'message': str(exc)},
'cylc_version': CYLC_VERSION,
}
response = json.dumps(res).encode()

Check warning on line 129 in cylc/flow/network/replier.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/replier.py#L129

Added line #L129 was not covered by tests
else:
# success case - serve the request
res = self.server.receiver(message)
data = res.get('data')
# send back the string to bytes response
if isinstance(res.get('data'), bytes):
response = res['data']
if isinstance(data, bytes):
response = data

Check warning on line 136 in cylc/flow/network/replier.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/replier.py#L136

Added line #L136 was not covered by tests
else:
response = encode_(res).encode()
self.socket.send(response)
response = json.dumps(res).encode()
self.socket.send(response) # type: ignore[union-attr]
Loading

0 comments on commit ee3102d

Please sign in to comment.