Skip to content

Commit

Permalink
implementation logging with line numbers #44 (#55)
Browse files Browse the repository at this point in the history
* switching to lxml
* a way to log that work and is minimally invasive
* all errors for toplevel xml use the new logger now.

---------

Signed-off-by: Christian Henkel <[email protected]>
  • Loading branch information
ct2034 authored Oct 8, 2024
1 parent c8e5f6f commit f8cc7d2
Show file tree
Hide file tree
Showing 26 changed files with 295 additions and 86 deletions.
12 changes: 12 additions & 0 deletions src/as2fm/as2fm_common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from array import array
from typing import MutableSequence, Type, Union, get_args, get_origin

from lxml.etree import _Comment, _Element

"""
Set of basic types that are supported by the Jani language.
Expand Down Expand Up @@ -57,6 +59,16 @@ def remove_namespace(tag: str) -> str:
return tag_wo_ns


def is_comment(element: _Element) -> bool:
"""
Check if an element is a comment.
:param element: The element to check.
:return: True if the element is a comment, False otherwise.
"""
return isinstance(element, _Comment) or "function Comment" in str(element)


def get_default_expression_for_type(field_type: Type[ValidTypes]) -> ValidTypes:
"""Generate a default expression for a field type."""
assert field_type in get_args(ValidTypes), f"Error: Unsupported data type {field_type}."
Expand Down
114 changes: 114 additions & 0 deletions src/as2fm/as2fm_common/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright (c) 2024 - for information on the respective copyright owner
# see the NOTICE file

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Modules that help produce error messages with references to the right line of the (sc)XML file
that caused the error.
"""
import os
from enum import Enum, auto

import lxml.etree

from as2fm.as2fm_common.common import is_comment

# This is the name of an internal attribute that is used to store the filepath of an element.
INTERNAL_FILEPATH_ATTR = "_filepath"


class Severity(Enum):
"""
Enum to represent the severity of the error.
"""

ERROR = auto()
WARNING = auto()
INFO = auto()


def set_filepath_for_all_elements(root: "lxml.etree._Element", filepath: str) -> None:
"""
Set the filepath for all elements in the XML tree.
:param root: The root element
:param filepath: The filepath
"""
# make a relative path for better readability (shorter)
rel_path = os.path.relpath(filepath, os.getcwd())
if not rel_path.startswith("./"):
rel_path = "./" + rel_path
# set the filepath for all elements
for element in root.iter():
try:
element.attrib[INTERNAL_FILEPATH_ATTR] = rel_path
except KeyError as e:
if is_comment(element):
continue
raise e


def _assemble_message(severity: Severity, element: "lxml.etree._Element", message: str) -> str:
"""
Produce an logging message with the line number of the element.
:param severity: The severity of the error
:param element: The element that caused the error
:param message: The message
:return: The message with path and line number
"""
assert hasattr(element, "sourceline"), (
"The element must have a sourceline attribute. This is set by the parser, "
"i. e. when `lxml.etree.ElementTree` is used."
)
assert INTERNAL_FILEPATH_ATTR in element.attrib.keys(), (
"The element must have a filepath attribute. This is set by "
"`as2fm_common.logging.set_filepath_for_all_elements`."
)
severity_initial = severity.name[0]
path = element.attrib[INTERNAL_FILEPATH_ATTR]
return f"{severity_initial} ({path}:{element.sourceline}) {message}"


def error(element: "lxml.etree._Element", message: str) -> str:
"""
Log an error message.
:param element: The element that caused the error
:param message: The message
:return: The message with the line number
"""
return _assemble_message(Severity.ERROR, element, message)


def warn(element: "lxml.etree._Element", message: str) -> str:
"""
Log a warning message.
:param element: The element that caused the warning
:param message: The message
:return: The message with the line number
"""
return _assemble_message(Severity.WARNING, element, message)


