Skip to content

Commit

Permalink
initial draft for formula parsing in multiformatreader
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaspie committed Jan 17, 2025
1 parent 9ca60a3 commit c7cb0cf
Showing 1 changed file with 85 additions and 8 deletions.
93 changes: 85 additions & 8 deletions src/pynxtools/dataconverter/readers/multi/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import re
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np

from pynxtools.dataconverter.readers.base.reader import BaseReader
from pynxtools.dataconverter.readers.utils import (
is_boolean,
Expand All @@ -36,6 +38,59 @@
logger = logging.getLogger("pynxtools")


def evaluate_expression(expression: str, data: Dict[str, Any]) -> Any:
"""
Evaluates a string expression where keys are accessed from a dictionary and transformations are applied.
Args:
expression (str): The string expression to evaluate, e.g., '/data/value + /someothervalue'.
data (Dict[str, Any]): A dictionary where keys are matched to parts of the expression.
Returns:
Any: The result of the evaluated expression.
"""
if not expression:
logger.warning("Empty formula provided.")
return None

# Prepare the safe environment for evaluation
safe_conversions = {
"mean": np.mean,
"min": np.min,
"max": np.max,
# "unit_conversion": ??
}

safe_conversions.update({"__builtins__": {}}) # Disable built-ins for safety

def resolve_key(key: str) -> Any:
"""Resolve a key by removing leading '/' and accessing the dictionary."""
if key.startswith("/"):
key = key.lstrip("/")
if key not in data:
raise KeyError(f"Key '{key}' not found in data.")
return data[key]

# Use regex to replace only keys in the expression
def replace_keys(match: re.Match) -> str:
key = match.group(0)
return f"resolve_key('{key}')"

# Match only valid dictionary keys (not operators or function calls)
pattern = r"(\/[\w\[\]\_\-/]+)" # this is currently not yet working

resolved_expression = re.sub(pattern, replace_keys, expression)

print(resolved_expression) # Debugging output to see the resolved expression

# Evaluate the resolved expression
try:
return eval(resolved_expression, safe_conversions, {"resolve_key": resolve_key})
except Exception as exc:
logger.warning(f"Formula '{expression}' could not be evaluated due to: {exc}")
return None


def fill_wildcard_data_indices(config_file_dict, key, value, dims):
"""
Replaces the wildcard data indices (*) with the respective dimension entries.
Expand Down Expand Up @@ -75,6 +130,7 @@ class ParseJsonCallbacks:
"@link": used for linking (meta)data
"@data": measurement data
"@eln": ELN data not provided within the experiment file
"@formula": To calculate values based on the presence of other (meta)data
Args:
attrs_callback (Callable[[str], Any]):
Expand All @@ -85,6 +141,8 @@ class ParseJsonCallbacks:
The callback to retrieve links under the specified key.
eln_callback (Callable[[str], Any]):
The callback to retrieve eln values under the specified key.
formula_callback (Callable[[str], Any]):
The callback to control formula calculations.
dims (List[str]):
The dimension labels of the data. Defaults to None.
entry_name (str):
Expand All @@ -101,6 +159,7 @@ def __init__(
data_callback: Optional[Callable[[str, str], Any]] = None,
link_callback: Optional[Callable[[str, str], Any]] = None,
eln_callback: Optional[Callable[[str, str], Any]] = None,
formula_callback: Optional[Callable[[str, str], Any]] = None,
dims: Optional[Callable[[str, str], List[str]]] = None,
entry_name: str = "entry",
):
Expand All @@ -109,17 +168,28 @@ def __init__(
"@link": link_callback if link_callback is not None else self.link_callback,
"@data": data_callback if data_callback is not None else self.identity,
"@eln": eln_callback if eln_callback is not None else self.identity,
"@formula": formula_callback
if link_callback is not None
else self.formula_callback,
}

self.dims = dims if dims is not None else lambda *_, **__: []
self.entry_name = entry_name

def link_callback(self, key: str, value: str) -> Dict[str, Any]:
def link_callback(self, _: str, value: str) -> Dict[str, Any]:
"""
Modify links to dictionaries with the correct entry name.
"""
return {"link": value.replace("/entry/", f"/{self.entry_name}/")}

def formula_callback(self, _: str, value: str) -> Dict[str, Any]:
"""
Modify formulas to start with "formula=".
"""
return {
"formula": value.replace("/ENTRY[entry]/", f"/ENTRY[{self.entry_name}]/")
}

def identity(self, _: str, value: str) -> str:
"""
Returns the input value unchanged.
Expand Down Expand Up @@ -230,10 +300,13 @@ def parse_config_value(value: str) -> Tuple[str, Any]:
)

# after filling, resolve links again:
if isinstance(new_entry_dict.get(key), str) and new_entry_dict[key].startswith(
"@link:"
):
new_entry_dict[key] = {"link": new_entry_dict[key][6:]}
if isinstance(new_entry_dict.get(key), str):
if new_entry_dict[key].startswith("@link:"):
new_entry_dict[key] = {"link": new_entry_dict[key][6:]}

if isinstance(new_entry_dict.get(key), dict) and "formula" in new_entry_dict[key]:
if formula := new_entry_dict[key]["formula"]:
new_entry_dict[key] = evaluate_expression(formula, new_entry_dict)


def fill_from_config(
Expand Down Expand Up @@ -263,9 +336,13 @@ def dict_sort_key(keyval: Tuple[str, Any]) -> bool:
Besides, pythons sorted is stable, so this will keep the order of the keys
which have the same sort key.
"""
if isinstance(keyval[1], str):
return not keyval[1].startswith("!")
return True
value = keyval[1]
if isinstance(keyval, str):
if value.startswith(("!@formula:", "@formula:")):
return (2, keyval[0]) # Last
if value.startswith("!"):
return (0, keyval[0]) # First
return (1, keyval[0]) # Middle

if callbacks is None:
# Use default callbacks if none are explicitly provided
Expand Down

0 comments on commit c7cb0cf

Please sign in to comment.