Skip to content

Commit

Permalink
fix: catch all exceptions in forked process
Browse files Browse the repository at this point in the history
If base exception is raised the communication queue will block
in the calling process and will cause timeouts. This is prevalent
in unit test for protobuf serialization in Github Actions. And
potentially can cause problems on running Karapace service.

Regardless if there is still timeout issues the changes here should
provide clearer error for investigation.
  • Loading branch information
jjaakola-aiven committed Jul 24, 2024
1 parent e5a6e41 commit b50817c
Showing 1 changed file with 44 additions and 34 deletions.
78 changes: 44 additions & 34 deletions karapace/protobuf/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,25 @@ def read_data(
return class_instance


_ReaderQueue: TypeAlias = "Queue[dict[object, object] | Exception]"
_ReaderQueue: TypeAlias = "Queue[dict[object, object] | BaseException]"


def reader_process(
queue: _ReaderQueue,
reader_queue: _ReaderQueue,
config: Config,
writer_schema: ProtobufSchema,
reader_schema: ProtobufSchema,
bio: BytesIO,
) -> None:
try:
queue.put(protobuf_to_dict(read_data(config, writer_schema, reader_schema, bio), True))
# todo: This lint ignore does not look reasonable. If it is, reasoning should be
# documented.
except Exception as e: # pylint: disable=broad-except
queue.put(e)
reader_queue.put(protobuf_to_dict(read_data(config, writer_schema, reader_schema, bio), True))
# Reading happens in the forked process, catch is broad so exception will get communicated
# back to calling process.
except BaseException as base_exception: # pylint: disable=broad-except
reader_queue.put(base_exception)


def reader_mp(
def read_in_forked_process_multiprocess_process(
config: Config,
writer_schema: ProtobufSchema,
reader_schema: ProtobufSchema,
Expand All @@ -200,14 +200,18 @@ def reader_mp(
# To avoid problem with enum values for basic SerDe support we
# will isolate work with call protobuf libraries in child process.
if __name__ == "karapace.protobuf.io":
queue: _ReaderQueue = Queue()
p = Process(target=reader_process, args=(queue, config, writer_schema, reader_schema, bio))
reader_queue: _ReaderQueue = Queue()
p = Process(target=reader_process, args=(reader_queue, config, writer_schema, reader_schema, bio))
p.start()
result = queue.get()
p.join()
TEN_SECONDS_WAIT = 10
try:
result = reader_queue.get(True, TEN_SECONDS_WAIT)
finally:
p.join()
reader_queue.close()
if isinstance(result, Dict):
return result
if isinstance(result, Exception):
if isinstance(result, BaseException):
raise result
raise IllegalArgumentException()
return {"Error": "This never must be returned"}
Expand All @@ -233,14 +237,14 @@ def __init__(
def read(self, bio: BytesIO) -> dict:
if self._reader_schema is None:
self._reader_schema = self._writer_schema
return reader_mp(self.config, self._writer_schema, self._reader_schema, bio)
return read_in_forked_process_multiprocess_process(self.config, self._writer_schema, self._reader_schema, bio)


_WriterQueue: TypeAlias = "Queue[bytes | Exception]"
_WriterQueue: TypeAlias = "Queue[bytes | BaseException]"


def writer_process(
queue: _WriterQueue,
writer_queue: _WriterQueue,
config: Config,
writer_schema: ProtobufSchema,
message_name: str,
Expand All @@ -249,18 +253,20 @@ def writer_process(
class_instance = get_protobuf_class_instance(writer_schema, message_name, config)
try:
dict_to_protobuf(class_instance, datum)
# todo: This does not look like a reasonable place to catch any exception,
# especially since we're effectively silencing them.
except Exception:
# pylint: disable=raise-missing-from
e = ProtobufTypeException(writer_schema, datum)
queue.put(e)
raise e
queue.put(class_instance.SerializeToString())


# todo: What is mp? Expand the abbreviation or add an explaining comment.
def writer_mp(
# Writing happens in the forked process, catch is broad so exception will get communicated
# back to calling process.
except Exception as bare_exception: # pylint: disable=broad-exception-caught
try:
raise ProtobufTypeException(writer_schema, datum) from bare_exception
except ProtobufTypeException as protobuf_exception:
writer_queue.put(protobuf_exception)
raise protobuf_exception
except BaseException as base_exception: # pylint: disable=broad-exception-caught
writer_queue.put(base_exception)
writer_queue.put(class_instance.SerializeToString())


def write_in_forked_process(
config: Config,
writer_schema: ProtobufSchema,
message_name: str,
Expand All @@ -274,14 +280,18 @@ def writer_mp(
# To avoid problem with enum values for basic SerDe support we
# will isolate work with call protobuf libraries in child process.
if __name__ == "karapace.protobuf.io":
queue: _WriterQueue = Queue()
p = Process(target=writer_process, args=(queue, config, writer_schema, message_name, datum))
writer_queue: _WriterQueue = Queue(1)
p = Process(target=writer_process, args=(writer_queue, config, writer_schema, message_name, datum))
p.start()
result = queue.get()
p.join()
TEN_SECONDS_WAIT = 10
try:
result = writer_queue.get(True, TEN_SECONDS_WAIT) # Block for ten seconds
finally:
p.join()
writer_queue.close()
if isinstance(result, bytes):
return result
if isinstance(result, Exception):
if isinstance(result, BaseException):
raise result
raise IllegalArgumentException()
raise NotImplementedError("Error: Reached unreachable code")
Expand Down Expand Up @@ -309,4 +319,4 @@ def write_index(self, writer: BytesIO) -> None:
write_indexes(writer, [self._message_index])

def write(self, datum: dict[object, object], writer: BytesIO) -> None:
writer.write(writer_mp(self.config, self._writer_schema, self._message_name, datum))
writer.write(write_in_forked_process(self.config, self._writer_schema, self._message_name, datum))

0 comments on commit b50817c

Please sign in to comment.