Skip to content

Commit

Permalink
Move utils to separate file for easier dependency tree
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Lampacrescia <[email protected]>
  • Loading branch information
MarcoLm993 committed Jul 2, 2024
1 parent 364a001 commit 3d7849c
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .scxml_base import ScxmlBase # noqa: F401
from .utils import as_plain_scxml_msg_expression, HelperRosDeclarations # noqa: F401
from .scxml_data_model import ScxmlDataModel # noqa: F401
from .scxml_param import ScxmlParam # noqa: F401
from .scxml_executable_entries import ScxmlAssign, ScxmlIf, ScxmlSend # noqa: F401
Expand All @@ -9,6 +10,6 @@
from .scxml_transition import ScxmlTransition # noqa: F401
from .scxml_ros_entries import (RosTimeRate, RosTopicPublisher, RosTopicSubscriber, # noqa: F401
RosRateCallback, RosTopicCallback, RosTopicPublish, # noqa: F401
RosField, ScxmlRosDeclarations, HelperRosDeclarations) # noqa: F401
RosField, ScxmlRosDeclarations) # noqa: F401
from .scxml_state import ScxmlState # noqa: F401
from .scxml_root import ScxmlRoot # noqa: F401
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from typing import List, Optional, Union, Tuple, get_args
from xml.etree import ElementTree as ET

from scxml_converter.scxml_entries import ScxmlBase, ScxmlParam
from scxml_converter.scxml_entries import (ScxmlBase, ScxmlParam, HelperRosDeclarations,
as_plain_scxml_msg_expression)

# Use delayed type evaluation: https://peps.python.org/pep-0484/#forward-references
ScxmlExecutableEntry = Union['ScxmlAssign', 'ScxmlIf', 'ScxmlSend']
Expand Down Expand Up @@ -94,9 +95,8 @@ def check_validity(self) -> bool:
print("Error: SCXML if: invalid else execution body found.")
return valid_conditional_executions and valid_else_execution

def check_valid_ros_instantiations(self, ros_declarations) -> bool:
def check_valid_ros_instantiations(self, ros_declarations: HelperRosDeclarations) -> bool:
"""Check if the ros instantiations have been declared."""
from .scxml_ros_entries import HelperRosDeclarations
# Check the executable content
assert isinstance(ros_declarations, HelperRosDeclarations), \
"Error: SCXML if: invalid ROS declarations type provided."
Expand All @@ -110,8 +110,15 @@ def check_valid_ros_instantiations(self, ros_declarations) -> bool:
return False
return True

def as_plain_scxml(self, ros_declarations) -> "ScxmlIf":
raise NotImplementedError
def as_plain_scxml(self, ros_declarations: HelperRosDeclarations) -> "ScxmlIf":
condional_executions = []
for condition, execution in self._conditional_executions:
condional_executions.append(
(as_plain_scxml_msg_expression(condition), as_plain_execution_body(execution)))
else_execution = None
if self._else_execution is not None:
else_execution = as_plain_execution_body(self._else_execution, ros_declarations)
return ScxmlIf(condional_executions, else_execution)

def as_xml(self) -> ET.Element:
# Based on example in https://www.w3.org/TR/scxml/#if
Expand Down Expand Up @@ -190,8 +197,8 @@ class ScxmlAssign(ScxmlBase):
"""This class represents a variable assignment."""

def __init__(self, name: str, expr: str):
self.name = name
self.expr = expr
self._name = name
self._expr = expr

def get_tag_name() -> str:
return "assign"
Expand All @@ -208,8 +215,8 @@ def from_xml_tree(xml_tree: ET.Element) -> "ScxmlAssign":

def check_validity(self) -> bool:
# TODO: Check that the location to assign exists in the data-model
valid_name = isinstance(self.name, str) and len(self.name) > 0
valid_expr = isinstance(self.expr, str) and len(self.expr) > 0
valid_name = isinstance(self._name, str) and len(self._name) > 0
valid_expr = isinstance(self._expr, str) and len(self._expr) > 0
if not valid_name:
print("Error: SCXML assign: name is not valid.")
if not valid_expr:
Expand All @@ -222,11 +229,13 @@ def check_valid_ros_instantiations(self, _) -> bool:
return True

def as_plain_scxml(self, _) -> "ScxmlAssign":
return self
# TODO: Might make sense to check if the assignment happens in a topic callback
expr = as_plain_scxml_msg_expression(self._expr)
return ScxmlAssign(self._name, expr)

