Skip to content

Commit

Permalink
Refactor bt_converter. Tests to be fixed
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Lampacrescia <[email protected]>
  • Loading branch information
MarcoLm993 committed Aug 15, 2024
1 parent 8e32886 commit 7fa1f05
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import json
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import List, Optional, Tuple
from xml.etree import ElementTree as ET

from as2fm_common.common import remove_namespace
from jani_generator.jani_entries import JaniModel
from jani_generator.ros_helpers.ros_services import RosService, RosServices
from jani_generator.ros_helpers.ros_timer import RosTimer
from jani_generator.scxml_helpers.scxml_to_jani import \
Expand Down Expand Up @@ -129,22 +128,21 @@ def generate_plain_scxml_models_and_timers(
"""
Generate plain SCXML models and ROS timers from the full model dictionary.
"""
# Convert behavior tree and plugins to ROS-scxml
# Load the skills and components scxml files (ROS-SCXML)
scxml_files_to_convert: list = model.skills + model.components
ros_scxmls: List[ScxmlRoot] = []
for fname in scxml_files_to_convert:
ros_scxmls.append(ScxmlRoot.from_scxml_file(fname))
# Convert behavior tree and plugins to ROS-SCXML
if model.bt is not None:
bt_out_dir = os.path.join(os.path.dirname(model.bt), "generated_bt_scxml")
os.makedirs(bt_out_dir, exist_ok=True)
expanded_bt_plugin_scxmls = bt_converter(
model.bt, model.plugins, bt_out_dir)
scxml_files_to_convert.extend(expanded_bt_plugin_scxmls)

# Convert ROS-SCXML FSMs to plain SCXML
ros_scxmls.extend(bt_converter(model.bt, model.plugins))
# Convert the loaded entries to plain SCXML
plain_scxml_models = []
all_timers: List[RosTimer] = []
all_services: RosServices = {}
for fname in scxml_files_to_convert:
for scxml_entry in ros_scxmls:
plain_scxml, ros_declarations = \
ScxmlRoot.from_scxml_file(fname).to_plain_scxml_and_declarations()
scxml_entry.to_plain_scxml_and_declarations()
# Handle ROS timers
for timer_name, timer_rate in ros_declarations._timers.items():
assert timer_name not in all_timers, \
Expand Down
92 changes: 35 additions & 57 deletions scxml_converter/src/scxml_converter/bt_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
"""

from copy import deepcopy
import os
import xml.etree.ElementTree as ET
from enum import Enum, auto
from typing import List, Tuple, Dict
from typing import List
import re

from btlib.bt_to_fsm.bt_to_fsm import Bt2FSM
from btlib.bts import xml_to_networkx
Expand All @@ -33,14 +34,6 @@
ScxmlTransition)


# A pair containing a node ID and the value for each of its ports
NodeAndPorts = Tuple[str, Dict[str, str]]


# A list of all the nodes in a BT and their ports values
BtNodes = List[NodeAndPorts]


class BT_EVENT_TYPE(Enum):
"""Event types for Behavior Tree."""
TICK = auto()
Expand All @@ -64,72 +57,61 @@ def bt_event_name(node_id: str, event_type: BT_EVENT_TYPE) -> str:
def bt_converter(
bt_xml_path: str,
bt_plugins_scxml_paths: List[str],
output_folder: str
) -> Tuple[ScxmlRoot, BtNodes]:
) -> List[ScxmlRoot]:
"""
Convert a Behavior Tree (BT) in XML format to SCXML.
Args:
bt_xml_path: The path to the Behavior Tree in XML format.
bt_plugins_scxml_paths: The paths to the SCXML files of BT plugins.
output_folder: The folder where the SCXML files will be saved.
Returns:
A list of the generated SCXML files.
A list of the generated SCXML objects.
"""
bt_graph, xpi = xml_to_networkx(bt_xml_path)
generated_files = []
bt_graph, _ = xml_to_networkx(bt_xml_path)

bt_plugins_scxml_paths_by_plugin = {}
bt_plugins_scxmls = {}
for path in bt_plugins_scxml_paths:
assert os.path.exists(path), f'SCXML must exist. {path} not found.'
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
xml = ET.fromstring(content)
name = xml.attrib['name']
assert name not in bt_plugins_scxml_paths_by_plugin, \
f'Plugin name must be unique. {name} already exists.'
bt_plugins_scxml_paths_by_plugin[name] = path
bt_plugin_scxml = ScxmlRoot.from_scxml_file(path)
bt_plugin_name = bt_plugin_scxml.get_name()
assert bt_plugin_name not in bt_plugins_scxmls, \
f'Plugin name must be unique. {bt_plugin_name} already exists.'
bt_plugins_scxmls[bt_plugin_name] = bt_plugin_scxml

leaf_node_ids = []
generated_scxmls: List[ScxmlRoot] = []
# Generate the instances of the plugins used in the BT
for node in bt_graph.nodes:
assert 'category' in bt_graph.nodes[node], 'Node must have a category.'
if bt_graph.nodes[node]['category'] == NODE_CAT.LEAF:
leaf_node_ids.append(node)
assert 'ID' in bt_graph.nodes[node], 'Leaf node must have a type.'
node_type = bt_graph.nodes[node]['ID']
node_id = node
assert node_type in bt_plugins_scxml_paths_by_plugin, \
assert node_type in bt_plugins_scxmls, \
f'Leaf node must have a plugin. {node_type} not found.'
instance_name = f'{node_id}_{node_type}'
output_fname = os.path.join(
output_folder, f'{instance_name}.scxml')
generated_files.append(output_fname)
this_plugin_scxml = ScxmlRoot.from_scxml_file(
bt_plugins_scxml_paths_by_plugin[node_type])
this_plugin_scxml.instantiate_bt_events(node_id)
scxml_plugin_instance: ScxmlRoot = deepcopy(bt_plugins_scxmls[node_type])
scxml_plugin_instance.set_name(instance_name)
scxml_plugin_instance.instantiate_bt_events(node_id)
# TODO: Replace arguments from the BT xml file.
# TODO: Change name to instance name
with open(output_fname, 'w', encoding='utf-8') as f:
f.write(this_plugin_scxml.as_xml_string())
generated_scxmls.append(scxml_plugin_instance)
# Generate the BT SCXML
fsm_graph = Bt2FSM(bt_graph).convert()
output_file_bt = os.path.join(output_folder, 'bt.scxml')
generated_files.append(output_file_bt)

root_tag = ScxmlRoot("bt")
bt_scxml_root = ScxmlRoot("bt")
name_with_id_pattern = re.compile(r"[0-9]+_.+")
for node in fsm_graph.nodes:
state = ScxmlState(node)
if '_' in node:
node_id = None
if name_with_id_pattern.match(node):
node_id = int(node.split('_')[0])
else:
node_id = None
if node_id and node_id in leaf_node_ids:
state.append_on_entry(ScxmlSend(
bt_event_name(node_id, BT_EVENT_TYPE.TICK)))
if node_id in leaf_node_ids:
state.append_on_entry(ScxmlSend(bt_event_name(node_id, BT_EVENT_TYPE.TICK)))
for edge in fsm_graph.edges(node):
target = edge[1]
transition = ScxmlTransition(target)
if node_id and node_id in leaf_node_ids:
if node_id is not None and node_id in leaf_node_ids:
if 'label' not in fsm_graph.edges[edge]:
continue
label = fsm_graph.edges[edge]['label']
Expand All @@ -145,21 +127,17 @@ def bt_converter(
transition.add_event(event_name)
state.add_transition(transition)
if node in ['success', 'failure', 'running']:
state.add_transition(
ScxmlTransition("wait_for_tick"))
root_tag.add_state(state)

state.add_transition(ScxmlTransition("wait_for_tick"))
bt_scxml_root.add_state(state)
# TODO: Make BT rate configurable, e.g. from main.xml
rtr = RosTimeRate("bt_tick", 1.0)
root_tag.add_ros_declaration(rtr)
bt_scxml_root.add_ros_declaration(rtr)

wait_for_tick = ScxmlState("wait_for_tick")
wait_for_tick.add_transition(
RosRateCallback(rtr, "tick"))
root_tag.add_state(wait_for_tick, initial=True)

assert root_tag.check_validity(), "Error: SCXML root tag is not valid."

with open(output_file_bt, 'w', encoding='utf-8') as f:
f.write(root_tag.as_xml_string())
bt_scxml_root.add_state(wait_for_tick, initial=True)
assert bt_scxml_root.check_validity(), "Error: SCXML root tag is not valid."
generated_scxmls.append(bt_scxml_root)

return generated_files
return generated_scxmls
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def get_name(self) -> str:
"""Get the name of the automaton represented by this SCXML model."""
return self._name

def set_name(self, name: str) -> None:
"""Rename the automaton represented by this SCXML model."""
assert isinstance(name, str) and len(name) > 0, "Error: SCXML root: invalid name."
self._name = name

def get_initial_state_id(self) -> str:
"""Get the ID of the initial state of the SCXML model."""
assert self._initial_state is not None, "Error: SCXML root: Initial state not set."
Expand Down

0 comments on commit 7fa1f05

Please sign in to comment.