Skip to content

Commit

Permalink
feat: remove MtQueue, add debug publishers for decode latency
Browse files Browse the repository at this point in the history
Signed-off-by: Max SCHMELLER <[email protected]>
  • Loading branch information
mojomex committed Oct 8, 2024
1 parent 668c8f7 commit 1bde107
Show file tree
Hide file tree
Showing 28 changed files with 393 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const uint16_t MTU_SIZE = 1500;
/// small to bridge scheduling and processing delays, packets will be dropped. This corresponds to
/// the net.core.rmem_default setting in Linux. The current value is hardcoded to accommodate one
/// pointcloud worth of OT128 packets (currently the highest data rate sensor supported).
const size_t UDP_SOCKET_BUFFER_SIZE = MTU_SIZE * 3600;
const size_t UDP_SOCKET_BUFFER_SIZE = 0x10000000; // 250 MiB

// Time interval between Announce messages, in units of log seconds (default: 1)
const int PTP_LOG_ANNOUNCE_INTERVAL = 1;
Expand Down
3 changes: 3 additions & 0 deletions nebula_ros/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ find_package(robosense_msgs REQUIRED)
find_package(tf2_ros REQUIRED)
find_package(visualization_msgs REQUIRED)
find_package(yaml-cpp REQUIRED)
find_package(tier4_debug_msgs REQUIRED)

include_directories(
include
Expand All @@ -36,12 +37,14 @@ include_directories(
${YAML_CPP_INCLUDE_DIRS}
${PCL_INCLUDE_DIRS}
${rclcpp_components_INCLUDE_DIRS}
${tier4_debug_msgs_INCLUDE_DIRS}
)

link_libraries(
${nebula_common_TARGETS}
${YAML_CPP_LIBRARIES}
${PCL_LIBRARIES}
${tier4_debug_msgs_TARGETS}
)

## Hesai
Expand Down
65 changes: 65 additions & 0 deletions nebula_ros/include/nebula_ros/common/counters.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2024 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "nebula_ros/common/debug_publisher.hpp"

#include <tier4_debug_msgs/msg/int64_stamped.hpp>

#include <chrono>
#include <cstdint>
#include <map>
#include <string>
namespace nebula::ros
{

class Counter
{
public:
using clock = std::chrono::steady_clock;

explicit Counter(rclcpp::Node * node) : debug_pub_(node, node->get_fully_qualified_name()) {}

void contribute(const std::string & key, clock::duration duration)
{
measurements_.try_emplace(key, 0);
measurements_[key] += duration;
}

bool try_publish(const std::string & key)
{
if (measurements_.find(key) == measurements_.end()) {
return false;
}

debug_pub_.publish<tier4_debug_msgs::msg::Int64Stamped>(key, measurements_[key].count());
measurements_.erase(key);
return true;
}

void publish_all()
{
for (const auto & [key, value] : measurements_) {
debug_pub_.publish<tier4_debug_msgs::msg::Int64Stamped>(key, value.count());
}
measurements_.clear();
}

private:
autoware::universe_utils::DebugPublisher debug_pub_;
std::map<std::string, clock::duration> measurements_;
};

} // namespace nebula::ros
68 changes: 68 additions & 0 deletions nebula_ros/include/nebula_ros/common/debug_publisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2020 Tier IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef AUTOWARE__UNIVERSE_UTILS__ROS__DEBUG_PUBLISHER_HPP_
#define AUTOWARE__UNIVERSE_UTILS__ROS__DEBUG_PUBLISHER_HPP_

#include <rclcpp/publisher_base.hpp>
#include <rclcpp/rclcpp.hpp>
#include <rosidl_runtime_cpp/traits.hpp>

#include <memory>
#include <string>
#include <unordered_map>

namespace autoware::universe_utils
{
namespace debug_publisher
{
template <class T_msg, class T>
T_msg to_debug_msg(const T & data, const rclcpp::Time & stamp)
{
T_msg msg;
msg.stamp = stamp;
msg.data = data;
return msg;
}
} // namespace debug_publisher

class DebugPublisher
{
public:
explicit DebugPublisher(rclcpp::Node * node, const char * ns) : node_(node), ns_(ns) {}

template <class T>
void publish(const std::string & name, const T & data, const rclcpp::QoS & qos = rclcpp::QoS(1))
{
if (pub_map_.count(name) == 0) {
pub_map_[name] = node_->create_publisher<T>(std::string(ns_) + "/" + name, qos);
}

std::dynamic_pointer_cast<rclcpp::Publisher<T>>(pub_map_.at(name))->publish(data);
}

template <class T_msg, class T>
void publish(const std::string & name, const T & data, const rclcpp::QoS & qos = rclcpp::QoS(1))
{
publish(name, debug_publisher::to_debug_msg<T_msg>(data, node_->now()), qos);
}

private:
rclcpp::Node * node_;
const char * ns_;
std::unordered_map<std::string, std::shared_ptr<rclcpp::PublisherBase>> pub_map_;
};
} // namespace autoware::universe_utils

#endif // AUTOWARE__UNIVERSE_UTILS__ROS__DEBUG_PUBLISHER_HPP_
4 changes: 3 additions & 1 deletion nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "nebula_decoders/nebula_decoders_hesai/hesai_driver.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp"
#include "nebula_ros/common/counters.hpp"
#include "nebula_ros/common/watchdog_timer.hpp"

#include <nebula_common/hesai/hesai_common.hpp>
Expand All @@ -39,7 +40,8 @@ class HesaiDecoderWrapper
const std::shared_ptr<const nebula::drivers::HesaiCalibrationConfigurationBase> & calibration,
bool publish_packets);

