Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of official ROSBag2 fix for silently stopping subscription #3

Merged
merged 10 commits into from
Jan 17, 2025
4 changes: 4 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/typesupport_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ ROSBAG2_CPP_PUBLIC
std::shared_ptr<rcpputils::SharedLibrary>
get_typesupport_library(const std::string & type, const std::string & typesupport_identifier);

ROSBAG2_CPP_PUBLIC
std::string get_typesupport_library_path(
const std::string & package_name, const std::string & typesupport_identifier);

ROSBAG2_CPP_PUBLIC
const rosidl_message_type_support_t *
get_typesupport_handle(
Expand Down
13 changes: 11 additions & 2 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ add_library(${PROJECT_NAME} SHARED
src/rosbag2_transport/generic_subscription.cpp
src/rosbag2_transport/qos.cpp
src/rosbag2_transport/recorder.cpp
src/rosbag2_transport/topic_filter.cpp
src/rosbag2_transport/rosbag2_node.cpp
src/rosbag2_transport/rosbag2_transport.cpp)
target_include_directories(${PROJECT_NAME} PUBLIC
Expand Down Expand Up @@ -132,7 +133,8 @@ function(create_tests_for_rmw_implementation)
test_msgs
yaml_cpp_vendor
rosbag2_compression
shared_queues_vendor)
shared_queues_vendor
LINK_LIBS rosbag2_transport)

rosbag2_transport_add_gmock(test_qos
src/rosbag2_transport/qos.cpp
Expand All @@ -144,7 +146,8 @@ function(create_tests_for_rmw_implementation)
AMENT_DEPS
rclcpp
rosbag2_test_common
yaml_cpp_vendor)
yaml_cpp_vendor
LINK_LIBS rosbag2_transport)

# disable the following tests for connext
# due to slower discovery of nodes
Expand All @@ -168,6 +171,12 @@ function(create_tests_for_rmw_implementation)
AMENT_DEPS test_msgs rosbag2_test_common rosbag2_compression shared_queues_vendor rosbag2_cpp rosbag2_storage
${SKIP_TEST})

rosbag2_transport_add_gmock(test_topic_filter
test/rosbag2_transport/test_topic_filter.cpp
INCLUDE_DIRS
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
LINK_LIBS rosbag2_transport)