def as_xml(self) -> ET.Element:
assert self.check_validity(), "SCXML: found invalid assign object."
return ET.Element(ScxmlAssign.get_tag_name(), {"location": self.name, "expr": self.expr})
return ET.Element(ScxmlAssign.get_tag_name(), {"location": self._name, "expr": self._expr})


# Get the resolved types from the forward references in ScxmlExecutableEntry
Expand Down Expand Up @@ -303,3 +312,15 @@ def append_execution_body_to_xml(xml_parent: ET.Element, exec_body: ScxmlExecuti
"""
for exec_entry in exec_body:
xml_parent.append(exec_entry.as_xml())


def as_plain_execution_body(exec_body: ScxmlExecutionBody,
ros_declarations: HelperRosDeclarations) -> ScxmlExecutionBody:
"""
Convert the execution body to plain SCXML.
:param exec_body: The execution body to convert
:param ros_declarations: The ROS declarations
:return: The converted execution body
"""
return [entry.as_plain_scxml(ros_declarations) for entry in exec_body]
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

"""Declaration of ROS-Specific SCXML tags extensions."""

from typing import Dict, List, Optional, Union
from typing import List, Optional, Union
from scxml_converter.scxml_entries import (ScxmlBase, ScxmlSend, ScxmlParam, ScxmlTransition,
ScxmlExecutionBody, valid_execution_body,
execution_body_from_xml)
ScxmlExecutionBody, HelperRosDeclarations,
valid_execution_body, execution_body_from_xml)
from xml.etree import ElementTree as ET


Expand All @@ -39,47 +39,6 @@ def _check_topic_type_known(topic_definition: str) -> bool:
return True


class HelperRosDeclarations:
"""Object that contains a description of the ROS declarations in the SCXML root."""

def __init__(self):
# Dict of publishers and subscribers: topic name -> type
self._publishers: Dict[str, str] = {}
self._subscribers: Dict[str, str] = {}
self._timers: Dict[str, float] = {}

def append_publisher(self, topic_name: str, topic_type: str) -> None:
assert isinstance(topic_name, str) and isinstance(topic_type, str), \
"Error: ROS declarations: topic name and type must be strings."
assert topic_name not in self._publishers, \
f"Error: ROS declarations: topic publisher {topic_name} already declared."
self._publishers[topic_name] = topic_type

def append_subscriber(self, topic_name: str, topic_type: str) -> None:
assert isinstance(topic_name, str) and isinstance(topic_type, str), \
"Error: ROS declarations: topic name and type must be strings."
assert topic_name not in self._subscribers, \
f"Error: ROS declarations: topic subscriber {topic_name} already declared."
self._subscribers[topic_name] = topic_type

def append_timer(self, timer_name: str, timer_rate: float) -> None:
assert isinstance(timer_name, str), "Error: ROS declarations: timer name must be a string."
assert isinstance(timer_rate, float) and timer_rate > 0, \
"Error: ROS declarations: timer rate must be a positive number."
assert timer_name not in self._timers, \
f"Error: ROS declarations: timer {timer_name} already declared."
self._timers[timer_name] = timer_rate

def is_publisher_defined(self, topic_name: str) -> bool:
return topic_name in self._publishers

def is_subscriber_defined(self, topic_name: str) -> bool:
return topic_name in self._subscribers

def is_timer_defined(self, timer_name: str) -> bool:
return timer_name in self._timers


class RosTimeRate(ScxmlBase):
"""Object used in the SCXML root to declare a new timer with its related tick rate."""

Expand Down
10 changes: 7 additions & 3 deletions scxml_converter/src/scxml_converter/scxml_entries/scxml_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,13 @@ def _convert_ros_instantiations_to_plain_scxml(

def as_plain_scxml(self, ros_declarations: HelperRosDeclarations) -> "ScxmlState":
"""Convert the ROS-specific entries to be plain SCXML"""
ScxmlState._convert_ros_instantiations_to_plain_scxml(self._on_entry, ros_declarations)
ScxmlState._convert_ros_instantiations_to_plain_scxml(self._on_exit, ros_declarations)
ScxmlState._convert_ros_instantiations_to_plain_scxml(self._body, ros_declarations)
plain_entry = ScxmlState._convert_ros_instantiations_to_plain_scxml(
self._on_entry, ros_declarations)
plain_exit = ScxmlState._convert_ros_instantiations_to_plain_scxml(
self._on_exit, ros_declarations)
plain_body = ScxmlState._convert_ros_instantiations_to_plain_scxml(
self._body, ros_declarations)
return ScxmlState(self._id, on_entry=plain_entry, on_exit=plain_exit, body=plain_body)

def as_xml(self) -> ET.Element:
assert self.check_validity(), "SCXML: found invalid state object."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

from typing import List, Optional
from scxml_converter.scxml_entries import (ScxmlBase, ScxmlExecutionBody, ScxmlExecutableEntry,
valid_execution_body, execution_body_from_xml)
HelperRosDeclarations, valid_execution_body,
execution_body_from_xml)

from xml.etree import ElementTree as ET

Expand Down Expand Up @@ -99,17 +100,17 @@ def check_validity(self) -> bool:
print("Error: SCXML transition: executable content is not valid.")
return valid_target and valid_events and valid_condition and valid_body

def check_valid_ros_instantiations(self, ros_declarations) -> bool:
def check_valid_ros_instantiations(self, ros_declarations: HelperRosDeclarations) -> bool:
"""Check if the ros instantiations have been declared."""
# Check the executable content
valid_body = self._check_valid_ros_instantiations_exec_body(ros_declarations)
if not valid_body:
print("Error: SCXML transition: executable content has invalid ROS instantiations.")
return valid_body

def _check_valid_ros_instantiations_exec_body(self, ros_declarations) -> bool:
def _check_valid_ros_instantiations_exec_body(self,
ros_declarations: HelperRosDeclarations) -> bool:
"""Check if the ros instantiations have been declared in the executable body."""
from .scxml_ros_entries import HelperRosDeclarations
assert isinstance(ros_declarations, HelperRosDeclarations), \
"Error: SCXML transition: invalid ROS declarations container."
if self._body is None:
Expand All @@ -119,8 +120,7 @@ def _check_valid_ros_instantiations_exec_body(self, ros_declarations) -> bool:
return False
return True

def as_plain_scxml(self, ros_declarations) -> "ScxmlTransition":
from .scxml_ros_entries import HelperRosDeclarations
def as_plain_scxml(self, ros_declarations: HelperRosDeclarations) -> "ScxmlTransition":
assert isinstance(ros_declarations, HelperRosDeclarations), \
"Error: SCXML transition: invalid ROS declarations container."
new_body = None
Expand Down
65 changes: 65 additions & 0 deletions scxml_converter/src/scxml_converter/scxml_entries/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright (c) 2024 - for information on the respective copyright owner
# see the NOTICE file

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Collection of various utilities for scxml entries."""

from typing import Dict


def as_plain_scxml_msg_expression(msg_expr: str) -> str:
"""Convert a ROS message expression (referring to ROS msg entries) to plain SCXML."""
prefix = "_event." if msg_expr.startswith("_msg.") else ""
return prefix + msg_expr.removeprefix("_msg.")


class HelperRosDeclarations:
"""Object that contains a description of the ROS declarations in the SCXML root."""

def __init__(self):
# Dict of publishers and subscribers: topic name -> type
self._publishers: Dict[str, str] = {}
self._subscribers: Dict[str, str] = {}
self._timers: Dict[str, float] = {}

def append_publisher(self, topic_name: str, topic_type: str) -> None:
assert isinstance(topic_name, str) and isinstance(topic_type, str), \
"Error: ROS declarations: topic name and type must be strings."
assert topic_name not in self._publishers, \
f"Error: ROS declarations: topic publisher {topic_name} already declared."
self._publishers[topic_name] = topic_type

def append_subscriber(self, topic_name: str, topic_type: str) -> None:
assert isinstance(topic_name, str) and isinstance(topic_type, str), \
"Error: ROS declarations: topic name and type must be strings."
assert topic_name not in self._subscribers, \
f"Error: ROS declarations: topic subscriber {topic_name} already declared."
self._subscribers[topic_name] = topic_type

def append_timer(self, timer_name: str, timer_rate: float) -> None:
assert isinstance(timer_name, str), "Error: ROS declarations: timer name must be a string."
assert isinstance(timer_rate, float) and timer_rate > 0, \
"Error: ROS declarations: timer rate must be a positive number."
assert timer_name not in self._timers, \
f"Error: ROS declarations: timer {timer_name} already declared."
self._timers[timer_name] = timer_rate

def is_publisher_defined(self, topic_name: str) -> bool:
return topic_name in self._publishers

def is_subscriber_defined(self, topic_name: str) -> bool:
return topic_name in self._subscribers

def is_timer_defined(self, timer_name: str) -> bool:
return timer_name in self._timers

0 comments on commit 3d7849c

Please sign in to comment.