From fdbe9c00f3355242fbb970bbfcac715244ce6e8e Mon Sep 17 00:00:00 2001 From: Max Novelli Date: Thu, 9 Jan 2025 17:11:06 +0100 Subject: [PATCH 1/2] added check on number of offline ingestors and relevant configuration --- resources/config.sample.json | 6 ++++- src/scicat_configuration.py | 2 ++ src/scicat_online_ingestor.py | 46 +++++++++++++++++++++++++++++------ 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/resources/config.sample.json b/resources/config.sample.json index ff2bb55..fb40ea3 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -1,4 +1,6 @@ { + "nexus_file" : "", + "done_writing_message_file": "", "config_file": "", "id": "", "dataset": { @@ -15,6 +17,8 @@ "ingestion": { "dry_run": false, "offline_ingestor_executable": "background_ingestor", + "max_offline_ingestors": 10, + "offline_ingestors_wait_time": 10, "schemas_directory": "schemas", "check_if_dataset_exists_by_pid": true, "check_if_dataset_exists_by_metadata": true, @@ -32,7 +36,7 @@ } }, "kafka": { - "topics": "KAFKA_TOPIC_1,KAFKA_TOPIC_2", + "topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"], "group_id": "GROUP_ID", "bootstrap_servers": "localhost:9093", "security_protocol": "sasl_ssl", diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index 9ae9381..39bf726 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -226,6 +226,8 @@ class FileHandlingOptions: class IngestionOptions: dry_run: bool = False offline_ingestor_executable: str | list[str]= "background_ingestor" + max_offline_ingestors: int = 10 + offline_ingestors_wait_time: int = 10 schemas_directory: str = "schemas" check_if_dataset_exists_by_pid: bool = True check_if_dataset_exists_by_metadata: bool = True diff --git a/src/scicat_online_ingestor.py b/src/scicat_online_ingestor.py index c6ea099..230d74e 100644 --- a/src/scicat_online_ingestor.py +++ b/src/scicat_online_ingestor.py @@ -6,6 +6,7 @@ import logging import pathlib import subprocess +from time import sleep try: __version__ = importlib.metadata.version(__package__ or __name__) @@ -59,7 +60,22 @@ def dump_message_to_file_if_needed( logger.info("Message file saved") -def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logger): +def _individual_message_commit( + job_id, + message, + consumer, + logger: logging.Logger +): + logger.info("Executing commit for message with job id %s", job_id) + consumer.commit(message=message) + + +def _check_offline_ingestors( + offline_ingestors, + consumer, + config, + logger: logging.Logger +) -> int: logger.info("%s offline ingestors running", len(offline_ingestors)) for job_id, job_item in offline_ingestors.items(): result = job_item["proc"].poll() @@ -69,8 +85,10 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) if result == 0: logger.info("Offline ingestor successful for job id %s", job_id) - logger.info("Executing commit for message with job id %s", job_id) - consumer.commit(message=job_item["message"]) + # if background process is successful + # check if we need to commit the individual message + if config.kafka.individual_message_commit: + _individual_message_commit(job_id,job_item["message"], consumer, logger) else: logger.error("Offline ingestor error for job id %s", job_id) logger.info( @@ -78,6 +96,8 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) offline_ingestors.pop(job_id) + return len(offline_ingestors) + def build_online_config(logger: logging.Logger | None = None) -> OnlineIngestorConfig: arg_parser = build_arg_parser( @@ -169,16 +189,26 @@ def main() -> None: if config.ingestion.dry_run: logger.info("Dry run mode enabled. Skipping background ingestor.") else: + logger.info("Checking number of offline ingestor") + offline_ingestor_runnings: int = _check_offline_ingestors( + offline_ingestors, + consumer, + config, + logger) + while offline_ingestor_runnings >= config.ingestion.max_offline_ingestors: + sleep(config.ingestion.offline_ingestors_wait_time) + offline_ingestor_runnings = _check_offline_ingestors( + offline_ingestors, + consumer, + config, + logger) + + logger.info("Offline ingestors currently running {}".format(offline_ingestor_runnings)) logger.info("Running background ingestor with command above") proc = subprocess.Popen(cmd) # noqa: S603 # save info about the background process offline_ingestors[job_id] = {"proc": proc, "message": message} - # if background process is successful - # check if we need to commit the individual message - if config.kafka.individual_message_commit: - _individual_message_commit(offline_ingestors, consumer, logger) - if __name__ == "__main__": main() From a792337ff034f7b2568f015e47f19ad63ea192ca Mon Sep 17 00:00:00 2001 From: "pre-commit-ci-lite[bot]" <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com> Date: Thu, 9 Jan 2025 16:15:01 +0000 Subject: [PATCH 2/2] Apply automatic formatting --- src/scicat_configuration.py | 2 +- src/scicat_online_ingestor.py | 55 +++++++++++++++++------------------ 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index 39bf726..a57a282 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -225,7 +225,7 @@ class FileHandlingOptions: @dataclass(kw_only=True) class IngestionOptions: dry_run: bool = False - offline_ingestor_executable: str | list[str]= "background_ingestor" + offline_ingestor_executable: str | list[str] = "background_ingestor" max_offline_ingestors: int = 10 offline_ingestors_wait_time: int = 10 schemas_directory: str = "schemas" diff --git a/src/scicat_online_ingestor.py b/src/scicat_online_ingestor.py index 230d74e..6ea8ff9 100644 --- a/src/scicat_online_ingestor.py +++ b/src/scicat_online_ingestor.py @@ -38,11 +38,11 @@ def dump_message_to_file_if_needed( - *, - logger: logging.Logger, - message_file_path: pathlib.Path, - file_handling_options: FileHandlingOptions, - message: WritingFinished, + *, + logger: logging.Logger, + message_file_path: pathlib.Path, + file_handling_options: FileHandlingOptions, + message: WritingFinished, ) -> None: """Dump the message to a file according to the configuration.""" if not file_handling_options.message_to_file: @@ -60,21 +60,13 @@ def dump_message_to_file_if_needed( logger.info("Message file saved") -def _individual_message_commit( - job_id, - message, - consumer, - logger: logging.Logger -): +def _individual_message_commit(job_id, message, consumer, logger: logging.Logger): logger.info("Executing commit for message with job id %s", job_id) consumer.commit(message=message) def _check_offline_ingestors( - offline_ingestors, - consumer, - config, - logger: logging.Logger + offline_ingestors, consumer, config, logger: logging.Logger ) -> int: logger.info("%s offline ingestors running", len(offline_ingestors)) for job_id, job_item in offline_ingestors.items(): @@ -88,7 +80,9 @@ def _check_offline_ingestors( # if background process is successful # check if we need to commit the individual message if config.kafka.individual_message_commit: - _individual_message_commit(job_id,job_item["message"], consumer, logger) + _individual_message_commit( + job_id, job_item["message"], consumer, logger + ) else: logger.error("Offline ingestor error for job id %s", job_id) logger.info( @@ -127,7 +121,7 @@ def main() -> None: logger.info('Starting the Scicat online Ingestor with the following configuration:') logger.info(config.to_dict()) - with ((handle_daemon_loop_exceptions(logger=logger))): + with handle_daemon_loop_exceptions(logger=logger): # Kafka consumer if (consumer := build_consumer(config.kafka, logger)) is None: raise RuntimeError("Failed to build the Kafka consumer") @@ -159,7 +153,9 @@ def main() -> None: --done-writing-message-file message_file_path # optional depending on the message_saving_options.message_output """ - cmd = _pre_executable_offline_ingestor(config.ingestion.offline_ingestor_executable) + [ + cmd = _pre_executable_offline_ingestor( + config.ingestion.offline_ingestor_executable + ) + [ "-c", config.config_file, "--nexus-file", @@ -191,19 +187,22 @@ def main() -> None: else: logger.info("Checking number of offline ingestor") offline_ingestor_runnings: int = _check_offline_ingestors( - offline_ingestors, - consumer, - config, - logger) - while offline_ingestor_runnings >= config.ingestion.max_offline_ingestors: + offline_ingestors, consumer, config, logger + ) + while ( + offline_ingestor_runnings + >= config.ingestion.max_offline_ingestors + ): sleep(config.ingestion.offline_ingestors_wait_time) offline_ingestor_runnings = _check_offline_ingestors( - offline_ingestors, - consumer, - config, - logger) + offline_ingestors, consumer, config, logger + ) - logger.info("Offline ingestors currently running {}".format(offline_ingestor_runnings)) + logger.info( + "Offline ingestors currently running {}".format( + offline_ingestor_runnings + ) + ) logger.info("Running background ingestor with command above") proc = subprocess.Popen(cmd) # noqa: S603 # save info about the background process