diff --git a/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py b/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py index 6373f0e9..a778bdfd 100644 --- a/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py +++ b/scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py @@ -27,7 +27,8 @@ RosTimeRate, RosTopicPublisher, RosTopicSubscriber, ScxmlBase, ScxmlDataModel, ScxmlRosDeclarations, ScxmlRosDeclarationsContainer, ScxmlState) -from scxml_converter.scxml_entries.xml_utils import get_children_as_scxml +from scxml_converter.scxml_entries.xml_utils import ( + assert_xml_tag_ok, get_children_as_scxml, get_xml_argument) from scxml_converter.scxml_entries.scxml_bt import BtPortDeclarations from scxml_converter.scxml_entries.bt_utils import BtPortsHandler @@ -44,15 +45,15 @@ def get_tag_name() -> str: def from_xml_tree(xml_tree: ET.Element) -> "ScxmlRoot": """Create a ScxmlRoot object from an XML tree.""" # --- Get the ElementTree objects - assert xml_tree.tag == ScxmlRoot.get_tag_name(), \ - f"Error: SCXML root: XML root tag {xml_tree.tag} is not {ScxmlRoot.get_tag_name()}." - assert "name" in xml_tree.attrib, \ - "Error: SCXML root: 'name' attribute not found in input xml." - assert "version" in xml_tree.attrib and xml_tree.attrib["version"] == "1.0", \ - "Error: SCXML root: 'version' attribute not found or invalid in input xml." + assert_xml_tag_ok(ScxmlRoot, xml_tree) + scxml_name = get_xml_argument(ScxmlRoot, xml_tree, "name") + scxml_version = get_xml_argument(ScxmlRoot, xml_tree, "version") + assert scxml_version == "1.0", \ + f"Error: SCXML root: expected version 1.0, found {scxml_version}." + scxml_init_state = get_xml_argument(ScxmlRoot, xml_tree, "initial") # Data Model - datamodel_elements = xml_tree.findall(ScxmlDataModel.get_tag_name()) - assert datamodel_elements is None or len(datamodel_elements) <= 1, \ + datamodel_elements = get_children_as_scxml(xml_tree, (ScxmlDataModel,)) + assert len(datamodel_elements) <= 1, \ f"Error: SCXML root: {len(datamodel_elements)} datamodels found, max 1 allowed." # ROS Declarations ros_declarations: List[ScxmlRosDeclarations] = get_children_as_scxml( @@ -61,26 +62,21 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlRoot": bt_port_declarations: List[BtPortDeclarations] = get_children_as_scxml( xml_tree, get_args(BtPortDeclarations)) # States - assert "initial" in xml_tree.attrib, \ - "Error: SCXML root: 'initial' attribute not found in input xml." - initial_state = xml_tree.attrib["initial"] - state_elements = xml_tree.findall(ScxmlState.get_tag_name()) - assert state_elements is not None and len(state_elements) > 0, \ - "Error: SCXML root: no state found in input xml." + scxml_states: List[ScxmlState] = get_children_as_scxml(xml_tree, (ScxmlState,)) + assert len(scxml_states) > 0, "Error: SCXML root: no state found in input xml." # --- Fill Data in the ScxmlRoot object - scxml_root = ScxmlRoot(xml_tree.attrib["name"]) + scxml_root = ScxmlRoot(scxml_name) # Data Model - if datamodel_elements is not None and len(datamodel_elements) > 0: - scxml_root.set_data_model(ScxmlDataModel.from_xml_tree(datamodel_elements[0])) + if len(datamodel_elements) > 0: + scxml_root.set_data_model(datamodel_elements[0]) # ROS Declarations scxml_root._ros_declarations = ros_declarations # BT Declarations for bt_port_declaration in bt_port_declarations: scxml_root.add_bt_port_declaration(bt_port_declaration) # States - for state_element in state_elements: - scxml_state = ScxmlState.from_xml_tree(state_element) - is_initial = scxml_state.get_id() == initial_state + for scxml_state in scxml_states: + is_initial = scxml_state.get_id() == scxml_init_state scxml_root.add_state(scxml_state, initial=is_initial) return scxml_root diff --git a/scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py b/scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py index 0e398ec6..190879c5 100644 --- a/scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py +++ b/scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py @@ -23,7 +23,7 @@ from xml.etree import ElementTree as ET from scxml_converter.scxml_entries import ( - RosField, ScxmlRoot, ScxmlExecutionBody, ScxmlSend, ScxmlTransition, BtGetValueInputPort, + ScxmlRoot, ScxmlDataModel, ScxmlExecutionBody, ScxmlSend, ScxmlTransition, BtGetValueInputPort, as_plain_execution_body, execution_body_from_xml, valid_execution_body, ScxmlRosDeclarationsContainer) @@ -31,7 +31,7 @@ from scxml_converter.scxml_entries.ros_utils import ( is_action_type_known) from scxml_converter.scxml_entries.xml_utils import ( - assert_xml_tag_ok, get_xml_argument, read_value_from_xml_arg_or_child) + assert_xml_tag_ok, get_xml_argument, read_value_from_xml_arg_or_child, get_children_as_scxml) from scxml_converter.scxml_entries.utils import is_non_empty_string @@ -42,13 +42,20 @@ class RosActionThread(ScxmlRoot): @staticmethod def get_tag_name() -> str: - return "ros_action_client" + return "ros_action_thread" @staticmethod def from_xml_tree(xml_tree: ET.Element) -> "RosActionThread": """Create a RosActionThread object from an XML tree.""" assert_xml_tag_ok(RosActionThread, xml_tree) - action_alias = get_xml_argument(RosActionThread, xml_tree, "name", none_allowed=True) + action_alias = get_xml_argument(RosActionThread, xml_tree, "name") + n_threads = get_xml_argument(RosActionThread, xml_tree, "n_threads") + initial_state = get_xml_argument(RosActionThread, xml_tree, "initial_state") + datamodel = get_children_as_scxml(xml_tree, (ScxmlDataModel,)) + ros_declarations: List[ScxmlRosDeclarations] = get_children_as_scxml( + xml_tree, get_args(ScxmlRosDeclarations)) + # TODO: Append the action server to the ROS declarations in the thread, somehow + action_name = read_value_from_xml_arg_or_child(RosActionThread, xml_tree, "action_name", (BtGetValueInputPort, str)) action_type = get_xml_argument(RosActionClient, xml_tree, "type")