def info(element: "lxml.etree._Element", message: str) -> str:
"""
Log an info message.
:param element: The element that caused the info message
:param message: The message
:return: The message with the line number
"""
return _assemble_message(Severity.INFO, element, message)
2 changes: 1 addition & 1 deletion src/as2fm/jani_generator/jani_entries/jani_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(
if not self._transient and self._type in (float, MutableSequence[float]):
print(
f"Warning: Variable {self._name} is not transient and has type float."
"This is not supported by STORM."
" This is not supported by STORM."
)

def name(self):
Expand Down
7 changes: 7 additions & 0 deletions src/as2fm/jani_generator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,10 @@ def main_scxml_to_jani(_args: Optional[Sequence[str]] = None) -> None:
assert len(jani_out_file) > 0, "Output file not provided."

interpret_top_level_xml(main_xml_file, jani_out_file, scxml_out_dir)


if __name__ == "__main__":
# for testing purposes only
import sys

main_scxml_to_jani(sys.argv[1:])
16 changes: 9 additions & 7 deletions src/as2fm/jani_generator/scxml_helpers/scxml_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
Module defining SCXML tags to match against.
"""

import xml.etree.ElementTree as ET
from array import ArrayType
from hashlib import sha256
from typing import Dict, List, Optional, Set, Tuple, Union, get_args

import lxml.etree as ET
from lxml.etree import _Element as Element

from as2fm.as2fm_common.common import check_value_type_compatible, string_to_value, value_to_type
from as2fm.as2fm_common.ecmascript_interpretation import interpret_ecma_script_expr
from as2fm.jani_generator.jani_entries import (
Expand Down Expand Up @@ -68,21 +70,21 @@
ModelTupleType = Tuple[JaniAutomaton, EventsHolder]


def _hash_element(element: Union[ET.Element, ScxmlBase, List[str]]) -> str:
def _hash_element(element: Union[Element, ScxmlBase, List[str]]) -> str:
"""
Hash an ElementTree element.
:param element: The element to hash.
:return: The hash of the element.
"""
if isinstance(element, ET.Element):
s = ET.tostring(element, encoding="unicode", method="xml")
if isinstance(element, Element):
s = ET.tostring(element, encoding="utf-8", method="xml")
elif isinstance(element, ScxmlBase):
s = ET.tostring(element.as_xml(), encoding="unicode", method="xml")
s = ET.tostring(element.as_xml(), encoding="utf-8", method="xml")
elif isinstance(element, list):
s = "/".join(f"{element}")
s = ("/".join(f"{element}")).encode()
else:
raise ValueError(f"Element type {type(element)} not supported.")
return sha256(s.encode()).hexdigest()[:8]
return sha256(s).hexdigest()[:8]


def _interpret_scxml_assign(
Expand Down
58 changes: 39 additions & 19 deletions src/as2fm/jani_generator/scxml_helpers/top_level_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from xml.etree import ElementTree as ET

import lxml.etree as ET

from as2fm.as2fm_common.common import remove_namespace
from as2fm.as2fm_common.logging import error, set_filepath_for_all_elements
from as2fm.jani_generator.ros_helpers.ros_action_handler import RosActionHandler
from as2fm.jani_generator.ros_helpers.ros_communication_handler import (
RosCommunicationHandler,
Expand Down Expand Up @@ -76,11 +78,13 @@ def parse_main_xml(xml_path: str) -> FullModel:
"""
# Used to generate absolute paths of scxml models
folder_of_xml = os.path.dirname(xml_path)
parser_wo_comments = ET.XMLParser(remove_comments=True)
with open(xml_path, "r", encoding="utf-8") as f:
xml = ET.parse(f)
assert (
remove_namespace(xml.getroot().tag) == "convince_mc_tc"
), "The top-level XML element must be convince_mc_tc."
xml = ET.parse(f, parser=parser_wo_comments)
set_filepath_for_all_elements(xml.getroot(), xml_path)
assert remove_namespace(xml.getroot().tag) == "convince_mc_tc", error(
xml.getroot(), "The top-level XML element must be convince_mc_tc."
)
model = FullModel()
for first_level in xml.getroot():
if remove_namespace(first_level.tag) == "mc_parameters":
Expand All @@ -94,7 +98,10 @@ def parse_main_xml(xml_path: str) -> FullModel:
elif remove_namespace(mc_parameter.tag) == "bt_tick_rate":
model.bt_tick_rate = float(mc_parameter.attrib["value"])
else:
raise ValueError(f"Invalid mc_parameter tag: {mc_parameter.tag}")
raise ValueError(
error(mc_parameter, f"Invalid mc_parameter tag: {mc_parameter.tag}")
)
assert model.max_time is not None, error(first_level, "`max_time` must be defined.")
elif remove_namespace(first_level.tag) == "behavior_tree":
for child in first_level:
if remove_namespace(child.tag) == "input":
Expand All @@ -104,24 +111,38 @@ def parse_main_xml(xml_path: str) -> FullModel:
elif child.attrib["type"] == "bt-plugin-ros-scxml":
model.plugins.append(os.path.join(folder_of_xml, child.attrib["src"]))
else:
raise ValueError(f"Invalid input type: {child.attrib['type']}")
raise ValueError(
error(child, f"Invalid input type: {child.attrib['type']}")
)
else:
raise ValueError(f"Invalid behavior_tree tag: {child.tag} != input")
assert model.bt is not None, "A Behavior Tree must be defined."
raise ValueError(
error(child, f"Invalid behavior_tree tag: {child.tag} != input")
)
assert model.bt is not None, error(first_level, "A Behavior Tree must be defined.")
elif remove_namespace(first_level.tag) == "node_models":
for node_model in first_level:
assert remove_namespace(node_model.tag) == "input", "Only input tags are supported."
assert (
node_model.attrib["type"] == "ros-scxml"
), "Only ROS-SCXML node models are supported."
assert remove_namespace(node_model.tag) == "input", error(
node_model, "Only input tags are supported."
)
assert node_model.attrib["type"] == "ros-scxml", error(
node_model, "Only ROS-SCXML node models are supported."
)
model.skills.append(os.path.join(folder_of_xml, node_model.attrib["src"]))
elif remove_namespace(first_level.tag) == "properties":
for property in first_level:
assert remove_namespace(property.tag) == "input", "Only input tags are supported."
assert property.attrib["type"] == "jani", "Only Jani properties are supported."
model.properties.append(os.path.join(folder_of_xml, property.attrib["src"]))
for jani_property in first_level:
assert remove_namespace(jani_property.tag) == "input", error(
jani_property, "Only input tags are supported."
)
assert jani_property.attrib["type"] == "jani", error(
jani_property,
"Only Jani properties are supported, not {jani_property.attrib['type']}.",
)
model.properties.append(os.path.join(folder_of_xml, jani_property.attrib["src"]))
assert len(model.properties) == 1, error(
first_level, "Only exactly one Jani property is supported."
)
else:
raise ValueError(f"Invalid main point tag: {first_level.tag}")
raise ValueError(error(first_level, f"Invalid main point tag: {first_level.tag}"))
return model


Expand Down Expand Up @@ -215,7 +236,6 @@ def interpret_top_level_xml(
)

jani_dict = jani_model.as_dict()
assert len(model.properties) == 1, "Only one property is supported right now."
with open(model.properties[0], "r", encoding="utf-8") as f:
jani_dict["properties"] = json.load(f)["properties"]

Expand Down
2 changes: 1 addition & 1 deletion src/as2fm/scxml_converter/scxml_entries/scxml_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
Base SCXML class, defining the methods all SCXML entries shall implement.
"""

from xml.etree import ElementTree as ET
from lxml import etree as ET


class ScxmlBase:
Expand Down
3 changes: 2 additions & 1 deletion src/as2fm/scxml_converter/scxml_entries/scxml_bt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"""

from typing import Union
from xml.etree import ElementTree as ET

from lxml import etree as ET

from as2fm.scxml_converter.scxml_entries import ScxmlBase
from as2fm.scxml_converter.scxml_entries.utils import is_non_empty_string
Expand Down
7 changes: 5 additions & 2 deletions src/as2fm/scxml_converter/scxml_entries/scxml_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import re
from typing import Any, Optional, Tuple, Union
from xml.etree import ElementTree as ET

from as2fm.as2fm_common.common import is_array_type
from lxml import etree as ET

from as2fm.as2fm_common.common import is_array_type, is_comment
from as2fm.scxml_converter.scxml_entries import BtGetValueInputPort, ScxmlBase
from as2fm.scxml_converter.scxml_entries.bt_utils import BtPortsHandler
from as2fm.scxml_converter.scxml_entries.utils import (
Expand Down Expand Up @@ -80,6 +81,8 @@ def from_xml_tree(xml_tree: ET.Element, comment_above: Optional[str] = None) ->
data_id = get_xml_argument(ScxmlData, xml_tree, "id")
data_type = get_xml_argument(ScxmlData, xml_tree, "type", none_allowed=True)
if data_type is None:
if is_comment(comment_above):
pass
comment_tuple = ScxmlData._interpret_type_from_comment_above(comment_above)
assert comment_tuple is not None, f"Error: SCXML data: type of {data_id} not found."
assert comment_tuple[0] == data_id, (
Expand Down
8 changes: 5 additions & 3 deletions src/as2fm/scxml_converter/scxml_entries/scxml_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
"""

from typing import List, Optional
from xml.etree import ElementTree as ET

from lxml import etree as ET

from as2fm.as2fm_common.common import is_comment
from as2fm.scxml_converter.scxml_entries import ScxmlBase, ScxmlData
from as2fm.scxml_converter.scxml_entries.bt_utils import BtPortsHandler
from as2fm.scxml_converter.scxml_entries.xml_utils import assert_xml_tag_ok
Expand All @@ -43,8 +45,8 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlDataModel":
data_entries = []
prev_xml_comment: Optional[str] = None
for data_entry_xml in xml_tree:
if data_entry_xml.tag is ET.Comment:
prev_xml_comment = data_entry_xml.text
if is_comment(data_entry_xml):
prev_xml_comment = data_entry_xml.text.strip()
else:
data_entries.append(ScxmlData.from_xml_tree(data_entry_xml, prev_xml_comment))
prev_xml_comment = None
Expand Down
Loading

0 comments on commit f8cc7d2

Please sign in to comment.