diff --git a/ecal/core/src/pubsub/ecal_pubgate.cpp b/ecal/core/src/pubsub/ecal_pubgate.cpp index b8837fd6a7..00c1c37a56 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.cpp +++ b/ecal/core/src/pubsub/ecal_pubgate.cpp @@ -122,6 +122,28 @@ namespace eCAL subscription_info.process_id = ecal_topic.pid; const SDataTypeInformation topic_information = ecal_topic.tdatatype; + CDataWriter::SLayerStates layer_states; + for (const auto& layer : ecal_topic.tlayer) + { + if (layer.confirmed) + { + switch (layer.type) + { + case TLayer::tlayer_udp_mc: + layer_states.udp = true; + break; + case TLayer::tlayer_shm: + layer_states.shm = true; + break; + case TLayer::tlayer_tcp: + layer_states.tcp = true; + break; + default: + break; + } + } + } + std::string reader_par; #if 0 for (const auto& layer : ecal_sample.tlayer()) @@ -138,7 +160,7 @@ namespace eCAL auto res = m_topic_name_datawriter_map.equal_range(topic_name); for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter) { - iter->second->ApplySubscription(subscription_info, topic_information, reader_par); + iter->second->ApplySubscription(subscription_info, topic_information, layer_states, reader_par); } } diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 6dfdbbfb8f..2c40e9ebd7 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -228,6 +228,28 @@ namespace eCAL publication_info.process_id = ecal_topic.pid; const SDataTypeInformation topic_information = ecal_topic.tdatatype; + CDataReader::SLayerStates layer_states; + for (const auto& layer : ecal_topic.tlayer) + { + if (layer.confirmed) + { + switch (layer.type) + { + case TLayer::tlayer_udp_mc: + layer_states.udp = true; + break; + case TLayer::tlayer_shm: + layer_states.shm = true; + break; + case TLayer::tlayer_tcp: + layer_states.tcp = true; + break; + default: + break; + } + } + } + // register publisher const std::shared_lock lock(m_topic_name_datareader_sync); auto res = m_topic_name_datareader_map.equal_range(topic_name); @@ -239,7 +261,7 @@ namespace eCAL iter->second->ApplyLayerParameter(publication_info, tlayer.type, tlayer.par_layer); } // inform for publisher connection - iter->second->ApplyPublication(publication_info, topic_information); + iter->second->ApplyPublication(publication_info, topic_information, layer_states); } } diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index fe968c46bf..2507690852 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -64,20 +64,7 @@ namespace eCAL m_host_group_name(Process::GetHostGroupName()), m_pid(Process::GetProcessID()), m_pname(Process::GetProcessName()), - m_topic_size(0), - m_connected(false), - m_read_buf_received(false), - m_read_time(0), - m_receive_time(0), - m_clock(0), - m_frequency_calculator(3.0f), - m_message_drops(0), - m_share_ttype(true), - m_share_tdesc(true), - m_use_udp_confirmed(false), - m_use_shm_confirmed(false), - m_use_tcp_confirmed(false), - m_created(false) + m_frequency_calculator(0.0f) { } @@ -142,13 +129,13 @@ namespace eCAL // reset receive callback { - const std::lock_guard lock(m_receive_callback_sync); + const std::lock_guard lock(m_receive_callback_mtx); m_receive_callback = nullptr; } // reset event callback map { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); m_event_callback_map.clear(); } @@ -162,10 +149,6 @@ namespace eCAL m_clock = 0; m_message_drops = 0; - m_use_udp_confirmed = false; - m_use_shm_confirmed = false; - m_use_tcp_confirmed = false; - return(true); } @@ -262,7 +245,7 @@ namespace eCAL Registration::TLayer udp_tlayer; udp_tlayer.type = tl_ecal_udp; udp_tlayer.version = 1; - udp_tlayer.confirmed = m_use_udp_confirmed; + udp_tlayer.confirmed = m_confirmed_layers.udp; ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); } #endif @@ -273,7 +256,7 @@ namespace eCAL Registration::TLayer shm_tlayer; shm_tlayer.type = tl_ecal_shm; shm_tlayer.version = 1; - shm_tlayer.confirmed = m_use_shm_confirmed; + shm_tlayer.confirmed = m_confirmed_layers.shm; ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); } #endif @@ -284,7 +267,7 @@ namespace eCAL Registration::TLayer tcp_tlayer; tcp_tlayer.type = tl_ecal_tcp; tcp_tlayer.version = 1; - tcp_tlayer.confirmed = m_use_tcp_confirmed; + tcp_tlayer.confirmed = m_confirmed_layers.tcp; ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); } #endif @@ -378,7 +361,7 @@ namespace eCAL { if (!m_created) return(false); - std::unique_lock read_buffer_lock(m_read_buf_mutex); + std::unique_lock read_buffer_lock(m_read_buf_mtx); // No need to wait (for whatever time) if something has been received if (!m_read_buf_received) @@ -418,13 +401,13 @@ namespace eCAL size_t CDataReader::AddSample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) { // ensure thread safety - const std::lock_guard lock(m_receive_callback_sync); + const std::lock_guard lock(m_receive_callback_mtx); if (!m_created) return(0); // store receive layer - m_use_udp_confirmed |= layer_ == tl_ecal_udp; - m_use_shm_confirmed |= layer_ == tl_ecal_shm; - m_use_tcp_confirmed |= layer_ == tl_ecal_tcp; + m_confirmed_layers.udp |= layer_ == tl_ecal_udp; + m_confirmed_layers.shm |= layer_ == tl_ecal_shm; + m_confirmed_layers.tcp |= layer_ == tl_ecal_tcp; // number of hash values to track for duplicates constexpr int hash_queue_size(64); @@ -474,7 +457,7 @@ namespace eCAL // Update frequency calculation { const auto receive_time = std::chrono::steady_clock::now(); - const std::lock_guard freq_lock(m_frequency_calculator_mutex); + const std::lock_guard freq_lock(m_frequency_calculator_mtx); m_frequency_calculator.addTick(receive_time); } @@ -511,7 +494,7 @@ namespace eCAL if(!processed) { // push sample into read buffer - const std::lock_guard read_buffer_lock(m_read_buf_mutex); + const std::lock_guard read_buffer_lock(m_read_buf_mtx); m_read_buf.clear(); m_read_buf.assign(payload_, payload_ + size_); m_read_time = time_; @@ -534,7 +517,7 @@ namespace eCAL // store receive callback { - const std::lock_guard lock(m_receive_callback_sync); + const std::lock_guard lock(m_receive_callback_mtx); #ifndef NDEBUG // log it Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddReceiveCallback"); @@ -551,7 +534,7 @@ namespace eCAL // reset receive callback { - const std::lock_guard lock(m_receive_callback_sync); + const std::lock_guard lock(m_receive_callback_mtx); #ifndef NDEBUG // log it Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemReceiveCallback"); @@ -572,7 +555,7 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); m_event_callback_map[type_] = std::move(callback_); } @@ -589,7 +572,7 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); m_event_callback_map[type_] = nullptr; } @@ -601,14 +584,14 @@ namespace eCAL m_id_set = id_set_; } - void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_) + void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_) { Connect(publication_info_.topic_id, data_type_info_); // add key to publisher map { - const std::lock_guard lock(m_pub_map_sync); - m_pub_map[publication_info_] = data_type_info_; + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map[publication_info_] = std::make_tuple(data_type_info_, layer_states_); } } @@ -616,7 +599,7 @@ namespace eCAL { // remove key from publisher map { - const std::lock_guard lock(m_pub_map_sync); + const std::lock_guard lock(m_pub_map_mtx); m_pub_map.erase(publication_info_); } } @@ -659,7 +642,7 @@ namespace eCAL // fire sub_event_connected { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); auto iter = m_event_callback_map.find(sub_event_connected); if (iter != m_event_callback_map.end() && iter->second) { @@ -673,7 +656,7 @@ namespace eCAL // fire sub_event_update_connection { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); auto iter = m_event_callback_map.find(sub_event_update_connection); if (iter != m_event_callback_map.end() && iter->second) { @@ -693,7 +676,7 @@ namespace eCAL // fire sub_event_disconnected { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); auto iter = m_event_callback_map.find(sub_event_disconnected); if (iter != m_event_callback_map.end() && iter->second) { @@ -763,7 +746,7 @@ namespace eCAL #endif // we fire the message drop event { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); auto citer = m_event_callback_map.find(sub_event_dropped); if (citer != m_event_callback_map.end() && citer->second) { @@ -829,7 +812,7 @@ namespace eCAL int32_t CDataReader::GetFrequency() { const auto frequency_time = std::chrono::steady_clock::now(); - const std::lock_guard lock(m_frequency_calculator_mutex); + const std::lock_guard lock(m_frequency_calculator_mtx); return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); } @@ -845,7 +828,7 @@ namespace eCAL // check connection timeouts { - const std::lock_guard lock(m_pub_map_sync); + const std::lock_guard lock(m_pub_map_mtx); m_pub_map.remove_deprecated(); if (m_pub_map.empty()) diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index 8490fdb13f..c8c6b19371 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -52,6 +53,13 @@ namespace eCAL class CDataReader { public: + struct SLayerStates + { + bool udp = false; + bool shm = false; + bool tcp = false; + }; + struct SPublicationInfo { std::string host_name; @@ -86,7 +94,7 @@ namespace eCAL void SetID(const std::set& id_set_); - void ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_); + void ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_); void RemovePublication(const SPublicationInfo& publication_info_); void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_); @@ -97,13 +105,13 @@ namespace eCAL bool IsPublished() const { - std::lock_guard const lock(m_pub_map_sync); + std::lock_guard const lock(m_pub_map_mtx); return(!m_pub_map.empty()); } size_t GetPublisherCount() const { - const std::lock_guard lock(m_pub_map_sync); + const std::lock_guard lock(m_pub_map_mtx); return(m_pub_map.size()); } @@ -136,47 +144,44 @@ namespace eCAL std::string m_topic_id; SDataTypeInformation m_topic_info; std::map m_attr; - std::atomic m_topic_size; + std::atomic m_topic_size = 0; - std::atomic m_connected; - using PublicationMapT = Util::CExpMap; - mutable std::mutex m_pub_map_sync; + std::atomic m_connected = false; + using PublicationMapT = Util::CExpMap>; + mutable std::mutex m_pub_map_mtx; PublicationMapT m_pub_map; - mutable std::mutex m_read_buf_mutex; + mutable std::mutex m_read_buf_mtx; std::condition_variable m_read_buf_cv; - bool m_read_buf_received; + bool m_read_buf_received = false; std::string m_read_buf; - long long m_read_time; + long long m_read_time = 0; - std::mutex m_receive_callback_sync; + std::mutex m_receive_callback_mtx; ReceiveCallbackT m_receive_callback; - std::atomic m_receive_time; + std::atomic m_receive_time = 0; std::deque m_sample_hash_queue; using EventCallbackMapT = std::map; - std::mutex m_event_callback_map_sync; + std::mutex m_event_callback_map_mtx; EventCallbackMapT m_event_callback_map; - std::atomic m_clock; + std::atomic m_clock = 0; - std::mutex m_frequency_calculator_mutex; + std::mutex m_frequency_calculator_mtx; ResettableFrequencyCalculator m_frequency_calculator; std::set m_id_set; using WriterCounterMapT = std::unordered_map; WriterCounterMapT m_writer_counter_map; - long long m_message_drops; - - bool m_share_ttype; - bool m_share_tdesc; + long long m_message_drops = 0; - bool m_use_udp_confirmed; - bool m_use_shm_confirmed; - bool m_use_tcp_confirmed; + bool m_share_ttype = false; + bool m_share_tdesc = false; - std::atomic m_created; + SLayerStates m_confirmed_layers; + std::atomic m_created = false; }; } diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 2450981d70..a7da2f9e25 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -74,27 +74,9 @@ namespace eCAL m_pname(Process::GetProcessName()), m_topic_name(topic_name_), m_topic_info(topic_info_), - m_topic_size(0), m_config(config_), - m_connected(false), - m_id(0), - m_clock(0), - m_frequency_calculator(0.0f), - m_created(false) + m_frequency_calculator(0.0f) { - // shm config -#if ECAL_CORE_TRANSPORT_SHM - m_writer.shm_mode.activated = config_.shm.activate; -#endif - // udp config -#if ECAL_CORE_TRANSPORT_UDP - m_writer.udp_mode.activated = config_.udp.activate; -#endif - // tcp config -#if ECAL_CORE_TRANSPORT_TCP - m_writer.tcp_mode.activated = config_.tcp.activate; -#endif - // build topic id std::stringstream counter; counter << std::chrono::steady_clock::now().time_since_epoch().count(); @@ -111,13 +93,13 @@ namespace eCAL Register(false); // create udp multicast layer - ActivateUdpLayer(m_writer.udp_mode.activated); + ActivateUdpLayer(config_.udp.activate); // create shm layer - ActivateShmLayer(m_writer.shm_mode.activated); + ActivateShmLayer(config_.shm.activate); // create tcp layer - ActivateTcpLayer(m_writer.tcp_mode.activated); + ActivateTcpLayer(config_.tcp.activate); #ifndef NDEBUG // log it @@ -134,28 +116,28 @@ namespace eCAL // destroy udp multicast writer #if ECAL_CORE_TRANSPORT_UDP - m_writer.udp.reset(); + m_writer_udp.reset(); #endif // destroy memory file writer #if ECAL_CORE_TRANSPORT_SHM - m_writer.shm.reset(); + m_writer_shm.reset(); #endif // destroy tcp writer #if ECAL_CORE_TRANSPORT_TCP - m_writer.tcp.reset(); + m_writer_tcp.reset(); #endif // clear subscriber maps { - const std::lock_guard lock(m_sub_map_sync); + const std::lock_guard lock(m_sub_map_mtx); m_sub_map.clear(); } // clear event callback map { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); m_event_callback_map.clear(); } @@ -228,7 +210,7 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::AddEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); m_event_callback_map[type_] = std::move(callback_); } @@ -245,7 +227,7 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::RemEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); m_event_callback_map[type_] = nullptr; } @@ -257,7 +239,7 @@ namespace eCAL { // we should think about if we would like to potentially use the `time_` variable to tick with (but we would need the same base for checking incoming samples then.... const auto send_time = std::chrono::steady_clock::now(); - const std::lock_guard lock(m_frequency_calculator_mutex); + const std::lock_guard lock(m_frequency_calculator_mtx); m_frequency_calculator.addTick(send_time); } @@ -266,10 +248,10 @@ namespace eCAL // can we do a zero copy write ? const bool allow_zero_copy = - m_config.shm.zero_copy_mode // zero copy mode activated by user - && m_writer.shm_mode.activated // shm layer active - && !m_writer.udp_mode.activated // udp layer inactive - && !m_writer.tcp_mode.activated; // tcp layer inactive + m_config.shm.zero_copy_mode // zero copy mode activated by user + && m_config.shm.activate // shm layer active + && !m_config.udp.activate // udp layer inactive + && !m_config.tcp.activate; // tcp layer inactive // create a payload copy for all layer if (!allow_zero_copy) @@ -288,7 +270,7 @@ namespace eCAL // SHM //////////////////////////////////////////////////////////////////////////// #if ECAL_CORE_TRANSPORT_SHM - if (m_writer.shm && m_writer.shm_mode.activated) + if (m_writer_shm && m_config.shm.activate) { #ifndef NDEBUG // log it @@ -310,7 +292,7 @@ namespace eCAL wattr.acknowledge_timeout_ms = m_config.shm.acknowledge_timeout_ms; // prepare send - if (m_writer.shm->PrepareWrite(wattr)) + if (m_writer_shm->PrepareWrite(wattr)) { // register new to update listening subscribers and rematch Register(true); @@ -321,7 +303,7 @@ namespace eCAL if (allow_zero_copy) { // write to shm layer (write content into the opened memory file without additional copy) - shm_sent = m_writer.shm->Write(payload_, wattr); + shm_sent = m_writer_shm->Write(payload_, wattr); } // multiple layer are active -> we make a copy and use that one else @@ -329,10 +311,10 @@ namespace eCAL // wrap the buffer into a payload object CBufferPayloadWriter payload_buf(m_payload_buffer.data(), m_payload_buffer.size()); // write to shm layer (write content into the opened memory file without additional copy) - shm_sent = m_writer.shm->Write(payload_buf, wattr); + shm_sent = m_writer_shm->Write(payload_buf, wattr); } - m_writer.shm_mode.confirmed = true; + m_confirmed_layers.shm = true; } written |= shm_sent; @@ -354,7 +336,7 @@ namespace eCAL // UDP (MC) //////////////////////////////////////////////////////////////////////////// #if ECAL_CORE_TRANSPORT_UDP - if (m_writer.udp && m_writer.udp_mode.activated) + if (m_writer_udp && m_config.udp.activate) { #ifndef NDEBUG // log it @@ -367,7 +349,7 @@ namespace eCAL #if ECAL_CORE_TRANSPORT_SHM // if shared memory layer for local communication is not activated // we activate udp message loopback to communicate with local processes too - const bool loopback = !m_writer.shm_mode.activated; + const bool loopback = !m_config.shm.activate; #else const bool loopback = true; #endif @@ -382,7 +364,7 @@ namespace eCAL wattr.loopback = loopback; // prepare send - if (m_writer.udp->PrepareWrite(wattr)) + if (m_writer_udp->PrepareWrite(wattr)) { // register new to update listening subscribers and rematch Register(true); @@ -390,8 +372,8 @@ namespace eCAL } // write to udp multicast layer - udp_sent = m_writer.udp->Write(m_payload_buffer.data(), wattr); - m_writer.udp_mode.confirmed = true; + udp_sent = m_writer_udp->Write(m_payload_buffer.data(), wattr); + m_confirmed_layers.udp = true; } written |= udp_sent; @@ -413,7 +395,7 @@ namespace eCAL // TCP //////////////////////////////////////////////////////////////////////////// #if ECAL_CORE_TRANSPORT_TCP - if (m_writer.tcp && m_writer.tcp_mode.activated) + if (m_writer_tcp && m_config.tcp.activate) { #ifndef NDEBUG // log it @@ -432,8 +414,8 @@ namespace eCAL wattr.time = time_; // write to tcp layer - tcp_sent = m_writer.tcp->Write(m_payload_buffer.data(), wattr); - m_writer.tcp_mode.confirmed = true; + tcp_sent = m_writer_tcp->Write(m_payload_buffer.data(), wattr); + m_confirmed_layers.tcp = true; } written |= tcp_sent; @@ -456,25 +438,25 @@ namespace eCAL else return 0; } - void CDataWriter::ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const std::string& reader_par_) + void CDataWriter::ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_, const std::string& reader_par_) { Connect(subscription_info_.topic_id, data_type_info_); // add key to subscriber map { - const std::lock_guard lock(m_sub_map_sync); - m_sub_map[subscription_info_] = data_type_info_; + const std::lock_guard lock(m_sub_map_mtx); + m_sub_map[subscription_info_] = std::make_tuple(data_type_info_, layer_states_); } // add a new subscription #if ECAL_CORE_TRANSPORT_UDP - if (m_writer.udp) m_writer.udp->ApplySubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id, reader_par_); + if (m_writer_udp) m_writer_udp->ApplySubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id, reader_par_); #endif #if ECAL_CORE_TRANSPORT_SHM - if (m_writer.shm) m_writer.shm->ApplySubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id, reader_par_); + if (m_writer_shm) m_writer_shm->ApplySubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id, reader_par_); #endif #if ECAL_CORE_TRANSPORT_TCP - if (m_writer.tcp) m_writer.tcp->ApplySubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id, reader_par_); + if (m_writer_tcp) m_writer_tcp->ApplySubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id, reader_par_); #endif #ifndef NDEBUG @@ -487,19 +469,19 @@ namespace eCAL { // remove key from subscriber map { - const std::lock_guard lock(m_sub_map_sync); + const std::lock_guard lock(m_sub_map_mtx); m_sub_map.erase(subscription_info_); } // remove subscription #if ECAL_CORE_TRANSPORT_UDP - if (m_writer.udp) m_writer.udp->RemoveSubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id); + if (m_writer_udp) m_writer_udp->RemoveSubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id); #endif #if ECAL_CORE_TRANSPORT_SHM - if (m_writer.shm) m_writer.shm->RemoveSubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id); + if (m_writer_shm) m_writer_shm->RemoveSubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id); #endif #if ECAL_CORE_TRANSPORT_TCP - if (m_writer.tcp) m_writer.tcp->RemoveSubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id); + if (m_writer_tcp) m_writer_tcp->RemoveSubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.topic_id); #endif #ifndef NDEBUG @@ -517,7 +499,7 @@ namespace eCAL // check connection timeouts { - const std::lock_guard lock(m_sub_map_sync); + const std::lock_guard lock(m_sub_map_mtx); m_sub_map.remove_deprecated(); if (m_sub_map.empty()) @@ -590,39 +572,39 @@ namespace eCAL #if ECAL_CORE_TRANSPORT_UDP // udp multicast layer - if (m_writer.udp) + if (m_writer_udp) { eCAL::Registration::TLayer udp_tlayer; udp_tlayer.type = tl_ecal_udp; udp_tlayer.version = 1; - udp_tlayer.confirmed = m_writer.udp_mode.confirmed; - udp_tlayer.par_layer.layer_par_udpmc = m_writer.udp->GetConnectionParameter().layer_par_udpmc; + udp_tlayer.confirmed = m_confirmed_layers.udp; + udp_tlayer.par_layer.layer_par_udpmc = m_writer_udp->GetConnectionParameter().layer_par_udpmc; ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); } #endif #if ECAL_CORE_TRANSPORT_SHM // shm layer - if (m_writer.shm) + if (m_writer_shm) { eCAL::Registration::TLayer shm_tlayer; shm_tlayer.type = tl_ecal_shm; shm_tlayer.version = 1; - shm_tlayer.confirmed = m_writer.shm_mode.confirmed; - shm_tlayer.par_layer.layer_par_shm = m_writer.shm->GetConnectionParameter().layer_par_shm; + shm_tlayer.confirmed = m_confirmed_layers.shm; + shm_tlayer.par_layer.layer_par_shm = m_writer_shm->GetConnectionParameter().layer_par_shm; ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); } #endif #if ECAL_CORE_TRANSPORT_TCP // tcp layer - if (m_writer.tcp) + if (m_writer_tcp) { eCAL::Registration::TLayer tcp_tlayer; tcp_tlayer.type = tl_ecal_tcp; tcp_tlayer.version = 1; - tcp_tlayer.confirmed = m_writer.tcp_mode.confirmed; - tcp_tlayer.par_layer.layer_par_tcp = m_writer.tcp->GetConnectionParameter().layer_par_tcp; + tcp_tlayer.confirmed = m_confirmed_layers.tcp; + tcp_tlayer.par_layer.layer_par_tcp = m_writer_tcp->GetConnectionParameter().layer_par_tcp; ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); } #endif @@ -637,7 +619,7 @@ namespace eCAL size_t loc_connections(0); size_t ext_connections(0); { - const std::lock_guard lock(m_sub_map_sync); + const std::lock_guard lock(m_sub_map_mtx); for (const auto& sub : m_sub_map) { if (sub.first.host_name == m_host_name) @@ -704,7 +686,7 @@ namespace eCAL // fire pub_event_connected { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); auto iter = m_event_callback_map.find(pub_event_connected); if (iter != m_event_callback_map.end() && iter->second) { @@ -718,7 +700,7 @@ namespace eCAL // fire pub_event_update_connection { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); auto iter = m_event_callback_map.find(pub_event_update_connection); if (iter != m_event_callback_map.end() && iter->second) { @@ -738,7 +720,7 @@ namespace eCAL // fire pub_event_disconnected { - const std::lock_guard lock(m_event_callback_map_sync); + const std::lock_guard lock(m_event_callback_map_mtx); auto iter = m_event_callback_map.find(pub_event_disconnected); if (iter != m_event_callback_map.end() && iter->second) { @@ -755,7 +737,7 @@ namespace eCAL void CDataWriter::ActivateUdpLayer(bool state_) { #if ECAL_CORE_TRANSPORT_UDP - m_writer.udp_mode.activated = state_; + m_config.udp.activate = state_; if (!m_created) return; // log state @@ -763,14 +745,14 @@ namespace eCAL if (state_) { - m_writer.udp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.udp); + m_writer_udp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.udp); #ifndef NDEBUG Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::UDP_WRITER"); #endif } else { - m_writer.udp.reset(); + m_writer_udp.reset(); } #else // ECAL_CORE_TRANSPORT_UDP (void)state_; @@ -780,7 +762,7 @@ namespace eCAL void CDataWriter::ActivateShmLayer(bool state_) { #if ECAL_CORE_TRANSPORT_SHM - m_writer.shm_mode.activated = state_; + m_config.shm.activate = state_; if (!m_created) return; // log state @@ -788,15 +770,15 @@ namespace eCAL if (state_) { - m_writer.shm = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.shm); - m_writer.shm->SetBufferCount(m_config.shm.memfile_buffer_count); + m_writer_shm = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.shm); + m_writer_shm->SetBufferCount(m_config.shm.memfile_buffer_count); #ifndef NDEBUG Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::SHM_WRITER"); #endif } else { - m_writer.shm.reset(); + m_writer_shm.reset(); } #else // ECAL_CORE_TRANSPORT_SHM (void)state_; @@ -806,7 +788,7 @@ namespace eCAL void CDataWriter::ActivateTcpLayer(bool state_) { #if ECAL_CORE_TRANSPORT_TCP - m_writer.tcp_mode.activated = state_; + m_config.tcp.activate = state_; if (!m_created) return; // log state @@ -814,14 +796,14 @@ namespace eCAL if (state_) { - m_writer.tcp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.tcp); + m_writer_tcp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.tcp); #ifndef NDEBUG Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::TCP_WRITER - SUCCESS"); #endif } else { - m_writer.tcp.reset(); + m_writer_tcp.reset(); } #else // ECAL_CORE_TRANSPORT_TCP (void)state_; @@ -851,7 +833,7 @@ namespace eCAL { const int32_t process_id = static_cast(Process::GetProcessID()); bool is_internal_only(true); - const std::lock_guard lock(m_sub_map_sync); + const std::lock_guard lock(m_sub_map_mtx); for (auto sub : m_sub_map) { if (sub.first.process_id != process_id) @@ -883,7 +865,7 @@ namespace eCAL int32_t CDataWriter::GetFrequency() { const auto frequency_time = std::chrono::steady_clock::now(); - const std::lock_guard lock(m_frequency_calculator_mutex); + const std::lock_guard lock(m_frequency_calculator_mtx); return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); } } diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index 7c5ab832fb..c53d8b7ea0 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -52,6 +52,7 @@ #include #include #include +#include #include namespace eCAL @@ -59,11 +60,18 @@ namespace eCAL class CDataWriter { public: + struct SLayerStates + { + bool udp = false; + bool shm = false; + bool tcp = false; + }; + struct SSubscriptionInfo { - std::string host_name; - int32_t process_id = 0; - std::string topic_id; + std::string host_name; + int32_t process_id = 0; + std::string topic_id; friend bool operator<(const SSubscriptionInfo& l, const SSubscriptionInfo& r) { @@ -85,7 +93,7 @@ namespace eCAL size_t Write(CPayloadWriter& payload_, long long time_, long long id_); - void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const std::string& reader_par_); + void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_, const std::string& reader_par_); void RemoveSubscription(const SSubscriptionInfo& subscription_info_); void RefreshRegistration(); @@ -97,13 +105,13 @@ namespace eCAL bool IsSubscribed() const { - std::lock_guard const lock(m_sub_map_sync); + std::lock_guard const lock(m_sub_map_mtx); return(!m_sub_map.empty()); } size_t GetSubscriberCount() const { - std::lock_guard const lock(m_sub_map_sync); + std::lock_guard const lock(m_sub_map_mtx); return(m_sub_map.size()); } @@ -135,51 +143,38 @@ namespace eCAL std::string m_topic_id; SDataTypeInformation m_topic_info; std::map m_attr; - size_t m_topic_size; + size_t m_topic_size = 0; Publisher::Configuration m_config; std::vector m_payload_buffer; - std::atomic m_connected; + std::atomic m_connected = false; - using SSubscriptionMapT = Util::CExpMap; - mutable std::mutex m_sub_map_sync; + using SSubscriptionMapT = Util::CExpMap>; + mutable std::mutex m_sub_map_mtx; SSubscriptionMapT m_sub_map; using EventCallbackMapT = std::map; - std::mutex m_event_callback_map_sync; + std::mutex m_event_callback_map_mtx; EventCallbackMapT m_event_callback_map; - long long m_id; - long long m_clock; + long long m_id = 0; + long long m_clock = 0; - std::mutex m_frequency_calculator_mutex; + std::mutex m_frequency_calculator_mtx; ResettableFrequencyCalculator m_frequency_calculator; - struct SWriter - { - struct SWriterMode - { - bool activated = false; - bool confirmed = false; - }; - - SWriterMode udp_mode; - SWriterMode tcp_mode; - SWriterMode shm_mode; - #if ECAL_CORE_TRANSPORT_UDP - std::unique_ptr udp; + std::unique_ptr m_writer_udp; #endif #if ECAL_CORE_TRANSPORT_SHM - std::unique_ptr shm; + std::unique_ptr m_writer_shm; #endif #if ECAL_CORE_TRANSPORT_TCP - std::unique_ptr tcp; + std::unique_ptr m_writer_tcp; #endif - }; - SWriter m_writer; - std::atomic m_created; + SLayerStates m_confirmed_layers; + std::atomic m_created = false; }; }