Skip to content

Commit

Permalink
fix/time_sync (#2221)
Browse files Browse the repository at this point in the history
* address pr comments

* address PR
  • Loading branch information
MishkaMN authored Dec 16, 2023
1 parent 4eca5ae commit 83ac8a0
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 53 deletions.
4 changes: 2 additions & 2 deletions carma_wm/include/carma_wm/CARMAWorldModel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ class CARMAWorldModel : public WorldModel

/*! \brief Set current Ros1 clock (only used in simulation runs)
*/
void setRos1Clock(rclcpp::Time time_now);
void setRos1Clock(const rclcpp::Time& time_now);

/*! \brief Set simulation clock clock (only used in simulation runs)
*/
void setSimulationClock(rclcpp::Time time_now);
void setSimulationClock(const rclcpp::Time& time_now);

/*! \brief helper for traffic signal Id
*/
Expand Down
2 changes: 1 addition & 1 deletion carma_wm/include/carma_wm/WMListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class WMListener

private:
// Callback function that uses lock to edit the map
void mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::UniquePtr geofence_msg);
void mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::SharedPtr geofence_msg);
carma_ros2_utils::SubPtr<carma_perception_msgs::msg::RoadwayObstacleList> roadway_objects_sub_;
carma_ros2_utils::SubPtr<autoware_lanelet2_msgs::msg::MapBin> map_update_sub_;

Expand Down
6 changes: 3 additions & 3 deletions carma_wm/src/CARMAWorldModel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,12 @@ namespace carma_wm
return semantic_map_;
}

void CARMAWorldModel::setRos1Clock(rclcpp::Time time_now)
void CARMAWorldModel::setRos1Clock(const rclcpp::Time& time_now)
{
ros1_clock_ = time_now;
}

void CARMAWorldModel::setSimulationClock(rclcpp::Time time_now)
void CARMAWorldModel::setSimulationClock(const rclcpp::Time& time_now)
{
simulation_clock_ = time_now;
}
Expand Down Expand Up @@ -1434,7 +1434,7 @@ namespace carma_wm
// NOTE: In simulation, ROS1 clock (often coming from CARLA) can have a large time ahead.
// the timing calculated here is in Simulation time, which is behind. Therefore, the world model adds the offset to make it meaningful to carma-platform:
// https://github.com/usdot-fhwa-stol/carma-platform/issues/2217
if (is_simulation && ros1_clock_.has_value() && simulation_clock_.has_value())
if (is_simulation && ros1_clock_ && simulation_clock_)
{
simulation_time_difference_in_seconds = ros1_clock_.value().seconds() - simulation_clock_.value().seconds();
}
Expand Down
68 changes: 44 additions & 24 deletions carma_wm/src/WMListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,44 +80,56 @@ WMListener::WMListener(
{
RCLCPP_DEBUG_STREAM(node_logging_->get_logger(), "WMListener: Using a multi-threaded subscription");

auto map_update_cb_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
map_update_options.callback_group = map_update_cb_group;
map_update_options.callback_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

auto map_cb_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
map_options.callback_group = map_cb_group;
map_options.callback_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

auto route_cb_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
route_options.callback_group = route_cb_group;
route_options.callback_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

auto roadway_objects_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
roadway_objects_options.callback_group = roadway_objects_group;
roadway_objects_options.callback_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

auto traffic_spat_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
traffic_spat_options.callback_group = traffic_spat_group;
traffic_spat_options.callback_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

auto ros1_clock_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
ros1_clock_options.callback_group = ros1_clock_group;

auto sim_clock_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
sim_clock_options.callback_group = sim_clock_group;
ros1_clock_options.callback_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

sim_clock_options.callback_group = node_base_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
}

// Setup subscribers
route_sub_ = rclcpp::create_subscription<carma_planning_msgs::msg::Route>(node_topics_, "route", 1,
std::bind(&WMListenerWorker::routeCallback, worker_.get(), std::placeholders::_1), route_options);
[this](const carma_planning_msgs::msg::Route::SharedPtr msg)
{
this->worker_->routeCallback(msg);
}
, route_options);

ros1_clock_sub_ = rclcpp::create_subscription<rosgraph_msgs::msg::Clock>(node_topics_, "/clock", 1,
std::bind(&WMListenerWorker::ros1ClockCallback, worker_.get(), std::placeholders::_1), ros1_clock_options);
[this](const rosgraph_msgs::msg::Clock::SharedPtr msg)
{
this->worker_->ros1ClockCallback(msg);
}
, ros1_clock_options);

