Skip to content

Commit

Permalink
add "rosx_introspection" to support JSON encoding (#307)
Browse files Browse the repository at this point in the history
Clients to the ROS 2 bridge can publish messages as JSON instead of cdr. JSON payloads will be re-serialized to cdr by the bridge before publishing to the ROS 2 graph

---------
Co-authored-by: Hans-Joachim Krauch <[email protected]>
  • Loading branch information
facontidavide authored Jul 29, 2024
1 parent f49c950 commit efbcaef
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 31 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2")
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(resource_retriever REQUIRED)
find_package(rosx_introspection REQUIRED)

add_library(foxglove_bridge_component SHARED
ros2_foxglove_bridge/src/message_definition_cache.cpp
Expand All @@ -168,7 +169,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2")
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/ros2_foxglove_bridge/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(foxglove_bridge_component ament_index_cpp rclcpp rclcpp_components resource_retriever rosgraph_msgs)
ament_target_dependencies(foxglove_bridge_component ament_index_cpp rclcpp rclcpp_components resource_retriever rosgraph_msgs rosx_introspection)
target_link_libraries(foxglove_bridge_component foxglove_bridge_base)
rclcpp_components_register_nodes(foxglove_bridge_component "foxglove_bridge::FoxgloveBridge")
enable_strict_compiler_warnings(foxglove_bridge_component)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,10 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, Mes
length,
data};
_handlers.clientMessageHandler(clientMessage, hdl);
} catch (const ServiceError& e) {
} catch (const ClientChannelError& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, "callService: Failed to execute handler");
sendStatusAndLogMsg(hdl, StatusLevel::Error, "clientMessage: Failed to execute handler");
}
} break;
case ClientBinaryOpcode::SERVICE_CALL_REQUEST: {
Expand Down
3 changes: 3 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
<depend condition="$ROS_VERSION == 2">rclcpp</depend>
<depend condition="$ROS_VERSION == 2">rclcpp_components</depend>

<depend condition="$ROS_VERSION == 2">rosx_introspection</depend>


<!-- Test dependencies -->
<test_depend condition="$ROS_VERSION == 1">gtest</test_depend>
<test_depend condition="$ROS_VERSION == 1">rostest</test_depend>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <rclcpp/rclcpp.hpp>
#include <rosgraph_msgs/msg/clock.hpp>
#include <rosx_introspection/ros_parser.hpp>
#include <websocketpp/common/connection_hdl.hpp>

#include <foxglove_bridge/callback_queue.hpp>
Expand Down Expand Up @@ -83,6 +84,7 @@ class FoxgloveBridge : public rclcpp::Node {
bool _includeHidden = false;
bool _disableLoanMessage = true;
std::unique_ptr<foxglove::CallbackQueue> _fetchAssetQueue;
std::unordered_map<std::string, std::shared_ptr<RosMsgParser::Parser>> _jsonParsers;
std::atomic<bool> _shuttingDown = false;

void subscribeConnectionGraph(bool subscribe);
Expand Down
102 changes: 89 additions & 13 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ FoxgloveBridge::FoxgloveBridge(const rclcpp::NodeOptions& options)
if (_useSimTime) {
serverOptions.capabilities.push_back(foxglove::CAPABILITY_TIME);
}
serverOptions.supportedEncodings = {"cdr"};
serverOptions.supportedEncodings = {"cdr", "json"};
serverOptions.metadata = {{"ROS_DISTRO", rosDistro}};
serverOptions.sendBufferLimitBytes = send_buffer_limit;
serverOptions.sessionId = std::to_string(std::time(nullptr));
Expand Down Expand Up @@ -266,7 +266,6 @@ void FoxgloveBridge::updateAdvertisedTopics(
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what());
continue;
}

channelsToAdd.push_back(newChannel);
}

Expand Down Expand Up @@ -602,6 +601,42 @@ void FoxgloveBridge::clientAdvertise(const foxglove::ClientAdvertisement& advert
std::to_string(advertisement.channelId) + " it had already advertised");
}

if (advertisement.schemaName.empty()) {
throw foxglove::ClientChannelError(
advertisement.channelId,
"Received client advertisement from " + _server->remoteEndpointString(hdl) + " for channel " +
std::to_string(advertisement.channelId) + " with empty schema name");
}

if (advertisement.encoding == "json") {
// register the JSON parser for this schemaName
auto parserIt = _jsonParsers.find(advertisement.schemaName);
if (parserIt == _jsonParsers.end()) {
const auto& schemaName = advertisement.schemaName;
std::string schema = "";

if (!advertisement.schema.empty()) {
// Schema is given by the advertisement
schema = std::string(reinterpret_cast<const char*>(advertisement.schema.data()),
advertisement.schema.size());
} else {
// Schema not given, look it up.
auto [format, msgDefinition] = _messageDefinitionCache.get_full_text(schemaName);
if (format != foxglove::MessageDefinitionFormat::MSG) {
throw foxglove::ClientChannelError(
advertisement.channelId,
"Message definition (.msg) for schema " + schemaName + " not found.");
}

schema = msgDefinition;
}

auto parser = std::make_shared<RosMsgParser::Parser>(
advertisement.topic, RosMsgParser::ROSType(schemaName), schema);
_jsonParsers.insert({schemaName, parser});
}
}

