diff --git a/scxml_converter/src/scxml_converter/scxml_entries/__init__.py b/scxml_converter/src/scxml_converter/scxml_entries/__init__.py index c0faaaad..fb830a3a 100644 --- a/scxml_converter/src/scxml_converter/scxml_entries/__init__.py +++ b/scxml_converter/src/scxml_converter/scxml_entries/__init__.py @@ -2,10 +2,10 @@ from .scxml_param import ScxmlParam # noqa: F401 from .scxml_executable_entries import ScxmlAssign, ScxmlIf, ScxmlSend # noqa: F401 from .scxml_executable_entries import ScxmlExecutableEntry, ScxmlExecutionBody # noqa: F401 -from .scxml_executable_entries import valid_execution_body # noqa: F401 from .scxml_transition import ScxmlTransition # noqa: F401 from .scxml_ros_entries import (RosTimeRate, RosTopicPublisher, RosTopicSubscriber, # noqa: F401 RosRateCallback, RosTopicCallback, RosTopicPublish, # noqa: F401 RosField, ScxmlRosDeclarations) # noqa: F401 +from .utils import execution_body_from_xml, valid_execution_body # noqa: F401 from .scxml_state import ScxmlState # noqa: F401 from .scxml_root import ScxmlRoot # noqa: F401 diff --git a/scxml_converter/src/scxml_converter/scxml_entries/scxml_executable_entries.py b/scxml_converter/src/scxml_converter/scxml_entries/scxml_executable_entries.py index 546097c4..744fea78 100644 --- a/scxml_converter/src/scxml_converter/scxml_entries/scxml_executable_entries.py +++ b/scxml_converter/src/scxml_converter/scxml_entries/scxml_executable_entries.py @@ -140,35 +140,3 @@ def check_validity(self) -> bool: def as_xml(self) -> ET.Element: assert self.check_validity(), "SCXML: found invalid assign object." return ET.Element('assign', {"location": self.name, "expr": self.expr}) - - -# Get the resolved types from the forward references in ScxmlExecutableEntry -_ResolvedScxmlExecutableEntry = \ - tuple(entry._evaluate(globals(), locals(), frozenset()) - for entry in get_args(ScxmlExecutableEntry)) - - -print(_ResolvedScxmlExecutableEntry) - - -def valid_execution_body(execution_body: ScxmlExecutionBody) -> bool: - """ - Check if an execution body is valid. - - :param execution_body: The execution body to check - :return: True if the execution body is valid, False otherwise - """ - valid = isinstance(execution_body, list) - if not valid: - print("Error: SCXML execution body: invalid type found: expected a list.") - for entry in execution_body: - if not isinstance(entry, _ResolvedScxmlExecutableEntry): - valid = False - print(f"Error: SCXML execution body: entry type {type(entry)} not in valid set " - f" {_ResolvedScxmlExecutableEntry}.") - break - if not entry.check_validity(): - valid = False - print("Error: SCXML execution body: invalid entry content found.") - break - return valid diff --git a/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py b/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py index ea2bb4bd..846b1594 100644 --- a/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py +++ b/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py @@ -50,7 +50,7 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlRoot": # Data Model datamodel_elements = xml_tree.findall(ScxmlDataModel.get_tag_name()) assert datamodel_elements is None or len(datamodel_elements) == 1, \ - "Error: SCXML root: multiple datamodels found in input xml." + "Error: SCXML root: multiple datamodels found in input xml, up to 1 are allowed." # ROS Declaration ros_time_rates = xml_tree.findall(RosTimeRate.get_tag_name()) ros_topic_subs = xml_tree.findall(RosTopicSubscriber.get_tag_name()) diff --git a/scxml_converter/src/scxml_converter/scxml_entries/scxml_state.py b/scxml_converter/src/scxml_converter/scxml_entries/scxml_state.py index bfc92088..e017653f 100644 --- a/scxml_converter/src/scxml_converter/scxml_entries/scxml_state.py +++ b/scxml_converter/src/scxml_converter/scxml_entries/scxml_state.py @@ -20,8 +20,8 @@ from typing import List, Optional from xml.etree import ElementTree as ET -from scxml_converter.scxml_entries import (ScxmlExecutableEntry, - ScxmlExecutionBody, ScxmlTransition, +from scxml_converter.scxml_entries import (ScxmlExecutableEntry, ScxmlExecutionBody, + ScxmlTransition, execution_body_from_xml, valid_execution_body) @@ -44,7 +44,28 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlState": """Create a ScxmlState object from an XML tree.""" assert xml_tree.tag == ScxmlState.get_tag_name(), \ f"Error: SCXML state: XML tag name is not {ScxmlState.get_tag_name()}." - # TODO + id = xml_tree.attrib.get("id") + assert id is not None and len(id) > 0, "Error: SCXML state: id is not valid." + scxml_state = ScxmlState(id) + # Get the onentry and onexit execution bodies + on_entry = xml_tree.findall("onentry") + assert on_entry is None or len(on_entry) == 1, \ + "Error: SCXML state: multiple onentry tags found, up to 1 allowed." + on_exit = xml_tree.findall("onexit") + assert on_exit is None or len(on_exit) == 1, \ + "Error: SCXML state: multiple onexit tags found, up to 1 allowed." + if on_entry is not None: + for exec_entry in execution_body_from_xml(on_entry[0]): + scxml_state.append_on_entry(exec_entry) + if on_exit is not None: + for exec_entry in execution_body_from_xml(on_exit[0]): + scxml_state.append_on_exit(exec_entry) + # Get the transitions in the state body + transitions_xml = xml_tree.findall(ScxmlTransition.get_tag_name()) + if transitions_xml is not None: + for transition_xml in transitions_xml: + scxml_state.add_transition(ScxmlTransition.from_xml_tree(transition_xml)) + return scxml_state def get_id(self) -> str: return self._id diff --git a/scxml_converter/src/scxml_converter/scxml_entries/scxml_transition.py b/scxml_converter/src/scxml_converter/scxml_entries/scxml_transition.py index ef204b16..dc361877 100644 --- a/scxml_converter/src/scxml_converter/scxml_entries/scxml_transition.py +++ b/scxml_converter/src/scxml_converter/scxml_entries/scxml_transition.py @@ -52,6 +52,12 @@ def __init__(self, self._events = events self._condition = condition + def get_tag_name() -> str: + return "transition" + + def from_xml_tree(xml_tree: ET.Element) -> "ScxmlTransition": + raise NotImplementedError("Not implemented yet.") + def add_event(self, event: str): if self._events is None: self._events = [] @@ -85,7 +91,7 @@ def check_validity(self) -> bool: def as_xml(self) -> ET.Element: assert self.check_validity(), "SCXML: found invalid transition." - xml_transition = ET.Element('transition', {"target": self._target}) + xml_transition = ET.Element(ScxmlTransition.get_tag_name(), {"target": self._target}) if self._events is not None: xml_transition.set("event", " ".join(self._events)) if self._condition is not None: diff --git a/scxml_converter/src/scxml_converter/scxml_entries/utils.py b/scxml_converter/src/scxml_converter/scxml_entries/utils.py new file mode 100644 index 00000000..80bbb9df --- /dev/null +++ b/scxml_converter/src/scxml_converter/scxml_entries/utils.py @@ -0,0 +1,37 @@ +# Get the resolved types from the forward references in ScxmlExecutableEntry +_ResolvedScxmlExecutableEntry = \ + tuple(entry._evaluate(globals(), locals(), frozenset()) + for entry in get_args(ScxmlExecutableEntry)) + + +def valid_execution_body(execution_body: ScxmlExecutionBody) -> bool: + """ + Check if an execution body is valid. + + :param execution_body: The execution body to check + :return: True if the execution body is valid, False otherwise + """ + valid = isinstance(execution_body, list) + if not valid: + print("Error: SCXML execution body: invalid type found: expected a list.") + for entry in execution_body: + if not isinstance(entry, _ResolvedScxmlExecutableEntry): + valid = False + print(f"Error: SCXML execution body: entry type {type(entry)} not in valid set " + f" {_ResolvedScxmlExecutableEntry}.") + break + if not entry.check_validity(): + valid = False + print("Error: SCXML execution body: invalid entry content found.") + break + return valid + + +def execution_body_from_xml(xml_tree: ET.ElementTree) -> ScxmlExecutionBody: + """ + Create an execution body from an XML tree. + + :param xml_tree: The XML tree to create the execution body from + :return: The execution body + """ + raise NotImplementedError("Not implemented yet.")