Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Lampacrescia <[email protected]>
  • Loading branch information
MarcoLm993 committed Jun 27, 2024
1 parent 1be3385 commit f5e0e9d
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from .scxml_param import ScxmlParam # noqa: F401
from .scxml_executable_entries import ScxmlAssign, ScxmlIf, ScxmlSend # noqa: F401
from .scxml_executable_entries import ScxmlExecutableEntry, ScxmlExecutionBody # noqa: F401
from .scxml_executable_entries import valid_execution_body # noqa: F401
from .scxml_transition import ScxmlTransition # noqa: F401
from .scxml_ros_entries import (RosTimeRate, RosTopicPublisher, RosTopicSubscriber, # noqa: F401
RosRateCallback, RosTopicCallback, RosTopicPublish, # noqa: F401
RosField, ScxmlRosDeclarations) # noqa: F401
from .utils import execution_body_from_xml, valid_execution_body # noqa: F401
from .scxml_state import ScxmlState # noqa: F401
from .scxml_root import ScxmlRoot # noqa: F401
Original file line number Diff line number Diff line change
Expand Up @@ -140,35 +140,3 @@ def check_validity(self) -> bool:
def as_xml(self) -> ET.Element:
assert self.check_validity(), "SCXML: found invalid assign object."
return ET.Element('assign', {"location": self.name, "expr": self.expr})


# Get the resolved types from the forward references in ScxmlExecutableEntry
_ResolvedScxmlExecutableEntry = \
tuple(entry._evaluate(globals(), locals(), frozenset())
for entry in get_args(ScxmlExecutableEntry))


print(_ResolvedScxmlExecutableEntry)


def valid_execution_body(execution_body: ScxmlExecutionBody) -> bool:
"""
Check if an execution body is valid.
:param execution_body: The execution body to check
:return: True if the execution body is valid, False otherwise
"""
valid = isinstance(execution_body, list)
if not valid:
print("Error: SCXML execution body: invalid type found: expected a list.")
for entry in execution_body:
if not isinstance(entry, _ResolvedScxmlExecutableEntry):
valid = False
print(f"Error: SCXML execution body: entry type {type(entry)} not in valid set "
f" {_ResolvedScxmlExecutableEntry}.")
break
if not entry.check_validity():
valid = False
print("Error: SCXML execution body: invalid entry content found.")
break
return valid
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlRoot":
# Data Model
datamodel_elements = xml_tree.findall(ScxmlDataModel.get_tag_name())
assert datamodel_elements is None or len(datamodel_elements) == 1, \
"Error: SCXML root: multiple datamodels found in input xml."
"Error: SCXML root: multiple datamodels found in input xml, up to 1 are allowed."
# ROS Declaration
ros_time_rates = xml_tree.findall(RosTimeRate.get_tag_name())
ros_topic_subs = xml_tree.findall(RosTopicSubscriber.get_tag_name())
Expand Down
27 changes: 24 additions & 3 deletions scxml_converter/src/scxml_converter/scxml_entries/scxml_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import List, Optional
from xml.etree import ElementTree as ET

from scxml_converter.scxml_entries import (ScxmlExecutableEntry,
ScxmlExecutionBody, ScxmlTransition,
from scxml_converter.scxml_entries import (ScxmlExecutableEntry, ScxmlExecutionBody,
ScxmlTransition, execution_body_from_xml,
valid_execution_body)


Expand All @@ -44,7 +44,28 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlState":
"""Create a ScxmlState object from an XML tree."""
assert xml_tree.tag == ScxmlState.get_tag_name(), \
f"Error: SCXML state: XML tag name is not {ScxmlState.get_tag_name()}."
# TODO
id = xml_tree.attrib.get("id")
assert id is not None and len(id) > 0, "Error: SCXML state: id is not valid."
scxml_state = ScxmlState(id)
# Get the onentry and onexit execution bodies
on_entry = xml_tree.findall("onentry")
assert on_entry is None or len(on_entry) == 1, \
"Error: SCXML state: multiple onentry tags found, up to 1 allowed."
on_exit = xml_tree.findall("onexit")
assert on_exit is None or len(on_exit) == 1, \
"Error: SCXML state: multiple onexit tags found, up to 1 allowed."
if on_entry is not None:
for exec_entry in execution_body_from_xml(on_entry[0]):
scxml_state.append_on_entry(exec_entry)
if on_exit is not None:
for exec_entry in execution_body_from_xml(on_exit[0]):
scxml_state.append_on_exit(exec_entry)
# Get the transitions in the state body
transitions_xml = xml_tree.findall(ScxmlTransition.get_tag_name())
if transitions_xml is not None:
for transition_xml in transitions_xml:
scxml_state.add_transition(ScxmlTransition.from_xml_tree(transition_xml))
return scxml_state

def get_id(self) -> str:
return self._id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def __init__(self,
self._events = events
self._condition = condition

def get_tag_name() -> str:
return "transition"

def from_xml_tree(xml_tree: ET.Element) -> "ScxmlTransition":
raise NotImplementedError("Not implemented yet.")

def add_event(self, event: str):
if self._events is None:
self._events = []
Expand Down Expand Up @@ -85,7 +91,7 @@ def check_validity(self) -> bool:

def as_xml(self) -> ET.Element:
assert self.check_validity(), "SCXML: found invalid transition."
xml_transition = ET.Element('transition', {"target": self._target})
xml_transition = ET.Element(ScxmlTransition.get_tag_name(), {"target": self._target})
if self._events is not None:
xml_transition.set("event", " ".join(self._events))
if self._condition is not None:
Expand Down
37 changes: 37 additions & 0 deletions scxml_converter/src/scxml_converter/scxml_entries/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Get the resolved types from the forward references in ScxmlExecutableEntry
_ResolvedScxmlExecutableEntry = \
tuple(entry._evaluate(globals(), locals(), frozenset())
for entry in get_args(ScxmlExecutableEntry))


def valid_execution_body(execution_body: ScxmlExecutionBody) -> bool:
"""
Check if an execution body is valid.
:param execution_body: The execution body to check
:return: True if the execution body is valid, False otherwise
"""
valid = isinstance(execution_body, list)
if not valid:
print("Error: SCXML execution body: invalid type found: expected a list.")
for entry in execution_body:
if not isinstance(entry, _ResolvedScxmlExecutableEntry):
valid = False
print(f"Error: SCXML execution body: entry type {type(entry)} not in valid set "
f" {_ResolvedScxmlExecutableEntry}.")
break
if not entry.check_validity():
valid = False
print("Error: SCXML execution body: invalid entry content found.")
break
return valid


def execution_body_from_xml(xml_tree: ET.ElementTree) -> ScxmlExecutionBody:
"""
Create an execution body from an XML tree.
:param xml_tree: The XML tree to create the execution body from
:return: The execution body
"""
raise NotImplementedError("Not implemented yet.")

0 comments on commit f5e0e9d

Please sign in to comment.