Skip to content

Commit

Permalink
Update geofence control sub to transient_local (#2289)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonSmet authored Jan 23, 2024
2 parents 18cc015 + fbc3e61 commit 1dce661
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions carma_wm_ctrl/src/WMBroadcasterNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,27 @@ void WMBroadcasterNode::publishTCMACK(const carma_v2x_msgs::msg::MobilityOperati
}

WMBroadcasterNode::WMBroadcasterNode(const rclcpp::NodeOptions &options)
: carma_ros2_utils::CarmaLifecycleNode(options)
: carma_ros2_utils::CarmaLifecycleNode(options)
{
// Create initial config
config_ = Config();

config_.ack_pub_times = declare_parameter<int>("ack_pub_times", config_.ack_pub_times);
config_.max_lane_width = declare_parameter<double>("max_lane_width", config_.max_lane_width);
config_.traffic_control_request_period = declare_parameter<double>("traffic_control_request_period", config_.traffic_control_request_period);
config_.vehicle_id = declare_parameter<std::string>("vehicle_id", config_.vehicle_id);
config_.participant = declare_parameter<std::string>("vehicle_participant_type", config_.participant);
config_.participant = declare_parameter<double>("config_speed_limit", config_.config_limit);

declare_parameter("intersection_ids_for_correction");
declare_parameter("intersection_coord_correction");
};

void WMBroadcasterNode::initializeWorker(std::weak_ptr<carma_ros2_utils::CarmaLifecycleNode> weak_node_pointer)
{
wmb_ = std::make_unique<carma_wm_ctrl::WMBroadcaster>(std::bind(&WMBroadcasterNode::publishMap, this, std_ph::_1), std::bind(&WMBroadcasterNode::publishMapUpdate, this, std_ph::_1),
wmb_ = std::make_unique<carma_wm_ctrl::WMBroadcaster>(std::bind(&WMBroadcasterNode::publishMap, this, std_ph::_1), std::bind(&WMBroadcasterNode::publishMapUpdate, this, std_ph::_1),
std::bind(&WMBroadcasterNode::publishCtrlReq, this, std_ph::_1), std::bind(&WMBroadcasterNode::publishActiveGeofence, this, std_ph::_1),
std::make_unique<carma_ros2_utils::timers::ROSTimerFactory>(weak_node_pointer),
std::make_unique<carma_ros2_utils::timers::ROSTimerFactory>(weak_node_pointer),
std::bind(&WMBroadcasterNode::publishTCMACK, this, std_ph::_1));
}

Expand All @@ -91,7 +91,7 @@ carma_ros2_utils::CallbackReturn WMBroadcasterNode::handle_on_configure(const rc
get_parameter<std::string>("vehicle_id", config_.vehicle_id);
get_parameter<std::string>("vehicle_participant_type", config_.participant);
get_parameter<double>("config_speed_limit", config_.config_limit);

wmb_->setConfigACKPubTimes(config_.ack_pub_times);
wmb_->setMaxLaneWidth(config_.max_lane_width);
wmb_->setConfigSpeedLimit(config_.config_limit);
Expand All @@ -103,30 +103,30 @@ carma_ros2_utils::CallbackReturn WMBroadcasterNode::handle_on_configure(const rc

rclcpp::Parameter intersection_ids_for_correction_param = get_parameter("intersection_ids_for_correction");
config_.intersection_ids_for_correction = intersection_ids_for_correction_param.as_integer_array();

wmb_->setIntersectionCoordCorrection(config_.intersection_ids_for_correction, config_.intersection_coord_correction);

RCLCPP_INFO_STREAM(rclcpp::get_logger("carma_mw_ctrl"),"Done loading parameters: " << config_);

/////////////
// PUBLISHERS
/////////////

// NOTE: Currently, intra-process comms must be disabled for the following two publishers that are transient_local: https://github.com/ros2/rclcpp/issues/1753
rclcpp::PublisherOptions intra_proc_disabled;
rclcpp::PublisherOptions intra_proc_disabled;
intra_proc_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; // Disable intra-process comms for this PublisherOptions object

// Create a publisher that will send all previously published messages to late-joining subscribers ONLY If the subscriber is transient_local too
auto pub_qos_transient_local = rclcpp::QoS(rclcpp::KeepAll()); // A publisher with this QoS will store all messages that it has sent on the topic
pub_qos_transient_local.transient_local(); // A publisher with this QoS will re-send all (when KeepAll is used) messages to all late-joining subscribers
pub_qos_transient_local.transient_local(); // A publisher with this QoS will re-send all (when KeepAll is used) messages to all late-joining subscribers
// NOTE: The subscriber's QoS must be set to transient_local() as well for earlier messages to be resent to the later-joiner.

// Map Update Publisher
map_update_pub_ = create_publisher<autoware_lanelet2_msgs::msg::MapBin>("map_update", pub_qos_transient_local, intra_proc_disabled);

// Map Publisher
map_pub_ = create_publisher<autoware_lanelet2_msgs::msg::MapBin>("semantic_map", pub_qos_transient_local, intra_proc_disabled);

//Route Message Publisher
control_msg_pub_= create_publisher<carma_v2x_msgs::msg::TrafficControlRequest>("outgoing_geofence_request", 1);

Expand All @@ -142,11 +142,11 @@ carma_ros2_utils::CallbackReturn WMBroadcasterNode::handle_on_configure(const rc
//TCR Visualizer pub (visualized on UI)
tcr_visualizer_pub_ = create_publisher<carma_v2x_msgs::msg::TrafficControlRequestPolygon>("tcr_bounding_points",1);

//Upcoming intersection and group id of traffic light
//Upcoming intersection and group id of traffic light
upcoming_intersection_ids_pub_ = create_publisher<std_msgs::msg::Int32MultiArray>("intersection_signal_group_ids", 1);

// Return success if everything initialized successfully

return CallbackReturn::SUCCESS;
}

Expand All @@ -156,30 +156,35 @@ carma_ros2_utils::CallbackReturn WMBroadcasterNode::handle_on_activate(const rcl
timer_ = create_timer(get_clock(),
std::chrono::milliseconds((int)(config_.traffic_control_request_period * 1000)),
std::bind(&WMBroadcasterNode::spin_callback, this));

/////////////
//SUBSCRIBERS
///////////// NOTE: subscriber declaration delayed until here so that when map is received, publisher is already activated to immediately publish back

// Base Map Sub
base_map_sub_ = create_subscription<autoware_lanelet2_msgs::msg::MapBin>("base_map", 1, std::bind(&WMBroadcaster::baseMapCallback, wmb_.get(), std_ph::_1));

// Base Map Georeference Sub
georef_sub_ = create_subscription<std_msgs::msg::String>("georeference", 1, std::bind(&WMBroadcaster::geoReferenceCallback, wmb_.get(), std_ph::_1));

// Geofence Sub
geofence_sub_ = create_subscription<carma_v2x_msgs::msg::TrafficControlMessage>("geofence", 200, std::bind(&WMBroadcaster::geofenceCallback, wmb_.get(), std_ph::_1));

rclcpp::SubscriptionOptions geofence_sub_options;
// NOTE: Currently, intra-process comms must be disabled for subscribers that are transient_local: https://github.com/ros2/rclcpp/issues/1753
geofence_sub_options.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
auto sub_qos_transient_local = rclcpp::QoS(rclcpp::KeepAll());
sub_qos_transient_local.transient_local();
geofence_sub_ = create_subscription<carma_v2x_msgs::msg::TrafficControlMessage>("geofence", sub_qos_transient_local, std::bind(&WMBroadcaster::geofenceCallback, wmb_.get(), std_ph::_1), geofence_sub_options);

// External Map Msg Sub
incoming_map_sub_ = create_subscription<carma_v2x_msgs::msg::MapData>("incoming_map", 20, std::bind(&WMBroadcaster::externalMapMsgCallback, wmb_.get(), std_ph::_1));

//Route Message Sub
route_callmsg_sub_ = create_subscription<carma_planning_msgs::msg::Route>("route", 1, std::bind(&WMBroadcaster::routeCallbackMessage, wmb_.get(), std_ph::_1));

//Current Location Sub
curr_location_sub_ = create_subscription<geometry_msgs::msg::PoseStamped>("current_pose", 1, std::bind(&WMBroadcaster::currentLocationCallback, wmb_.get(), std_ph::_1));


return CallbackReturn::SUCCESS;
}

Expand Down

0 comments on commit 1dce661

Please sign in to comment.