Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added check on number of offline ingestors and relevant configuration #100

Open
wants to merge 2 commits into
base: fix-offline-command
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion resources/config.sample.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{
"nexus_file" : "",
"done_writing_message_file": "",
"config_file": "",
"id": "",
"dataset": {
Expand All @@ -15,6 +17,8 @@
"ingestion": {
"dry_run": false,
"offline_ingestor_executable": "background_ingestor",
"max_offline_ingestors": 10,
"offline_ingestors_wait_time": 10,
"schemas_directory": "schemas",
"check_if_dataset_exists_by_pid": true,
"check_if_dataset_exists_by_metadata": true,
Expand All @@ -32,7 +36,7 @@
}
},
"kafka": {
"topics": "KAFKA_TOPIC_1,KAFKA_TOPIC_2",
"topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"],
"group_id": "GROUP_ID",
"bootstrap_servers": "localhost:9093",
"security_protocol": "sasl_ssl",
Expand Down
4 changes: 3 additions & 1 deletion src/scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ class FileHandlingOptions:
@dataclass(kw_only=True)
class IngestionOptions:
dry_run: bool = False
offline_ingestor_executable: str | list[str]= "background_ingestor"
offline_ingestor_executable: str | list[str] = "background_ingestor"
max_offline_ingestors: int = 10
offline_ingestors_wait_time: int = 10
schemas_directory: str = "schemas"
check_if_dataset_exists_by_pid: bool = True
check_if_dataset_exists_by_metadata: bool = True
Expand Down
59 changes: 44 additions & 15 deletions src/scicat_online_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import pathlib
import subprocess
from time import sleep

try:
__version__ = importlib.metadata.version(__package__ or __name__)
Expand Down Expand Up @@ -37,11 +38,11 @@


def dump_message_to_file_if_needed(
*,
logger: logging.Logger,
message_file_path: pathlib.Path,
file_handling_options: FileHandlingOptions,
message: WritingFinished,
*,
logger: logging.Logger,
message_file_path: pathlib.Path,
file_handling_options: FileHandlingOptions,
message: WritingFinished,
) -> None:
"""Dump the message to a file according to the configuration."""
if not file_handling_options.message_to_file:
Expand All @@ -59,7 +60,14 @@ def dump_message_to_file_if_needed(
logger.info("Message file saved")


def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logger):
def _individual_message_commit(job_id, message, consumer, logger: logging.Logger):
logger.info("Executing commit for message with job id %s", job_id)
consumer.commit(message=message)


def _check_offline_ingestors(
offline_ingestors, consumer, config, logger: logging.Logger
) -> int:
logger.info("%s offline ingestors running", len(offline_ingestors))
for job_id, job_item in offline_ingestors.items():
result = job_item["proc"].poll()
Expand All @@ -69,15 +77,21 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg
)
if result == 0:
logger.info("Offline ingestor successful for job id %s", job_id)
logger.info("Executing commit for message with job id %s", job_id)
consumer.commit(message=job_item["message"])
# if background process is successful
# check if we need to commit the individual message
if config.kafka.individual_message_commit:
_individual_message_commit(
job_id, job_item["message"], consumer, logger
)
else:
logger.error("Offline ingestor error for job id %s", job_id)
logger.info(
"Removed ingestor for message with job id %s from queue", job_id
)
offline_ingestors.pop(job_id)

return len(offline_ingestors)


def build_online_config(logger: logging.Logger | None = None) -> OnlineIngestorConfig:
arg_parser = build_arg_parser(
Expand Down Expand Up @@ -107,7 +121,7 @@ def main() -> None:
logger.info('Starting the Scicat online Ingestor with the following configuration:')
logger.info(config.to_dict())

with ((handle_daemon_loop_exceptions(logger=logger))):
with handle_daemon_loop_exceptions(logger=logger):
# Kafka consumer
if (consumer := build_consumer(config.kafka, logger)) is None:
raise RuntimeError("Failed to build the Kafka consumer")
Expand Down Expand Up @@ -139,7 +153,9 @@ def main() -> None:
--done-writing-message-file message_file_path
# optional depending on the message_saving_options.message_output
"""
cmd = _pre_executable_offline_ingestor(config.ingestion.offline_ingestor_executable) + [
cmd = _pre_executable_offline_ingestor(
config.ingestion.offline_ingestor_executable
) + [
"-c",
config.config_file,
"--nexus-file",
Expand Down Expand Up @@ -169,16 +185,29 @@ def main() -> None:
if config.ingestion.dry_run:
logger.info("Dry run mode enabled. Skipping background ingestor.")
else:
logger.info("Checking number of offline ingestor")
offline_ingestor_runnings: int = _check_offline_ingestors(
offline_ingestors, consumer, config, logger
)
while (
offline_ingestor_runnings
>= config.ingestion.max_offline_ingestors
):
sleep(config.ingestion.offline_ingestors_wait_time)
offline_ingestor_runnings = _check_offline_ingestors(
offline_ingestors, consumer, config, logger
)

logger.info(
"Offline ingestors currently running {}".format(
offline_ingestor_runnings
)
)
logger.info("Running background ingestor with command above")
proc = subprocess.Popen(cmd) # noqa: S603
# save info about the background process
offline_ingestors[job_id] = {"proc": proc, "message": message}

# if background process is successful
# check if we need to commit the individual message
if config.kafka.individual_message_commit:
_individual_message_commit(offline_ingestors, consumer, logger)


if __name__ == "__main__":
main()
Loading