void process_cloud_packet(std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_msg);
bool process_cloud_packet(
std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_msg, Counter & counter);

void on_config_change(
const std::shared_ptr<const nebula::drivers::HesaiSensorConfiguration> & new_config);
Expand Down
9 changes: 4 additions & 5 deletions nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "nebula_common/hesai/hesai_common.hpp"
#include "nebula_common/nebula_common.hpp"
#include "nebula_common/nebula_status.hpp"
#include "nebula_ros/common/counters.hpp"
#include "nebula_ros/common/debug_publisher.hpp"
#include "nebula_ros/common/mt_queue.hpp"
#include "nebula_ros/hesai/decoder_wrapper.hpp"
#include "nebula_ros/hesai/hw_interface_wrapper.hpp"
Expand Down Expand Up @@ -94,15 +96,12 @@ class HesaiRosWrapper final : public rclcpp::Node
get_calibration_result_t get_calibration_data(
const std::string & calibration_file_path, bool ignore_others = false);

Counter counter_;

Status wrapper_status_;

std::shared_ptr<const nebula::drivers::HesaiSensorConfiguration> sensor_cfg_ptr_{};

/// @brief Stores received packets that have not been processed yet by the decoder thread
MtQueue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
/// @brief Thread to isolate decoding from receiving
std::thread decoder_thread_;

rclcpp::Subscription<pandar_msgs::msg::PandarScan>::SharedPtr packets_sub_{};

bool launch_hw_;
Expand Down
1 change: 1 addition & 0 deletions nebula_ros/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<depend>rclcpp_components</depend>
<depend>robosense_msgs</depend>
<depend>tf2_ros</depend>
<depend>tier4_debug_msgs</depend>
<depend>velodyne_msgs</depend>
<depend>visualization_msgs</depend>
<depend>yaml-cpp</depend>
Expand Down
25 changes: 22 additions & 3 deletions nebula_ros/src/hesai/decoder_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

#include "nebula_ros/hesai/decoder_wrapper.hpp"

#include "nebula_ros/common/counters.hpp"

#include <nebula_common/hesai/hesai_common.hpp>
#include <rclcpp/logging.hpp>
#include <rclcpp/time.hpp>

#include <chrono>
#include <memory>

#pragma clang diagnostic ignored "-Wbitwise-instead-of-logical"
Expand Down Expand Up @@ -92,9 +95,11 @@ void HesaiDecoderWrapper::on_calibration_change(
calibration_cfg_ptr_ = new_calibration;
}

