Skip to content

Commit

Permalink
Add condition for topic and rate callbacks
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Lampacrescia <[email protected]>
  • Loading branch information
MarcoLm993 committed Jul 3, 2024
1 parent d547a1c commit da05638
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def as_xml(self) -> ET.Element:
class RosRateCallback(ScxmlTransition):
"""Callback that triggers each time the associated timer ticks."""

def __init__(self, timer: Union[RosTimeRate, str], target: str,
def __init__(self, timer: Union[RosTimeRate, str], target: str, condition: Optional[str] = None,
body: Optional[ScxmlExecutionBody] = None):
"""
Generate a new rate timer and callback.
Expand All @@ -200,6 +200,7 @@ def __init__(self, timer: Union[RosTimeRate, str], target: str,
assert isinstance(timer, str), "Error: SCXML rate callback: invalid timer type."
self._timer_name = timer
self._target = target
self._condition = condition
self._body = body
assert self.check_validity(), "Error: SCXML rate callback: invalid parameters."

Expand All @@ -214,21 +215,27 @@ def from_xml_tree(xml_tree: ET.Element) -> "RosRateCallback":
target = xml_tree.attrib.get("target")
assert timer_name is not None and target is not None, \
"Error: SCXML rate callback: 'name' or 'target' attribute not found in input xml."
condition = xml_tree.get("cond")
condition = condition if condition is not None and len(condition) > 0 else None
exec_body = execution_body_from_xml(xml_tree)
exec_body = exec_body if exec_body is not None else None
return RosRateCallback(timer_name, target, exec_body)
return RosRateCallback(timer_name, target, condition, exec_body)

def check_validity(self) -> bool:
valid_timer = isinstance(self._timer_name, str) and len(self._timer_name) > 0
valid_target = isinstance(self._target, str) and len(self._target) > 0
valid_cond = self._condition is None or (
isinstance(self._condition, str) and len(self._condition) > 0)
valid_body = self._body is None or valid_execution_body(self._body)
if not valid_timer:
print("Error: SCXML rate callback: timer name is not valid.")
if not valid_target:
print("Error: SCXML rate callback: target is not valid.")
if not valid_cond:
print("Error: SCXML rate callback: condition is not valid.")
if not valid_body:
print("Error: SCXML rate callback: body is not valid.")
return valid_timer and valid_target and valid_body
return valid_timer and valid_target and valid_cond and valid_body

def check_valid_ros_instantiations(self, ros_declarations: HelperRosDeclarations) -> bool:
"""Check if the ros instantiations have been declared."""
Expand All @@ -246,13 +253,16 @@ def check_valid_ros_instantiations(self, ros_declarations: HelperRosDeclarations
def as_plain_scxml(self, ros_declarations: HelperRosDeclarations) -> ScxmlTransition:
event_name = "ros_time_rate." + self._timer_name
target = self._target
cond = self._condition
body = as_plain_execution_body(self._body, ros_declarations)
return ScxmlTransition(target, [event_name], None, body)
return ScxmlTransition(target, [event_name], cond, body)

def as_xml(self) -> ET.Element:
assert self.check_validity(), "Error: SCXML rate callback: invalid parameters."
xml_rate_callback = ET.Element(
"ros_rate_callback", {"name": self._timer_name, "target": self._target})
if self._condition is not None:
xml_rate_callback.set("cond", self._condition)
if self._body is not None:
for entry in self._body:
xml_rate_callback.append(entry.as_xml())
Expand All @@ -264,7 +274,7 @@ class RosTopicCallback(ScxmlTransition):

def __init__(
self, topic: Union[RosTopicSubscriber, str], target: str,
body: Optional[ScxmlExecutionBody] = None):
condition: Optional[str] = None, body: Optional[ScxmlExecutionBody] = None):
"""
Create a new ros_topic_callback object instance.
Expand All @@ -279,6 +289,7 @@ def __init__(
assert isinstance(topic, str), "Error: SCXML topic callback: invalid topic type."
self._topic = topic
self._target = target
self._condition = condition
self._body = body
assert self.check_validity(), "Error: SCXML topic callback: invalid parameters."

Expand All @@ -293,21 +304,27 @@ def from_xml_tree(xml_tree: ET.Element) -> "RosTopicCallback":
target = xml_tree.attrib.get("target")
assert topic_name is not None and target is not None, \
"Error: SCXML topic callback: 'topic' or 'target' attribute not found in input xml."
condition = xml_tree.get("cond")
condition = condition if condition is not None and len(condition) > 0 else None
exec_body = execution_body_from_xml(xml_tree)
exec_body = exec_body if exec_body is not None else None
return RosTopicCallback(topic_name, target, exec_body)
return RosTopicCallback(topic_name, target, condition, exec_body)

def check_validity(self) -> bool:
valid_topic = isinstance(self._topic, str) and len(self._topic) > 0
valid_target = isinstance(self._target, str) and len(self._target) > 0
valid_cond = self._condition is None or (
isinstance(self._condition, str) and len(self._condition) > 0)
valid_body = self._body is None or valid_execution_body(self._body)
if not valid_topic:
print("Error: SCXML topic callback: topic name is not valid.")
if not valid_target:
print("Error: SCXML topic callback: target is not valid.")
if not valid_cond:
print("Error: SCXML topic callback: condition is not valid.")
if not valid_body:
print("Error: SCXML topic callback: body is not valid.")
return valid_topic and valid_target and valid_body
return valid_topic and valid_target and valid_cond and valid_body

def check_valid_ros_instantiations(self, ros_declarations: HelperRosDeclarations) -> bool:
"""Check if the ros instantiations have been declared."""
Expand All @@ -325,13 +342,16 @@ def check_valid_ros_instantiations(self, ros_declarations: HelperRosDeclarations
def as_plain_scxml(self, ros_declarations: HelperRosDeclarations) -> ScxmlTransition:
event_name = "ros_topic." + self._topic
target = self._target
cond = self._condition
body = as_plain_execution_body(self._body, ros_declarations)
return ScxmlTransition(target, [event_name], None, body)
return ScxmlTransition(target, [event_name], cond, body)

def as_xml(self) -> ET.Element:
assert self.check_validity(), "Error: SCXML topic callback: invalid parameters."
xml_topic_callback = ET.Element(
"ros_topic_callback", {"topic": self._topic, "target": self._target})
if self._condition is not None:
xml_topic_callback.set("cond", self._condition)
if self._body is not None:
for entry in self._body:
xml_topic_callback.append(entry.as_xml())
Expand Down
4 changes: 2 additions & 2 deletions scxml_converter/test/test_systemtest_scxml_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ def test_battery_drainer_ros_from_code():
use_battery_state.append_on_entry(
RosTopicPublish(ros_topic_pub, [RosField("data", "battery_percent")]))
use_battery_state.add_transition(
RosRateCallback(ros_timer, "use_battery",
RosRateCallback(ros_timer, "use_battery", None,
[ScxmlAssign("battery_percent", "battery_percent - 1")]))
use_battery_state.add_transition(
RosTopicCallback(ros_topic_sub, "use_battery",
RosTopicCallback(ros_topic_sub, "use_battery", None,
[ScxmlAssign("battery_percent", "100")]))
battery_drainer_scxml.add_state(use_battery_state, initial=True)

Expand Down

0 comments on commit da05638

Please sign in to comment.