diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index dfeb8b2..c226d41 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -27,5 +27,5 @@ jobs: - run: python -m pip install -e . - run: docker-compose version - run: docker-compose -f tests/docker-compose-file-writer.yml up -d - - run: scicat_ingestor -c resources/config.sample.json --verbose + - run: python tests/_scicat_ingestor.py -c resources/config.sample.json --verbose - run: docker-compose -f tests/docker-compose-file-writer.yml down diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index 879de0c..72ff38f 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -1,9 +1,11 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import logging +from collections.abc import Generator +from contextlib import contextmanager from scicat_configuration import build_main_arg_parser, build_scicat_config -from scicat_kafka import build_consumer +from scicat_kafka import build_consumer, wrdn_messages from scicat_logging import build_logger @@ -15,6 +17,22 @@ def quit(logger: logging.Logger, unexpected: bool = True) -> None: sys.exit(1 if unexpected else 0) +@contextmanager +def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]: + """Exit the program if an exception is raised.""" + try: + yield + except KeyboardInterrupt: + logger.info("Received keyboard interrupt.") + quit(logger, unexpected=False) + except Exception as e: + logger.error("An exception occurred: %s", e) + quit(logger, unexpected=True) + else: + logger.error("Loop finished unexpectedly.") + quit(logger, unexpected=True) + + def main() -> None: """Main entry point of the app.""" arg_parser = build_main_arg_parser() @@ -26,6 +44,11 @@ def main() -> None: logger.info('Starting the Scicat Ingestor with the following configuration:') logger.info(config.to_dict()) - # Kafka consumer - if build_consumer(config.kafka_options, logger) is None: - quit(logger) + with exit_at_exceptions(logger): + # Kafka consumer + if (consumer := build_consumer(config.kafka_options, logger)) is None: + raise RuntimeError("Failed to build the Kafka consumer") + + # Receive messages + for message in wrdn_messages(consumer, logger): + logger.info("Processing message: %s", message) diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index a832867..26535aa 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -1,8 +1,14 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import logging +from collections.abc import Generator from confluent_kafka import Consumer +from streaming_data_types import deserialise_wrdn +from streaming_data_types.finished_writing_wrdn import ( + FILE_IDENTIFIER as WRDN_FILE_IDENTIFIER, +) +from streaming_data_types.finished_writing_wrdn import WritingFinished from scicat_configuration import kafkaOptions @@ -66,3 +72,58 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool: else: logger.info("Kafka consumer successfully instantiated") return True + + +def _validate_data_type(message_content: bytes, logger: logging.Logger) -> bool: + logger.info("Data type: %s", (data_type := message_content[4:8])) + if data_type == WRDN_FILE_IDENTIFIER: + logger.info("WRDN message received.") + return True + else: + logger.error("Unexpected data type: %s", data_type) + return False + + +def _filter_error_encountered( + wrdn_content: WritingFinished, logger: logging.Logger +) -> WritingFinished | None: + """Filter out messages with the ``error_encountered`` flag set to True.""" + if wrdn_content.error_encountered: + logger.error( + "``error_encountered`` flag True. " + "Unable to deserialize message. Skipping the message." + ) + return wrdn_content + else: + return None + + +def _deserialise_wrdn( + message_content: bytes, logger: logging.Logger +) -> WritingFinished | None: + if _validate_data_type(message_content, logger): + logger.info("Deserialising WRDN message") + wrdn_content: WritingFinished = deserialise_wrdn(message_content) + logger.info("Deserialised WRDN message: %.5000s", wrdn_content) + return _filter_error_encountered(wrdn_content, logger) + + +def wrdn_messages( + consumer: Consumer, logger: logging.Logger +) -> Generator[WritingFinished | None, None, None]: + """Wait for a WRDN message and yield it. + + Yield ``None`` if no message is received or an error is encountered. + """ + while True: + # The decision to proceed or stop will be done by the caller. + message = consumer.poll(timeout=1.0) + if message is None: + logger.info("Received no messages") + yield None + elif message.error(): + logger.error("Consumer error: %s", message.error()) + yield None + else: + logger.info("Received message.") + yield _deserialise_wrdn(message.value(), logger) diff --git a/tests/_scicat_ingestor.py b/tests/_scicat_ingestor.py new file mode 100644 index 0000000..abebc25 --- /dev/null +++ b/tests/_scicat_ingestor.py @@ -0,0 +1,24 @@ +# Entry point for integration test. +# All system arguments are passed to the ``scicat_ingestor``. + + +if __name__ == "__main__": + import signal + import subprocess + import sys + from time import sleep + + # Run the main function in a subprocess + process = subprocess.Popen( + [ + "scicat_ingestor", + *(sys.argv[1:] or ["--verbose", "-c", "resources/config.sample.json"]), + ] + ) + + # Send a SIGINT signal to the process after 5 seconds + sleep(5) + process.send_signal(signal.SIGINT) + + # Kill the process after 5 more seconds + sleep(5)