Skip to content

Commit

Permalink
Start looking into the server thread functionalities
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Lampacrescia <[email protected]>
  • Loading branch information
MarcoLm993 committed Aug 21, 2024
1 parent eb98056 commit 9b23b35
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
38 changes: 17 additions & 21 deletions scxml_converter/src/scxml_converter/scxml_entries/scxml_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
RosTimeRate, RosTopicPublisher, RosTopicSubscriber, ScxmlBase, ScxmlDataModel,
ScxmlRosDeclarations, ScxmlRosDeclarationsContainer, ScxmlState)

from scxml_converter.scxml_entries.xml_utils import get_children_as_scxml
from scxml_converter.scxml_entries.xml_utils import (
assert_xml_tag_ok, get_children_as_scxml, get_xml_argument)

from scxml_converter.scxml_entries.scxml_bt import BtPortDeclarations
from scxml_converter.scxml_entries.bt_utils import BtPortsHandler
Expand All @@ -44,15 +45,15 @@ def get_tag_name() -> str:
def from_xml_tree(xml_tree: ET.Element) -> "ScxmlRoot":
"""Create a ScxmlRoot object from an XML tree."""
# --- Get the ElementTree objects
assert xml_tree.tag == ScxmlRoot.get_tag_name(), \
f"Error: SCXML root: XML root tag {xml_tree.tag} is not {ScxmlRoot.get_tag_name()}."
assert "name" in xml_tree.attrib, \
"Error: SCXML root: 'name' attribute not found in input xml."
assert "version" in xml_tree.attrib and xml_tree.attrib["version"] == "1.0", \
"Error: SCXML root: 'version' attribute not found or invalid in input xml."
assert_xml_tag_ok(ScxmlRoot, xml_tree)
scxml_name = get_xml_argument(ScxmlRoot, xml_tree, "name")
scxml_version = get_xml_argument(ScxmlRoot, xml_tree, "version")
assert scxml_version == "1.0", \
f"Error: SCXML root: expected version 1.0, found {scxml_version}."
scxml_init_state = get_xml_argument(ScxmlRoot, xml_tree, "initial")
# Data Model
datamodel_elements = xml_tree.findall(ScxmlDataModel.get_tag_name())
assert datamodel_elements is None or len(datamodel_elements) <= 1, \
datamodel_elements = get_children_as_scxml(xml_tree, (ScxmlDataModel,))
assert len(datamodel_elements) <= 1, \
f"Error: SCXML root: {len(datamodel_elements)} datamodels found, max 1 allowed."
# ROS Declarations
ros_declarations: List[ScxmlRosDeclarations] = get_children_as_scxml(
Expand All @@ -61,26 +62,21 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlRoot":
bt_port_declarations: List[BtPortDeclarations] = get_children_as_scxml(
xml_tree, get_args(BtPortDeclarations))
# States
assert "initial" in xml_tree.attrib, \
"Error: SCXML root: 'initial' attribute not found in input xml."
initial_state = xml_tree.attrib["initial"]
state_elements = xml_tree.findall(ScxmlState.get_tag_name())
assert state_elements is not None and len(state_elements) > 0, \
"Error: SCXML root: no state found in input xml."
scxml_states: List[ScxmlState] = get_children_as_scxml(xml_tree, (ScxmlState,))
assert len(scxml_states) > 0, "Error: SCXML root: no state found in input xml."
# --- Fill Data in the ScxmlRoot object
scxml_root = ScxmlRoot(xml_tree.attrib["name"])
scxml_root = ScxmlRoot(scxml_name)
# Data Model
if datamodel_elements is not None and len(datamodel_elements) > 0:
scxml_root.set_data_model(ScxmlDataModel.from_xml_tree(datamodel_elements[0]))
if len(datamodel_elements) > 0:
scxml_root.set_data_model(datamodel_elements[0])
# ROS Declarations
scxml_root._ros_declarations = ros_declarations
# BT Declarations
for bt_port_declaration in bt_port_declarations:
scxml_root.add_bt_port_declaration(bt_port_declaration)
# States
for state_element in state_elements:
scxml_state = ScxmlState.from_xml_tree(state_element)
is_initial = scxml_state.get_id() == initial_state
for scxml_state in scxml_states:
is_initial = scxml_state.get_id() == scxml_init_state
scxml_root.add_state(scxml_state, initial=is_initial)
return scxml_root

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
from xml.etree import ElementTree as ET

from scxml_converter.scxml_entries import (
RosField, ScxmlRoot, ScxmlExecutionBody, ScxmlSend, ScxmlTransition, BtGetValueInputPort,
ScxmlRoot, ScxmlDataModel, ScxmlExecutionBody, ScxmlSend, ScxmlTransition, BtGetValueInputPort,
as_plain_execution_body, execution_body_from_xml, valid_execution_body,
ScxmlRosDeclarationsContainer)

from scxml_converter.scxml_entries.bt_utils import BtPortsHandler
from scxml_converter.scxml_entries.ros_utils import (
is_action_type_known)
from scxml_converter.scxml_entries.xml_utils import (
assert_xml_tag_ok, get_xml_argument, read_value_from_xml_arg_or_child)
assert_xml_tag_ok, get_xml_argument, read_value_from_xml_arg_or_child, get_children_as_scxml)
from scxml_converter.scxml_entries.utils import is_non_empty_string


Expand All @@ -42,13 +42,20 @@ class RosActionThread(ScxmlRoot):

@staticmethod
def get_tag_name() -> str:
return "ros_action_client"
return "ros_action_thread"

@staticmethod
def from_xml_tree(xml_tree: ET.Element) -> "RosActionThread":
"""Create a RosActionThread object from an XML tree."""
assert_xml_tag_ok(RosActionThread, xml_tree)
action_alias = get_xml_argument(RosActionThread, xml_tree, "name", none_allowed=True)
action_alias = get_xml_argument(RosActionThread, xml_tree, "name")
n_threads = get_xml_argument(RosActionThread, xml_tree, "n_threads")
initial_state = get_xml_argument(RosActionThread, xml_tree, "initial_state")
datamodel = get_children_as_scxml(xml_tree, (ScxmlDataModel,))
ros_declarations: List[ScxmlRosDeclarations] = get_children_as_scxml(

Check failure on line 55 in scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py

View workflow job for this annotation

GitHub Actions / scxml_converter ⏩ pylint

Undefined variable 'ScxmlRosDeclarations'
xml_tree, get_args(ScxmlRosDeclarations))

Check failure on line 56 in scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py

View workflow job for this annotation

GitHub Actions / scxml_converter ⏩ pylint

Undefined variable 'get_args'

Check failure on line 56 in scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py

View workflow job for this annotation

GitHub Actions / scxml_converter ⏩ pylint

Undefined variable 'ScxmlRosDeclarations'
# TODO: Append the action server to the ROS declarations in the thread, somehow

action_name = read_value_from_xml_arg_or_child(RosActionThread, xml_tree, "action_name",
(BtGetValueInputPort, str))
action_type = get_xml_argument(RosActionClient, xml_tree, "type")

Check failure on line 61 in scxml_converter/src/scxml_converter/scxml_entries/scxml_ros_action_server_thread.py

View workflow job for this annotation

GitHub Actions / scxml_converter ⏩ pylint

Undefined variable 'RosActionClient'
Expand Down

0 comments on commit 9b23b35

Please sign in to comment.