sim_clock_sub_ = rclcpp::create_subscription<rosgraph_msgs::msg::Clock>(node_topics_, "/sim_clock", 1,
std::bind(&WMListenerWorker::simClockCallback, worker_.get(), std::placeholders::_1), sim_clock_options);
[this](const rosgraph_msgs::msg::Clock::SharedPtr msg)
{
this->worker_->simClockCallback(msg);
}
, sim_clock_options);

roadway_objects_sub_ = rclcpp::create_subscription<carma_perception_msgs::msg::RoadwayObstacleList>(node_topics_, "roadway_objects", 1,
std::bind(&WMListenerWorker::roadwayObjectListCallback, worker_.get(), std::placeholders::_1), roadway_objects_options);
[this](const carma_perception_msgs::msg::RoadwayObstacleList::SharedPtr msg)
{
this->worker_->roadwayObjectListCallback(msg);
}
, roadway_objects_options);

traffic_spat_sub_ = rclcpp::create_subscription<carma_v2x_msgs::msg::SPAT>(node_topics_, "incoming_spat", 1,
std::bind(&WMListenerWorker::incomingSpatCallback, worker_.get(), std::placeholders::_1), traffic_spat_options);
[this](const carma_v2x_msgs::msg::SPAT::SharedPtr msg)
{
this->worker_->incomingSpatCallback(msg);
}
, traffic_spat_options);

// NOTE: Currently, intra-process comms must be disabled for subscribers that are transient_local: https://github.com/ros2/rclcpp/issues/1753
map_update_options.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; // Disable intra-process comms for the map update subscriber
Expand All @@ -127,7 +139,11 @@ WMListener::WMListener(

// Create map update subscriber that will receive earlier messages that were missed ONLY if the publisher is transient_local too
map_update_sub_ = rclcpp::create_subscription<autoware_lanelet2_msgs::msg::MapBin>(node_topics_, "map_update", map_update_sub_qos,
std::bind(&WMListener::mapUpdateCallback, this, std::placeholders::_1), map_update_options);
[this](const autoware_lanelet2_msgs::msg::MapBin::SharedPtr msg)
{
this->mapUpdateCallback(msg);
}
, map_update_options);

map_options.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; // Disable intra-process comms for the semantic map subscriber
auto map_sub_qos = rclcpp::QoS(rclcpp::KeepLast(2)); // Set the queue size for the semantic map subscriber
Expand All @@ -136,7 +152,11 @@ WMListener::WMListener(

// Create semantic mab subscriber that will receive earlier messages that were missed ONLY if the publisher is transient_local too
map_sub_ = rclcpp::create_subscription<autoware_lanelet2_msgs::msg::MapBin>(node_topics_, "semantic_map", map_sub_qos,
std::bind(&WMListenerWorker::mapCallback, worker_.get(), std::placeholders::_1), map_options);
[this](const autoware_lanelet2_msgs::msg::MapBin::SharedPtr msg)
{
this->worker_->mapCallback(msg);
}
, map_options);
}

WMListener::~WMListener() {}
Expand All @@ -159,13 +179,13 @@ WorldModelConstPtr WMListener::getWorldModel()
return worker_->getWorldModel();
}

void WMListener::mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::UniquePtr geofence_msg)
void WMListener::mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::SharedPtr geofence_msg)
{
const std::lock_guard<std::mutex> lock(mw_mutex_);

RCLCPP_INFO_STREAM(node_logging_->get_logger(), "New Map Update Received. SeqNum: " << geofence_msg->seq_id);

worker_->mapUpdateCallback(std::move(geofence_msg));
worker_->mapUpdateCallback(geofence_msg);
}

void WMListener::setMapCallback(std::function<void()> callback)
Expand Down
30 changes: 15 additions & 15 deletions carma_wm/src/WMListenerWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ WorldModelConstPtr WMListenerWorker::getWorldModel() const
}


