From 4a4cf494b2acfd49754d0677580a38ddf840b98f Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Thu, 23 Feb 2023 15:40:36 +0100 Subject: [PATCH] fix: BrokenPipe error (this time, for real) --- homcc/client/client.py | 44 +++++++++++++++++++++++++++---------- homcc/client/compilation.py | 9 ++++---- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/homcc/client/client.py b/homcc/client/client.py index 2798dce..815bbab 100644 --- a/homcc/client/client.py +++ b/homcc/client/client.py @@ -26,6 +26,7 @@ from homcc.common.errors import ( ClientParsingError, FailedHostNameResolutionError, + HostRefusedConnectionError, RemoteHostsFailure, SlotsExhaustedError, ) @@ -410,7 +411,12 @@ async def __aexit__(self, *_): """disconnect from server and close client socket""" logger.debug("Disconnecting from '%s:%i'.", self.host, self.port) self._writer.close() - await self._writer.wait_closed() + + try: + await self._writer.wait_closed() + except ConnectionError: + # If an error occurs during closing the connection, we can safely ignore it. + pass async def _send(self, message: Message): """send a message to homcc server""" @@ -428,17 +434,32 @@ async def send_argument_message( docker_container: Optional[str], ): """send an argument message to homcc server""" - await self._send( - ArgumentMessage( - args=list(arguments), - cwd=cwd, - dependencies=dependency_dict, - target=target, - schroot_profile=schroot_profile, - docker_container=docker_container, - compression=self.compression, + try: + await self._send( + ArgumentMessage( + args=list(arguments), + cwd=cwd, + dependencies=dependency_dict, + target=target, + schroot_profile=schroot_profile, + docker_container=docker_container, + compression=self.compression, + ) ) - ) + except ConnectionError as error: + # TODO(o.layer): we have to handle this edge case here, because the server may close the + # connection before the client sends the ArgumentMessage. (and therefore sending will fail) + # In the future, we should make the contract between the server and the client clearer, e.g. + # by defining the time in point / ordering of when to expect ConnectionRefusedMessages. + logger.debug( + "Error occurred when sending ArgumentMessage. The server has probably closed the " + "connection before we could send the ArgumentMessage: %s", + error, + ) + raise HostRefusedConnectionError( + f"Host {self.host}:{self.port} closed the connection, probably due to " + "reaching the compilation limit." + ) from error async def send_dependency_reply_message(self, dependency: str): """send dependency reply message to homcc server""" @@ -452,7 +473,6 @@ def check_timeout(self): async def receive(self) -> Message: """receive data from homcc server and convert it to Message""" - # read stream into internal buffer self._data += await self._reader.read(TCP_BUFFER_SIZE) bytes_needed, parsed_message = Message.from_bytes(bytearray(self._data)) diff --git a/homcc/client/compilation.py b/homcc/client/compilation.py index 350993d..5116ded 100644 --- a/homcc/client/compilation.py +++ b/homcc/client/compilation.py @@ -145,6 +145,7 @@ async def compile_remotely_at( remote_arguments.normalize_compiler() state.set_compile() + await client.send_argument_message( arguments=remote_arguments, cwd=os.getcwd(), @@ -153,17 +154,15 @@ async def compile_remotely_at( schroot_profile=schroot_profile, docker_container=docker_container, ) - - # invert dependency dictionary to access dependencies via hash - dependency_dict = {file_hash: dependency for dependency, file_hash in dependency_dict.items()} - host_response: Message = await client.receive() - if isinstance(host_response, ConnectionRefusedMessage): raise HostRefusedConnectionError( f"Host {client.host}:{client.port} refused the connection:\n{host_response.info}!" ) + # invert dependency dictionary to access dependencies via hash + dependency_dict = {file_hash: dependency for dependency, file_hash in dependency_dict.items()} + # provide requested dependencies while isinstance(host_response, DependencyRequestMessage): requested_dependency: str = dependency_dict[host_response.get_sha1sum()]