Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more integration helpers. #24

Merged
merged 5 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ jobs:
- run: python -m pip install -e .
- run: docker-compose version
- run: docker-compose -f tests/docker-compose-file-writer.yml up -d
- run: scicat_ingestor -c resources/config.sample.json --verbose
- run: python tests/_scicat_ingestor.py -c resources/config.sample.json --verbose
- run: docker-compose -f tests/docker-compose-file-writer.yml down
31 changes: 27 additions & 4 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import logging
from collections.abc import Generator
from contextlib import contextmanager

from scicat_configuration import build_main_arg_parser, build_scicat_config
from scicat_kafka import build_consumer
from scicat_kafka import build_consumer, wrdn_messages
from scicat_logging import build_logger


Expand All @@ -15,6 +17,22 @@ def quit(logger: logging.Logger, unexpected: bool = True) -> None:
sys.exit(1 if unexpected else 0)


@contextmanager
def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
"""Exit the program if an exception is raised."""
try:
yield
except KeyboardInterrupt:
logger.info("Received keyboard interrupt.")
quit(logger, unexpected=False)
except Exception as e:
logger.error("An exception occurred: %s", e)
quit(logger, unexpected=True)
else:
logger.error("Loop finished unexpectedly.")
quit(logger, unexpected=True)


def main() -> None:
"""Main entry point of the app."""
arg_parser = build_main_arg_parser()
Expand All @@ -26,6 +44,11 @@ def main() -> None:
logger.info('Starting the Scicat Ingestor with the following configuration:')
logger.info(config.to_dict())

# Kafka consumer
if build_consumer(config.kafka_options, logger) is None:
quit(logger)
with exit_at_exceptions(logger):
# Kafka consumer
if (consumer := build_consumer(config.kafka_options, logger)) is None:
raise RuntimeError("Failed to build the Kafka consumer")

# Receive messages
for message in wrdn_messages(consumer, logger):
logger.info("Processing message: %s", message)
61 changes: 61 additions & 0 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import logging
from collections.abc import Generator

from confluent_kafka import Consumer
from streaming_data_types import deserialise_wrdn
from streaming_data_types.finished_writing_wrdn import (
FILE_IDENTIFIER as WRDN_FILE_IDENTIFIER,
)
from streaming_data_types.finished_writing_wrdn import WritingFinished

from scicat_configuration import kafkaOptions

Expand Down Expand Up @@ -66,3 +72,58 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
else:
logger.info("Kafka consumer successfully instantiated")
return True


def _validate_data_type(message_content: bytes, logger: logging.Logger) -> bool:
logger.info("Data type: %s", (data_type := message_content[4:8]))
if data_type == WRDN_FILE_IDENTIFIER:
logger.info("WRDN message received.")
return True
else:
logger.error("Unexpected data type: %s", data_type)
return False


def _filter_error_encountered(
wrdn_content: WritingFinished, logger: logging.Logger
) -> WritingFinished | None:
"""Filter out messages with the ``error_encountered`` flag set to True."""
if wrdn_content.error_encountered:
logger.error(
"``error_encountered`` flag True. "
"Unable to deserialize message. Skipping the message."
)
return wrdn_content
else:
return None


def _deserialise_wrdn(
message_content: bytes, logger: logging.Logger
) -> WritingFinished | None:
if _validate_data_type(message_content, logger):
logger.info("Deserialising WRDN message")
wrdn_content: WritingFinished = deserialise_wrdn(message_content)
logger.info("Deserialised WRDN message: %.5000s", wrdn_content)
return _filter_error_encountered(wrdn_content, logger)


def wrdn_messages(
consumer: Consumer, logger: logging.Logger
) -> Generator[WritingFinished | None, None, None]:
"""Wait for a WRDN message and yield it.

Yield ``None`` if no message is received or an error is encountered.
"""
while True:
# The decision to proceed or stop will be done by the caller.
message = consumer.poll(timeout=1.0)
if message is None:
logger.info("Received no messages")
yield None
elif message.error():
logger.error("Consumer error: %s", message.error())
yield None
else:
logger.info("Received message.")
yield _deserialise_wrdn(message.value(), logger)
24 changes: 24 additions & 0 deletions tests/_scicat_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Entry point for integration test.
# All system arguments are passed to the ``scicat_ingestor``.


if __name__ == "__main__":
import signal
import subprocess
import sys
from time import sleep

# Run the main function in a subprocess
process = subprocess.Popen(
[
"scicat_ingestor",
*(sys.argv[1:] or ["--verbose", "-c", "resources/config.sample.json"]),
]
)

# Send a SIGINT signal to the process after 5 seconds
sleep(5)
process.send_signal(signal.SIGINT)

# Kill the process after 5 more seconds
sleep(5)