void WMListenerWorker::mapCallback(const autoware_lanelet2_msgs::msg::MapBin::UniquePtr map_msg)
void WMListenerWorker::mapCallback(const autoware_lanelet2_msgs::msg::MapBin::SharedPtr map_msg)
{
current_map_version_ = map_msg->map_version;

Expand All @@ -67,15 +67,15 @@ void WMListenerWorker::mapCallback(const autoware_lanelet2_msgs::msg::MapBin::Un
bool more_updates_to_apply = true;
while(!map_update_queue_.empty() && more_updates_to_apply) {

auto update = std::move(map_update_queue_.front()); // Get first update
auto update = map_update_queue_.front(); // Get first update
map_update_queue_.pop(); // Remove update from queue

if (update->map_version < current_map_version_) { // Drop any so far unapplied updates for the previous map
RCLCPP_WARN_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "There were unapplied updates in carma_wm when a new map was received.");
continue;
}
if (update->map_version == current_map_version_) { // Current update goes with current map
mapUpdateCallback(std::move(update)); // Apply the update
mapUpdateCallback(update); // Apply the update
} else {
RCLCPP_INFO_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Done applying updates for new map. However, more updates are waiting for a future map.");
more_updates_to_apply = false; // If there is more updates queued that are not for this map version assume they are for a future map version
Expand All @@ -101,7 +101,7 @@ void WMListenerWorker::mapCallback(const autoware_lanelet2_msgs::msg::MapBin::Un
}
}

void WMListenerWorker::incomingSpatCallback(const carma_v2x_msgs::msg::SPAT::UniquePtr spat_msg)
void WMListenerWorker::incomingSpatCallback(const carma_v2x_msgs::msg::SPAT::SharedPtr spat_msg)
{
world_model_->processSpatFromMsg(*spat_msg, use_sim_time_);
}
Expand Down Expand Up @@ -144,28 +144,28 @@ void logSignalizedIntersectionManager(const carma_wm::SignalizedIntersectionMana
}
}

void WMListenerWorker::mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::UniquePtr geofence_msg)
void WMListenerWorker::mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::SharedPtr geofence_msg)
{
RCLCPP_INFO_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Map Update Being Evaluated. SeqNum: " << geofence_msg->seq_id);
if (rerouting_flag_) // no update should be applied if rerouting
{
RCLCPP_INFO_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Currently new route is being processed. Queueing this update. Received seq: " << geofence_msg->seq_id << " prev seq: " << most_recent_update_msg_seq_);
map_update_queue_.emplace(std::move(geofence_msg));
map_update_queue_.emplace(geofence_msg);
return;
}
if (geofence_msg->seq_id <= most_recent_update_msg_seq_) {
RCLCPP_DEBUG_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Dropping map update which has already been processed. Received seq: " << geofence_msg->seq_id << " prev seq: " << most_recent_update_msg_seq_);
return;
} else if(!world_model_->getMap() || current_map_version_ < geofence_msg->map_version) { // If our current map version is older than the version target by this update
RCLCPP_DEBUG_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Update received for newer map version than available. Queueing update until map is available.");
map_update_queue_.emplace(std::move(geofence_msg));
map_update_queue_.emplace(geofence_msg);
return;
} else if (current_map_version_ > geofence_msg->map_version) { // If this update is for an older map
RCLCPP_WARN_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Dropping old map update as newer map is already available.");
return;
} else if (most_recent_update_msg_seq_ + 1 < geofence_msg->seq_id) {
RCLCPP_INFO_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Queuing map update as we are waiting on an earlier update to be applied. most_recent_update_msg_seq_: " << most_recent_update_msg_seq_ << "geofence_msg->seq_id: " << geofence_msg->seq_id);
map_update_queue_.emplace(std::move(geofence_msg));
map_update_queue_.emplace(geofence_msg);
return;
}

Expand All @@ -180,7 +180,7 @@ void WMListenerWorker::mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::Un
if(route_node_flag_!=true)
{
RCLCPP_INFO_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Route is not yet available. Therefore queueing the update");
map_update_queue_.emplace(std::move(geofence_msg));
map_update_queue_.emplace(geofence_msg);
return;
}
}
Expand Down Expand Up @@ -570,23 +570,23 @@ std::string WMListenerWorker::getVehicleParticipationType() const
return world_model_->getVehicleParticipationType();
}

void WMListenerWorker::roadwayObjectListCallback(const carma_perception_msgs::msg::RoadwayObstacleList::UniquePtr msg)
void WMListenerWorker::roadwayObjectListCallback(const carma_perception_msgs::msg::RoadwayObstacleList::SharedPtr msg)
{
// this topic publishes only the objects that are on the road
world_model_->setRoadwayObjects(msg->roadway_obstacles);
}

void WMListenerWorker::ros1ClockCallback(const rosgraph_msgs::msg::Clock::UniquePtr clock_msg)
void WMListenerWorker::ros1ClockCallback(const rosgraph_msgs::msg::Clock::SharedPtr clock_msg)
{
world_model_->setRos1Clock(rclcpp::Time(clock_msg->clock));
}

void WMListenerWorker::simClockCallback(const rosgraph_msgs::msg::Clock::UniquePtr clock_msg)
void WMListenerWorker::simClockCallback(const rosgraph_msgs::msg::Clock::SharedPtr clock_msg)
{
world_model_->setSimulationClock(rclcpp::Time(clock_msg->clock));
}

