Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve server-client communication error handling #6578

Open
wants to merge 2 commits into
base: 8.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6578.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved handling of any internal errors when executing commands against a running workflow.
70 changes: 56 additions & 14 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import asyncio
import getpass
import json
from typing import Optional, Tuple
from typing import (
TYPE_CHECKING,
Optional,
Tuple,
Union,
)

import zmq
import zmq.asyncio
Expand All @@ -30,34 +35,71 @@
CylcError,
CylcVersionError,
ServiceFileError,
WorkflowStopped
WorkflowStopped,
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.workflow_files import (
ContactFileFields,
KeyType,
KeyOwner,
KeyInfo,
KeyOwner,
KeyType,
get_workflow_srv_dir,
load_contact_file,
get_workflow_srv_dir
)


if TYPE_CHECKING:
# BACK COMPAT: typing_extensions.TypedDict
# FROM: Python 3.7
# TO: Python 3.11
from typing_extensions import TypedDict


API = 5 # cylc API version
MSG_TIMEOUT = "TIMEOUT"

if TYPE_CHECKING:
class ResponseDict(TypedDict, total=False):
"""Structure of server response messages.

def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
try:
return json.dumps(message)
except TypeError as exc:
return json.dumps({'errors': [{'message': str(exc)}]})
Confusingly, has similar format to GraphQL execution result.
But if we change this now we could break compatibility for
issuing commands to/receiving responses from workflows running in
different versions of Cylc 8.
"""
data: object
"""For most Cylc commands that issue GQL mutations, the data field will
look like:
data: {
<mutationName1>: {
result: [
{
id: <workflow/task ID>,
response: [<success_bool>, <message>]
},
...
]
}
}
but this is not 100% consistent unfortunately
"""
error: Union[Exception, str, dict]
"""If an error occurred that could not be handled.
(usually a dict {message: str, traceback?: str}).
"""
user: str
cylc_version: str
"""Server (i.e. running workflow) Cylc version.

Going forward, we include this so we can more easily handle any future
back-compat issues."""


def decode_(message):
"""Convert an encoded message string to JSON with an added 'user' field."""
def load_server_response(message: str) -> 'ResponseDict':
"""Convert a JSON message string to dict with an added 'user' field."""
msg = json.loads(message)
msg['user'] = getpass.getuser() # assume this is the user
if 'user' not in msg:
msg['user'] = getpass.getuser() # assume this is the user
return msg


Expand Down
70 changes: 43 additions & 27 deletions cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Client for workflow runtime API."""

from abc import ABCMeta, abstractmethod
from abc import (
ABCMeta,
abstractmethod,
)
import asyncio
import json
import os
from shutil import which
import socket
import sys
from typing import Any, Optional, Union, Dict
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
Union,
)

import zmq
import zmq.asyncio

from cylc.flow import LOG
from cylc.flow import (
LOG,
__version__ as CYLC_VERSION,
)
from cylc.flow.exceptions import (
ClientError,
ClientTimeout,
Expand All @@ -36,16 +49,17 @@
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.network import (
encode_,
decode_,
ZMQSocketBase,
get_location,
ZMQSocketBase
load_server_response,
)
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.workflow_files import (
detect_old_contact_file,
)
from cylc.flow.workflow_files import detect_old_contact_file


if TYPE_CHECKING:
from cylc.flow.network import ResponseDict


class WorkflowRuntimeClientBase(metaclass=ABCMeta):
Expand Down Expand Up @@ -270,7 +284,7 @@ async def async_request(
args: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
req_meta: Optional[Dict[str, Any]] = None
) -> object:
) -> Union[bytes, object]:
"""Send an asynchronous request using asyncio.

