diff --git a/communications/comms_bridge/CMakeLists.txt b/communications/comms_bridge/CMakeLists.txt index d4de1bfdce..a9f6aaf995 100644 --- a/communications/comms_bridge/CMakeLists.txt +++ b/communications/comms_bridge/CMakeLists.txt @@ -92,18 +92,13 @@ include_directories( ${Boost_INCLUDE_DIRS} ) +file(GLOB cc_files + "src/*.cpp" +) # Declare C++ libraries add_library(comms_bridge - src/bridge_publisher.cpp - src/bridge_subscriber.cpp - src/generic_rapid_msg_ros_pub.cpp - src/generic_ros_sub_rapid_pub.cpp - src/comms_bridge_nodelet.cpp - src/generic_rapid_sub.cpp - src/rapid_advertisement_info.cpp - #src/rapid_content.cpp - src/util.cpp + ${cc_files} ) target_compile_definitions(comms_bridge PUBLIC ${RTIDDS_DEFINE_FLAGS}) add_dependencies(comms_bridge ${catkin_EXPORTED_TARGETS}) diff --git a/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h b/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h index 11a9aff3e1..16e46f9c05 100644 --- a/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h +++ b/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h @@ -38,8 +38,8 @@ class GenericRapidMsgRosPub : public BridgePublisher { explicit GenericRapidMsgRosPub(double ad2pub_delay = DEFAULT_ADVERTISE_TO_PUB_DELAY); virtual ~GenericRapidMsgRosPub(); - void ConvertAdvertisementInfo(rapid::ext::astrobee::GenericCommsAdvertisementInfo const* data); - void ConvertContent(rapid::ext::astrobee::GenericCommsContent const* data); + void ConvertData(rapid::ext::astrobee::GenericCommsAdvertisementInfo const* data); + void ConvertData(rapid::ext::astrobee::GenericCommsContent const* data); }; } // end namespace ff diff --git a/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h b/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h index 19033d1cb7..fcd09522de 100644 --- a/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h +++ b/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h @@ -32,43 +32,75 @@ #include "knShare/Time.h" +#include "dds_msgs/GenericCommsAdvertisementInfoSupport.h" +#include "dds_msgs/GenericCommsContentSupport.h" + namespace ff { -/** - * @brief base class for rapid subscriber to ros publisher - * @details base class for rapid subscriber to ros publisher. - * A kn::DdsEventLoop is run within its own thread of execution. - * Child classes must connect requeseted messege and callback - * to m_ddsEventLoop and call startThread() - */ +template class GenericRapidSub { - protected: + public: GenericRapidSub(const std::string& entity_name, const std::string& subscribe_topic, - GenericRapidMsgRosPub* rapid_msg_ros_pub); - ~GenericRapidSub(); + const std::string& subscriber_partition, + GenericRapidMsgRosPub* rapid_msg_ros_pub) + : dds_event_loop_(entity_name), + subscribe_topic_(subscribe_topic), + subscriber_partition_(subscriber_partition), + ros_pub_(rapid_msg_ros_pub) { + // connect to ddsEventLoop + try { + dds_event_loop_.connect(this, + subscribe_topic, // topic + subscriber_partition, // name + entity_name, // profile + ""); // library + } catch (std::exception& e) { + ROS_ERROR_STREAM("Rapid exception: " << e.what()); + throw; + } catch (...) { + ROS_ERROR("Rapid exception unknown"); + throw; + } - /** - * Will start thread execution by calling threadExec() - */ - virtual void StartThread(); + // start joinable thread + thread_ = std::thread(&GenericRapidSub::ThreadExec, this); + } + + ~GenericRapidSub() { + alive_ = false; // Notify thread to exit + thread_.join(); + } + void operator() (T const* data) { + ros_pub_->ConvertData(data); + } + + private: GenericRapidMsgRosPub* ros_pub_; std::string subscribe_topic_; + std::string subscriber_partition_; std::atomic alive_; std::thread thread_; kn::DdsEventLoop dds_event_loop_; - private: /** * Function to execute within seperate thread * process DdsEventLoop at 10Hz */ - virtual void ThreadExec(); + void ThreadExec() { + while (alive_) { + // process events at 10hz + dds_event_loop_.processEvents(kn::milliseconds(100)); + } + } }; -typedef std::shared_ptr GenericRapidSubPtr; +typedef std::shared_ptr> + AdvertisementInfoRapidSubPtr; +typedef std::shared_ptr> + ContentRapidSubPtr; } // end namespace ff diff --git a/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h b/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h index 3b3f559fd5..42d8ee0205 100644 --- a/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h +++ b/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h @@ -46,7 +46,8 @@ class GenericROSSubRapidPub : public BridgeSubscriber { GenericROSSubRapidPub(); ~GenericROSSubRapidPub(); - void InitializeDDS(std::string agent_name); + void InitializeDDS(); + void SizeCheck(unsigned int &size, const int size_in, const int max_size, diff --git a/communications/comms_bridge/include/comms_bridge/rapid_advertisement_info.h b/communications/comms_bridge/include/comms_bridge/rapid_advertisement_info.h deleted file mode 100644 index 059cebd28d..0000000000 --- a/communications/comms_bridge/include/comms_bridge/rapid_advertisement_info.h +++ /dev/null @@ -1,58 +0,0 @@ -/* Copyright (c) 2017, United States Government, as represented by the - * Administrator of the National Aeronautics and Space Administration. - * - * All rights reserved. - * - * The Astrobee platform is licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -#ifndef COMMS_BRIDGE_RAPID_ADVERTISEMENT_INFO_H_ -#define COMMS_BRIDGE_RAPID_ADVERTISEMENT_INFO_H_ - -#include -#include -#include - -#include "comms_bridge/generic_rapid_msg_ros_pub.h" -#include "comms_bridge/generic_rapid_sub.h" -#include "comms_bridge/util.h" - -#include "dds_msgs/AstrobeeConstants.h" -#include "dds_msgs/GenericCommsAdvertisementInfoSupport.h" - -#include "knDds/DdsTypedSupplier.h" - -#include "rapidDds/RapidConstants.h" - -#include "rapidUtil/RapidHelper.h" - -namespace ff { - -class RapidAdvertisementInfo : public GenericRapidSub { - public: - RapidAdvertisementInfo(const std::string& subscribe_topic, - const std::string& subscriber_partition, - GenericRapidMsgRosPub* rapid_msg_ros_pub); - - /** - * call back for ddsEventLoop - */ - void operator() (rapid::ext::astrobee::GenericCommsAdvertisementInfo const* data); - - private: - std::string subscriber_partition_; -}; - -} // end namespace ff - -#endif // COMMS_BRIDGE_RAPID_ADVERTISEMENT_INFO_H_ diff --git a/communications/comms_bridge/include/comms_bridge/ros_ros_bridge_publisher.h b/communications/comms_bridge/include/comms_bridge/ros_ros_bridge_publisher.h deleted file mode 100644 index be1e5bca6a..0000000000 --- a/communications/comms_bridge/include/comms_bridge/ros_ros_bridge_publisher.h +++ /dev/null @@ -1,64 +0,0 @@ -/* Copyright (c) 2017, United States Government, as represented by the - * Administrator of the National Aeronautics and Space Administration. - * - * All rights reserved. - * - * The Astrobee platform is licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -#ifndef COMMS_BRIDGE_ROS_ROS_BRIDGE_PUBLISHER_H_ -#define COMMS_BRIDGE_ROS_ROS_BRIDGE_PUBLISHER_H_ - -/* This is a specialization of ROSBridgePublisher using a ROS conduit - that is another topic on the same ROS master - ie, it's a ROS-ROS bridge over ROS -*/ - -#include -#include -#include - -#include - -#include - -#define DEFAULT_ROSROSBRIDGE_PUB_META_TOPIC_PREFIX "/polymorph_relay" -// default time to delay between advertising and publishing on that topic [sec] -#define DEFAULT_ROSROSBRIDGE_PUB_ADVERTISE_DELAY 3.0 - -class ROSROSBridgePublisher : public BridgePublisher { - public: - ROSROSBridgePublisher(const std::string& meta_topic_prefix = DEFAULT_ROSROSBRIDGE_PUB_META_TOPIC_PREFIX, - double ad2pub_delay = DEFAULT_ROSROSBRIDGE_PUB_ADVERTISE_DELAY); - virtual ~ROSROSBridgePublisher(); - - protected: - void setupMetaChannels(); - void setupReverseMetaChannels(); - - void requestTopicInfo(std::string const& output_topic); - - void handleContentMessage(const ff_msgs::RelayContent::ConstPtr& msg); - void handleAdMessage(const ff_msgs::RelayAdvertisement::ConstPtr& msg); - - std::string m_meta_topic_prefix; - ros::Subscriber m_advert_sub; - ros::Subscriber m_content_sub; - ros::Publisher m_reset_pub; - - // prohibit shallow copy or assignment - ROSROSBridgePublisher(const ROSROSBridgePublisher&) : BridgePublisher(m_ad2pub_delay) {} - void operator=(const ROSROSBridgePublisher&) {} -}; - -#endif // COMMS_BRIDGE_ROS_ROS_BRIDGE_PUBLISHER_H_ diff --git a/communications/comms_bridge/include/comms_bridge/ros_ros_bridge_subscriber.h b/communications/comms_bridge/include/comms_bridge/ros_ros_bridge_subscriber.h deleted file mode 100644 index c53650d2be..0000000000 --- a/communications/comms_bridge/include/comms_bridge/ros_ros_bridge_subscriber.h +++ /dev/null @@ -1,70 +0,0 @@ -/* Copyright (c) 2017, United States Government, as represented by the - * Administrator of the National Aeronautics and Space Administration. - * - * All rights reserved. - * - * The Astrobee platform is licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -#ifndef COMMS_BRIDGE_ROS_ROS_BRIDGE_SUBSCRIBER_H_ -#define COMMS_BRIDGE_ROS_ROS_BRIDGE_SUBSCRIBER_H_ - -/* This is a specialization of ROSBridgeSubscriber using a ROS conduit - that is another topic on the same ROS master - ie, it's a ROS-ROS bridge over ROS -*/ - -#include -#include -#include - -#include - -#include -#include - -#define DEFAULT_ROSROSBRIDGE_PUB_META_TOPIC_PREFIX "/polymorph_relay" - -class ROSROSBridgeSubscriber : public BridgeSubscriber { - public: - explicit ROSROSBridgeSubscriber(const std::string& meta_topic_prefix = DEFAULT_ROSROSBRIDGE_PUB_META_TOPIC_PREFIX); - virtual ~ROSROSBridgeSubscriber(); - - // Called with the mutex held - virtual void subscribeTopic(std::string const& in_topic, const RelayTopicInfo& info); - - // Called with the mutex held - virtual void advertiseTopic(const RelayTopicInfo& info); - - // Called with the mutex held - virtual void relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info); - - protected: - void setupMetaChannels(); - void setupReverseMetaChannels(); - void handleResetMessage(const ff_msgs::RelayReset::ConstPtr& msg); - - std::string m_meta_topic_prefix; - unsigned int m_n_advertised; - ros::Publisher m_advertiser_pub; - ros::Publisher m_relayer_pub; - ros::Subscriber m_reset_sub; - - std::map requested_resets; // topics requiring re-advert - - // prohibit shallow copy or assignment - ROSROSBridgeSubscriber(const ROSROSBridgeSubscriber&) {} - void operator=(const ROSROSBridgeSubscriber&) {} -}; - -#endif // COMMS_BRIDGE_ROS_ROS_BRIDGE_SUBSCRIBER_H_ diff --git a/communications/comms_bridge/src/comms_bridge_nodelet.cpp b/communications/comms_bridge/src/comms_bridge_nodelet.cpp index 8684ba8a86..dc3d462088 100644 --- a/communications/comms_bridge/src/comms_bridge_nodelet.cpp +++ b/communications/comms_bridge/src/comms_bridge_nodelet.cpp @@ -35,8 +35,11 @@ #include #include "comms_bridge/generic_rapid_msg_ros_pub.h" +#include "comms_bridge/generic_rapid_sub.h" #include "comms_bridge/generic_ros_sub_rapid_pub.h" +#include "dds_msgs/AstrobeeConstants.h" + // SoraCore #include "knDds/DdsSupport.h" #include "knDds/DdsEntitiesFactory.h" @@ -182,9 +185,31 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { dds_entities_factory_.reset(new kn::DdsEntitiesFactorySvc()); dds_entities_factory_->init(dds_params); - ros_sub_.InitializeDDS(agent_name_); + InitializeDDS(); + } - // TODO(Katie): Add more publisher stuff here + void InitializeDDS() { + std::string connection; + ff::AdvertisementInfoRapidSubPtr advertisement_info_sub; + ff::ContentRapidSubPtr content_sub; + ros_sub_.InitializeDDS(); + for (size_t i = 0; i < rapid_connections_.size(); i++) { + // Lower case the external agent name to use it like a namespace + connection = rapid_connections_[i]; + connection[0] = tolower(connection[0]); + advertisement_info_sub = + std::make_shared>( + "AstrobeeGenericCommsAdvertisementInfoProfile", rapid::ext::astrobee::GENERIC_COMMS_ADVERTISEMENT_INFO_TOPIC, + connection, ros_pub_.get()); + advertisement_info_rapid_subs_.push_back(advertisement_info_sub); + + content_sub = std::make_shared>( + "AstrobeeGenericCommsContentProfile", + rapid::ext::astrobee::GENERIC_COMMS_CONTENT_TOPIC, + connection, + ros_pub_.get()); + content_rapid_subs_.push_back(content_sub); + } } bool ReadParams() { @@ -241,7 +266,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { // This should be very quick since we shouldn't have more than 2 connections bool found = false; - for (int i = 0; i < rapid_connections_.size() && !found; i++) { + for (size_t i = 0; i < rapid_connections_.size() && !found; i++) { if (connection == rapid_connections_[i]) { found = true; } @@ -258,7 +283,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { config_reader::ConfigReader::Table relay_table, relay_item; std::string topic_name; if (link_table.GetTable(table_name.c_str(), &relay_table)) { - for (int i = 1; i <= relay_table.GetSize(); i++) { + for (size_t i = 1; i <= relay_table.GetSize(); i++) { relay_table.GetTable(i, &relay_item); if (!relay_item.GetStr("name", &topic_name)) { NODELET_ERROR("Comms Bridge Nodelet: Agent topic name not specified!"); @@ -272,6 +297,8 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { private: config_reader::ConfigReader config_params_; ff::GenericROSSubRapidPub ros_sub_; + std::vector content_rapid_subs_; + std::vector advertisement_info_rapid_subs_; std::shared_ptr dds_entities_factory_; std::shared_ptr ros_pub_; std::string agent_name_, participant_name_; diff --git a/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp b/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp index 27f0d886bd..f981dca3fc 100644 --- a/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp +++ b/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp @@ -29,7 +29,7 @@ GenericRapidMsgRosPub::GenericRapidMsgRosPub(double ad2pub_delay) : GenericRapidMsgRosPub::~GenericRapidMsgRosPub() {} -void GenericRapidMsgRosPub::ConvertAdvertisementInfo( +void GenericRapidMsgRosPub::ConvertData( rapid::ext::astrobee::GenericCommsAdvertisementInfo const* data) { const std::lock_guard lock(m_mutex_); @@ -50,7 +50,7 @@ void GenericRapidMsgRosPub::ConvertAdvertisementInfo( } } -void GenericRapidMsgRosPub::ConvertContent( +void GenericRapidMsgRosPub::ConvertData( rapid::ext::astrobee::GenericCommsContent const* data) { const std::lock_guard lock(m_mutex_); diff --git a/communications/comms_bridge/src/generic_rapid_sub.cpp b/communications/comms_bridge/src/generic_rapid_sub.cpp deleted file mode 100644 index 61f1889c65..0000000000 --- a/communications/comms_bridge/src/generic_rapid_sub.cpp +++ /dev/null @@ -1,46 +0,0 @@ -/* Copyright (c) 2017, United States Government, as represented by the - * Administrator of the National Aeronautics and Space Administration. - * - * All rights reserved. - * - * The Astrobee platform is licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -#include "comms_bridge/generic_rapid_sub.h" - -#include - -namespace ff { - -GenericRapidSub::GenericRapidSub(const std::string& entity_name, const std::string& subscribe_topic, - GenericRapidMsgRosPub* rapid_msg_ros_pub) - : dds_event_loop_(entity_name), subscribe_topic_(subscribe_topic), ros_pub_(rapid_msg_ros_pub) {} - -GenericRapidSub::~GenericRapidSub() { - alive_ = false; // Notify thread to exit - thread_.join(); -} - -void GenericRapidSub::StartThread() { - // start joinable thread - thread_ = std::thread(&GenericRapidSub::ThreadExec, this); -} - -void GenericRapidSub::ThreadExec() { - while (alive_) { - // process events at 10hz - dds_event_loop_.processEvents(kn::milliseconds(100)); - } -} - -} // end namespace ff diff --git a/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp b/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp index 4d1666effd..5df2cd0625 100644 --- a/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp +++ b/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp @@ -27,14 +27,14 @@ GenericROSSubRapidPub::GenericROSSubRapidPub() : GenericROSSubRapidPub::~GenericROSSubRapidPub() {} -void GenericROSSubRapidPub::InitializeDDS(std::string agent_name) { +void GenericROSSubRapidPub::InitializeDDS() { advertisement_info_supplier_.reset( new GenericROSSubRapidPub::AdvertisementInfoSupplier( - rapid::ext::astrobee::GENERIC_COMMS_ADVERTISEMENT_INFO_TOPIC + agent_name, + rapid::ext::astrobee::GENERIC_COMMS_ADVERTISEMENT_INFO_TOPIC, "", "AstrobeeGenericCommsAdvertisementInfoProfile", "")); content_supplier_.reset(new GenericROSSubRapidPub::ContentSupplier( - rapid::ext::astrobee::GENERIC_COMMS_CONTENT_TOPIC + agent_name, + rapid::ext::astrobee::GENERIC_COMMS_CONTENT_TOPIC, "", "AstrobeeGenericCommsContentProfile", "")); rapid::RapidHelper::initHeader(advertisement_info_supplier_->event().hdr); diff --git a/communications/comms_bridge/src/rapid_advertisement_info.cpp b/communications/comms_bridge/src/rapid_advertisement_info.cpp deleted file mode 100644 index 2d3a2a42f9..0000000000 --- a/communications/comms_bridge/src/rapid_advertisement_info.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/* Copyright (c) 2017, United States Government, as represented by the - * Administrator of the National Aeronautics and Space Administration. - * - * All rights reserved. - * - * The Astrobee platform is licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -#include "comms_bridge/rapid_advertisement_info.h" - -#include - -namespace ff { - -RapidAdvertisementInfo::RapidAdvertisementInfo(const std::string& subscribe_topic, - const std::string& subscriber_partition, - GenericRapidMsgRosPub* rapid_msg_ros_pub) - : GenericRapidSub("AstrobeeGenericCommsAdvertisementInfoProfile", subscribe_topic, rapid_msg_ros_pub), - subscriber_partition_(subscriber_partition) { - // connect to ddsEventLoop - try { - dds_event_loop_.connect( - this, - subscribe_topic, // topic - subscriber_partition, // name - "AstrobeeGenericCommsAdvertisementInfo", // profile - ""); // library - } catch (std::exception& e) { - ROS_ERROR_STREAM("RapidAdvertisementInfo exception: " << e.what()); - throw; - } catch (...) { - ROS_ERROR("RapidAdvertisementInfo exception unknown"); - throw; - } - - // start thread - StartThread(); -} - -void RapidAdvertisementInfo::operator() ( - rapid::ext::astrobee::GenericCommsAdvertisementInfo const* data) { - ros_pub_->ConvertAdvertisementInfo(data); -} - -} // end namespace ff diff --git a/communications/comms_bridge/src/ros_ros_bridge_publisher.cpp b/communications/comms_bridge/src/ros_ros_bridge_publisher.cpp deleted file mode 100644 index 40c018622c..0000000000 --- a/communications/comms_bridge/src/ros_ros_bridge_publisher.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* Copyright (c) 2017, United States Government, as represented by the - * Administrator of the National Aeronautics and Space Administration. - * - * All rights reserved. - * - * The Astrobee platform is licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -#include -#include -#include - -#include "comms_bridge/ros_ros_bridge_publisher.h" - -// ROS internal subscriber message queue size -#define SUBSCRIBER_QUEUE_SIZE 10 -// ROS internal publisher message queue size -#define PUBLISHER_QUEUE_SIZE 10 - -ROSROSBridgePublisher::ROSROSBridgePublisher(const std::string& meta_topic_prefix, double ad2pub_delay) - : BridgePublisher(ad2pub_delay), m_meta_topic_prefix(meta_topic_prefix) { - setupMetaChannels(); - setupReverseMetaChannels(); -} - -ROSROSBridgePublisher::~ROSROSBridgePublisher() {} - -void ROSROSBridgePublisher::setupMetaChannels() { - ros::NodeHandle nh; - - std::string advertisement_topic = m_meta_topic_prefix + "/advert"; - if (m_verbose > 0) - printf("advert topic = %s\n", advertisement_topic.c_str()); - m_advert_sub = - nh.subscribe(advertisement_topic, SUBSCRIBER_QUEUE_SIZE, &ROSROSBridgePublisher::handleAdMessage, this); - - std::string content_topic = m_meta_topic_prefix + "/content"; - if (m_verbose > 0) - printf("content topic = %s\n", content_topic.c_str()); - m_content_sub = - nh.subscribe(content_topic, SUBSCRIBER_QUEUE_SIZE, &ROSROSBridgePublisher::handleContentMessage, this); -} - -void ROSROSBridgePublisher::setupReverseMetaChannels() { - ros::NodeHandle nh; - - std::string reset_topic = m_meta_topic_prefix + "/reset"; - m_reset_pub = nh.advertise(reset_topic, PUBLISHER_QUEUE_SIZE); -} - -void ROSROSBridgePublisher::requestTopicInfo(std::string const& output_topic) { - ff_msgs::RelayReset reset_msg; - - reset_msg.header.seq = 0; - reset_msg.header.stamp = ros::Time::now(); - - reset_msg.topic = output_topic; - - m_reset_pub.publish(reset_msg); -} - -void ROSROSBridgePublisher::handleContentMessage(const ff_msgs::RelayContent::ConstPtr& msg) { - const std::lock_guard lock(m_mutex); - - const std::string output_topic = msg->output_topic; - - if (m_verbose) - printf("Received content message for topic %s\n", output_topic.c_str()); - - std::map::iterator iter = m_relay_topics.find(output_topic); - if (iter == m_relay_topics.end()) { - // first time seeing this topic - // The advertisement that preceded this was either lost, delayed out of - // order, or we the publisher were restarted after it was sent - if (m_verbose) - printf(" first seeing topic\n"); - - RelayTopicInfo topic_info; - topic_info.out_topic = output_topic; - topic_info.ad_info.md5_sum = msg->type_md5_sum; - iter = m_relay_topics.emplace(output_topic, topic_info).first; - - requestTopicInfo(output_topic); - } - - RelayTopicInfo &topic_info = iter->second; - - ContentInfo content_info; - content_info.type_md5_sum = msg->type_md5_sum; - content_info.data = msg->msg; - - if (!relayMessage(topic_info, content_info)) - printf(" error relaying message\n"); -} - -void ROSROSBridgePublisher::handleAdMessage(const ff_msgs::RelayAdvertisement::ConstPtr& msg) { - const std::lock_guard lock(m_mutex); - - const std::string output_topic = msg->output_topic; - - if (m_verbose) - printf("Received advertisement message for topic %s\n", output_topic.c_str()); - - AdvertisementInfo ad_info; - ad_info.md5_sum = msg->md5_sum; - ad_info.data_type = msg->data_type; - ad_info.definition = msg->definition; - - if (!advertiseTopic(msg->output_topic, ad_info)) - printf(" error advertising topic\n"); -} diff --git a/communications/comms_bridge/src/ros_ros_bridge_subscriber.cpp b/communications/comms_bridge/src/ros_ros_bridge_subscriber.cpp deleted file mode 100644 index ef53a7e5b9..0000000000 --- a/communications/comms_bridge/src/ros_ros_bridge_subscriber.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* Copyright (c) 2017, United States Government, as represented by the - * Administrator of the National Aeronautics and Space Administration. - * - * All rights reserved. - * - * The Astrobee platform is licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -#include -#include - -#include "comms_bridge/ros_ros_bridge_subscriber.h" - -// ROS internal subscriber message queue size -#define SUBSCRIBER_QUEUE_SIZE 10 -// ROS internal publisher message queue size -#define PUBLISHER_QUEUE_SIZE 10 - -ROSROSBridgeSubscriber::ROSROSBridgeSubscriber(const std::string& meta_topic_prefix) - : m_meta_topic_prefix(meta_topic_prefix) { - m_n_advertised = 0; - - setupMetaChannels(); - setupReverseMetaChannels(); -} - -ROSROSBridgeSubscriber::~ROSROSBridgeSubscriber() {} - -void ROSROSBridgeSubscriber::setupMetaChannels() { - ros::NodeHandle nh; - - std::string advertisement_topic = m_meta_topic_prefix + "/advert"; - m_advertiser_pub = nh.advertise(advertisement_topic, PUBLISHER_QUEUE_SIZE); - - std::string content_topic = m_meta_topic_prefix + "/content"; - m_relayer_pub = nh.advertise(content_topic, PUBLISHER_QUEUE_SIZE); -} - -void ROSROSBridgeSubscriber::setupReverseMetaChannels() { - ros::NodeHandle nh; - - std::string reset_topic = m_meta_topic_prefix + "/reset"; - m_reset_sub = nh.subscribe(reset_topic, SUBSCRIBER_QUEUE_SIZE, &ROSROSBridgeSubscriber::handleResetMessage, this); -} - -void ROSROSBridgeSubscriber::handleResetMessage(const ff_msgs::RelayReset::ConstPtr& msg) { - const std::string out_topic = msg->topic; - - const std::lock_guard lock(m_mutex); - - std::map::iterator iter = m_relay_topics.begin(); - while (iter != m_relay_topics.end()) { - if (iter->second.out_topic == out_topic) - break; - iter++; - } - - if (iter == m_relay_topics.end()) { - if (m_verbose) - printf("Received reset on unknown topic %s\n", out_topic.c_str()); - return; - } - - if (m_verbose) - printf("Received reset for topic %s\n", out_topic.c_str()); - - requested_resets[out_topic] = true; -} - -void ROSROSBridgeSubscriber::subscribeTopic(std::string const& in_topic, const RelayTopicInfo& info) { - // this is just the base subscriber letting us know it's adding a topic - // nothing more we need to do -} - -void ROSROSBridgeSubscriber::advertiseTopic(const RelayTopicInfo& info) { - const AdvertisementInfo &ad_info = info.ad_info; - - ff_msgs::RelayAdvertisement ad; - - ad.header.seq = ++m_n_advertised; - ad.header.stamp = ros::Time::now(); - - ad.output_topic = info.out_topic; - - ad.latching = ad_info.latching; - - ad.data_type = ad_info.data_type; - ad.md5_sum = ad_info.md5_sum; - ad.definition = ad_info.definition; - - m_advertiser_pub.publish(ad); -} - -void ROSROSBridgeSubscriber::relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info) { - std::map::iterator iter = requested_resets.find(topic_info.out_topic); - if (iter != requested_resets.end()) { - advertiseTopic(topic_info); - requested_resets.erase(iter); - } - - ff_msgs::RelayContent content; - - content.header.seq = topic_info.relay_seqnum; - content.header.stamp = ros::Time::now(); - - content.output_topic = topic_info.out_topic; - - content.type_md5_sum = content_info.type_md5_sum; - - content.msg.resize(content_info.data_size); - std::memcpy(static_cast(content.msg.data()), static_cast(content_info.data), content.msg.size()); - - m_relayer_pub.publish(content); -} diff --git a/communications/dds_msgs/idl/AstrobeeConstants.idl b/communications/dds_msgs/idl/AstrobeeConstants.idl index 8e8cd972f1..9d022e663e 100644 --- a/communications/dds_msgs/idl/AstrobeeConstants.idl +++ b/communications/dds_msgs/idl/AstrobeeConstants.idl @@ -36,7 +36,6 @@ module rapid { const String64 FAULT_STATE_TOPIC = "astrobee_fault_state"; const String64 GENERIC_COMMS_ADVERTISEMENT_INFO_TOPIC = "astrobee_generic_comms_advertisement_info"; const String64 GENERIC_COMMS_CONTENT_TOPIC = "astrobee_generic_comms_content"; - const String64 GENERIC_COMMS_RESET_TOPIC = "astrobee_generic_comms_reset"; const String64 GNC_CONTROL_STATE_TOPIC = "astrobee_gnc_control_state"; const String64 GNC_FAM_CMD_STATE_TOPIC = "astrobee_gnc_fam_cmd_state"; const String64 GUEST_SCIENCE_CONFIG_TOPIC = "astrobee_guest_science_config"; diff --git a/communications/dds_msgs/idl/GenericCommsReset.idl b/communications/dds_msgs/idl/GenericCommsReset.idl deleted file mode 100644 index b295a5afff..0000000000 --- a/communications/dds_msgs/idl/GenericCommsReset.idl +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2023 (c) 2023 Intelligent Robotics Group, NASA ARC - */ - -#include "Message.idl" - -module rapid { - module ext { - module astrobee { - - //@copy-c-declaration class GenericCommsResetTypeSupport; - //@copy-c-declaration class GenericCommsResetDataWriter; - //@copy-c-declaration class GenericCommsResetDataReader; - //@copy-c-declaration struct GenericConmmsResetSeq; - - //@copy-declaration /** - //@copy-declaration * Reset message in case publisher didn't receive initial advertisement info message for this topic - //@copy-declaration */ - valuetype GenericCommsReset : Message { - //@copy-c-declaration #if RTI_DDS_VERSION_MAJOR < 4 || (RTI_DDS_VERSION_MAJOR == 4 && RTI_DDS_VERSION_MINOR < 5) || (RTI_DDS_VERSION_MAJOR == 4 && RTI_DDS_VERSION_MINOR == 5 && RTI_DDS_VERSION_RELEASE != 'f' ) - //@copy-c-declaration typedef GenericCommsResetTypeSupport TypeSupport; - //@copy-c-declaration typedef GenericCommsResetDataWriter DataWriter; - //@copy-c-declaration typedef GenericCommsResetDataReader DataReader; - //@copy-c-declaration typedef GenericCommsResetSeq Seq; - //@copy-c-declaration #endif - //@copy-c-declaration typedef GenericCommsReset Type; - - //@copy-declaration // Topic for which metadata retransmission is requested - public String128 topic; - }; - }; - }; -};