try {
// Create a new topic advertisement
const auto& topicName = advertisement.topic;
Expand Down Expand Up @@ -629,9 +664,10 @@ void FoxgloveBridge::clientAdvertise(const foxglove::ClientAdvertisement& advert
publisherOptions.callback_group = _clientPublishCallbackGroup;
auto publisher = this->create_generic_publisher(topicName, topicType, qos, publisherOptions);

RCLCPP_INFO(this->get_logger(), "Client %s is advertising \"%s\" (%s) on channel %d",
RCLCPP_INFO(this->get_logger(),
"Client %s is advertising \"%s\" (%s) on channel %d with encoding \"%s\"",
_server->remoteEndpointString(hdl).c_str(), topicName.c_str(), topicType.c_str(),
advertisement.channelId);
advertisement.channelId, advertisement.encoding.c_str());

// Store the new topic advertisement
clientPublications.emplace(advertisement.channelId, std::move(publisher));
Expand Down Expand Up @@ -704,17 +740,57 @@ void FoxgloveBridge::clientMessage(const foxglove::ClientMessage& message, Conne
publisher = it2->second;
}

// Copy the message payload into a SerializedMessage object
rclcpp::SerializedMessage serializedMessage{message.getLength()};
auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message();
std::memcpy(rclSerializedMsg.buffer, message.getData(), message.getLength());
rclSerializedMsg.buffer_length = message.getLength();
auto publishMessage = [publisher, this](const void* data, size_t size) {
// Copy the message payload into a SerializedMessage object
rclcpp::SerializedMessage serializedMessage{size};
auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message();
std::memcpy(rclSerializedMsg.buffer, data, size);
rclSerializedMsg.buffer_length = size;
// Publish the message
if (_disableLoanMessage || !publisher->can_loan_messages()) {
publisher->publish(serializedMessage);
} else {
publisher->publish_as_loaned_msg(serializedMessage);
}
};

// Publish the message
if (_disableLoanMessage || !publisher->can_loan_messages()) {
publisher->publish(serializedMessage);
if (message.advertisement.encoding == "cdr") {
publishMessage(message.getData(), message.getLength());
} else if (message.advertisement.encoding == "json") {
// get the specific parser for this schemaName
std::shared_ptr<RosMsgParser::Parser> parser;
{
std::lock_guard<std::mutex> lock(_clientAdvertisementsMutex);
auto parserIt = _jsonParsers.find(message.advertisement.schemaName);
if (parserIt != _jsonParsers.end()) {
parser = parserIt->second;
}
}
if (!parser) {
throw foxglove::ClientChannelError(message.advertisement.channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
" with encoding \"json\": no parser found");
} else {
thread_local RosMsgParser::ROS2_Serializer serializer;
serializer.reset();
const std::string jsonMessage(reinterpret_cast<const char*>(message.getData()),
message.getLength());
try {
parser->serializeFromJson(jsonMessage, &serializer);
publishMessage(serializer.getBufferData(), serializer.getBufferSize());
} catch (const std::exception& ex) {
throw foxglove::ClientChannelError(message.advertisement.channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
" with encoding \"json\": " + ex.what());
}
}
} else {
publisher->publish_as_loaned_msg(serializedMessage);
throw foxglove::ClientChannelError(
message.advertisement.channelId,
"Dropping client message from " + _server->remoteEndpointString(hdl) +
" with unknown encoding \"" + message.advertisement.encoding + "\"");
}
}

Expand Down
63 changes: 48 additions & 15 deletions ros2_foxglove_bridge/tests/smoke_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
constexpr char URI[] = "ws://localhost:8765";

// Binary representation of std_msgs/msg/String for "hello world"
constexpr uint8_t HELLO_WORLD_BINARY[] = {0, 1, 0, 0, 12, 0, 0, 0, 104, 101,
108, 108, 111, 32, 119, 111, 114, 108, 100, 0};
constexpr uint8_t HELLO_WORLD_CDR[] = {0, 1, 0, 0, 12, 0, 0, 0, 104, 101,
108, 108, 111, 32, 119, 111, 114, 108, 100, 0};
constexpr char HELLO_WORLD_JSON[] = "{\"data\": \"hello world\"}";
constexpr char STD_MSGS_STRING_SCHEMA[] = "data string";

constexpr auto ONE_SECOND = std::chrono::seconds(1);
constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(10);
Expand Down Expand Up @@ -124,7 +126,11 @@ class ServiceTest : public TestWithExecutor {
std::shared_ptr<foxglove::Client<websocketpp::config::asio_client>> _wsClient;
};

class ExistingPublisherTest : public TestWithExecutor {
class PublisherTest
: public TestWithExecutor,
public testing::WithParamInterface<std::pair<std::string, std::vector<uint8_t>>> {};

class ExistingPublisherTest : public PublisherTest {
public:
inline static const std::string TOPIC_NAME = "/some_topic";

Expand Down Expand Up @@ -198,8 +204,8 @@ TEST(SmokeTest, testSubscription) {
client->subscribe({{subscriptionId, channel.id}});
ASSERT_EQ(std::future_status::ready, msgFuture.wait_for(ONE_SECOND));
const auto msgData = msgFuture.get();
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));
ASSERT_EQ(sizeof(HELLO_WORLD_CDR), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_CDR, msgData.data(), msgData.size()));

// Unsubscribe from the channel again.
client->unsubscribe({subscriptionId});
Expand Down Expand Up @@ -243,21 +249,25 @@ TEST(SmokeTest, testSubscriptionParallel) {
for (auto& future : futures) {
ASSERT_EQ(std::future_status::ready, future.wait_for(DEFAULT_TIMEOUT));
auto msgData = future.get();
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));
ASSERT_EQ(sizeof(HELLO_WORLD_CDR), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_CDR, msgData.data(), msgData.size()));
}

