Skip to content

Commit

Permalink
Merge pull request #97 from SciCatProject/fixes-for-deployment
Browse files Browse the repository at this point in the history
Fixes for deployment
  • Loading branch information
YooSunYoung authored Dec 11, 2024
2 parents 64ea0b8 + 9a593e8 commit 64cd307
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 102 deletions.
1 change: 1 addition & 0 deletions resources/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"topics": "KAFKA_TOPIC_1,KAFKA_TOPIC_2",
"group_id": "GROUP_ID",
"bootstrap_servers": "localhost:9093",
"security_protocol": "sasl_ssl",
"sasl_mechanism": "SCRAM-SHA-256",
"sasl_username": "USERNAME",
"sasl_password": "PASSWORD",
Expand Down
29 changes: 21 additions & 8 deletions src/scicat_configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import argparse
import logging
from collections.abc import Mapping
from dataclasses import asdict, dataclass, field, is_dataclass
from functools import partial
Expand Down Expand Up @@ -190,6 +191,8 @@ class KafkaOptions:
"""Kafka consumer group ID."""
bootstrap_servers: str = "localhost:9093"
"""List of Kafka bootstrap servers. Multiple servers can be separated by commas."""
security_protocol: str = "sasl_ssl"
"""Security protocol """
sasl_mechanism: str = "SCRAM-SHA-256"
"""Kafka SASL mechanism."""
sasl_username: str = "USERNAME"
Expand Down Expand Up @@ -336,20 +339,33 @@ def to_dict(self) -> dict:
T = TypeVar("T")


def build_dataclass(tp: type[T], data: dict, prefixes: tuple[str, ...] = ()) -> T:
def build_dataclass(
*,
tp: type[T],
data: dict,
prefixes: tuple[str, ...] = (),
logger: logging.Logger | None = None,
strict: bool = False,
) -> T:
type_hints = get_annotations(tp)
if unused_keys := (set(data.keys()) - set(type_hints.keys())):
unused_keys = set(data.keys()) - set(type_hints.keys())
if unused_keys:
# If ``data`` contains unnecessary fields.
unused_keys_repr = "\n\t\t- ".join(
".".join((*prefixes, unused_key)) for unused_key in unused_keys
)
raise ValueError(f"Invalid argument found: \n\t\t- {unused_keys_repr}")
error_message = f"Invalid argument found: \n\t\t- {unused_keys_repr}"
if logger is not None:
logger.warning(error_message)
if strict:
raise ValueError(error_message)
return tp(
**{
key: build_dataclass(sub_tp, value, (*prefixes, key))
key: build_dataclass(tp=sub_tp, data=value, prefixes=(*prefixes, key))
if is_dataclass(sub_tp := type_hints.get(key))
else value
for key, value in data.items()
if key not in unused_keys
}
)

