diff --git a/resources/config.sample.json b/resources/config.sample.json index 3d6c956..1615a09 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -1,4 +1,6 @@ { + "nexus_file": "", + "done_writing_message_file": "", "config_file": "", "id": "", "dataset": { @@ -17,6 +19,8 @@ "offline_ingestor_executable": [ "background_ingestor" ], + "max_offline_ingestors": 10, + "offline_ingestors_wait_time": 10, "schemas_directory": "schemas", "check_if_dataset_exists_by_pid": true, "check_if_dataset_exists_by_metadata": true, diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index 687fe78..7d9ee18 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -232,6 +232,8 @@ class IngestionOptions: offline_ingestor_executable: list[str] = field( default_factory=default_offline_ingestor_executable ) + max_offline_ingestors: int = 10 + offline_ingestors_wait_time: int = 10 schemas_directory: str = "schemas" check_if_dataset_exists_by_pid: bool = True check_if_dataset_exists_by_metadata: bool = True @@ -309,6 +311,8 @@ class OnlineIngestorConfig: # original_dict: Mapping """Original configuration dictionary in the json file.""" + nexus_file: str = "" + done_writing_message_file: str = "" config_file: str id: str = "" dataset: DatasetOptions = field(default_factory=DatasetOptions) diff --git a/src/scicat_online_ingestor.py b/src/scicat_online_ingestor.py index fa00314..0ac6723 100644 --- a/src/scicat_online_ingestor.py +++ b/src/scicat_online_ingestor.py @@ -6,6 +6,7 @@ import logging import pathlib import subprocess +from time import sleep try: __version__ = importlib.metadata.version(__package__ or __name__) @@ -59,7 +60,14 @@ def dump_message_to_file_if_needed( logger.info("Message file saved") -def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logger): +def _individual_message_commit(job_id, message, consumer, logger: logging.Logger): + logger.info("Executing commit for message with job id %s", job_id) + consumer.commit(message=message) + + +def _check_offline_ingestors( + offline_ingestors, consumer, config, logger: logging.Logger +) -> int: logger.info("%s offline ingestors running", len(offline_ingestors)) for job_id, job_item in offline_ingestors.items(): result = job_item["proc"].poll() @@ -69,8 +77,12 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) if result == 0: logger.info("Offline ingestor successful for job id %s", job_id) - logger.info("Executing commit for message with job id %s", job_id) - consumer.commit(message=job_item["message"]) + # if background process is successful + # check if we need to commit the individual message + if config.kafka.individual_message_commit: + _individual_message_commit( + job_id, job_item["message"], consumer, logger + ) else: logger.error("Offline ingestor error for job id %s", job_id) logger.info( @@ -78,6 +90,8 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) offline_ingestors.pop(job_id) + return len(offline_ingestors) + def build_online_config(logger: logging.Logger | None = None) -> OnlineIngestorConfig: arg_parser = build_arg_parser( @@ -166,16 +180,28 @@ def main() -> None: if config.ingestion.dry_run: logger.info("Dry run mode enabled. Skipping background ingestor.") else: + logger.info("Checking number of offline ingestor") + offline_ingestor_runnings: int = _check_offline_ingestors( + offline_ingestors, consumer, config, logger + ) + while ( + offline_ingestor_runnings + >= config.ingestion.max_offline_ingestors + ): + sleep(config.ingestion.offline_ingestors_wait_time) + offline_ingestor_runnings = _check_offline_ingestors( + offline_ingestors, consumer, config, logger + ) + + logger.info( + "Offline ingestors currently running %s", + offline_ingestor_runnings, + ) logger.info("Running background ingestor with command above") proc = subprocess.Popen(cmd) # noqa: S603 # save info about the background process offline_ingestors[job_id] = {"proc": proc, "message": message} - # if background process is successful - # check if we need to commit the individual message - if config.kafka.individual_message_commit: - _individual_message_commit(offline_ingestors, consumer, logger) - if __name__ == "__main__": main()