Skip to content

Commit

Permalink
Create one callback_group per subscription (#19)
Browse files Browse the repository at this point in the history
**Public-Facing Changes**
- [ROS2] Each subscription uses its own mutually exclusive callback_group

**Description**
Fixes #16
  • Loading branch information
jhurliman authored Nov 9, 2022
1 parent 7c3ecb2 commit 644d9fc
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 20 deletions.
11 changes: 9 additions & 2 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
clang-format \
curl \
git \
git-lfs \
gnupg \
libasio-dev \
libssl-dev \
libwebsocketpp-dev \
lldb \
lsb-release \
nlohmann-json3-dev \
python3-colcon-common-extensions \
Expand All @@ -18,6 +20,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
python3-rosinstall \
python3-rosinstall-generator \
python3-wstool \
strace \
&& rm -rf /var/lib/apt/lists/*

# Authorize the ROS 2 GPG key and add the ROS 2 apt repository
Expand All @@ -26,7 +29,11 @@ RUN echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/r

# Install ROS 2
RUN apt-get update && apt-get install -y --no-install-recommends \
ros-galactic-foxglove-msgs \
ros-galactic-ros-core \
ros-galactic-rosbag2 \
ros-galactic-rosbag2-storage-mcap \
ros-galactic-tf2-msgs \
&& rm -rf /var/lib/apt/lists/*

SHELL ["/bin/bash", "-c"]
Expand All @@ -38,6 +45,6 @@ RUN mkdir -p /ros_ws/src/ros-foxglove-bridge
RUN echo $'\
unset ROS_DISTRO\n\
alias ros2_build_debug="colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Debug"\n\
alias ros2_build_release="colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Release"\n\
alias ros2_foxglove_bridge="source /ros_ws/install/setup.bash && ros2 run foxglove_bridge foxglove_bridge --ros-args --log-level debug"\n\
alias ros2_build_release="colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=RelWithDebInfo"\n\
alias ros2_foxglove_bridge="source /ros_ws/install/setup.bash && ros2 run foxglove_bridge foxglove_bridge --ros-args --log-level debug --log-level rcl:=INFO"\n\
' >> ~/.bashrc
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@
build/
install/
log/

# Data cache
data/
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/foxglove_bridge/foxglove_bridge",
"args": [],
"args": ["--ros-args", "--log-level", "debug"],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"MIMode": "lldb",
}]
}
5 changes: 5 additions & 0 deletions download_test_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

mkdir -p data
cd data
git clone https://github.com/foxglove/ros-foxglove-bridge-benchmark-assets.git .
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ inline void Server<ServerConfiguration>::handleConnectionClosed(ConnHandle hdl)

const auto oldSubscriptionsByChannel = std::move(client->second.subscriptionsByChannel);
_clients.erase(client);
for (const auto& [chanId, subs] : oldSubscriptionsByChannel) {
if (!anySubscribed(chanId) && _unsubscribeHandler) {
_unsubscribeHandler(chanId);
if (_unsubscribeHandler) {
for (const auto& [chanId, subs] : oldSubscriptionsByChannel) {
if (!anySubscribed(chanId)) {
_unsubscribeHandler(chanId);
}
}
}
}
Expand Down Expand Up @@ -462,7 +464,7 @@ inline void Server<ServerConfiguration>::handleMessage(ConnHandle hdl, MessagePt
});
continue;
}
ChannelId chanId = sub->second;
ChannelId chanId = sub->first;
clientInfo.subscriptionsByChannel.erase(sub);
if (!anySubscribed(chanId) && _unsubscribeHandler) {
_unsubscribeHandler(chanId);
Expand Down
26 changes: 14 additions & 12 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ constexpr int DEFAULT_NUM_THREADS = 0;
using namespace std::chrono_literals;
using namespace std::placeholders;
using LogLevel = foxglove::WebSocketLogLevel;
using Subscription = std::pair<rclcpp::GenericSubscription::SharedPtr, rclcpp::SubscriptionOptions>;

class FoxgloveBridge : public rclcpp::Node {
public:
Expand Down Expand Up @@ -151,15 +152,15 @@ class FoxgloveBridge : public rclcpp::Node {
// Stop tracking this channel in the WebSocket server
_server.removeChannel(channel.id);

// Remove this topic+datatype tuple
_advertisedTopics.erase(topicAndDatatype);
_channelToTopicAndDatatype.erase(channel.id);

// Remove the subscription for this topic, if any
_subscriptions.erase(channel.id);

// Remove this topic+datatype tuple
_channelToTopicAndDatatype.erase(channel.id);
_advertisedTopics.erase(topicAndDatatype);

RCLCPP_DEBUG(this->get_logger(), "Removed channel %d for topic \"%s\" (%s)", channel.id,
channel.topic.c_str(), channel.schemaName.c_str());
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
}

// Advertise new topics
Expand Down Expand Up @@ -229,8 +230,7 @@ class FoxgloveBridge : public rclcpp::Node {
foxglove::MessageDefinitionCache _messageDefinitionCache;
std::unordered_map<TopicAndDatatype, foxglove::Channel, PairHash> _advertisedTopics;
std::unordered_map<foxglove::ChannelId, TopicAndDatatype> _channelToTopicAndDatatype;
std::unordered_map<foxglove::ChannelId, std::shared_ptr<rclcpp::GenericSubscription>>
_subscriptions;
std::unordered_map<foxglove::ChannelId, Subscription> _subscriptions;
std::mutex _subscriptionsMutex;
rclcpp::node_interfaces::OnSetParametersCallbackHandle::SharedPtr _parametersUpdateHandle;
rclcpp::TimerBase::SharedPtr _updateTimer;
Expand Down Expand Up @@ -284,6 +284,8 @@ class FoxgloveBridge : public rclcpp::Node {

rclcpp::SubscriptionOptions subscriptionOptions;
subscriptionOptions.event_callbacks = eventCallbacks;
subscriptionOptions.callback_group =
this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

constexpr size_t QUEUE_LENGTH = 10;
try {
Expand All @@ -293,9 +295,9 @@ class FoxgloveBridge : public rclcpp::Node {
rosMessageHandler(channel, msg);
},
subscriptionOptions);
_subscriptions.emplace(channelId, std::move(subscriber));
RCLCPP_INFO(this->get_logger(), "Subscribed to topic \"%s\" (%s)", topic.c_str(),
datatype.c_str());
_subscriptions.emplace(channelId, std::make_pair(std::move(subscriber), subscriptionOptions));
RCLCPP_INFO(this->get_logger(), "Subscribed to topic \"%s\" (%s) on channel %d",
topic.c_str(), datatype.c_str(), channelId);
} catch (const std::exception& ex) {
RCLCPP_ERROR(this->get_logger(), "Failed to subscribe to topic \"%s\" (%s): %s",
topic.c_str(), datatype.c_str(), ex.what());
Expand All @@ -318,8 +320,8 @@ class FoxgloveBridge : public rclcpp::Node {
return;
}

RCLCPP_INFO(this->get_logger(), "Unsubscribing from topic \"%s\" (%s)",
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
RCLCPP_INFO(this->get_logger(), "Unsubscribing from topic \"%s\" (%s) on channel %d",
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), channelId);
_subscriptions.erase(it2);
}

Expand Down

0 comments on commit 644d9fc

Please sign in to comment.