Expand Down Expand Up @@ -385,10 +401,7 @@ def merge_config_and_input_args(

def _validate_config_file(target_type: type[T], config_file: Path) -> T:
config = {**_load_config(config_file), "config_file": config_file.as_posix()}
return build_dataclass(
target_type,
config,
)
return build_dataclass(tp=target_type, data=config, strict=True)


def validate_config_file() -> None:
Expand Down
22 changes: 11 additions & 11 deletions src/scicat_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ def convert_to_type(input_value: Any, dtype_desc: str) -> Any:

_OPERATOR_REGISTRY = MappingProxyType(
{
"DO_NOTHING": lambda value: value,
"join_with_space": lambda value: ", ".join(
"DO_NOTHING": lambda value, recipe: value,
"join_with_space": lambda value, recipe: ", ".join(
ast.literal_eval(value) if isinstance(value, str) else value
),
# "evaluate": lambda value: ast.literal_eval(value),
Expand All @@ -128,12 +128,15 @@ def convert_to_type(input_value: Any, dtype_desc: str) -> Any:
# It is better to use the specific converters for the types.
# However, if it is the only way to go, you can add it here.
# Please add a comment to explain why it is needed.
"filename": lambda value: os.path.basename(value),
"dirname": lambda value: os.path.dirname(value),
"dirname-2": lambda value: os.path.dirname(os.path.dirname(value)),
"getitem": lambda value, key: value[
key
"filename": lambda value, recipe: os.path.basename(value),
"dirname": lambda value, recipe: os.path.dirname(value),
"dirname-2": lambda value, recipe: os.path.dirname(os.path.dirname(value)),
"getitem": lambda value, recipe: value[
recipe.field
], # The only operator that takes an argument
"str-replace": lambda value, recipe: str(value).replace(
recipe.pattern, recipe.replacement
),
}
)

Expand Down Expand Up @@ -190,10 +193,7 @@ def extract_variables_values(
value = variable_recipe.value
value = render_variable_value(value, variable_map)
_operator = _get_operator(variable_recipe.operator)
if variable_recipe.field is not None:
value = _operator(value, variable_recipe.field)
else:
value = _operator(value)
value = _operator(value, variable_recipe)

else:
raise Exception("Invalid variable source: ", source)
Expand Down
38 changes: 19 additions & 19 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def build_consumer(kafka_options: KafkaOptions, logger: logging.Logger) -> Consu
kafka_topics = collect_kafka_topics(kafka_options)
logger.info("Subscribing to the following Kafka topics: %s", kafka_topics)
consumer.subscribe(kafka_topics)
return Consumer(consumer_options)
return consumer


def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
Expand All @@ -75,38 +75,41 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
return True


def _validate_data_type(message_content: bytes, logger: logging.Logger) -> bool:
logger.info("Data type: %s", (data_type := message_content[4:8]))
if data_type == WRDN_FILE_IDENTIFIER:
def _validate_wrdn_message_type(message_content: bytes, logger: logging.Logger) -> bool:
logger.info("Message type: %s", (message_type := message_content[4:8]))
if message_type == WRDN_FILE_IDENTIFIER:
logger.info("WRDN message received.")
return True
else:
logger.error("Unexpected data type: %s", data_type)
logger.info("Message of type %s ignored", message_type)
return False


def _filter_error_encountered(
wrdn_content: WritingFinished, logger: logging.Logger
deserialized_message: WritingFinished, logger: logging.Logger
) -> WritingFinished | None:
"""Filter out messages with the ``error_encountered`` flag set to True."""
if wrdn_content.error_encountered:
if deserialized_message.error_encountered:
logger.error(
"``error_encountered`` flag True. "
"Unable to deserialize message. Skipping the message."
"Unable to deserialize message. ``error_encountered`` is true. Skipping the message."
)
return wrdn_content
else:
return None
else:
logger.info("Message successfully deserialized.")
return deserialized_message


def _deserialise_wrdn(
message_content: bytes, logger: logging.Logger
) -> WritingFinished | None:
if _validate_data_type(message_content, logger):
deserialized_message: WritingFinished | None = None
if _validate_wrdn_message_type(message_content, logger):
logger.info("Deserialising WRDN message")
wrdn_content: WritingFinished = deserialise_wrdn(message_content)
logger.info("Deserialised WRDN message: %.5000s", wrdn_content)
return _filter_error_encountered(wrdn_content, logger)
deserialized_message: WritingFinished = deserialise_wrdn(message_content)
deserialized_message = _filter_error_encountered(deserialized_message, logger)
logger.info("Deserialised WRDN message: %.5000s", deserialized_message)

return deserialized_message


def wrdn_messages(
Expand All @@ -130,10 +133,7 @@ def wrdn_messages(
message_value = message.value()
message_type = message_value[4:8]
logger.info("Received message. Type : %s", message_type)
if message_type == b"wrdn":
yield _deserialise_wrdn(message_value, logger)
else:
yield None
yield _deserialise_wrdn(message_value, logger)


# def compose_message_path(
Expand Down
59 changes: 38 additions & 21 deletions src/scicat_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class ValueMetadataVariable(MetadataSchemaVariable):
operator: str = ""
value: str
field: str | None = None
pattern: str | None = None
replacement: str | None = None
# We only allow one field(argument) for now


Expand Down Expand Up @@ -188,6 +190,40 @@ def collect_schemas(dir_path: pathlib.Path) -> OrderedDict[str, MetadataSchema]:
return schemas


def _select_applicable_schema(
selector: str | dict, filename: str | None = None
) -> bool:
if isinstance(selector, str):
# filename:starts_with:/ess/data/coda
select_target_name, select_function_name, select_argument = selector.split(":")
if select_target_name in ["filename"]:
select_target_value = filename
else:
raise ValueError(f"Invalid target name {select_target_name}")

if select_function_name == "starts_with":
return select_target_value.startswith(select_argument)
else:
raise ValueError(f"Invalid function name {select_function_name}")

elif isinstance(selector, dict):
output = True
for key, conditions in selector.items():
if key == "or":
output = output and any(
_select_applicable_schema(item, filename) for item in conditions
)
elif key == "and":
output = output and all(
_select_applicable_schema(item, filename) for item in conditions
)
else:
raise NotImplementedError("Invalid operator")
return output
else:
raise Exception(f"Invalid type for schema selector {type(selector)}")


def select_applicable_schema(
nexus_file: pathlib.Path,
schemas: OrderedDict[str, MetadataSchema],
Expand All @@ -198,26 +234,7 @@ def select_applicable_schema(
Order of the schemas matters and first schema that is suitable is selected.
"""
for schema in schemas.values():
if isinstance(schema.selector, str):
select_target_name, select_function_name, select_argument = (
schema.selector.split(":")
)
if select_target_name in ["filename"]:
select_target_value = nexus_file.as_posix()
else:
raise ValueError(f"Invalid target name {select_target_name}")

if select_function_name == "starts_with":
if select_target_value.startswith(select_argument):
return schema
else:
raise ValueError(f"Invalid function name {select_function_name}")

elif isinstance(schema.selector, dict):
raise NotImplementedError(
"Dictionary based selector is not implemented yet"
)
else:
raise Exception(f"Invalid type for schema selector {type(schema.selector)}")
if _select_applicable_schema(schema.selector, str(nexus_file)):
return schema

raise Exception("No applicable metadata schema configuration found!!")
11 changes: 7 additions & 4 deletions src/scicat_offline_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from system_helpers import exit, handle_exceptions


def build_offline_config() -> OfflineIngestorConfig:
def build_offline_config(logger: logging.Logger | None = None) -> OfflineIngestorConfig:
arg_parser = build_arg_parser(
OfflineIngestorConfig, mandatory_args=('--config-file',)
)
Expand All @@ -47,7 +47,9 @@ def build_offline_config() -> OfflineIngestorConfig:
# with ``OnlineIngestorConfig``.
del merged_configuration["kafka"]

config = build_dataclass(OfflineIngestorConfig, merged_configuration)
config = build_dataclass(
tp=OfflineIngestorConfig, data=merged_configuration, logger=logger, strict=False
)

return config

Expand Down Expand Up @@ -115,9 +117,10 @@ def _check_if_dataset_exists_by_metadata(

def main() -> None:
"""Main entry point of the app."""
config = build_offline_config()
tmp_config = build_offline_config()
logger = build_logger(tmp_config)
config = build_offline_config(logger=logger)
fh_options = config.ingestion.file_handling
logger = build_logger(config)

# Log the configuration as dictionary so that it is easier to read from the logs
logger.info(
Expand Down
Loading

0 comments on commit 64cd307

Please sign in to comment.