void HesaiDecoderWrapper::process_cloud_packet(
std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_msg)
bool HesaiDecoderWrapper::process_cloud_packet(
std::unique_ptr<nebula_msgs::msg::NebulaPacket> packet_msg, Counter & counter)
{
auto t1 = std::chrono::steady_clock::now();

// Accumulate packets for recording only if someone is subscribed to the topic (for performance)
if (
packets_pub_ && (packets_pub_->get_subscription_count() > 0 ||
Expand All @@ -110,6 +115,9 @@ void HesaiDecoderWrapper::process_cloud_packet(
current_scan_msg_->packets.emplace_back(std::move(pandar_packet_msg));
}

auto t2 = std::chrono::steady_clock::now();
counter.contribute("copy_into_scanmsg_ns", t2 - t1);

std::tuple<nebula::drivers::NebulaPointCloudPtr, double> pointcloud_ts{};
nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr;
{
Expand All @@ -118,13 +126,16 @@ void HesaiDecoderWrapper::process_cloud_packet(
pointcloud = std::get<0>(pointcloud_ts);
}

auto t3 = std::chrono::steady_clock::now();
counter.contribute("decode_ns", t3 - t2);

// A pointcloud is only emitted when a scan completes (e.g. 3599 packets do not emit, the 3600th
// emits one)
if (pointcloud == nullptr) {
// Since this ends the function early, the `cloud_watchdog_` will not be updated.
// Thus, if pointclouds are not emitted for too long (e.g. when decoder settings are wrong or no
// packets come in), the watchdog will log a warning automatically
return;
return false;
}

cloud_watchdog_->update();
Expand All @@ -135,6 +146,9 @@ void HesaiDecoderWrapper::process_cloud_packet(
current_scan_msg_ = std::make_unique<pandar_msgs::msg::PandarScan>();
}

auto t4 = std::chrono::steady_clock::now();
counter.contribute("publish_packets_ns", t4 - t3);

if (
nebula_points_pub_->get_subscription_count() > 0 ||
nebula_points_pub_->get_intra_process_subscription_count() > 0) {
Expand Down Expand Up @@ -166,6 +180,11 @@ void HesaiDecoderWrapper::process_cloud_packet(
rclcpp::Time(seconds_to_chrono_nano_seconds(std::get<1>(pointcloud_ts)).count());
publish_cloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_);
}

auto t5 = std::chrono::steady_clock::now();
counter.contribute("publish_points_ns", t5 - t4);

return true;
}

void HesaiDecoderWrapper::publish_cloud(
Expand Down
23 changes: 12 additions & 11 deletions nebula_ros/src/hesai/hesai_ros_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

#include "nebula_ros/hesai/hesai_ros_wrapper.hpp"

#include "nebula_ros/common/counters.hpp"
#include "nebula_ros/common/debug_publisher.hpp"
#include "nebula_ros/common/parameter_descriptors.hpp"

#include <nebula_common/hesai/hesai_common.hpp>
#include <nebula_common/nebula_common.hpp>
#include <nebula_decoders/nebula_decoders_common/angles.hpp>

#include <chrono>
#include <cstdint>
#include <filesystem>
#include <memory>
Expand All @@ -21,9 +24,9 @@ namespace nebula::ros
{
HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options)
: rclcpp::Node("hesai_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)),
counter_(this),
wrapper_status_(Status::NOT_INITIALIZED),
sensor_cfg_ptr_(nullptr),
packet_queue_(3000),
hw_interface_wrapper_(),
hw_monitor_wrapper_(),
decoder_wrapper_()
Expand Down Expand Up @@ -67,12 +70,6 @@ HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options)

RCLCPP_DEBUG(get_logger(), "Starting stream");

decoder_thread_ = std::thread([this]() {
while (true) {
decoder_wrapper_->process_cloud_packet(packet_queue_.pop());
}
});

if (launch_hw_) {
hw_interface_wrapper_->hw_interface()->RegisterScanCallback(
std::bind(&HesaiRosWrapper::receive_cloud_packet_callback, this, std::placeholders::_1));
Expand Down Expand Up @@ -273,8 +270,7 @@ void HesaiRosWrapper::receive_scan_message_callback(
auto nebula_pkt_ptr = std::make_unique<nebula_msgs::msg::NebulaPacket>();
nebula_pkt_ptr->stamp = pkt.stamp;
std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data));

packet_queue_.push(std::move(nebula_pkt_ptr));
decoder_wrapper_->process_cloud_packet(std::move(nebula_pkt_ptr), counter_);
}
}

Expand Down Expand Up @@ -400,6 +396,8 @@ rcl_interfaces::msg::SetParametersResult HesaiRosWrapper::on_parameter_change(

void HesaiRosWrapper::receive_cloud_packet_callback(std::vector<uint8_t> & packet)
{
auto perf_start = std::chrono::steady_clock::now();

if (!decoder_wrapper_ || decoder_wrapper_->status() != Status::OK) {
return;
}
Expand All @@ -412,9 +410,12 @@ void HesaiRosWrapper::receive_cloud_packet_callback(std::vector<uint8_t> & packe
msg_ptr->stamp.sec = static_cast<int>(timestamp_ns / 1'000'000'000);
msg_ptr->stamp.nanosec = static_cast<int>(timestamp_ns % 1'000'000'000);
msg_ptr->data.swap(packet);
bool published = decoder_wrapper_->process_cloud_packet(std::move(msg_ptr), counter_);

if (!packet_queue_.try_push(std::move(msg_ptr))) {
RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped");
auto perf_stop = std::chrono::steady_clock::now();
counter_.contribute("total_ns", perf_stop - perf_start);
if (published) {
counter_.publish_all();
}
}

Expand Down
Loading

0 comments on commit 1bde107

Please sign in to comment.