Skip to content

Commit

Permalink
Apply automatic formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
pre-commit-ci-lite[bot] authored Jan 9, 2025
1 parent fdbe9c0 commit a792337
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class FileHandlingOptions:
@dataclass(kw_only=True)
class IngestionOptions:
dry_run: bool = False
offline_ingestor_executable: str | list[str]= "background_ingestor"
offline_ingestor_executable: str | list[str] = "background_ingestor"
max_offline_ingestors: int = 10
offline_ingestors_wait_time: int = 10
schemas_directory: str = "schemas"
Expand Down
55 changes: 27 additions & 28 deletions src/scicat_online_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@


def dump_message_to_file_if_needed(
*,
logger: logging.Logger,
message_file_path: pathlib.Path,
file_handling_options: FileHandlingOptions,
message: WritingFinished,
*,
logger: logging.Logger,
message_file_path: pathlib.Path,
file_handling_options: FileHandlingOptions,
message: WritingFinished,
) -> None:
"""Dump the message to a file according to the configuration."""
if not file_handling_options.message_to_file:
Expand All @@ -60,21 +60,13 @@ def dump_message_to_file_if_needed(
logger.info("Message file saved")


def _individual_message_commit(
job_id,
message,
consumer,
logger: logging.Logger
):
def _individual_message_commit(job_id, message, consumer, logger: logging.Logger):
logger.info("Executing commit for message with job id %s", job_id)
consumer.commit(message=message)


def _check_offline_ingestors(
offline_ingestors,
consumer,
config,
logger: logging.Logger
offline_ingestors, consumer, config, logger: logging.Logger
) -> int:
logger.info("%s offline ingestors running", len(offline_ingestors))
for job_id, job_item in offline_ingestors.items():
Expand All @@ -88,7 +80,9 @@ def _check_offline_ingestors(
# if background process is successful
# check if we need to commit the individual message
if config.kafka.individual_message_commit:
_individual_message_commit(job_id,job_item["message"], consumer, logger)
_individual_message_commit(
job_id, job_item["message"], consumer, logger
)
else:
logger.error("Offline ingestor error for job id %s", job_id)
logger.info(
Expand Down Expand Up @@ -127,7 +121,7 @@ def main() -> None:
logger.info('Starting the Scicat online Ingestor with the following configuration:')
logger.info(config.to_dict())

with ((handle_daemon_loop_exceptions(logger=logger))):
with handle_daemon_loop_exceptions(logger=logger):
# Kafka consumer
if (consumer := build_consumer(config.kafka, logger)) is None:
raise RuntimeError("Failed to build the Kafka consumer")
Expand Down Expand Up @@ -159,7 +153,9 @@ def main() -> None:
--done-writing-message-file message_file_path
# optional depending on the message_saving_options.message_output
"""
cmd = _pre_executable_offline_ingestor(config.ingestion.offline_ingestor_executable) + [
cmd = _pre_executable_offline_ingestor(
config.ingestion.offline_ingestor_executable
) + [
"-c",
config.config_file,
"--nexus-file",
Expand Down Expand Up @@ -191,19 +187,22 @@ def main() -> None:
else:
logger.info("Checking number of offline ingestor")
offline_ingestor_runnings: int = _check_offline_ingestors(
offline_ingestors,
consumer,
config,
logger)
while offline_ingestor_runnings >= config.ingestion.max_offline_ingestors:
offline_ingestors, consumer, config, logger
)
while (
offline_ingestor_runnings
>= config.ingestion.max_offline_ingestors
):
sleep(config.ingestion.offline_ingestors_wait_time)
offline_ingestor_runnings = _check_offline_ingestors(
offline_ingestors,
consumer,
config,
logger)
offline_ingestors, consumer, config, logger
)

logger.info("Offline ingestors currently running {}".format(offline_ingestor_runnings))
logger.info(
"Offline ingestors currently running {}".format(
offline_ingestor_runnings
)
)
logger.info("Running background ingestor with command above")
proc = subprocess.Popen(cmd) # noqa: S603
# save info about the background process
Expand Down

0 comments on commit a792337

Please sign in to comment.