Has the same arguments and return values as ``serial_request``.
Expand All @@ -292,12 +306,12 @@ async def async_request(
if req_meta:
msg['meta'].update(req_meta)
LOG.debug('zmq:send %s', msg)
message = encode_(msg)
message = json.dumps(msg)
self.socket.send_string(message)

# receive response
if self.poller.poll(timeout):
res = await self.socket.recv()
res: bytes = await self.socket.recv()
else:
self.timeout_handler()
raise ClientTimeout(
Expand All @@ -307,26 +321,28 @@ async def async_request(
' --comms-timeout option;'
'\n* or check the workflow log.'
)
LOG.debug('zmq:recv %s', res)

if msg['command'] in PB_METHOD_MAP:
response = {'data': res}
else:
response = decode_(
res.decode() if isinstance(res, bytes) else res
)
LOG.debug('zmq:recv %s', response)
if command in PB_METHOD_MAP:
return res

response: ResponseDict = load_server_response(res.decode())

try:
return response['data']
except KeyError:
error = response.get(
'error',
{'message': f'Received invalid response: {response}'},
)
raise ClientError(
error.get('message'), # type: ignore
error.get('traceback'), # type: ignore
) from None
error = response.get('error')
if not error:
error = (
f"Received invalid response for Cylc {CYLC_VERSION}: "
f"{response}"
)
wflow_cylc_ver = response.get('cylc_version')
if wflow_cylc_ver:
error += (
f"\n(Workflow is running in Cylc {wflow_cylc_ver})"
)
raise ClientError(str(error)) from None

def get_header(self) -> dict:
"""Return "header" data to attach to each request for traceability.
Expand Down
28 changes: 23 additions & 5 deletions cylc/flow/network/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@

import asyncio
import sys
from typing import Callable, Dict, List, Tuple, Optional, Union, Type
from typing import (
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
)

from ansimarkup import ansiprint

from cylc.flow.async_util import unordered_map
from cylc.flow.exceptions import CylcError, WorkflowStopped
from cylc.flow.exceptions import (
CylcError,
WorkflowStopped,
)
import cylc.flow.flags
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.terminal import DIM
Expand Down Expand Up @@ -220,21 +231,28 @@


def _report(
response: dict,
response: Union[dict, list],
) -> Tuple[Optional[str], Optional[str], bool]:
"""Report the result of a GraphQL operation.

This analyses GraphQL mutation responses to determine the outcome.

Args:
response: The GraphQL response.
response: The workflow server response (NOT necessarily conforming to
GraphQL execution result spec).

Returns:
(stdout, stderr, outcome)

"""
try:
ret: List[Tuple[Optional[str], Optional[str], bool]] = []
if not isinstance(response, dict):
if isinstance(response, list) and response[0].get('error'):
# If operating on workflow running in older Cylc version,
# may get a error response like [{'error': '...'}]
raise Exception(response)
raise Exception(f"Unexpected response: {response}")

Check warning on line 255 in cylc/flow/network/multi.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/multi.py#L254-L255

Added lines #L254 - L255 were not covered by tests
for mutation_response in response.values():
# extract the result of each mutation result in the response
success, msg = mutation_response['result'][0]['response']
Expand Down Expand Up @@ -268,7 +286,7 @@
# response returned is not in the expected format - this shouldn't
# happen but we need to protect against it
err_msg = ''
if cylc.flow.flags.verbosity > 1: # debug mode
if cylc.flow.flags.verbosity > 0: # verbose mode
# print the full result to stderr
err_msg += f'\n <{DIM}>response={response}</{DIM}>'
return (
Expand Down
54 changes: 34 additions & 20 deletions cylc/flow/network/replier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Server for workflow runtime API."""

import json
from queue import Queue
from typing import TYPE_CHECKING, Optional
from typing import (
TYPE_CHECKING,
Optional,
)

import zmq

from cylc.flow import LOG
from cylc.flow.network import encode_, decode_, ZMQSocketBase
from cylc.flow import (
LOG,
__version__ as CYLC_VERSION,
)
from cylc.flow.network import (
ZMQSocketBase,
load_server_response,
)


if TYPE_CHECKING:
from cylc.flow.network import ResponseDict
from cylc.flow.network.server import WorkflowRuntimeServer


Expand Down Expand Up @@ -69,7 +81,7 @@ def _bespoke_stop(self) -> None:
LOG.debug('stopping zmq replier...')
self.queue.put('STOP')

def listener(self):
def listener(self) -> None:
"""The server main loop, listen for and serve requests.

When called, this method will receive and respond until there are no
Expand All @@ -90,7 +102,9 @@ def listener(self):

try:
# Check for messages
msg = self.socket.recv_string(zmq.NOBLOCK)
msg = self.socket.recv_string( # type: ignore[union-attr]
zmq.NOBLOCK
)
except zmq.error.Again:
# No messages, break to parent loop/caller.
break
Expand All @@ -99,27 +113,27 @@ def listener(self):
continue
# attempt to decode the message, authenticating the user in the
# process
res: ResponseDict
response: bytes
try:
message = decode_(msg)
message = load_server_response(msg)
except Exception as exc: # purposefully catch generic exception
# failed to decode message, possibly resulting from failed
# authentication
LOG.exception('failed to decode message: "%s"', exc)
import traceback
response = encode_(
{
'error': {
'message': 'failed to decode message: "%s"' % msg,
'traceback': traceback.format_exc(),
}
}
).encode()
LOG.exception(exc)
LOG.error(f'failed to decode message: "{msg}"')
res = {
'error': {'message': str(exc)},
'cylc_version': CYLC_VERSION,
}
response = json.dumps(res).encode()
else:
# success case - serve the request
res = self.server.receiver(message)
data = res.get('data')
# send back the string to bytes response
if isinstance(res.get('data'), bytes):
response = res['data']
if isinstance(data, bytes):
response = data
else:
response = encode_(res).encode()
self.socket.send(response)
response = json.dumps(res).encode()
self.socket.send(response) # type: ignore[union-attr]
Loading
Loading