From 644d9fc8fea97bf3625723084974790cdf63c891 Mon Sep 17 00:00:00 2001 From: John Hurliman Date: Wed, 9 Nov 2022 14:34:32 -0800 Subject: [PATCH] Create one callback_group per subscription (#19) **Public-Facing Changes** - [ROS2] Each subscription uses its own mutually exclusive callback_group **Description** Fixes #16 --- .devcontainer/Dockerfile | 11 ++++++-- .gitignore | 3 +++ .vscode/launch.json | 4 +-- download_test_data.sh | 5 ++++ .../foxglove_bridge/websocket_server.hpp | 10 ++++--- .../src/ros2_foxglove_bridge.cpp | 26 ++++++++++--------- 6 files changed, 39 insertions(+), 20 deletions(-) create mode 100755 download_test_data.sh diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 2ec2f37..d866ffd 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -6,10 +6,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ clang-format \ curl \ git \ + git-lfs \ gnupg \ libasio-dev \ libssl-dev \ libwebsocketpp-dev \ + lldb \ lsb-release \ nlohmann-json3-dev \ python3-colcon-common-extensions \ @@ -18,6 +20,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ python3-rosinstall \ python3-rosinstall-generator \ python3-wstool \ + strace \ && rm -rf /var/lib/apt/lists/* # Authorize the ROS 2 GPG key and add the ROS 2 apt repository @@ -26,7 +29,11 @@ RUN echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/r # Install ROS 2 RUN apt-get update && apt-get install -y --no-install-recommends \ + ros-galactic-foxglove-msgs \ ros-galactic-ros-core \ + ros-galactic-rosbag2 \ + ros-galactic-rosbag2-storage-mcap \ + ros-galactic-tf2-msgs \ && rm -rf /var/lib/apt/lists/* SHELL ["/bin/bash", "-c"] @@ -38,6 +45,6 @@ RUN mkdir -p /ros_ws/src/ros-foxglove-bridge RUN echo $'\ unset ROS_DISTRO\n\ alias ros2_build_debug="colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Debug"\n\ -alias ros2_build_release="colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Release"\n\ -alias ros2_foxglove_bridge="source /ros_ws/install/setup.bash && ros2 run foxglove_bridge foxglove_bridge --ros-args --log-level debug"\n\ +alias ros2_build_release="colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=RelWithDebInfo"\n\ +alias ros2_foxglove_bridge="source /ros_ws/install/setup.bash && ros2 run foxglove_bridge foxglove_bridge --ros-args --log-level debug --log-level rcl:=INFO"\n\ ' >> ~/.bashrc diff --git a/.gitignore b/.gitignore index 77aae37..c899296 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,6 @@ build/ install/ log/ + +# Data cache +data/ diff --git a/.vscode/launch.json b/.vscode/launch.json index e30048e..5a38bec 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,11 +9,11 @@ "type": "cppdbg", "request": "launch", "program": "${workspaceFolder}/build/foxglove_bridge/foxglove_bridge", - "args": [], + "args": ["--ros-args", "--log-level", "debug"], "stopAtEntry": false, "cwd": "${workspaceFolder}", "environment": [], "externalConsole": false, - "MIMode": "gdb", + "MIMode": "lldb", }] } diff --git a/download_test_data.sh b/download_test_data.sh new file mode 100755 index 0000000..94b84c1 --- /dev/null +++ b/download_test_data.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +mkdir -p data +cd data +git clone https://github.com/foxglove/ros-foxglove-bridge-benchmark-assets.git . diff --git a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp index 0a79a48..36a083c 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp @@ -231,9 +231,11 @@ inline void Server::handleConnectionClosed(ConnHandle hdl) const auto oldSubscriptionsByChannel = std::move(client->second.subscriptionsByChannel); _clients.erase(client); - for (const auto& [chanId, subs] : oldSubscriptionsByChannel) { - if (!anySubscribed(chanId) && _unsubscribeHandler) { - _unsubscribeHandler(chanId); + if (_unsubscribeHandler) { + for (const auto& [chanId, subs] : oldSubscriptionsByChannel) { + if (!anySubscribed(chanId)) { + _unsubscribeHandler(chanId); + } } } } @@ -462,7 +464,7 @@ inline void Server::handleMessage(ConnHandle hdl, MessagePt }); continue; } - ChannelId chanId = sub->second; + ChannelId chanId = sub->first; clientInfo.subscriptionsByChannel.erase(sub); if (!anySubscribed(chanId) && _unsubscribeHandler) { _unsubscribeHandler(chanId); diff --git a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp index fb8167f..d5cae09 100644 --- a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp +++ b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp @@ -18,6 +18,7 @@ constexpr int DEFAULT_NUM_THREADS = 0; using namespace std::chrono_literals; using namespace std::placeholders; using LogLevel = foxglove::WebSocketLogLevel; +using Subscription = std::pair; class FoxgloveBridge : public rclcpp::Node { public: @@ -151,15 +152,15 @@ class FoxgloveBridge : public rclcpp::Node { // Stop tracking this channel in the WebSocket server _server.removeChannel(channel.id); - // Remove this topic+datatype tuple - _advertisedTopics.erase(topicAndDatatype); - _channelToTopicAndDatatype.erase(channel.id); - // Remove the subscription for this topic, if any _subscriptions.erase(channel.id); + // Remove this topic+datatype tuple + _channelToTopicAndDatatype.erase(channel.id); + _advertisedTopics.erase(topicAndDatatype); + RCLCPP_DEBUG(this->get_logger(), "Removed channel %d for topic \"%s\" (%s)", channel.id, - channel.topic.c_str(), channel.schemaName.c_str()); + topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); } // Advertise new topics @@ -229,8 +230,7 @@ class FoxgloveBridge : public rclcpp::Node { foxglove::MessageDefinitionCache _messageDefinitionCache; std::unordered_map _advertisedTopics; std::unordered_map _channelToTopicAndDatatype; - std::unordered_map> - _subscriptions; + std::unordered_map _subscriptions; std::mutex _subscriptionsMutex; rclcpp::node_interfaces::OnSetParametersCallbackHandle::SharedPtr _parametersUpdateHandle; rclcpp::TimerBase::SharedPtr _updateTimer; @@ -284,6 +284,8 @@ class FoxgloveBridge : public rclcpp::Node { rclcpp::SubscriptionOptions subscriptionOptions; subscriptionOptions.event_callbacks = eventCallbacks; + subscriptionOptions.callback_group = + this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); constexpr size_t QUEUE_LENGTH = 10; try { @@ -293,9 +295,9 @@ class FoxgloveBridge : public rclcpp::Node { rosMessageHandler(channel, msg); }, subscriptionOptions); - _subscriptions.emplace(channelId, std::move(subscriber)); - RCLCPP_INFO(this->get_logger(), "Subscribed to topic \"%s\" (%s)", topic.c_str(), - datatype.c_str()); + _subscriptions.emplace(channelId, std::make_pair(std::move(subscriber), subscriptionOptions)); + RCLCPP_INFO(this->get_logger(), "Subscribed to topic \"%s\" (%s) on channel %d", + topic.c_str(), datatype.c_str(), channelId); } catch (const std::exception& ex) { RCLCPP_ERROR(this->get_logger(), "Failed to subscribe to topic \"%s\" (%s): %s", topic.c_str(), datatype.c_str(), ex.what()); @@ -318,8 +320,8 @@ class FoxgloveBridge : public rclcpp::Node { return; } - RCLCPP_INFO(this->get_logger(), "Unsubscribing from topic \"%s\" (%s)", - topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); + RCLCPP_INFO(this->get_logger(), "Unsubscribing from topic \"%s\" (%s) on channel %d", + topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), channelId); _subscriptions.erase(it2); }