void WMListenerWorker::routeCallback(const carma_planning_msgs::msg::Route::UniquePtr route_msg)
void WMListenerWorker::routeCallback(const carma_planning_msgs::msg::Route::SharedPtr route_msg)
{
if (route_msg->map_version < current_map_version_) {
RCLCPP_WARN_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Route message rejected as it is for an older map");
Expand All @@ -611,7 +611,7 @@ void WMListenerWorker::routeCallback(const carma_planning_msgs::msg::Route::Uniq
bool more_updates_to_apply = true;
while(!map_update_queue_.empty() && more_updates_to_apply) {

auto update = std::move(map_update_queue_.front()); // Get first update
auto update = map_update_queue_.front(); // Get first update
map_update_queue_.pop(); // Remove update from queue

if (update->map_version < current_map_version_) { // Drop any so far unapplied updates for the current map
Expand All @@ -628,7 +628,7 @@ void WMListenerWorker::routeCallback(const carma_planning_msgs::msg::Route::Uniq

update->invalidates_route=false; // Do not trigger recomputation of routing graph in mapUpdateCallback; recomputation of routing graph will occur outside of this loop

mapUpdateCallback(std::move(update)); // Apply the update
mapUpdateCallback(update); // Apply the update
} else {
RCLCPP_INFO_STREAM(rclcpp::get_logger("carma_wm::WMListenerWorker"), "Apply from reroute: Done applying updates for new map. However, more updates are waiting for a future map.");
more_updates_to_apply = false; // If there is more updates queued that are not for this map version assume they are for a future map version
Expand Down
16 changes: 8 additions & 8 deletions carma_wm/src/WMListenerWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,34 +49,34 @@ class WMListenerWorker
*
* \param map_msg The new map messages to generate the map from
*/
void mapCallback(const autoware_lanelet2_msgs::msg::MapBin::UniquePtr map_msg);
void mapCallback(const autoware_lanelet2_msgs::msg::MapBin::SharedPtr map_msg);

/*!
* \brief Callback for new map update messages (geofence). Updates the underlying map
*
* \param geofence_msg The new map update messages to generate the map edits from
*/
void mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::UniquePtr geofence_msg);
void mapUpdateCallback(autoware_lanelet2_msgs::msg::MapBin::SharedPtr geofence_msg);

/*!
* \brief Callback for route message.
*/
void routeCallback(const carma_planning_msgs::msg::Route::UniquePtr route_msg);
void routeCallback(const carma_planning_msgs::msg::Route::SharedPtr route_msg);

/*!
* \brief Callback for ROS1 clock message (used in Simulation runs)
*/
void ros1ClockCallback(const rosgraph_msgs::msg::Clock::UniquePtr clock_msg);
void ros1ClockCallback(const rosgraph_msgs::msg::Clock::SharedPtr clock_msg);

/*!
* \brief Callback for Simulation clock message (used in Simulation runs)
*/
void simClockCallback(const rosgraph_msgs::msg::Clock::UniquePtr clock_msg);
void simClockCallback(const rosgraph_msgs::msg::Clock::SharedPtr clock_msg);

/*!
* \brief Callback for roadway objects msg
*/
void roadwayObjectListCallback(const carma_perception_msgs::msg::RoadwayObstacleList::UniquePtr msg);
void roadwayObjectListCallback(const carma_perception_msgs::msg::RoadwayObstacleList::SharedPtr msg);

/*!
* \brief Allows user to set a callback to be triggered when a map update is received
Expand Down Expand Up @@ -144,7 +144,7 @@ class WMListenerWorker
/**
* \brief incoming spat message
*/
void incomingSpatCallback(const carma_v2x_msgs::msg::SPAT::UniquePtr spat_msg);
void incomingSpatCallback(const carma_v2x_msgs::msg::SPAT::SharedPtr spat_msg);

/**
* \brief set true if simulation_mode is on
Expand All @@ -160,7 +160,7 @@ class WMListenerWorker
double config_speed_limit_;

size_t current_map_version_ = 0; // Current map version based on recived map messages
std::queue<autoware_lanelet2_msgs::msg::MapBin::UniquePtr> map_update_queue_; // Update queue used to cache map updates when they cannot be immeadiatly applied due to waiting for rerouting
std::queue<autoware_lanelet2_msgs::msg::MapBin::SharedPtr> map_update_queue_; // Update queue used to cache map updates when they cannot be immeadiatly applied due to waiting for rerouting
boost::optional<carma_planning_msgs::msg::Route> delayed_route_msg_;

bool recompute_route_flag_=false; // indicates whether if this node should recompute its route based on invalidated msg
Expand Down

0 comments on commit 83ac8a0

Please sign in to comment.