for (auto client : clients) {
client->unsubscribe({subscriptionId});
}
}

TEST_F(TestWithExecutor, testPublishing) {
TEST_P(PublisherTest, testPublishing) {
const auto& [encoding, message] = GetParam();

foxglove::ClientAdvertisement advertisement;
advertisement.channelId = 1;
advertisement.topic = "/foo";
advertisement.encoding = "cdr";
advertisement.schemaName = "std_msgs/String";
advertisement.encoding = encoding;
advertisement.schemaName = "std_msgs/msg/String";
advertisement.schema =
std::vector<uint8_t>(STD_MSGS_STRING_SCHEMA, std::end(STD_MSGS_STRING_SCHEMA));

// Set up a ROS node with a subscriber
std::promise<std::string> msgPromise;
Expand All @@ -279,7 +289,7 @@ TEST_F(TestWithExecutor, testPublishing) {
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));

// Publish the message and unadvertise again
client->publish(advertisement.channelId, HELLO_WORLD_BINARY, sizeof(HELLO_WORLD_BINARY));
client->publish(advertisement.channelId, message.data(), message.size());
client->unadvertise({advertisement.channelId});

// Ensure that we have received the correct message via our ROS subscriber
Expand All @@ -288,12 +298,25 @@ TEST_F(TestWithExecutor, testPublishing) {
EXPECT_EQ("hello world", msgFuture.get());
}

TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) {
INSTANTIATE_TEST_SUITE_P(
TestPublishingCDR, PublisherTest,
testing::Values(std::make_pair("cdr", std::vector<uint8_t>(HELLO_WORLD_CDR,
std::end(HELLO_WORLD_CDR)))));

INSTANTIATE_TEST_SUITE_P(
TestPublishingJSON, PublisherTest,
testing::Values(std::make_pair("json", std::vector<uint8_t>(HELLO_WORLD_JSON,
std::end(HELLO_WORLD_JSON)))));

TEST_P(ExistingPublisherTest, testPublishingWithExistingPublisher) {
const auto& [encoding, message] = GetParam();

foxglove::ClientAdvertisement advertisement;
advertisement.channelId = 1;
advertisement.topic = TOPIC_NAME;
advertisement.encoding = "cdr";
advertisement.schemaName = "std_msgs/String";
advertisement.encoding = encoding;
advertisement.schemaName = "std_msgs/msg/String";
advertisement.schema = {}; // Schema intentionally left empty.

// Set up a ROS node with a subscriber
std::promise<std::string> msgPromise;
Expand All @@ -316,7 +339,7 @@ TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) {
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));

// Publish the message and unadvertise again
client->publish(advertisement.channelId, HELLO_WORLD_BINARY, sizeof(HELLO_WORLD_BINARY));
client->publish(advertisement.channelId, message.data(), message.size());
client->unadvertise({advertisement.channelId});

// Ensure that we have received the correct message via our ROS subscriber
Expand All @@ -325,6 +348,16 @@ TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) {
EXPECT_EQ("hello world", msgFuture.get());
}

INSTANTIATE_TEST_SUITE_P(
ExistingPublisherTestCDR, ExistingPublisherTest,
testing::Values(std::make_pair("cdr", std::vector<uint8_t>(HELLO_WORLD_CDR,
std::end(HELLO_WORLD_CDR)))));

INSTANTIATE_TEST_SUITE_P(
ExistingPublisherTestJSON, ExistingPublisherTest,
testing::Values(std::make_pair("json", std::vector<uint8_t>(HELLO_WORLD_JSON,
std::end(HELLO_WORLD_JSON)))));

TEST_F(ParameterTest, testGetAllParams) {
const std::string requestId = "req-testGetAllParams";
auto future = foxglove::waitForParameters(_wsClient, requestId);
Expand Down

0 comments on commit efbcaef

Please sign in to comment.