From 4f8607f12e9cb88401cf5f3746d5ceb6a1f9b5df Mon Sep 17 00:00:00 2001 From: Simon Stelter Date: Tue, 3 Sep 2024 18:03:10 +0200 Subject: [PATCH] added motion_graph_inspector.py --- scripts/tools/motion_graph_inspector.py | 229 ++++++++++++++++++ .../tree/behaviors/plot_motion_graph.py | 176 ++++++-------- 2 files changed, 307 insertions(+), 98 deletions(-) create mode 100755 scripts/tools/motion_graph_inspector.py diff --git a/scripts/tools/motion_graph_inspector.py b/scripts/tools/motion_graph_inspector.py new file mode 100755 index 0000000000..75dbf36188 --- /dev/null +++ b/scripts/tools/motion_graph_inspector.py @@ -0,0 +1,229 @@ +import sys +import rospy +import pydot +from PyQt5.QtGui import QPainter +from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QComboBox, QHBoxLayout, QPushButton, QSizePolicy, QLabel +from PyQt5.QtSvg import QSvgWidget +from PyQt5.QtCore import Qt, QTimer, QRectF +from giskard_msgs.msg import ExecutionState +from giskardpy.tree.behaviors.plot_motion_graph import execution_state_to_dot_graph +from PyQt5.QtCore import QMutex, QMutexLocker + + +class MySvgWidget(QSvgWidget): + + def __init__(self, *args): + QSvgWidget.__init__(self, *args) + self.mutex = QMutex() # Mutex for synchronizing access to the widget + + def paintEvent(self, event): + with QMutexLocker(self.mutex): # Lock the mutex to prevent concurrent access + renderer = self.renderer() + if renderer is not None: + painter = QPainter(self) + size = renderer.defaultSize() + ratio = size.height() / size.width() + frame_ratio = self.height() / self.width() + if frame_ratio > ratio: + new_width, new_height = self.width(), self.width() * ratio + else: + new_width, new_height = self.height() / ratio, self.height() + if new_width < self.width(): + left = (self.width() - new_width) / 2 + else: + left = 0 + renderer.render(painter, QRectF(left, 0, new_width, new_height)) + painter.end() + + +class DotGraphViewer(QWidget): + def __init__(self): + super().__init__() + + # Initialize the ROS node + rospy.init_node('dot_graph_viewer', anonymous=True) + + # Set up the GUI components + self.svg_widget = MySvgWidget(self) + self.svg_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) + self.svg_widget.setMinimumSize(600, 400) + + self.topic_selector = QComboBox(self) + self.topic_selector.activated.connect(self.on_topic_selector_clicked) + + # Navigation buttons + self.first_button = QPushButton('First') + self.prev_goal_button = QPushButton('Prev Goal') + self.prev_button = QPushButton('<') + self.next_button = QPushButton('>') + self.next_goal_button = QPushButton('Next Goal') + self.latest_button = QPushButton('Last') + + # Position label + self.position_label = QLabel(self) + + # Connect navigation buttons to their functions + self.first_button.clicked.connect(self.show_first_image) + self.prev_goal_button.clicked.connect(self.show_prev_goal) + self.prev_button.clicked.connect(self.show_previous_image) + self.next_button.clicked.connect(self.show_next_image) + self.next_goal_button.clicked.connect(self.show_next_goal) + self.latest_button.clicked.connect(self.show_latest_image) + + # Layout for topic selection + top_layout = QHBoxLayout() + top_layout.addWidget(self.topic_selector) + + # Layout for navigation buttons and position label + nav_layout = QHBoxLayout() + nav_layout.addWidget(self.first_button) + nav_layout.addWidget(self.prev_goal_button) + nav_layout.addWidget(self.prev_button) + nav_layout.addWidget(self.position_label) + nav_layout.addWidget(self.next_button) + nav_layout.addWidget(self.next_goal_button) + nav_layout.addWidget(self.latest_button) + + # Main layout + layout = QVBoxLayout() + layout.addLayout(top_layout) + layout.addWidget(self.svg_widget) + layout.addLayout(nav_layout) + self.setLayout(layout) + + self.setWindowTitle('DOT Graph Viewer') + self.resize(800, 600) + + # Initialize graph history and goal tracking + self.graphs_by_goal = {} + self.goals = [] + self.current_goal_index = -1 + self.current_message_index = -1 + + # Timer for periodically refreshing topics + self.topic_refresh_timer = QTimer(self) + self.topic_refresh_timer.timeout.connect(self.refresh_topics) + self.topic_refresh_timer.start(1000) # Refresh every 5 seconds + + # Populate the dropdown with available topics if none is selected + self.refresh_topics() + + def refresh_topics(self) -> None: + if self.topic_selector.currentText() == '': + # Find all topics of type ExecutionState + topics = rospy.get_published_topics() + execution_state_topics = [topic for topic, msg_type in topics if msg_type == 'giskard_msgs/ExecutionState'] + + self.topic_selector.clear() + self.topic_selector.addItems(execution_state_topics) + if len(execution_state_topics) > 0: + self.on_topic_selected(0) + + def on_topic_selector_clicked(self) -> None: + # Stop refreshing topics once a topic is selected + if self.topic_selector.currentIndex() != -1: + self.topic_refresh_timer.stop() + self.on_topic_selected(self.topic_selector.currentIndex()) + + def on_topic_selected(self, index: int) -> None: + topic_name = self.topic_selector.currentText() + if topic_name: + rospy.Subscriber(topic_name, ExecutionState, self.on_new_message_received) + + def on_new_message_received(self, msg: ExecutionState) -> None: + if len(self.goals) > 0: + navigator_at_end = (self.current_goal_index == len(self.goals) - 1 + and self.current_message_index == len(self.graphs_by_goal[self.goals[-1]]) - 1) + else: + navigator_at_end = True + # Extract goal_id and group graphs by goal_id + goal_id = msg.goal_id # Adjust based on the actual structure of goal_id + graph = execution_state_to_dot_graph(msg, use_state_color=True) + + if goal_id not in self.graphs_by_goal: + self.graphs_by_goal[goal_id] = [] + self.goals.append(goal_id) + + self.graphs_by_goal[goal_id].append(graph) + + # Update the display to show the latest graph + if navigator_at_end: + self.current_goal_index = len(self.goals) - 1 + self.current_message_index = len(self.graphs_by_goal[goal_id]) - 1 + self.display_graph(self.current_goal_index, self.current_message_index, update_position_label=False) + self.update_position_label() + + def display_graph(self, goal_index: int, message_index: int, update_position_label: bool = True) -> None: + # Display the graph based on goal and message index + goal_id = self.goals[goal_index] + graph = self.graphs_by_goal[goal_id][message_index] + svg_path = '../../../rqt_giskard/scripts/graph.svg' + graph.write_svg(svg_path) + with QMutexLocker(self.svg_widget.mutex): # Lock the mutex during SVG loading + self.svg_widget.load(svg_path) + + # Update the position label + if update_position_label: + self.update_position_label() + + def update_position_label(self) -> None: + goal_count = len(self.goals) + if goal_count == 0: + self.position_label.setText('goal 0/0, update 0/0') + return + + goal_id = self.goals[self.current_goal_index] + message_count = len(self.graphs_by_goal[goal_id]) + position_text = f'goal {self.current_goal_index + 1}/{goal_count}, update {self.current_message_index + 1}/{message_count}' + self.position_label.setText(position_text) + + def show_first_image(self) -> None: + if self.goals: + self.current_goal_index = 0 + self.current_message_index = 0 + self.display_graph(self.current_goal_index, self.current_message_index) + + def show_previous_image(self) -> None: + if self.goals: + if self.current_message_index > 0: + self.current_message_index -= 1 + else: + if self.current_goal_index > 0: + self.current_goal_index -= 1 + self.current_message_index = len(self.graphs_by_goal[self.goals[self.current_goal_index]]) - 1 + self.display_graph(self.current_goal_index, self.current_message_index) + + def show_next_image(self) -> None: + if self.goals: + if self.current_message_index < len(self.graphs_by_goal[self.goals[self.current_goal_index]]) - 1: + self.current_message_index += 1 + else: + if self.current_goal_index < len(self.goals) - 1: + self.current_goal_index += 1 + self.current_message_index = 0 + self.display_graph(self.current_goal_index, self.current_message_index) + + def show_prev_goal(self) -> None: + if self.goals and self.current_goal_index > 0: + self.current_goal_index -= 1 + self.current_message_index = 0 + self.display_graph(self.current_goal_index, self.current_message_index) + + def show_next_goal(self) -> None: + if self.goals and self.current_goal_index < len(self.goals) - 1: + self.current_goal_index += 1 + self.current_message_index = 0 + self.display_graph(self.current_goal_index, self.current_message_index) + + def show_latest_image(self) -> None: + if self.goals: + self.current_goal_index = len(self.goals) - 1 + self.current_message_index = len(self.graphs_by_goal[self.goals[self.current_goal_index]]) - 1 + self.display_graph(self.current_goal_index, self.current_message_index) + + +if __name__ == '__main__': + app = QApplication(sys.argv) + viewer = DotGraphViewer() + viewer.show() + sys.exit(app.exec_()) diff --git a/src/giskardpy/tree/behaviors/plot_motion_graph.py b/src/giskardpy/tree/behaviors/plot_motion_graph.py index c8e27239ab..57bfbdcead 100644 --- a/src/giskardpy/tree/behaviors/plot_motion_graph.py +++ b/src/giskardpy/tree/behaviors/plot_motion_graph.py @@ -16,6 +16,19 @@ from giskardpy.utils.decorators import record_time, catch_and_raise_to_blackboard from giskardpy.utils.utils import create_path, json_str_to_kwargs +MyBLUE = '#0000B4' +MyGREEN = '#006600' +MyORANGE = '#996900' +MyRED = '#993000' +MyGRAY = '#E0E0E0' +MonitorTrueGreen = '#CDEACD' +MonitorFalseRed = '#EACDCD' +FONT = 'sans-serif' +LineWidth = 6 +ArrowSize = 1.5 +Fontsize = 25 +ConditionFont = 'monospace' + def extract_monitor_names_from_condition(condition: str) -> List[str]: return re.findall(r"'(.*?)'", condition) @@ -25,26 +38,26 @@ def search_for_monitor(monitor_name: str, execution_state: ExecutionState) -> gi return [m for m in execution_state.monitors if m.name == monitor_name][0] -task_state_to_color: Dict[TaskState, str] = { - TaskState.not_started: 'gray', - TaskState.running: 'green', - TaskState.on_hold: 'orange', - TaskState.succeeded: 'palegreen', - TaskState.failed: 'red' +task_state_to_color: Dict[TaskState, Tuple[str, str]] = { + TaskState.not_started: ('black', MyGRAY), + TaskState.running: (MyBLUE, MyGRAY), + TaskState.on_hold: (MyORANGE, MyGRAY), + TaskState.succeeded: (MyGREEN, MyGRAY), + TaskState.failed: ('red', MyGRAY) } -monitor_state_to_color: Dict[Tuple[TaskState, int], str] = { - (TaskState.not_started, 1): 'darkgreen', - (TaskState.running, 1): 'green', - (TaskState.on_hold, 1): 'turquoise', - (TaskState.succeeded, 1): 'palegreen', - (TaskState.failed, 1): 'gray', - - (TaskState.not_started, 0): 'darkred', - (TaskState.running, 0): 'red', - (TaskState.on_hold, 0): 'orange', - (TaskState.succeeded, 0): 'lightpink', - (TaskState.failed, 0): 'black' +monitor_state_to_color: Dict[Tuple[TaskState, int], Tuple[str, str]] = { + (TaskState.not_started, 1): ('black', MonitorTrueGreen), + (TaskState.running, 1): (MyBLUE, MonitorTrueGreen), + (TaskState.on_hold, 1): (MyORANGE, MonitorTrueGreen), + (TaskState.succeeded, 1): (MyGREEN, MonitorTrueGreen), + (TaskState.failed, 1): ('red', MonitorTrueGreen), + + (TaskState.not_started, 0): ('black', MonitorFalseRed), + (TaskState.running, 0): (MyBLUE, MonitorFalseRed), + (TaskState.on_hold, 0): (MyORANGE, MonitorFalseRed), + (TaskState.succeeded, 0): (MyGREEN, MonitorFalseRed), + (TaskState.failed, 0): ('red', MonitorFalseRed), } @@ -56,7 +69,7 @@ def format_condition(condition: str) -> str: return condition -def format_monitor_msg(msg: giskard_msgs.Monitor, color: str, name_bg_color: str) -> str: +def format_monitor_msg(msg: giskard_msgs.Monitor, color: str) -> str: start_condition = format_condition(msg.start_condition) kwargs = json_str_to_kwargs(msg.kwargs) hold_condition = format_condition(kwargs['hold_condition']) @@ -64,18 +77,18 @@ def format_monitor_msg(msg: giskard_msgs.Monitor, color: str, name_bg_color: str if msg.monitor_class in {EndMotion.__name__, CancelMotion.__name__}: hold_condition = None end_condition = None - return conditions_to_str(msg.name, start_condition, hold_condition, end_condition, color, name_bg_color) + return conditions_to_str(msg.name, start_condition, hold_condition, end_condition, color) -def format_task_msg(msg: giskard_msgs.MotionGoal, color: str, name_bg_color: str) -> str: +def format_task_msg(msg: giskard_msgs.MotionGoal, color: str) -> str: start_condition = format_condition(msg.start_condition) hold_condition = format_condition(msg.hold_condition) end_condition = format_condition(msg.end_condition) - return conditions_to_str(msg.name, start_condition, hold_condition, end_condition, color, name_bg_color) + return conditions_to_str(msg.name, start_condition, hold_condition, end_condition, color) def conditions_to_str(name: str, start_condition: str, pause_condition: Optional[str], end_condition: Optional[str], - color: str, name_bg_color: str) -> str: + color: str) -> str: label = (f'<' f'' f'' @@ -91,22 +104,10 @@ def conditions_to_str(name: str, start_condition: str, pause_condition: Optional return label -MyBLUE = '#003399' -MyGREEN = '#006600' -MyORANGE = '#996900' -MyRED = '#993000' -MyGRAY = '#E0E0E0' -FONT = 'sans-serif' -LineWidth = 6 -ArrowSize = 1.5 -Fontsize = 25 -ConditionFont = 'monospace' - - -def format_msg(msg: Union[giskard_msgs.Monitor, giskard_msgs.MotionGoal], color: str, name_bg_color: str) -> str: +def format_msg(msg: Union[giskard_msgs.Monitor, giskard_msgs.MotionGoal], color: str) -> str: if isinstance(msg, giskard_msgs.MotionGoal): - return format_task_msg(msg, color, name_bg_color) - return format_monitor_msg(msg, color, name_bg_color) + return format_task_msg(msg, color) + return format_monitor_msg(msg, color) def add_boarder_to_node(graph: pydot.Graph, node: pydot.Node, num: int, color: str, style: str) -> None: @@ -122,101 +123,80 @@ def add_boarder_to_node(graph: pydot.Graph, node: pydot.Node, num: int, color: s graph.add_subgraph(c) -def execution_state_to_dot_graph(execution_state: ExecutionState) -> pydot.Dot: - graph = pydot.Dot(graph_type='digraph', ranksep=1.5) +def execution_state_to_dot_graph(execution_state: ExecutionState, use_state_color: bool = False) -> pydot.Dot: + graph = pydot.Dot(graph_type='digraph', ranksep=1.) - # graph.add_node(make_legend_node()) - - def add_or_get_node(thing: Union[giskard_msgs.Monitor, giskard_msgs.MotionGoal]): - bgcolor = 'white' - name_bg_color = 'white' + def add_node(thing: Union[giskard_msgs.Monitor, giskard_msgs.MotionGoal], color: str, bg_color: str) \ + -> pydot.Node: num_extra_boarders = 0 node_id = str(thing.name) - color = 'black' boarder_style = 'rounded' if isinstance(thing, giskard_msgs.Monitor): style = 'filled, rounded' - # color = MyBLUE if thing.monitor_class == EndMotion.__name__: - # color = MyGREEN num_extra_boarders = 1 boarder_style = 'rounded' elif thing.monitor_class == CancelMotion.__name__: - # color = MyRED num_extra_boarders = 1 boarder_style = 'dashed, rounded' else: # isinstance(thing, Task) style = 'filled, diagonals' - # color = 'black' - bgcolor = MyGRAY - label = format_msg(thing, color, name_bg_color) - nodes = graph.get_node(label) - if not nodes: - node = pydot.Node(node_id, - label=label, - shape='rectangle', - color=color, - style=style, - margin=0, - fontname=FONT, - fillcolor=bgcolor, - fontsize=Fontsize, - penwidth=LineWidth) - add_boarder_to_node(graph=graph, node=node, num=num_extra_boarders, color=color, style=boarder_style) - graph.add_node(node) - else: - node = nodes[0] # Get the first node from the list + label = format_msg(thing, color) + node = pydot.Node(node_id, + label=label, + shape='rectangle', + color=color, + style=style, + margin=0, + fontname=FONT, + fillcolor=bg_color, + fontsize=Fontsize, + penwidth=LineWidth) + add_boarder_to_node(graph=graph, node=node, num=num_extra_boarders, color=color, style=boarder_style) + graph.add_node(node) return node - # Process monitors and their start_condition + # Process monitors and their conditions for i, monitor in enumerate(execution_state.monitors): + if use_state_color: + color, bg_color = monitor_state_to_color[(execution_state.monitor_life_cycle_state[i], + execution_state.monitor_state[i])] + else: + color, bg_color = 'black', 'white' kwargs = json_str_to_kwargs(monitor.kwargs) hold_condition = format_condition(kwargs['hold_condition']) end_condition = format_condition(kwargs['end_condition']) - monitor_node = add_or_get_node(monitor) - # monitor_node.obj_dict['attributes']['color'] = monitor_state_to_color[ - # (execution_state.monitor_life_cycle_state[i], - # execution_state.monitor_state[i])] + monitor_node = add_node(monitor, color, bg_color) free_symbols = extract_monitor_names_from_condition(monitor.start_condition) for sub_monitor_name in free_symbols: - sub_monitor = search_for_monitor(sub_monitor_name, execution_state) - sub_monitor_node = add_or_get_node(sub_monitor) - graph.add_edge( - pydot.Edge(sub_monitor_node, monitor_node, penwidth=LineWidth, color=MyGREEN, arrowsize=ArrowSize)) + graph.add_edge(pydot.Edge(sub_monitor_name, monitor_node, penwidth=LineWidth, color=MyGREEN, + arrowsize=ArrowSize)) free_symbols = extract_monitor_names_from_condition(hold_condition) for sub_monitor_name in free_symbols: - sub_monitor = search_for_monitor(sub_monitor_name, execution_state) - sub_monitor_node = add_or_get_node(sub_monitor) - graph.add_edge( - pydot.Edge(sub_monitor_node, monitor_node, penwidth=LineWidth, color=MyORANGE, arrowsize=ArrowSize)) + graph.add_edge(pydot.Edge(sub_monitor_name, monitor_node, penwidth=LineWidth, color=MyORANGE, + arrowsize=ArrowSize)) free_symbols = extract_monitor_names_from_condition(end_condition) for sub_monitor_name in free_symbols: - sub_monitor = search_for_monitor(sub_monitor_name, execution_state) - sub_monitor_node = add_or_get_node(sub_monitor) - graph.add_edge( - pydot.Edge(sub_monitor_node, monitor_node, color=MyRED, penwidth=LineWidth, arrowhead='none', - arrowtail='normal', - dir='both', arrowsize=ArrowSize)) + graph.add_edge(pydot.Edge(monitor_node, sub_monitor_name, color=MyRED, penwidth=LineWidth, arrowhead='none', + arrowtail='normal', + dir='both', arrowsize=ArrowSize)) # Process goals and their connections for i, task in enumerate(execution_state.tasks): # TODO add one collision avoidance task? - goal_node = add_or_get_node(task) - # goal_node.obj_dict['attributes']['color'] = task_state_to_color[execution_state.task_state[i]] + if use_state_color: + color, bg_color = task_state_to_color[execution_state.task_state[i]] + else: + color, bg_color = 'black', MyGRAY + goal_node = add_node(task, color, bg_color) for monitor_name in extract_monitor_names_from_condition(task.start_condition): - monitor = search_for_monitor(monitor_name, execution_state) - monitor_node = add_or_get_node(monitor) - graph.add_edge(pydot.Edge(monitor_node, goal_node, penwidth=LineWidth, color=MyGREEN, arrowsize=ArrowSize)) + graph.add_edge(pydot.Edge(monitor_name, goal_node, penwidth=LineWidth, color=MyGREEN, arrowsize=ArrowSize)) for monitor_name in extract_monitor_names_from_condition(task.hold_condition): - monitor = search_for_monitor(monitor_name, execution_state) - monitor_node = add_or_get_node(monitor) - graph.add_edge(pydot.Edge(monitor_node, goal_node, penwidth=LineWidth, color=MyORANGE, arrowsize=ArrowSize)) + graph.add_edge(pydot.Edge(monitor_name, goal_node, penwidth=LineWidth, color=MyORANGE, arrowsize=ArrowSize)) for monitor_name in extract_monitor_names_from_condition(task.end_condition): - monitor = search_for_monitor(monitor_name, execution_state) - monitor_node = add_or_get_node(monitor) - graph.add_edge(pydot.Edge(goal_node, monitor_node, color=MyRED, penwidth=LineWidth, arrowhead='none', + graph.add_edge(pydot.Edge(goal_node, monitor_name, color=MyRED, penwidth=LineWidth, arrowhead='none', arrowtail='normal', dir='both', arrowsize=ArrowSize))
{name}