diff --git a/CMakeLists.txt b/CMakeLists.txt index 45791fc1ac..ab5d704992 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,7 +16,7 @@ # # ========================= eCAL LICENSE ================================= -cmake_minimum_required(VERSION 3.13) +cmake_minimum_required(VERSION 3.15) include(CMakeDependentOption) diff --git a/app/meas_cutter/CMakeLists.txt b/app/meas_cutter/CMakeLists.txt index 159715b6e0..7b9dee4121 100644 --- a/app/meas_cutter/CMakeLists.txt +++ b/app/meas_cutter/CMakeLists.txt @@ -24,7 +24,9 @@ find_package(yaml-cpp REQUIRED) #compatibility with yaml-cpp < 0.8.0 if (NOT TARGET yaml-cpp::yaml-cpp AND TARGET yaml-cpp) - add_library(yaml-cpp::yaml-cpp ALIAS yaml-cpp) + # ALIASing a imported non-global library requires CMake 3.18 so we do this + add_library(yaml-cpp::yaml-cpp INTERFACE IMPORTED) + target_link_libraries(yaml-cpp::yaml-cpp INTERFACE yaml-cpp) endif() diff --git a/app/mon/mon_plugins/signals_plotting/src/signal_tree_model.cpp b/app/mon/mon_plugins/signals_plotting/src/signal_tree_model.cpp index 523ae596aa..2bcdf4b97a 100644 --- a/app/mon/mon_plugins/signals_plotting/src/signal_tree_model.cpp +++ b/app/mon/mon_plugins/signals_plotting/src/signal_tree_model.cpp @@ -74,10 +74,16 @@ Qt::ItemFlags SignalTreeModel::flags(const QModelIndex& index) const QVariant SignalTreeModel::data(const QModelIndex& index, int role) const { QAbstractTreeItem* tree_item = item(index); - if (tree_item) - return (index.column() >= 0 ? tree_item->data(index.column(), (Qt::ItemDataRole)role) : QVariant()); - else - return QVariant(); + if (tree_item != nullptr) + { + int const item_column = mapColumnToItem(index.column(), tree_item->type()); + if (item_column >= 0) + { + return tree_item->data(item_column, (Qt::ItemDataRole)role); + } + } + + return QVariant(); } void SignalTreeModel::setCheckedState(SignalTreeItem* item, int index_column) diff --git a/ecal/core/include/ecal/ecal_timed_cb.h b/ecal/core/include/ecal/ecal_timed_cb.h index 5581b6d518..a8cceb00f9 100644 --- a/ecal/core/include/ecal/ecal_timed_cb.h +++ b/ecal/core/include/ecal/ecal_timed_cb.h @@ -97,7 +97,10 @@ namespace eCAL { if (!m_running) return(false); m_stop = true; - m_thread.join(); + // Wait for the callback thread to finish + if (m_thread.joinable()) { + m_thread.join(); + } m_running = false; return(true); } diff --git a/ecal/core/include/ecal/msg/capnproto/subscriber.h b/ecal/core/include/ecal/msg/capnproto/subscriber.h index d330700137..02cb9edac5 100644 --- a/ecal/core/include/ecal/msg/capnproto/subscriber.h +++ b/ecal/core/include/ecal/msg/capnproto/subscriber.h @@ -68,6 +68,14 @@ namespace eCAL { } + /** + * @brief Destructor + **/ + ~CBuilderSubscriber() override + { + this->Destroy(); + } + /** * @brief Copy Constructor is not available. **/ diff --git a/ecal/core/include/ecal/msg/flatbuffers/subscriber.h b/ecal/core/include/ecal/msg/flatbuffers/subscriber.h index c0c0acf973..0b274b2f39 100644 --- a/ecal/core/include/ecal/msg/flatbuffers/subscriber.h +++ b/ecal/core/include/ecal/msg/flatbuffers/subscriber.h @@ -56,6 +56,14 @@ namespace eCAL { } + /** + * @brief Destructor + **/ + ~CSubscriber() override + { + this->Destroy(); + } + /** * @brief Copy Constructor is not available. **/ diff --git a/ecal/core/include/ecal/msg/messagepack/subscriber.h b/ecal/core/include/ecal/msg/messagepack/subscriber.h index d2334c3608..ece4e0d4ef 100644 --- a/ecal/core/include/ecal/msg/messagepack/subscriber.h +++ b/ecal/core/include/ecal/msg/messagepack/subscriber.h @@ -59,6 +59,14 @@ namespace eCAL { } + /** + * @brief Destructor + **/ + ~CSubscriber() override + { + this->Destroy(); + } + /** * @brief Copy Constructor is not available. **/ diff --git a/ecal/core/include/ecal/msg/protobuf/subscriber.h b/ecal/core/include/ecal/msg/protobuf/subscriber.h index 3596f3060b..ed057364f7 100644 --- a/ecal/core/include/ecal/msg/protobuf/subscriber.h +++ b/ecal/core/include/ecal/msg/protobuf/subscriber.h @@ -74,6 +74,14 @@ namespace eCAL { } + /** + * @brief Destructor + **/ + ~CSubscriber() override + { + this->Destroy(); + } + /** * @brief Copy Constructor is not available. **/ diff --git a/ecal/core/include/ecal/msg/string/subscriber.h b/ecal/core/include/ecal/msg/string/subscriber.h index 694d6025b3..8caccd038e 100644 --- a/ecal/core/include/ecal/msg/string/subscriber.h +++ b/ecal/core/include/ecal/msg/string/subscriber.h @@ -62,6 +62,14 @@ namespace eCAL { } + /** + * @brief Destructor + **/ + ~CSubscriber() override + { + this->Destroy(); + } + /** * @brief Copy Constructor is not available. **/ diff --git a/ecal/core/include/ecal/msg/subscriber.h b/ecal/core/include/ecal/msg/subscriber.h index 3f6980f795..0d342e0630 100644 --- a/ecal/core/include/ecal/msg/subscriber.h +++ b/ecal/core/include/ecal/msg/subscriber.h @@ -76,6 +76,8 @@ namespace eCAL { } + virtual ~CMsgSubscriber() = default; + /** * @brief Copy Constructor is not available. **/ @@ -125,8 +127,6 @@ namespace eCAL return *this; } - virtual ~CMsgSubscriber() {} - /** * @brief Creates this object. * @@ -207,7 +207,10 @@ namespace eCAL assert(IsCreated()); RemReceiveCallback(); - m_cb_callback = callback_; + { + std::lock_guard callback_lock(m_cb_callback_mutex); + m_cb_callback = callback_; + } auto callback = std::bind(&CMsgSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2); return(CSubscriber::AddReceiveCallback(callback)); } @@ -219,9 +222,12 @@ namespace eCAL **/ bool RemReceiveCallback() { + bool ret = CSubscriber::RemReceiveCallback(); + + std::lock_guard callback_lock(m_cb_callback_mutex); if (m_cb_callback == nullptr) return(false); m_cb_callback = nullptr; - return(CSubscriber::RemReceiveCallback()); + return(ret); } protected: @@ -245,7 +251,11 @@ namespace eCAL private: void ReceiveCallback(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_) { - MsgReceiveCallbackT fn_callback(m_cb_callback); + MsgReceiveCallbackT fn_callback = nullptr; + { + std::lock_guard callback_lock(m_cb_callback_mutex); + fn_callback = m_cb_callback; + } if(fn_callback == nullptr) return; @@ -256,6 +266,7 @@ namespace eCAL } } + std::mutex m_cb_callback_mutex; MsgReceiveCallbackT m_cb_callback; }; } diff --git a/ecal/core/src/ecal_registration_receiver.cpp b/ecal/core/src/ecal_registration_receiver.cpp index 4bbb12e5cd..f2363c09c6 100644 --- a/ecal/core/src/ecal_registration_receiver.cpp +++ b/ecal/core/src/ecal_registration_receiver.cpp @@ -202,7 +202,11 @@ namespace eCAL eCAL::pb::Sample modified_ttype_sample; ModifyIncomingSampleForBackwardsCompatibility(ecal_sample_, modified_ttype_sample); - m_callback_custom_apply_sample(modified_ttype_sample); + // forward all registration samples to outside "customer" (e.g. Monitoring) + { + const std::lock_guard lock(m_callback_custom_apply_sample_mtx); + m_callback_custom_apply_sample(modified_ttype_sample); + } std::string reg_sample; if ( m_callback_pub @@ -417,11 +421,13 @@ namespace eCAL void CRegistrationReceiver::SetCustomApplySampleCallback(const ApplySampleCallbackT& callback_) { + const std::lock_guard lock(m_callback_custom_apply_sample_mtx); m_callback_custom_apply_sample = callback_; } void CRegistrationReceiver::RemCustomApplySampleCallback() { + const std::lock_guard lock(m_callback_custom_apply_sample_mtx); m_callback_custom_apply_sample = [](const auto&){}; } }; diff --git a/ecal/core/src/ecal_registration_receiver.h b/ecal/core/src/ecal_registration_receiver.h index bc320ffb28..62b7bcf752 100644 --- a/ecal/core/src/ecal_registration_receiver.h +++ b/ecal/core/src/ecal_registration_receiver.h @@ -78,6 +78,15 @@ namespace eCAL CRegistrationReceiver(); ~CRegistrationReceiver(); + // default copy constructor + CRegistrationReceiver(const CRegistrationReceiver& other) = delete; + // default copy assignment operator + CRegistrationReceiver& operator=(const CRegistrationReceiver& other) = delete; + // default move constructor + CRegistrationReceiver(CRegistrationReceiver&& other) noexcept = delete; + // default move assignment operator + CRegistrationReceiver& operator=(CRegistrationReceiver&& other) noexcept = delete; + void Create(); void Destroy(); @@ -121,6 +130,7 @@ namespace eCAL bool m_use_network_monitoring; bool m_use_shm_monitoring; + std::mutex m_callback_custom_apply_sample_mtx; ApplySampleCallbackT m_callback_custom_apply_sample; std::string m_host_group_name; diff --git a/ecal/core/src/ecal_thread.h b/ecal/core/src/ecal_thread.h index 7c5fa3c65f..135a4b1a1d 100644 --- a/ecal/core/src/ecal_thread.h +++ b/ecal/core/src/ecal_thread.h @@ -37,6 +37,11 @@ namespace eCAL CThread(); virtual ~CThread(); + CThread(const CThread&) = delete; + CThread& operator=(const CThread&) = delete; + CThread(CThread&& rhs) = delete; + CThread& operator=(CThread&& rhs) = delete; + int Start(int period, std::function ext_caller_); int Stop(); int Fire(); diff --git a/ecal/core/src/ecal_timer.cpp b/ecal/core/src/ecal_timer.cpp index 199cd15446..c5dc50aecf 100644 --- a/ecal/core/src/ecal_timer.cpp +++ b/ecal/core/src/ecal_timer.cpp @@ -38,6 +38,10 @@ namespace eCAL CTimerImpl(const int timeout_, TimerCallbackT callback_, const int delay_) : m_stop(false), m_running(false) { Start(timeout_, callback_, delay_); } virtual ~CTimerImpl() { Stop(); } + CTimerImpl(const CTimerImpl&) = delete; + CTimerImpl& operator=(const CTimerImpl&) = delete; + CTimerImpl(CTimerImpl&& rhs) = delete; + CTimerImpl& operator=(CTimerImpl&& rhs) = delete; bool Start(const int timeout_, TimerCallbackT callback_, const int delay_) { diff --git a/ecal/core/src/io/ecal_memfile_pool.cpp b/ecal/core/src/io/ecal_memfile_pool.cpp index 04ac67929a..12afb7fdb0 100644 --- a/ecal/core/src/io/ecal_memfile_pool.cpp +++ b/ecal/core/src/io/ecal_memfile_pool.cpp @@ -309,7 +309,10 @@ namespace eCAL { } - CMemFileThreadPool::~CMemFileThreadPool() = default; + CMemFileThreadPool::~CMemFileThreadPool() + { + Destroy(); + } void CMemFileThreadPool::Create() { diff --git a/ecal/core/src/io/ecal_memfile_pool.h b/ecal/core/src/io/ecal_memfile_pool.h index c09c7f6b7e..4cd6490927 100644 --- a/ecal/core/src/io/ecal_memfile_pool.h +++ b/ecal/core/src/io/ecal_memfile_pool.h @@ -50,6 +50,11 @@ namespace eCAL CMemFileObserver(); ~CMemFileObserver(); + CMemFileObserver(const CMemFileObserver&) = delete; + CMemFileObserver& operator=(const CMemFileObserver&) = delete; + CMemFileObserver(CMemFileObserver&& rhs) = delete; + CMemFileObserver& operator=(CMemFileObserver&& rhs) = delete; + bool Create(const std::string& memfile_name_, const std::string& memfile_event_); bool Destroy(); diff --git a/ecal/core/src/io/snd_sample.cpp b/ecal/core/src/io/snd_sample.cpp index b4b1ec9784..ad579057e2 100644 --- a/ecal/core/src/io/snd_sample.cpp +++ b/ecal/core/src/io/snd_sample.cpp @@ -45,6 +45,7 @@ namespace eCAL { if (!m_udp_sender) return(0); + std::lock_guard const send_lock(m_payload_mutex); // return value size_t sent_sum(0); diff --git a/ecal/core/src/io/snd_sample.h b/ecal/core/src/io/snd_sample.h index 8f2ff2be13..0d5a066d88 100644 --- a/ecal/core/src/io/snd_sample.h +++ b/ecal/core/src/io/snd_sample.h @@ -24,6 +24,7 @@ #pragma once #include +#include #include "udp_sender.h" @@ -47,6 +48,7 @@ namespace eCAL SSenderAttr m_attr; std::shared_ptr m_udp_sender; + std::mutex m_payload_mutex; std::vector m_payload; }; } diff --git a/ecal/core/src/io/udp_receiver.cpp b/ecal/core/src/io/udp_receiver.cpp index 012f596006..e479b57f36 100644 --- a/ecal/core/src/io/udp_receiver.cpp +++ b/ecal/core/src/io/udp_receiver.cpp @@ -54,6 +54,11 @@ namespace eCAL #endif //ECAL_NPCAP_SUPPORT } + CUDPReceiver::~CUDPReceiver() + { + Destroy(); + } + bool CUDPReceiver::Create(const SReceiverAttr& attr_) { if (m_socket_impl) return false; diff --git a/ecal/core/src/io/udp_receiver.h b/ecal/core/src/io/udp_receiver.h index 6a976bde6f..399a3751d2 100644 --- a/ecal/core/src/io/udp_receiver.h +++ b/ecal/core/src/io/udp_receiver.h @@ -35,6 +35,12 @@ namespace eCAL { public: CUDPReceiver(); + ~CUDPReceiver(); + + CUDPReceiver(const CUDPReceiver&) = delete; + CUDPReceiver& operator=(const CUDPReceiver&) = delete; + CUDPReceiver(CUDPReceiver&& rhs) = delete; + CUDPReceiver& operator=(CUDPReceiver&& rhs) = delete; bool Create(const SReceiverAttr& attr_) override; bool Destroy() override; diff --git a/ecal/core/src/pubsub/ecal_publisher.cpp b/ecal/core/src/pubsub/ecal_publisher.cpp index cf44221aee..7f6655dc58 100644 --- a/ecal/core/src/pubsub/ecal_publisher.cpp +++ b/ecal/core/src/pubsub/ecal_publisher.cpp @@ -72,15 +72,13 @@ namespace eCAL * @brief CPublisher are move-enabled **/ CPublisher::CPublisher(CPublisher&& rhs) noexcept : - m_datawriter(rhs.m_datawriter), + m_datawriter(std::move(rhs.m_datawriter)), m_qos(rhs.m_qos), + m_tlayer(rhs.m_tlayer), m_id(rhs.m_id), m_created(rhs.m_created), m_initialized(rhs.m_initialized) { - InitializeQOS(); - InitializeTLayer(); - rhs.m_created = false; rhs.m_initialized = false; } @@ -90,15 +88,16 @@ namespace eCAL **/ CPublisher& CPublisher::operator=(CPublisher&& rhs) noexcept { - m_datawriter = rhs.m_datawriter; + // Call destroy, to clean up the current state, then afterwards move all elements + Destroy(); + m_datawriter = std::move(rhs.m_datawriter); m_qos = rhs.m_qos; + m_tlayer = rhs.m_tlayer, m_id = rhs.m_id; m_created = rhs.m_created; m_initialized = rhs.m_initialized; - InitializeQOS(); - InitializeTLayer(); rhs.m_created = false; rhs.m_initialized = false; diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index f04edf78d4..6c1922c120 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -62,26 +62,25 @@ namespace eCAL } CSubscriber::CSubscriber(CSubscriber&& rhs) noexcept : - m_datareader(rhs.m_datareader), + m_datareader(std::move(rhs.m_datareader)), m_qos(rhs.m_qos), m_created(rhs.m_created), m_initialized(rhs.m_initialized) { - InitializeQOS(); - rhs.m_created = false; rhs.m_initialized = false; } CSubscriber& CSubscriber::operator=(CSubscriber&& rhs) noexcept { - m_datareader = std::move(rhs.m_datareader); + // Call destroy, to clean up the current state, then afterwards move all elements + Destroy(); + m_datareader = std::move(rhs.m_datareader); m_qos = rhs.m_qos; m_created = rhs.m_created; m_initialized = rhs.m_initialized; - InitializeQOS(); rhs.m_created = false; rhs.m_initialized = false; diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 3e5db224e0..f915d5b9a1 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -726,8 +726,9 @@ namespace eCAL // fire sub_event_connected { + const std::lock_guard lock(m_event_callback_map_sync); auto iter = m_event_callback_map.find(sub_event_connected); - if (iter != m_event_callback_map.end()) + if (iter != m_event_callback_map.end() && iter->second) { data.type = sub_event_connected; data.tid = tid_; @@ -741,15 +742,18 @@ namespace eCAL } // fire sub_event_update_connection - auto iter = m_event_callback_map.find(sub_event_update_connection); - if (iter != m_event_callback_map.end()) { - data.type = sub_event_update_connection; - data.tid = tid_; - data.ttype = Util::CombinedTopicEncodingAndType(data_type_info_.encoding, data_type_info_.name); - data.tdesc = data_type_info_.descriptor; - data.tdatatype = data_type_info_; - (iter->second)(m_topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_sync); + auto iter = m_event_callback_map.find(sub_event_update_connection); + if (iter != m_event_callback_map.end() && iter->second) + { + data.type = sub_event_update_connection; + data.tid = tid_; + data.ttype = Util::CombinedTopicEncodingAndType(data_type_info_.encoding, data_type_info_.name); + data.tdesc = data_type_info_.descriptor; + data.tdatatype = data_type_info_; + (iter->second)(m_topic_name.c_str(), &data); + } } } @@ -760,14 +764,17 @@ namespace eCAL m_connected = false; // fire sub_event_disconnected - auto iter = m_event_callback_map.find(sub_event_disconnected); - if (iter != m_event_callback_map.end()) { - SSubEventCallbackData data; - data.type = sub_event_disconnected; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - (iter->second)(m_topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_sync); + auto iter = m_event_callback_map.find(sub_event_disconnected); + if (iter != m_event_callback_map.end() && iter->second) + { + SSubEventCallbackData data; + data.type = sub_event_disconnected; + data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + data.clock = 0; + (iter->second)(m_topic_name.c_str(), &data); + } } } } @@ -827,14 +834,17 @@ namespace eCAL Logging::Log(log_level_warning, msg); #endif // we fire the message drop event - auto citer = m_event_callback_map.find(sub_event_dropped); - if (citer != m_event_callback_map.end()) { - SSubEventCallbackData data; - data.type = sub_event_dropped; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = current_clock_; - (citer->second)(m_topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_sync); + auto citer = m_event_callback_map.find(sub_event_dropped); + if (citer != m_event_callback_map.end()) + { + SSubEventCallbackData data; + data.type = sub_event_dropped; + data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + data.clock = current_clock_; + (citer->second)(m_topic_name.c_str(), &data); + } } // increase the drop counter m_message_drops += clock_difference; @@ -948,7 +958,7 @@ namespace eCAL { const std::lock_guard lock(m_event_callback_map_sync); auto iter = m_event_callback_map.find(sub_event_timeout); - if(iter != m_event_callback_map.end()) + if(iter != m_event_callback_map.end() && iter->second) { SSubEventCallbackData data; data.type = sub_event_timeout; diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index aa289ffcdb..5608b5cb32 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -1004,25 +1004,31 @@ namespace eCAL m_connected = true; // fire pub_event_connected - auto iter = m_event_callback_map.find(pub_event_connected); - if (iter != m_event_callback_map.end()) { - data.type = pub_event_connected; - (iter->second)(m_topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_sync); + auto iter = m_event_callback_map.find(pub_event_connected); + if (iter != m_event_callback_map.end() && iter->second) + { + data.type = pub_event_connected; + (iter->second)(m_topic_name.c_str(), &data); + } } } // fire pub_event_update_connection - auto iter = m_event_callback_map.find(pub_event_update_connection); - if (iter != m_event_callback_map.end()) { - data.type = pub_event_update_connection; - data.tid = tid_; - // Remove with eCAL6 (next two lines) - data.ttype = Util::CombinedTopicEncodingAndType(tinfo_.encoding, tinfo_.name); - data.tdesc = tinfo_.descriptor; - data.tdatatype = tinfo_; - (iter->second)(m_topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_sync); + auto iter = m_event_callback_map.find(pub_event_update_connection); + if (iter != m_event_callback_map.end() && iter->second) + { + data.type = pub_event_update_connection; + data.tid = tid_; + // Remove with eCAL6 (next two lines) + data.ttype = Util::CombinedTopicEncodingAndType(tinfo_.encoding, tinfo_.name); + data.tdesc = tinfo_.descriptor; + data.tdatatype = tinfo_; + (iter->second)(m_topic_name.c_str(), &data); + } } } @@ -1033,14 +1039,17 @@ namespace eCAL m_connected = false; // fire pub_event_disconnected - auto iter = m_event_callback_map.find(pub_event_disconnected); - if (iter != m_event_callback_map.end()) { - SPubEventCallbackData data; - data.type = pub_event_disconnected; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - (iter->second)(m_topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_sync); + auto iter = m_event_callback_map.find(pub_event_disconnected); + if (iter != m_event_callback_map.end() && iter->second) + { + SPubEventCallbackData data; + data.type = pub_event_disconnected; + data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + data.clock = 0; + (iter->second)(m_topic_name.c_str(), &data); + } } } } diff --git a/ecal/core/src/readwrite/ecal_writer_shm.cpp b/ecal/core/src/readwrite/ecal_writer_shm.cpp index fd3efdac5f..e22781d323 100644 --- a/ecal/core/src/readwrite/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/ecal_writer_shm.cpp @@ -88,7 +88,10 @@ namespace eCAL if (!m_created) return false; m_created = false; - m_memory_file_vec.clear(); + { + const std::lock_guard lock(m_memory_file_vec_mtx); + m_memory_file_vec.clear(); + } return true; } @@ -101,6 +104,8 @@ namespace eCAL bool CDataWriterSHM::SetBufferCount(size_t buffer_count_) { + const std::lock_guard lock(m_memory_file_vec_mtx); + // no need to adapt anything if (m_memory_file_vec.size() == buffer_count_) return true; @@ -160,11 +165,15 @@ namespace eCAL ret_state |= true; } - // adapt write index if needed - m_write_idx %= m_memory_file_vec.size(); - - // check size and reserve new if needed - ret_state |= m_memory_file_vec[m_write_idx]->CheckSize(attr_.len); + { + const std::lock_guard lock(m_memory_file_vec_mtx); + + // adapt write index if needed + m_write_idx %= m_memory_file_vec.size(); + + // check size and reserve new if needed + ret_state |= m_memory_file_vec[m_write_idx]->CheckSize(attr_.len); + } return ret_state; } @@ -173,6 +182,9 @@ namespace eCAL { if (!m_created) return false; + // protect m_memory_file_vec + const std::lock_guard lock(m_memory_file_vec_mtx); + // write content const bool force_full_write(m_memory_file_vec.size() > 1); const bool sent = m_memory_file_vec[m_write_idx]->Write(payload_, attr_, force_full_write); @@ -188,6 +200,9 @@ namespace eCAL { if (!m_created) return; + // protect m_memory_file_vec + const std::lock_guard lock(m_memory_file_vec_mtx); + for (auto& memory_file : m_memory_file_vec) { memory_file->Connect(process_id_); @@ -201,6 +216,10 @@ namespace eCAL { // starting from eCAL version > 5.8.13/5.9.0 the ConnectionParameter is defined as google protobuf eCAL::pb::ConnnectionPar connection_par; + + // protect m_memory_file_vec + const std::lock_guard lock(m_memory_file_vec_mtx); + for (auto& memory_file : m_memory_file_vec) { connection_par.mutable_layer_par_shm()->add_memory_file_list(memory_file->GetName()); diff --git a/ecal/core/src/readwrite/ecal_writer_shm.h b/ecal/core/src/readwrite/ecal_writer_shm.h index daac497abf..b3606bc4d6 100644 --- a/ecal/core/src/readwrite/ecal_writer_shm.h +++ b/ecal/core/src/readwrite/ecal_writer_shm.h @@ -27,6 +27,7 @@ #include "io/ecal_memfile_sync.h" #include +#include #include namespace eCAL @@ -59,7 +60,10 @@ namespace eCAL size_t m_write_idx = 0; size_t m_buffer_count = 1; SSyncMemoryFileAttr m_memory_file_attr = {}; + + std::mutex m_memory_file_vec_mtx; std::vector> m_memory_file_vec; + static const std::string m_memfile_base_name; }; } diff --git a/testing/ecal/core_test/src/core_test.cpp b/testing/ecal/core_test/src/core_test.cpp index 32f22ebc85..73f4af2a1b 100644 --- a/testing/ecal/core_test/src/core_test.cpp +++ b/testing/ecal/core_test/src/core_test.cpp @@ -5,9 +5,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -96,7 +96,7 @@ TEST(Core, LeakedPubSub) std::this_thread::sleep_for(std::chrono::milliseconds(100)); #endif } - }); + }); // let them work together std::this_thread::sleep_for(std::chrono::seconds(2)); @@ -111,66 +111,67 @@ TEST(Core, LeakedPubSub) TEST(Core, CallbackDestruction) { - // initialize eCAL API - EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "callback destruction")); + for (int i = 0; i < 10; ++i) + { + // initialize eCAL API + EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "callback destruction")); - // enable loop back communication in the same thread - eCAL::Util::EnableLoopback(true); + // enable loop back communication in the same thread + eCAL::Util::EnableLoopback(true); - // create subscriber and register a callback - std::shared_ptr< eCAL::string::CSubscriber> sub; + // create subscriber and register a callback + std::shared_ptr> sub; - // create publisher - eCAL::string::CPublisher pub("foo"); + // create publisher + eCAL::string::CPublisher pub("foo"); - // start publishing thread - std::atomic pub_stop(false); - std::thread pub_t([&]() { - while (!pub_stop) - { - pub.Send("Hello World"); + // start publishing thread + std::atomic pub_stop(false); + std::thread pub_t([&]() { + while (!pub_stop) { + pub.Send("Hello World"); #if 0 - // some kind of busy waiting.... - int y = 0; - for (int i = 0; i < 100000; i++) - { - y += i; - } + // some kind of busy waiting.... + int y = 0; + for (int i = 0; i < 100000; i++) + { + y += i; + } #else - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); #endif - } - }); - - std::atomic sub_stop(false); - std::thread sub_t([&]() { - while (!sub_stop) - { - sub = std::make_shared>("foo"); - sub->AddReceiveCallback(std::bind(OnReceive, std::placeholders::_4)); - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - }); + } + }); + + std::atomic sub_stop(false); + std::thread sub_t([&]() { + while (!sub_stop) { + sub = std::make_shared>("foo"); + sub->AddReceiveCallback(std::bind(OnReceive, std::placeholders::_4)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }); - // let them work together - std::this_thread::sleep_for(std::chrono::seconds(10)); + // let them work together + std::this_thread::sleep_for(std::chrono::seconds(10)); - // stop publishing thread - pub_stop = true; - pub_t.join(); + // stop publishing thread + pub_stop = true; + pub_t.join(); - sub_stop = true; - sub_t.join(); + sub_stop = true; + sub_t.join(); - // finalize eCAL API - // without destroying any pub / sub - EXPECT_EQ(0, eCAL::Finalize()); + // finalize eCAL API + // without destroying any pub / sub + EXPECT_EQ(0, eCAL::Finalize()); + } } /* excluded for now, system timer jitter too high */ #if 0 TEST(Core, TimerCallback) -{ +{ // initialize eCAL API EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "timer callback"));