rosbag2_transport_add_gmock(test_play
src/rosbag2_transport/qos.cpp
test/rosbag2_transport/test_play.cpp
Expand Down
49 changes: 27 additions & 22 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "generic_subscription.hpp"
#include "qos.hpp"
#include "topic_filter.hpp"
#include "rosbag2_node.hpp"

#ifdef _WIN32
Expand Down Expand Up @@ -99,32 +100,36 @@ void Recorder::topics_discovery(const RecordOptions & record_options)
std::unordered_map<std::string, std::string>
Recorder::get_requested_or_available_topics(const RecordOptions & record_options)
{
auto unfiltered_topics = record_options.topics.empty() ?
node_->get_all_topics_with_types(record_options.include_hidden_topics) :
node_->get_topics_with_types(record_options.topics);
auto all_topics_and_types = node_->get_topic_names_and_types();
auto filtered_topics_and_types = topic_filter::filter_topics_with_more_than_one_type(
all_topics_and_types, record_options.include_hidden_topics);

if (record_options.regex.empty() && record_options.exclude.empty()) {
return unfiltered_topics;
}

std::unordered_map<std::string, std::string> filtered_by_regex;
filtered_topics_and_types = topic_filter::filter_topics_with_known_type(
filtered_topics_and_types, topic_unknown_types_);

std::regex topic_regex(record_options.regex);
std::regex exclude_regex(record_options.exclude);
bool take = record_options.all;
for (const auto & kv : unfiltered_topics) {
// regex_match returns false for 'empty' regex
if (!record_options.regex.empty()) {
take = std::regex_match(kv.first, topic_regex);
}
if (take) {
take = !std::regex_match(kv.first, exclude_regex);
}
if (take) {
filtered_by_regex.insert(kv);
if (!record_options.topics.empty()) {
// expand specified topics
std::vector<std::string> expanded_topics;
expanded_topics.reserve(record_options.topics.size());
for (const auto & topic : record_options.topics) {
expanded_topics.push_back(
rclcpp::expand_topic_or_service_name(
topic, node_->get_name(), node_->get_namespace(), false));
}
filtered_topics_and_types = topic_filter::filter_topics(
expanded_topics, filtered_topics_and_types);
}
return filtered_by_regex;

if (record_options.regex.empty() && record_options.exclude.empty()) {
return filtered_topics_and_types;
}

return topic_filter::filter_topics_using_regex(
filtered_topics_and_types,
record_options.regex,
record_options.exclude,
record_options.all
);
}

std::unordered_map<std::string, std::string>
Expand Down
1 change: 1 addition & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class Recorder
std::unordered_set<std::string> topics_warned_about_incompatibility_;
std::string serialization_format_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
};

} // namespace rosbag2_transport
Expand Down
153 changes: 153 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/topic_filter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2021, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <map>
#include <regex>
#include <string>
#include <vector>
#include <unordered_map>
#include <unordered_set>

#include "rclcpp/logging.hpp"

#include "rcpputils/split.hpp"

#include "./topic_filter.hpp"

namespace rosbag2_transport
{
namespace topic_filter
{

std::unordered_map<std::string, std::string> filter_topics(
const std::vector<std::string> & selected_topic_names,
const std::unordered_map<std::string, std::string> & all_topic_names_and_types)
{
std::unordered_map<std::string, std::string> filtered_topics_and_types;

auto topic_name_matches = [&selected_topic_names](const auto & topic_and_type) -> bool
{
return std::find(
selected_topic_names.begin(),
selected_topic_names.end(), topic_and_type.first) != selected_topic_names.end();
};

for (const auto & topic_and_type : all_topic_names_and_types) {
if (topic_name_matches(topic_and_type)) {
filtered_topics_and_types.insert(topic_and_type);
}
}

return filtered_topics_and_types;
}

std::unordered_map<std::string, std::string> filter_topics_with_more_than_one_type(
const std::map<std::string, std::vector<std::string>> & topics_and_types,
bool include_hidden_topics)
{
std::unordered_map<std::string, std::string> filtered_topics_and_types;

auto logger = rclcpp::get_logger("rosbag2_transport");

for (const auto & topic_and_type : topics_and_types) {
if (topic_and_type.second.size() > 1) {
RCLCPP_ERROR_STREAM(
logger,
"Topic '" << topic_and_type.first <<
"' has several types associated. Only topics with one type are supported");
continue;
}

// According to rclpy's implementation, the indicator for a hidden topic is a leading '_'
// https://github.com/ros2/rclpy/blob/master/rclpy/rclpy/topic_or_service_is_hidden.py#L15
if (!include_hidden_topics) {
auto tokens = rcpputils::split(topic_and_type.first, '/', true); // skip empty
auto is_hidden = std::find_if(
tokens.begin(), tokens.end(), [](const auto & token) -> bool {
return token[0] == '_';
});
if (is_hidden != tokens.end()) {
RCLCPP_WARN_ONCE(
logger,
"Hidden topics are not recorded. Enable them with --include-hidden-topics");
continue;
}
}

filtered_topics_and_types.insert({topic_and_type.first, topic_and_type.second[0]});
}
return filtered_topics_and_types;
}

std::unordered_map<std::string, std::string>
filter_topics_using_regex(
const std::unordered_map<std::string, std::string> & topics_and_types,
const std::string & filter_regex_string,
const std::string & exclude_regex_string,
bool all_flag
)
{
std::unordered_map<std::string, std::string> filtered_by_regex;

std::regex filter_regex(filter_regex_string);
std::regex exclude_regex(exclude_regex_string);

for (const auto & kv : topics_and_types) {
bool take = all_flag;
// regex_match returns false for 'empty' regex
if (!all_flag && !filter_regex_string.empty()) {
take = std::regex_match(kv.first, filter_regex);
}
if (take) {
take = !std::regex_match(kv.first, exclude_regex);
}
if (take) {
filtered_by_regex.insert(kv);
}
}
return filtered_by_regex;
}

std::unordered_map<std::string, std::string>
filter_topics_with_known_type(
const std::unordered_map<std::string, std::string> & topics_and_types,
std::unordered_set<std::string> & topic_unknown_types)
{
std::unordered_map<std::string, std::string> filtered_topics_and_types;

for (const auto & topic_and_type : topics_and_types) {
try {
auto package_name = std::get<0>(rosbag2_cpp::extract_type_identifier(topic_and_type.second));
rosbag2_cpp::get_typesupport_library_path(package_name, "rosidl_typesupport_cpp");
} catch (std::runtime_error & e) {
std::unordered_set<std::string>::const_iterator got = topic_unknown_types.find(
topic_and_type.second);
if (got == topic_unknown_types.end()) {
topic_unknown_types.emplace(topic_and_type.second);
RCLCPP_WARN_STREAM(
rclcpp::get_logger("rosbag2_transport"),
"Topic '" << topic_and_type.first <<
"' has unknown type '" << topic_and_type.second <<
"' associated. Only topics with known type are supported. Reason: '" << e.what());
}
continue;
}
filtered_topics_and_types.insert(topic_and_type);
}
return filtered_topics_and_types;
}

} // namespace topic_filter
} // namespace rosbag2_transport
62 changes: 62 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/topic_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TRANSPORT__TOPIC_FILTER_HPP_
#define ROSBAG2_TRANSPORT__TOPIC_FILTER_HPP_

#include <map>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "rosbag2_cpp/typesupport_helpers.hpp"
#include "rosbag2_transport/visibility_control.hpp"

namespace rosbag2_transport
{
namespace topic_filter
{

ROSBAG2_TRANSPORT_PUBLIC
std::unordered_map<std::string, std::string>
filter_topics(
const std::vector<std::string> & selected_topic_names,
const std::unordered_map<std::string, std::string> & all_topic_names_and_types);

ROSBAG2_TRANSPORT_PUBLIC
std::unordered_map<std::string, std::string>
filter_topics_with_more_than_one_type(
const std::map<std::string, std::vector<std::string>> & topics_and_types,
bool include_hidden_topics = false);

ROSBAG2_TRANSPORT_PUBLIC
std::unordered_map<std::string, std::string>
filter_topics_using_regex(
const std::unordered_map<std::string, std::string> & topics_and_types,
const std::string & filter_regex_string,
const std::string & exclude_regex_string,
bool all_flag
);

ROSBAG2_TRANSPORT_PUBLIC
std::unordered_map<std::string, std::string>
filter_topics_with_known_type(
const std::unordered_map<std::string, std::string> & topics_and_types,
std::unordered_set<std::string> & topic_unknown_types);

} // namespace topic_filter
} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__TOPIC_FILTER_HPP_
Loading