From fe4492e160941dd3679164aae0f7e4bcb55a284f Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 26 Jul 2024 17:54:38 +0200 Subject: [PATCH 1/5] registration provider single threaded sending Register(false) calls removed finally --- ecal/core/src/readwrite/ecal_reader.cpp | 20 +--- ecal/core/src/readwrite/ecal_reader.h | 2 +- ecal/core/src/readwrite/ecal_writer.cpp | 45 ++++----- ecal/core/src/readwrite/ecal_writer.h | 2 +- .../ecal_registration_provider.cpp | 99 ++++++++++--------- .../registration/ecal_registration_provider.h | 15 +-- .../ecal_registration_sample_applier.cpp | 32 +++++- .../ecal_registration_sample_applier.h | 1 + .../src/service/ecal_service_client_impl.cpp | 8 +- .../src/service/ecal_service_client_impl.h | 2 +- .../src/service/ecal_service_server_impl.cpp | 15 +-- .../src/service/ecal_service_server_impl.h | 2 +- .../cpp/util_test/src/util_getclients.cpp | 31 +++++- .../cpp/util_test/src/util_getservices.cpp | 31 +++++- .../cpp/util_test/src/util_gettopics.cpp | 11 ++- 15 files changed, 183 insertions(+), 133 deletions(-) diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index a2a450206d..ff0ae7fb8d 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -98,9 +98,6 @@ namespace eCAL // mark as created m_created = true; - - // register - Register(false); } CDataReader::~CDataReader() @@ -256,9 +253,6 @@ namespace eCAL bool CDataReader::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) { - auto current_val = m_attr.find(attr_name_); - - const bool force = current_val == m_attr.end() || current_val->second != attr_value_; m_attr[attr_name_] = attr_value_; #ifndef NDEBUG @@ -266,16 +260,11 @@ namespace eCAL Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::SetAttribute"); #endif - // register it - Register(force); - return(true); } bool CDataReader::ClearAttribute(const std::string& attr_name_) { - auto force = m_attr.find(attr_name_) != m_attr.end(); - m_attr.erase(attr_name_); #ifndef NDEBUG @@ -283,9 +272,6 @@ namespace eCAL Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::ClearAttribute"); #endif - // register it - Register(force); - return(true); } @@ -533,10 +519,10 @@ namespace eCAL return(out.str()); } - void CDataReader::Register(bool force_) + void CDataReader::Register() { #if ECAL_CORE_REGISTRATION - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample()); #ifndef NDEBUG // log it @@ -548,7 +534,7 @@ namespace eCAL void CDataReader::Unregister() { #if ECAL_CORE_REGISTRATION - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); + if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample()); #ifndef NDEBUG // log it diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index 5716610dd6..eebff4d34c 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -125,7 +125,7 @@ namespace eCAL std::string Dump(const std::string& indent_ = ""); protected: - void Register(bool force_); + void Register(); void Unregister(); void CheckConnections(); diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 5f00b7f33f..6f4164c1b4 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -120,9 +120,6 @@ namespace eCAL // mark as created m_created = true; - - // register - Register(false); } CDataWriter::~CDataWriter() @@ -227,7 +224,7 @@ namespace eCAL if (m_writer_shm->PrepareWrite(wattr)) { // register new to update listening subscribers and rematch - Register(true); + Register(); Process::SleepMS(5); } @@ -291,7 +288,7 @@ namespace eCAL if (m_writer_udp->PrepareWrite(wattr)) { // register new to update listening subscribers and rematch - Register(true); + Register(); Process::SleepMS(5); } @@ -364,8 +361,6 @@ namespace eCAL bool CDataWriter::SetDataTypeInformation(const SDataTypeInformation& topic_info_) { - // Does it even make sense to register if the info is the same??? - const bool force = m_topic_info != topic_info_; m_topic_info = topic_info_; #ifndef NDEBUG @@ -373,17 +368,11 @@ namespace eCAL Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetDescription"); #endif - // register it - Register(force); - return(true); } bool CDataWriter::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) { - auto current_val = m_attr.find(attr_name_); - - const bool force = current_val == m_attr.end() || current_val->second != attr_value_; m_attr[attr_name_] = attr_value_; #ifndef NDEBUG @@ -391,16 +380,11 @@ namespace eCAL Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetAttribute"); #endif - // register it - Register(force); - return(true); } bool CDataWriter::ClearAttribute(const std::string& attr_name_) { - auto force = m_attr.find(attr_name_) != m_attr.end(); - m_attr.erase(attr_name_); #ifndef NDEBUG @@ -408,9 +392,6 @@ namespace eCAL Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ClearAttribute"); #endif - // register it - Register(force); - return(true); } @@ -484,10 +465,10 @@ namespace eCAL StartUdpLayer(); break; case tl_ecal_shm: - if (StartShmLayer()) Register(true); + StartShmLayer(); break; case tl_ecal_tcp: - if (StartTcpLayer()) Register(true); + StartTcpLayer(); break; default: break; @@ -585,10 +566,10 @@ namespace eCAL return(out.str()); } - void CDataWriter::Register(bool force_) + void CDataWriter::Register() { #if ECAL_CORE_REGISTRATION - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample()); #ifndef NDEBUG // log it @@ -600,7 +581,7 @@ namespace eCAL void CDataWriter::Unregister() { #if ECAL_CORE_REGISTRATION - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); + if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample()); #ifndef NDEBUG // log it @@ -625,8 +606,7 @@ namespace eCAL // check connection timeouts CheckConnections(); - if (m_created) return GetRegistrationSample(); - else return GetUnregistrationSample(); + return GetRegistrationSample(); } Registration::Sample CDataWriter::GetRegistrationSample() @@ -816,6 +796,9 @@ namespace eCAL // create writer m_writer_udp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.layer.udp); + // register activated layer + Register(); + #ifndef NDEBUG Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateUdpLayer::WRITER_CREATED"); #endif @@ -839,6 +822,9 @@ namespace eCAL // create writer m_writer_shm = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.layer.shm); + // register activated layer + Register(); + #ifndef NDEBUG Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateShmLayer::WRITER_CREATED"); #endif @@ -862,6 +848,9 @@ namespace eCAL // create writer m_writer_tcp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.layer.tcp); + // register activated layer + Register(); + #ifndef NDEBUG Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateTcpLayer::WRITER_CREATED"); #endif diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index c1108de17e..1ea35da0d0 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -128,7 +128,7 @@ namespace eCAL std::string Dump(const std::string& indent_ = ""); protected: - void Register(bool force_); + void Register(); void Unregister(); void CheckConnections(); diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index 49f75c6041..f920848e5c 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -22,7 +22,7 @@ * * All process internal publisher/subscriber, server/clients register here with all their attributes. * - * These information will be send cyclic (registration refresh) via UDP to external eCAL processes. + * These information will be send cyclic (registration refresh) via UDP or SHM to external eCAL processes. * **/ #include "ecal_registration_provider.h" @@ -84,7 +84,7 @@ namespace eCAL // start cyclic registration thread m_reg_sample_snd_thread = std::make_shared(std::bind(&CRegistrationProvider::RegisterSendThread, this)); - m_reg_sample_snd_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs())); + m_reg_sample_snd_thread->start(std::chrono::milliseconds(0)); m_created = true; } @@ -97,37 +97,27 @@ namespace eCAL m_reg_sample_snd_thread->stop(); // send process unregistration sample - SendSample(Registration::GetProcessUnregisterSample()); + //m_reg_sender->SendSample(Registration::GetProcessUnregisterSample()); + // delete registration sender m_reg_sender.reset(); m_created = false; } - bool CRegistrationProvider::ApplySample(const Registration::Sample& sample_, const bool force_) + // register single sample (currently we do not differ between registration/unregistration) + bool CRegistrationProvider::RegisterSample(const Registration::Sample& sample_) { if (!m_created) return(false); + ProcessSingleSample(sample_); + return(true); + } - // forward all registration samples to outside "customer" (e.g. monitoring, descgate) - { - const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); - for (const auto& iter : m_callback_custom_apply_sample_map) - { - iter.second(sample_); - } - } - - if (force_) - { - // send sample - SendSample(sample_); - } - else - { - // add sample to sample list and send it later - AddSample2SampleList(sample_); - } - + // unregister single sample (currently we do not differ between registration/unregistration) + bool CRegistrationProvider::UnregisterSample(const Registration::Sample& sample_) + { + if (!m_created) return(false); + ProcessSingleSample(sample_); return(true); } @@ -147,55 +137,70 @@ namespace eCAL } } - void CRegistrationProvider::AddSample2SampleList(const Registration::Sample& sample_) + void CRegistrationProvider::ForwardSample(const Registration::Sample& sample_) + { + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + for (const auto& iter : m_callback_custom_apply_sample_map) + { + iter.second(sample_); + } + } + + void CRegistrationProvider::ProcessSingleSample(const Registration::Sample& sample_) { - const std::lock_guard lock(m_sample_list_mtx); - m_sample_list.samples.push_back(sample_); + // forward registration sample to outside "customer" (currently monitoring and descgate) + ForwardSample(sample_); + + // force rgistration thread to send + TriggerRegisterSendThread(); } - void CRegistrationProvider::SendSample(const Registration::Sample& sample_) + void CRegistrationProvider::TriggerRegisterSendThread() { - Registration::SampleList sample_list; - sample_list.samples.push_back(sample_); - m_reg_sender->SendSampleList(sample_list); + { + std::lock_guard lock(m_reg_sample_snd_thread_cv_mtx); + m_reg_sample_snd_thread_trigger = true; + } + m_reg_sample_snd_thread_cv.notify_one(); } void CRegistrationProvider::RegisterSendThread() { - // collect all registrations and send them out - // the internal list already contain elements here: - // one process registration sample - // one or more registration/unregistration samples added by AddSample2SampleList + // collect all registrations and send them out cyclic { - // lock sample list - std::lock_guard lock(m_sample_list_mtx); + // create sample list + Registration::SampleList sample_list; + + // and add process registration sample + sample_list.samples.push_back(Registration::GetProcessRegisterSample()); #if ECAL_CORE_SUBSCRIBER // add subscriber registrations - if (g_subgate() != nullptr) g_subgate()->GetRegistrations(m_sample_list); + if (g_subgate() != nullptr) g_subgate()->GetRegistrations(sample_list); #endif #if ECAL_CORE_PUBLISHER // add publisher registrations - if (g_pubgate() != nullptr) g_pubgate()->GetRegistrations(m_sample_list); + if (g_pubgate() != nullptr) g_pubgate()->GetRegistrations(sample_list); #endif #if ECAL_CORE_SERVICE // add server registrations - if (g_servicegate() != nullptr) g_servicegate()->GetRegistrations(m_sample_list); + if (g_servicegate() != nullptr) g_servicegate()->GetRegistrations(sample_list); // add client registrations - if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(m_sample_list); + if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(sample_list); #endif // send registration sample list - m_reg_sender->SendSampleList(m_sample_list); + m_reg_sender->SendSampleList(sample_list); - // clear it - m_sample_list.samples.clear(); - - // and add process registration sample to internal sample list as first sample (for next registration loop) - m_sample_list.samples.push_back(Registration::GetProcessRegisterSample()); + // wait for trigger or registration refresh timeout + { + std::unique_lock lock(m_reg_sample_snd_thread_cv_mtx); + m_reg_sample_snd_thread_cv.wait_for(lock, std::chrono::milliseconds(Config::GetRegistrationRefreshMs()), [this] { return m_reg_sample_snd_thread_trigger; }); + m_reg_sample_snd_thread_trigger = false; + } } } } diff --git a/ecal/core/src/registration/ecal_registration_provider.h b/ecal/core/src/registration/ecal_registration_provider.h index 587be88dec..85bdccd708 100644 --- a/ecal/core/src/registration/ecal_registration_provider.h +++ b/ecal/core/src/registration/ecal_registration_provider.h @@ -31,6 +31,7 @@ #include "io/udp/ecal_udp_sample_sender.h" #include +#include #include #include #include @@ -50,26 +51,28 @@ namespace eCAL void Start(); void Stop(); - bool ApplySample(const Registration::Sample& sample_, bool force_); + bool RegisterSample(const Registration::Sample& sample_); + bool UnregisterSample(const Registration::Sample& sample_); using ApplySampleCallbackT = std::function; void SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_); void RemCustomApplySampleCallback(const std::string& customer_); protected: - void AddSample2SampleList(const Registration::Sample& sample_); - void SendSample(const Registration::Sample& sample_); + void ForwardSample(const Registration::Sample& sample_); + void ProcessSingleSample(const Registration::Sample& sample_); + void TriggerRegisterSendThread(); void RegisterSendThread(); static std::atomic m_created; std::unique_ptr m_reg_sender; - std::shared_ptr m_reg_sample_snd_thread; - std::mutex m_sample_list_mtx; - Registration::SampleList m_sample_list; + std::condition_variable m_reg_sample_snd_thread_cv; + std::mutex m_reg_sample_snd_thread_cv_mtx; + bool m_reg_sample_snd_thread_trigger; bool m_use_registration_udp; bool m_use_registration_shm; diff --git a/ecal/core/src/registration/ecal_registration_sample_applier.cpp b/ecal/core/src/registration/ecal_registration_sample_applier.cpp index 41b7a4aa17..4f3b65282f 100644 --- a/ecal/core/src/registration/ecal_registration_sample_applier.cpp +++ b/ecal/core/src/registration/ecal_registration_sample_applier.cpp @@ -96,6 +96,36 @@ namespace eCAL return true; } + bool CSampleApplier::IsSameProcess(const Registration::Sample& sample_) + { + int32_t pid(0); + switch (sample_.cmd_type) + { + case bct_reg_process: + case bct_unreg_process: + pid = sample_.process.pid; + break; + case bct_reg_publisher: + case bct_unreg_publisher: + case bct_reg_subscriber: + case bct_unreg_subscriber: + pid = sample_.topic.pid; + break; + case bct_reg_service: + case bct_unreg_service: + pid = sample_.service.pid; + break; + case bct_reg_client: + case bct_unreg_client: + pid = sample_.client.pid; + break; + default: + break; + } + + return pid == m_pid; + } + bool CSampleApplier::AcceptRegistrationSample(const Registration::Sample& sample_) { // check if the sample is from the same host group @@ -103,7 +133,7 @@ namespace eCAL { // register if the sample is from another process // or if loopback mode is enabled - return m_loopback || (sample_.topic.pid != m_pid); + return !IsSameProcess(sample_) || m_loopback; } else { diff --git a/ecal/core/src/registration/ecal_registration_sample_applier.h b/ecal/core/src/registration/ecal_registration_sample_applier.h index 354f9c982c..401fdf4820 100644 --- a/ecal/core/src/registration/ecal_registration_sample_applier.h +++ b/ecal/core/src/registration/ecal_registration_sample_applier.h @@ -54,6 +54,7 @@ namespace eCAL void RemCustomApplySampleCallback(const std::string& customer_); private: + bool IsSameProcess(const Registration::Sample& sample_); bool IsHostGroupMember(const eCAL::Registration::Sample& sample_); bool AcceptRegistrationSample(const Registration::Sample& sample_); diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index 96f48f280d..abce7327eb 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -89,7 +89,7 @@ namespace eCAL m_created = true; // register this client - Register(true); + Register(); return(true); } @@ -687,13 +687,13 @@ namespace eCAL return ecal_reg_sample; } - void CServiceClientImpl::Register(const bool force_) + void CServiceClientImpl::Register() { if (!m_created) return; if (m_service_name.empty()) return; // register entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample()); } void CServiceClientImpl::Unregister() @@ -701,7 +701,7 @@ namespace eCAL if (m_service_name.empty()) return; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); + if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample()); } void CServiceClientImpl::CheckForNewServices() diff --git a/ecal/core/src/service/ecal_service_client_impl.h b/ecal/core/src/service/ecal_service_client_impl.h index 1c75b75ab3..cb9059d124 100644 --- a/ecal/core/src/service/ecal_service_client_impl.h +++ b/ecal/core/src/service/ecal_service_client_impl.h @@ -99,7 +99,7 @@ namespace eCAL Registration::Sample GetRegistrationSample(); Registration::Sample GetUnregistrationSample(); - void Register(bool force_); + void Register(); void Unregister(); void CheckForNewServices(); diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index 7aab9c31f4..10d2ff6450 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -128,9 +128,6 @@ namespace eCAL // mark as created m_created = true; - // register this service - Register(false); - return(true); } @@ -200,9 +197,6 @@ namespace eCAL } } - // register this service - Register(false); - return true; } @@ -238,9 +232,6 @@ namespace eCAL } } - // register this service - Register(false); - return true; } @@ -377,10 +368,10 @@ namespace eCAL return ecal_reg_sample; } - void CServiceServerImpl::Register(bool force_) + void CServiceServerImpl::Register() { #if ECAL_CORE_REGISTRATION - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample()); #ifndef NDEBUG // log it @@ -392,7 +383,7 @@ namespace eCAL void CServiceServerImpl::Unregister() { #if ECAL_CORE_REGISTRATION - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); + if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample()); #ifndef NDEBUG // log it diff --git a/ecal/core/src/service/ecal_service_server_impl.h b/ecal/core/src/service/ecal_service_server_impl.h index 22eef8a9ec..ccf741bef2 100644 --- a/ecal/core/src/service/ecal_service_server_impl.h +++ b/ecal/core/src/service/ecal_service_server_impl.h @@ -85,7 +85,7 @@ namespace eCAL std::string GetServiceName() { return m_service_name; }; protected: - void Register(bool force_); + void Register(); void Unregister(); Registration::Sample GetRegistrationSample(); diff --git a/ecal/tests/cpp/util_test/src/util_getclients.cpp b/ecal/tests/cpp/util_test/src/util_getclients.cpp index bd70c9dbe0..8a938cf16e 100644 --- a/ecal/tests/cpp/util_test/src/util_getclients.cpp +++ b/ecal/tests/cpp/util_test/src/util_getclients.cpp @@ -22,8 +22,8 @@ #include enum { - CMN_MONITORING_TIMEOUT_MS = (5000), - CMN_REGISTRATION_REFRESH_MS = (1000 * 2) + CMN_MONITORING_TIMEOUT_MS = (5000 + 100), + CMN_REGISTRATION_REFRESH_MS = (1000) }; TEST(core_cpp_util, ClientExpiration) @@ -31,6 +31,9 @@ TEST(core_cpp_util, ClientExpiration) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + std::map client_info_map; // create simple client and let it expire @@ -43,6 +46,9 @@ TEST(core_cpp_util, ClientExpiration) service_method_info.response_type.descriptor = "foo::resp_desc"; const eCAL::CServiceClient client("foo::service", { {"foo::method", service_method_info} }); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // get all clients eCAL::Util::GetClients(client_info_map); @@ -70,7 +76,7 @@ TEST(core_cpp_util, ClientExpiration) } // let's unregister - eCAL::Process::SleepMS(CMN_REGISTRATION_REFRESH_MS); + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // get all clients again, all clients // should be removed from the map @@ -88,6 +94,9 @@ TEST(core_cpp_util, ClientEqualQualities) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + std::map client_info_map; // create 2 clients with the same quality of data type information @@ -100,6 +109,9 @@ TEST(core_cpp_util, ClientEqualQualities) service_method_info1.response_type.descriptor = "foo::resp_desc1"; eCAL::CServiceClient client1("foo::service", { {"foo::method", service_method_info1} }); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // get all clients eCAL::Util::GetClients(client_info_map); @@ -159,7 +171,7 @@ TEST(core_cpp_util, ClientEqualQualities) } // let's unregister - eCAL::Process::SleepMS(CMN_REGISTRATION_REFRESH_MS); + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // get all clients again, all clients // should be removed from the map @@ -177,6 +189,9 @@ TEST(core_cpp_util, ClientDifferentQualities) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + std::map client_info_map; // create 2 clients with different qualities of data type information @@ -189,6 +204,9 @@ TEST(core_cpp_util, ClientDifferentQualities) service_method_info1.response_type.descriptor = ""; eCAL::CServiceClient client1("foo::service", { {"foo::method", service_method_info1} }); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // get all clients eCAL::Util::GetClients(client_info_map); @@ -214,6 +232,9 @@ TEST(core_cpp_util, ClientDifferentQualities) service_method_info2.response_type.descriptor = "foo::resp_desc2"; eCAL::CServiceClient client2("foo::service", { {"foo::method", service_method_info2} }); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // check attributes, we expect attributes from client 2 here eCAL::Util::GetClientTypeNames("foo::service", "foo::method", req_type, resp_type); EXPECT_EQ(req_type, "foo::req_type2"); @@ -227,7 +248,7 @@ TEST(core_cpp_util, ClientDifferentQualities) } // let's unregister - eCAL::Process::SleepMS(CMN_REGISTRATION_REFRESH_MS); + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // get all clients again, all clients // should be removed from the map diff --git a/ecal/tests/cpp/util_test/src/util_getservices.cpp b/ecal/tests/cpp/util_test/src/util_getservices.cpp index fbf2239d43..7ad633d5c1 100644 --- a/ecal/tests/cpp/util_test/src/util_getservices.cpp +++ b/ecal/tests/cpp/util_test/src/util_getservices.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ enum { CMN_MONITORING_TIMEOUT_MS = (5000 + 100), - CMN_REGISTRATION_REFRESH_MS = (1000 + 100) + CMN_REGISTRATION_REFRESH_MS = (1000) }; TEST(core_cpp_util, ServiceExpiration) @@ -31,6 +31,9 @@ TEST(core_cpp_util, ServiceExpiration) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + std::map service_info_map; // create simple service and let it expire @@ -39,6 +42,9 @@ TEST(core_cpp_util, ServiceExpiration) eCAL::CServiceServer service("foo::service"); service.AddDescription("foo::method", "foo::req_type", "foo::req_desc", "foo::resp_type", "foo::resp_desc"); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // get all services eCAL::Util::GetServices(service_info_map); @@ -66,7 +72,7 @@ TEST(core_cpp_util, ServiceExpiration) } // let's unregister - eCAL::Process::SleepMS(CMN_REGISTRATION_REFRESH_MS); + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // get all services again, all services // should be removed from the map @@ -84,6 +90,9 @@ TEST(core_cpp_util, ServiceEqualQualities) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + std::map service_info_map; // create 2 services with the same quality of data type information @@ -92,6 +101,9 @@ TEST(core_cpp_util, ServiceEqualQualities) eCAL::CServiceServer service1("foo::service"); service1.AddDescription("foo::method", "foo::req_type1", "foo::req_desc1", "foo::resp_type1", "foo::resp_desc1"); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // get all services eCAL::Util::GetServices(service_info_map); @@ -147,7 +159,7 @@ TEST(core_cpp_util, ServiceEqualQualities) } // let's unregister - eCAL::Process::SleepMS(CMN_REGISTRATION_REFRESH_MS); + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // get all services again, all services // should be removed from the map @@ -165,6 +177,9 @@ TEST(core_cpp_util, ServiceDifferentQualities) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + std::map service_info_map; // create 2 services with different qualities of data type information @@ -173,6 +188,9 @@ TEST(core_cpp_util, ServiceDifferentQualities) eCAL::CServiceServer service1("foo::service"); service1.AddDescription("foo::method", "foo::req_type1", "foo::req_desc1", "", ""); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // get all services eCAL::Util::GetServices(service_info_map); @@ -194,6 +212,9 @@ TEST(core_cpp_util, ServiceDifferentQualities) eCAL::CServiceServer service2("foo::service"); service2.AddDescription("foo::method", "foo::req_type2", "foo::req_desc2", "foo::resp_type2", "foo::resp_desc2"); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // check attributes, we expect attributes from service 2 here eCAL::Util::GetServiceTypeNames("foo::service", "foo::method", req_type, resp_type); EXPECT_EQ(req_type, "foo::req_type2"); @@ -207,7 +228,7 @@ TEST(core_cpp_util, ServiceDifferentQualities) } // let's unregister - eCAL::Process::SleepMS(CMN_REGISTRATION_REFRESH_MS); + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // get all services again, all services // should be removed from the map diff --git a/ecal/tests/cpp/util_test/src/util_gettopics.cpp b/ecal/tests/cpp/util_test/src/util_gettopics.cpp index 495a9dcfea..19b27429da 100644 --- a/ecal/tests/cpp/util_test/src/util_gettopics.cpp +++ b/ecal/tests/cpp/util_test/src/util_gettopics.cpp @@ -27,8 +27,8 @@ #include enum { - CMN_MONITORING_TIMEOUT_MS = (5000), - CMN_REGISTRATION_REFRESH_MS = (1000 * 2) + CMN_MONITORING_TIMEOUT_MS = (5000 + 100), + CMN_REGISTRATION_REFRESH_MS = (1000) }; TEST(core_cpp_util, GetTopics) @@ -69,6 +69,9 @@ TEST(core_cpp_util, GetTopics) // this should trigger a warning but not increase map size eCAL::CSubscriber sub12("B1", info_B1_2); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // get all topics eCAL::Util::GetTopics(topic_info_map); @@ -116,7 +119,7 @@ TEST(core_cpp_util, GetTopics) } // let's unregister - eCAL::Process::SleepMS(CMN_REGISTRATION_REFRESH_MS); + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // get all topics again, now all topics // should be removed from the map @@ -132,7 +135,7 @@ TEST(core_cpp_util, GetTopics) TEST(core_cpp_util, GetTopicsParallel) { constexpr const int max_publisher_count(2000); - constexpr const int waiting_time_thread(1000); + constexpr const int waiting_time_thread(4000); constexpr const int parallel_threads(1); // initialize eCAL API From c7008f2159f5fe03fc868e11c79680f9e1f55189 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 26 Jul 2024 20:26:22 +0200 Subject: [PATCH 2/5] monitoring default in ecal.yaml corrected to 5000 ms clang-tidy --- ecal/core/cfg/ecal.yaml | 34 +++++++++---------- .../ecal_registration_provider.cpp | 2 +- .../ecal_registration_sample_applier.cpp | 4 +-- .../ecal_registration_sample_applier.h | 5 +-- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/ecal/core/cfg/ecal.yaml b/ecal/core/cfg/ecal.yaml index 480be706ac..0ee9c9a569 100644 --- a/ecal/core/cfg/ecal.yaml +++ b/ecal/core/cfg/ecal.yaml @@ -1,8 +1,8 @@ -# _____ _ _ ____ _ _ -# | ____|___| (_)_ __ ___ ___ ___ / ___| / \ | | -# | _| / __| | | '_ \/ __|/ _ \ _____ / _ \ | / _ \ | | -# | |__| (__| | | |_) \__ \ __/ |_____| | __/ |___ / ___ \| |___ -# |_____\___|_|_| .__/|___/\___| \___|\____/_/ \_\_____| +# _____ _ _ ____ _ _ +# | ____|___| (_)_ __ ___ ___ ___ / ___| / \ | | +# | _| / __| | | '_ \/ __|/ _ \ _____ / _ \ | / _ \ | | +# | |__| (__| | | |_) \__ \ __/ |_____| | __/ |___ / ___ \| |___ +# |_____\___|_|_| .__/|___/\___| \___|\____/_/ \_\_____| # |_| # _ _ _ __ _ _ _ # __ _| | ___ | |__ __ _| | ___ ___ _ __ / _(_) __ _ _ _ _ __ __ _| |_(_) ___ _ __ @@ -15,7 +15,7 @@ # Registration layer configuration registration: # Topic registration refresh cylce (has to be smaller then registration timeout! Default: 1000) - registration_refresh: 1000 + registration_refresh: 1000 # Timeout for topic registration in ms (internal, Default: 60000) registration_timeout: 60000 # Enable to receive registration information on the same local machine @@ -38,12 +38,12 @@ registration: udp: enable: true port: 14000 - + # Monitoring configuration monitoring: - # Timeout for topic monitoring in ms (Default: 1000), increase in 1000er steps - timeout: 1000 + # Timeout for topic monitoring in ms (Default: 5000), increase in 1000er steps + timeout: 5000 # Topics blacklist as regular expression (will not be monitored) filter_excl: "^__.*$" # Topics whitelist as regular expression (will be monitored only) (Default: "") @@ -74,7 +74,7 @@ transport_layer: join_all_interfaces: false # Windows specific setting to enable receiving UDP traffic with the Npcap based receiver npcap_enabled: false - + # In local mode multicast group and ttl are set by default and are not adjustable local: # Multicast group base. All registration and logging is sent on this address @@ -86,9 +86,9 @@ transport_layer: # Multicast group base. All registration and logging is sent on this address group: "239.0.0.1" # TTL (hop limit) is used to determine the amount of routers being traversed towards the destination - ttl: 3 + ttl: 3 - tcp: + tcp: # Reader amount of threads that shall execute workload number_executor_reader: 4 # Writer amount of threads that shall execute workload @@ -96,12 +96,12 @@ transport_layer: # Reconnection attemps the session will try to reconnect in case of an issue max_reconnections: 5 - shm: + shm: # Default memory file size for new publisher memfile_min_size_bytes: 4096 # Dynamic file size reserve before recreating memory file if topic size changes memfile_reserve_percent: 50 - + # Publisher specific base settings publisher: @@ -117,12 +117,12 @@ publisher: acknowledge_timeout_ms: 0 # Maximum number of used buffers (needs to be greater than 1, default = 1) memfile_buffer_count: 1 - + # Base configuration for UDP publisher udp: # Enable layer enable: true - + # Base configuration for TCP publisher tcp: # Enable layer @@ -157,7 +157,7 @@ subscriber: # Enable dropping of payload messages that arrive out of order drop_out_of_order_messages: true - + # Time configuration time: diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index f920848e5c..b0fafcb1c2 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -158,7 +158,7 @@ namespace eCAL void CRegistrationProvider::TriggerRegisterSendThread() { { - std::lock_guard lock(m_reg_sample_snd_thread_cv_mtx); + const std::lock_guard lock(m_reg_sample_snd_thread_cv_mtx); m_reg_sample_snd_thread_trigger = true; } m_reg_sample_snd_thread_cv.notify_one(); diff --git a/ecal/core/src/registration/ecal_registration_sample_applier.cpp b/ecal/core/src/registration/ecal_registration_sample_applier.cpp index 4f3b65282f..5dba33dbb4 100644 --- a/ecal/core/src/registration/ecal_registration_sample_applier.cpp +++ b/ecal/core/src/registration/ecal_registration_sample_applier.cpp @@ -59,7 +59,7 @@ namespace eCAL return true; } - bool CSampleApplier::IsHostGroupMember(const Registration::Sample& sample_) + bool CSampleApplier::IsHostGroupMember(const Registration::Sample& sample_) const { std::string host_group_name; std::string host_name; @@ -96,7 +96,7 @@ namespace eCAL return true; } - bool CSampleApplier::IsSameProcess(const Registration::Sample& sample_) + bool CSampleApplier::IsSameProcess(const Registration::Sample& sample_) const { int32_t pid(0); switch (sample_.cmd_type) diff --git a/ecal/core/src/registration/ecal_registration_sample_applier.h b/ecal/core/src/registration/ecal_registration_sample_applier.h index 401fdf4820..bd2fcf889e 100644 --- a/ecal/core/src/registration/ecal_registration_sample_applier.h +++ b/ecal/core/src/registration/ecal_registration_sample_applier.h @@ -54,8 +54,9 @@ namespace eCAL void RemCustomApplySampleCallback(const std::string& customer_); private: - bool IsSameProcess(const Registration::Sample& sample_); - bool IsHostGroupMember(const eCAL::Registration::Sample& sample_); + bool IsSameProcess(const Registration::Sample& sample_) const; + bool IsHostGroupMember(const eCAL::Registration::Sample& sample_) const; + bool AcceptRegistrationSample(const Registration::Sample& sample_); bool m_network; From e5ca5c42b7d387e43f20c92078f2ae8a91d6b50c Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Sun, 28 Jul 2024 19:53:53 +0200 Subject: [PATCH 3/5] registration provider logic reduced to just send registration samples (connection to descgate, monitoring cut) --- ecal/core/src/config/default_config.h | 24 +++++----- ecal/core/src/ecal_globals.cpp | 6 +-- .../src/monitoring/ecal_monitoring_impl.cpp | 6 +-- ecal/core/src/readwrite/ecal_reader.cpp | 5 ++- .../ecal_registration_provider.cpp | 44 +++++++------------ .../registration/ecal_registration_provider.h | 12 ++--- .../cpp/util_test/src/util_getclients.cpp | 3 ++ .../cpp/util_test/src/util_getservices.cpp | 3 ++ .../cpp/util_test/src/util_gettopics.cpp | 3 ++ 9 files changed, 49 insertions(+), 57 deletions(-) diff --git a/ecal/core/src/config/default_config.h b/ecal/core/src/config/default_config.h index abe4bf28bb..321c90736a 100644 --- a/ecal/core/src/config/default_config.h +++ b/ecal/core/src/config/default_config.h @@ -19,7 +19,7 @@ const std::string default_config = R"(# _____ _ _ # Registration layer configuration registration: # Topic registration refresh cylce (has to be smaller then registration timeout! Default: 1000) - registration_refresh: 1000 + registration_refresh: 1000 # Timeout for topic registration in ms (internal, Default: 60000) registration_timeout: 60000 # Enable to receive registration information on the same local machine @@ -42,12 +42,12 @@ const std::string default_config = R"(# _____ _ _ udp: enable: true port: 14000 - + # Monitoring configuration monitoring: - # Timeout for topic monitoring in ms (Default: 1000), increase in 1000er steps - timeout: 1000 + # Timeout for topic monitoring in ms (Default: 5000), increase in 1000er steps + timeout: 5000 # Topics blacklist as regular expression (will not be monitored) filter_excl: "^__.*$" # Topics whitelist as regular expression (will be monitored only) (Default: "") @@ -78,7 +78,7 @@ const std::string default_config = R"(# _____ _ _ join_all_interfaces: false # Windows specific setting to enable receiving UDP traffic with the Npcap based receiver npcap_enabled: false - + # In local mode multicast group and ttl are set by default and are not adjustable local: # Multicast group base. All registration and logging is sent on this address @@ -90,9 +90,9 @@ const std::string default_config = R"(# _____ _ _ # Multicast group base. All registration and logging is sent on this address group: "239.0.0.1" # TTL (hop limit) is used to determine the amount of routers being traversed towards the destination - ttl: 3 + ttl: 3 - tcp: + tcp: # Reader amount of threads that shall execute workload number_executor_reader: 4 # Writer amount of threads that shall execute workload @@ -100,12 +100,12 @@ const std::string default_config = R"(# _____ _ _ # Reconnection attemps the session will try to reconnect in case of an issue max_reconnections: 5 - shm: + shm: # Default memory file size for new publisher memfile_min_size_bytes: 4096 # Dynamic file size reserve before recreating memory file if topic size changes memfile_reserve_percent: 50 - + # Publisher specific base settings publisher: @@ -121,12 +121,12 @@ const std::string default_config = R"(# _____ _ _ acknowledge_timeout_ms: 0 # Maximum number of used buffers (needs to be greater than 1, default = 1) memfile_buffer_count: 1 - + # Base configuration for UDP publisher udp: # Enable layer enable: true - + # Base configuration for TCP publisher tcp: # Enable layer @@ -161,7 +161,7 @@ const std::string default_config = R"(# _____ _ _ # Enable dropping of payload messages that arrive out of order drop_out_of_order_messages: true - + # Time configuration time: diff --git a/ecal/core/src/ecal_globals.cpp b/ecal/core/src/ecal_globals.cpp index 5954bd017b..56634748d4 100644 --- a/ecal/core/src/ecal_globals.cpp +++ b/ecal/core/src/ecal_globals.cpp @@ -204,8 +204,7 @@ namespace eCAL if (descgate_instance) { #if ECAL_CORE_REGISTRATION - // utilize registration provider and receiver to get descriptions - g_registration_provider()->SetCustomApplySampleCallback("descgate", [](const auto& sample_) {g_descgate()->ApplySample(sample_, tl_none); }); + // utilize registration receiver to get descriptions g_registration_receiver()->SetCustomApplySampleCallback("descgate", [](const auto& sample_) {g_descgate()->ApplySample(sample_, tl_none); }); #endif } @@ -303,8 +302,7 @@ namespace eCAL if (descgate_instance) { #if ECAL_CORE_REGISTRATION - // stop registration provider and receiver utilization to get descriptions - g_registration_provider()->RemCustomApplySampleCallback("descgate"); + // stop registration receiver utilization to get descriptions g_registration_receiver()->RemCustomApplySampleCallback("descgate"); #endif } diff --git a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp index caea4b5441..e0cc7c13a7 100644 --- a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp +++ b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp @@ -58,8 +58,7 @@ namespace eCAL // get name of this host m_host_name = Process::GetHostName(); - // utilize registration provider and receiver to enrich monitor information - g_registration_provider()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); }); + // utilize registration receiver to enrich monitor information g_registration_receiver()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_){this->ApplySample(sample_, tl_none);}); // setup blacklist and whitelist filter strings# @@ -74,8 +73,7 @@ namespace eCAL void CMonitoringImpl::Destroy() { - // stop registration provider and receiver utilization to enrich monitor information - g_registration_provider()->RemCustomApplySampleCallback("monitoring"); + // stop registration receiver utilization to enrich monitor information g_registration_receiver()->RemCustomApplySampleCallback("monitoring"); m_init = false; } diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index ff0ae7fb8d..f3b1127821 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -357,7 +357,10 @@ namespace eCAL // initialize tcp layer #if ECAL_CORE_TRANSPORT_TCP - if (Config::IsTcpRecEnabled()) + // TODO: use configuration here + // currently we have a missmatch between global Config::IsTcpRecEnabled() and m_config.tcp.enable + //if (Config::IsTcpRecEnabled()) + if (1) { CTCPReaderLayer::Get()->Initialize(); } diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index b0fafcb1c2..d232ba024e 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -97,7 +97,7 @@ namespace eCAL m_reg_sample_snd_thread->stop(); // send process unregistration sample - //m_reg_sender->SendSample(Registration::GetProcessUnregisterSample()); + ProcessSingleSample(Registration::GetProcessUnregisterSample()); // delete registration sender m_reg_sender.reset(); @@ -121,40 +121,21 @@ namespace eCAL return(true); } - void CRegistrationProvider::SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_) - { - const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); - m_callback_custom_apply_sample_map[customer_] = callback_; - } - - void CRegistrationProvider::RemCustomApplySampleCallback(const std::string& customer_) - { - const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); - auto iter = m_callback_custom_apply_sample_map.find(customer_); - if (iter != m_callback_custom_apply_sample_map.end()) - { - m_callback_custom_apply_sample_map.erase(iter); - } - } - - void CRegistrationProvider::ForwardSample(const Registration::Sample& sample_) - { - const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); - for (const auto& iter : m_callback_custom_apply_sample_map) - { - iter.second(sample_); - } - } - void CRegistrationProvider::ProcessSingleSample(const Registration::Sample& sample_) { - // forward registration sample to outside "customer" (currently monitoring and descgate) - ForwardSample(sample_); + // add registration sample to registration loop + AddSingleSample(sample_); // force rgistration thread to send TriggerRegisterSendThread(); } + void CRegistrationProvider::AddSingleSample(const Registration::Sample& sample_) + { + const std::lock_guard lock(m_applied_sample_list_mtx); + m_applied_sample_list.samples.push_back(sample_); + } + void CRegistrationProvider::TriggerRegisterSendThread() { { @@ -195,6 +176,13 @@ namespace eCAL // send registration sample list m_reg_sender->SendSampleList(sample_list); + // send active applied samples at the end of the registration loop + { + const std::lock_guard lock(m_applied_sample_list_mtx); + m_reg_sender->SendSampleList(m_applied_sample_list); + m_applied_sample_list.samples.clear(); + } + // wait for trigger or registration refresh timeout { std::unique_lock lock(m_reg_sample_snd_thread_cv_mtx); diff --git a/ecal/core/src/registration/ecal_registration_provider.h b/ecal/core/src/registration/ecal_registration_provider.h index 85bdccd708..70364570d8 100644 --- a/ecal/core/src/registration/ecal_registration_provider.h +++ b/ecal/core/src/registration/ecal_registration_provider.h @@ -54,13 +54,9 @@ namespace eCAL bool RegisterSample(const Registration::Sample& sample_); bool UnregisterSample(const Registration::Sample& sample_); - using ApplySampleCallbackT = std::function; - void SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_); - void RemCustomApplySampleCallback(const std::string& customer_); - protected: - void ForwardSample(const Registration::Sample& sample_); void ProcessSingleSample(const Registration::Sample& sample_); + void AddSingleSample(const Registration::Sample& sample_); void TriggerRegisterSendThread(); void RegisterSendThread(); @@ -74,10 +70,10 @@ namespace eCAL std::mutex m_reg_sample_snd_thread_cv_mtx; bool m_reg_sample_snd_thread_trigger; + std::mutex m_applied_sample_list_mtx; + Registration::SampleList m_applied_sample_list; + bool m_use_registration_udp; bool m_use_registration_shm; - - std::mutex m_callback_custom_apply_sample_map_mtx; - std::map m_callback_custom_apply_sample_map; }; } diff --git a/ecal/tests/cpp/util_test/src/util_getclients.cpp b/ecal/tests/cpp/util_test/src/util_getclients.cpp index 8a938cf16e..ee93e2efc3 100644 --- a/ecal/tests/cpp/util_test/src/util_getclients.cpp +++ b/ecal/tests/cpp/util_test/src/util_getclients.cpp @@ -161,6 +161,9 @@ TEST(core_cpp_util, ClientEqualQualities) // destroy client 1 client1.Destroy(); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // check attributes, client 1 attributes should be replaced by client 2 attributes now eCAL::Util::GetClientTypeNames("foo::service", "foo::method", req_type, resp_type); EXPECT_EQ(req_type, "foo::req_type2"); diff --git a/ecal/tests/cpp/util_test/src/util_getservices.cpp b/ecal/tests/cpp/util_test/src/util_getservices.cpp index 7ad633d5c1..2e1aef8c5b 100644 --- a/ecal/tests/cpp/util_test/src/util_getservices.cpp +++ b/ecal/tests/cpp/util_test/src/util_getservices.cpp @@ -149,6 +149,9 @@ TEST(core_cpp_util, ServiceEqualQualities) // destroy service 1 service1.Destroy(); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // check attributes, service 1 attributes should be replaced by service 2 attributes now eCAL::Util::GetServiceTypeNames("foo::service", "foo::method", req_type, resp_type); EXPECT_EQ(req_type, "foo::req_type2"); diff --git a/ecal/tests/cpp/util_test/src/util_gettopics.cpp b/ecal/tests/cpp/util_test/src/util_gettopics.cpp index 19b27429da..3d4466a331 100644 --- a/ecal/tests/cpp/util_test/src/util_gettopics.cpp +++ b/ecal/tests/cpp/util_test/src/util_gettopics.cpp @@ -102,6 +102,9 @@ TEST(core_cpp_util, GetTopics) pub1.Destroy(); sub1.Destroy(); + // let's register + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); + // size should be 5 again (because of pub12 and sub12 should have replaced pub1 and sub1 attributes now) EXPECT_EQ(topic_info_map.size(), 5); From a2d054ba8ddd5adc665f793fc4e07103cae58a74 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Mon, 29 Jul 2024 10:56:25 +0200 Subject: [PATCH 4/5] use subscriber config (m_config) to initialize needed data layer --- ecal/core/src/pubsub/ecal_subgate.cpp | 4 -- ecal/core/src/readwrite/ecal_reader.cpp | 10 ++--- ecal/core/src/readwrite/ecal_reader.h | 2 +- .../src/readwrite/tcp/ecal_reader_tcp.cpp | 5 ++- ecal/core/src/readwrite/tcp/ecal_reader_tcp.h | 2 + .../ecal_registration_provider.cpp | 39 +++++++++++-------- .../registration/ecal_registration_provider.h | 2 - 7 files changed, 33 insertions(+), 31 deletions(-) diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 61c254a37c..cdac6281b8 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -52,10 +52,6 @@ namespace eCAL void CSubGate::Start() { if(m_created) return; - - // initialize data reader layers - CDataReader::InitializeLayers(); - m_created = true; } diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index f3b1127821..460742f3cf 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -94,6 +94,7 @@ namespace eCAL m_pub_map.set_expiration(registration_timeout); // start transport layers + InitializeLayers(); StartTransportLayer(); // mark as created @@ -341,7 +342,7 @@ namespace eCAL { // initialize udp layer #if ECAL_CORE_TRANSPORT_UDP - if (Config::IsUdpMulticastRecEnabled()) + if (m_config.udp.enable) { CUDPReaderLayer::Get()->Initialize(); } @@ -349,7 +350,7 @@ namespace eCAL // initialize shm layer #if ECAL_CORE_TRANSPORT_SHM - if (Config::IsShmRecEnabled()) + if (m_config.shm.enable) { CSHMReaderLayer::Get()->Initialize(); } @@ -357,10 +358,7 @@ namespace eCAL // initialize tcp layer #if ECAL_CORE_TRANSPORT_TCP - // TODO: use configuration here - // currently we have a missmatch between global Config::IsTcpRecEnabled() and m_config.tcp.enable - //if (Config::IsTcpRecEnabled()) - if (1) + if (m_config.tcp.enable) { CTCPReaderLayer::Get()->Initialize(); } diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index eebff4d34c..c6955e2d69 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -119,7 +119,7 @@ namespace eCAL std::string GetTopicID() const { return(m_topic_id); } SDataTypeInformation GetDataTypeInformation() const { return(m_topic_info); } - static void InitializeLayers(); + void InitializeLayers(); size_t ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); std::string Dump(const std::string& indent_ = ""); diff --git a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp index ed82df0fd1..e916fcdc11 100644 --- a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp +++ b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp @@ -125,10 +125,13 @@ namespace eCAL //////////////// // LAYER //////////////// - CTCPReaderLayer::CTCPReaderLayer() = default; + CTCPReaderLayer::CTCPReaderLayer() : m_initialized(false) {} void CTCPReaderLayer::Initialize() { + if (m_initialized) return; + m_initialized = true; + const tcp_pubsub::logger::logger_t tcp_pubsub_logger = std::bind(TcpPubsubLogger, std::placeholders::_1, std::placeholders::_2); m_executor = std::make_shared(Config::GetTcpPubsubReaderThreadpoolSize(), tcp_pubsub_logger); } diff --git a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h index 83fd64f557..2eae0b2a5e 100644 --- a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h +++ b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h @@ -30,6 +30,7 @@ #include "serialization/ecal_struct_sample_payload.h" +#include #include #include @@ -73,6 +74,7 @@ namespace eCAL void SetConnectionParameter(SReaderLayerPar& /*par_*/) override; private: + std::atomic m_initialized; std::shared_ptr m_executor; using DataReaderTCPMapT = std::unordered_map>; diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index d232ba024e..74936dec78 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -96,8 +95,14 @@ namespace eCAL // stop cyclic registration thread m_reg_sample_snd_thread->stop(); - // send process unregistration sample - ProcessSingleSample(Registration::GetProcessUnregisterSample()); + // add unregistration sample to registration loop + AddSingleSample(Registration::GetProcessUnregisterSample()); + + // wake up registration thread the last time + TriggerRegisterSendThread(); + + // stop registration thread + m_reg_sample_snd_thread.reset(); // delete registration sender m_reg_sender.reset(); @@ -105,29 +110,29 @@ namespace eCAL m_created = false; } - // register single sample (currently we do not differ between registration/unregistration) + // (re)register single sample bool CRegistrationProvider::RegisterSample(const Registration::Sample& sample_) { if (!m_created) return(false); - ProcessSingleSample(sample_); + + // add registration sample to registration loop + AddSingleSample(sample_); + + // wake up registration thread + TriggerRegisterSendThread(); + return(true); } - // unregister single sample (currently we do not differ between registration/unregistration) + // unregister single sample bool CRegistrationProvider::UnregisterSample(const Registration::Sample& sample_) { if (!m_created) return(false); - ProcessSingleSample(sample_); - return(true); - } - void CRegistrationProvider::ProcessSingleSample(const Registration::Sample& sample_) - { - // add registration sample to registration loop + // add registration sample to registration loop, no need to force registration thread to send AddSingleSample(sample_); - // force rgistration thread to send - TriggerRegisterSendThread(); + return(true); } void CRegistrationProvider::AddSingleSample(const Registration::Sample& sample_) @@ -173,17 +178,17 @@ namespace eCAL if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(sample_list); #endif - // send registration sample list + // send collected registration sample list m_reg_sender->SendSampleList(sample_list); - // send active applied samples at the end of the registration loop + // send asynchronously applied samples at the end of the registration loop { const std::lock_guard lock(m_applied_sample_list_mtx); m_reg_sender->SendSampleList(m_applied_sample_list); m_applied_sample_list.samples.clear(); } - // wait for trigger or registration refresh timeout + // wait for external trigger or until registration refresh timeout { std::unique_lock lock(m_reg_sample_snd_thread_cv_mtx); m_reg_sample_snd_thread_cv.wait_for(lock, std::chrono::milliseconds(Config::GetRegistrationRefreshMs()), [this] { return m_reg_sample_snd_thread_trigger; }); diff --git a/ecal/core/src/registration/ecal_registration_provider.h b/ecal/core/src/registration/ecal_registration_provider.h index 70364570d8..a7c49d45da 100644 --- a/ecal/core/src/registration/ecal_registration_provider.h +++ b/ecal/core/src/registration/ecal_registration_provider.h @@ -55,9 +55,7 @@ namespace eCAL bool UnregisterSample(const Registration::Sample& sample_); protected: - void ProcessSingleSample(const Registration::Sample& sample_); void AddSingleSample(const Registration::Sample& sample_); - void TriggerRegisterSendThread(); void RegisterSendThread(); From 0fccaa21023bc3d32d0e77bb5359288181513d17 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Mon, 29 Jul 2024 13:01:32 +0200 Subject: [PATCH 5/5] CThread class extented with trigger function --- .../ecal_registration_provider.cpp | 29 +++------------- .../registration/ecal_registration_provider.h | 30 ++++++----------- ecal/core/src/util/ecal_thread.h | 33 ++++++++++++++++--- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index 74936dec78..b8816c2f1a 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -83,7 +83,7 @@ namespace eCAL // start cyclic registration thread m_reg_sample_snd_thread = std::make_shared(std::bind(&CRegistrationProvider::RegisterSendThread, this)); - m_reg_sample_snd_thread->start(std::chrono::milliseconds(0)); + m_reg_sample_snd_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs())); m_created = true; } @@ -92,17 +92,14 @@ namespace eCAL { if(!m_created) return; - // stop cyclic registration thread - m_reg_sample_snd_thread->stop(); - // add unregistration sample to registration loop AddSingleSample(Registration::GetProcessUnregisterSample()); // wake up registration thread the last time - TriggerRegisterSendThread(); + m_reg_sample_snd_thread->trigger(); - // stop registration thread - m_reg_sample_snd_thread.reset(); + // stop cyclic registration thread + m_reg_sample_snd_thread->stop(); // delete registration sender m_reg_sender.reset(); @@ -119,7 +116,7 @@ namespace eCAL AddSingleSample(sample_); // wake up registration thread - TriggerRegisterSendThread(); + m_reg_sample_snd_thread->trigger(); return(true); } @@ -141,15 +138,6 @@ namespace eCAL m_applied_sample_list.samples.push_back(sample_); } - void CRegistrationProvider::TriggerRegisterSendThread() - { - { - const std::lock_guard lock(m_reg_sample_snd_thread_cv_mtx); - m_reg_sample_snd_thread_trigger = true; - } - m_reg_sample_snd_thread_cv.notify_one(); - } - void CRegistrationProvider::RegisterSendThread() { // collect all registrations and send them out cyclic @@ -187,13 +175,6 @@ namespace eCAL m_reg_sender->SendSampleList(m_applied_sample_list); m_applied_sample_list.samples.clear(); } - - // wait for external trigger or until registration refresh timeout - { - std::unique_lock lock(m_reg_sample_snd_thread_cv_mtx); - m_reg_sample_snd_thread_cv.wait_for(lock, std::chrono::milliseconds(Config::GetRegistrationRefreshMs()), [this] { return m_reg_sample_snd_thread_trigger; }); - m_reg_sample_snd_thread_trigger = false; - } } } } diff --git a/ecal/core/src/registration/ecal_registration_provider.h b/ecal/core/src/registration/ecal_registration_provider.h index a7c49d45da..5fe6652763 100644 --- a/ecal/core/src/registration/ecal_registration_provider.h +++ b/ecal/core/src/registration/ecal_registration_provider.h @@ -28,17 +28,12 @@ #pragma once -#include "io/udp/ecal_udp_sample_sender.h" -#include -#include +#include "registration/ecal_registration_sender.h" +#include "util/ecal_thread.h" + #include #include -#include -#include - -#include -#include "util/ecal_thread.h" namespace eCAL { @@ -56,22 +51,17 @@ namespace eCAL protected: void AddSingleSample(const Registration::Sample& sample_); - void TriggerRegisterSendThread(); void RegisterSendThread(); - static std::atomic m_created; - - std::unique_ptr m_reg_sender; - std::shared_ptr m_reg_sample_snd_thread; + static std::atomic m_created; - std::condition_variable m_reg_sample_snd_thread_cv; - std::mutex m_reg_sample_snd_thread_cv_mtx; - bool m_reg_sample_snd_thread_trigger; + std::unique_ptr m_reg_sender; + std::shared_ptr m_reg_sample_snd_thread; - std::mutex m_applied_sample_list_mtx; - Registration::SampleList m_applied_sample_list; + std::mutex m_applied_sample_list_mtx; + Registration::SampleList m_applied_sample_list; - bool m_use_registration_udp; - bool m_use_registration_shm; + bool m_use_registration_udp; + bool m_use_registration_shm; }; } diff --git a/ecal/core/src/util/ecal_thread.h b/ecal/core/src/util/ecal_thread.h index 49aec75437..7aa1d13fba 100644 --- a/ecal/core/src/util/ecal_thread.h +++ b/ecal/core/src/util/ecal_thread.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,12 +84,28 @@ namespace eCAL } } + /** + * @brief Trigger the callback thread to interrupt the current sleep without stopping it. + * The callback function will be executed immediately. + */ + void trigger() + { + { + const std::unique_lock lock(mtx_); + // Set the flag to signal the callback thread to trigger + triggerThread_ = true; + // Notify the callback thread to wake up and check the flag + cv_.notify_one(); + } + } + private: std::thread callbackThread_; /**< The callback thread object. */ std::function callback_; /**< The callback function to be executed in the callback thread. */ std::mutex mtx_; /**< Mutex for thread synchronization. */ std::condition_variable cv_; /**< Condition variable for signaling between threads. */ - bool stopThread_{false}; /**< Flag to indicate whether the callback thread should stop. */ + bool stopThread_{ false }; /**< Flag to indicate whether the callback thread should stop. */ + bool triggerThread_{ false }; /**< Flag to indicate whether the callback thread should be triggered. */ /** * @brief Callback function that runs in the callback thread. @@ -105,10 +121,17 @@ namespace eCAL { std::unique_lock lock(mtx_); // Wait for a signal or a timeout - if (cv_.wait_for(lock, timeout, [this] { return stopThread_; })) + if (cv_.wait_for(lock, timeout, [this] { return stopThread_ || triggerThread_; })) { - // If the stopThread flag is true, break out of the loop - break; + if (stopThread_) { + // If the stopThread flag is true, break out of the loop + break; + } + + if (triggerThread_) { + // If the triggerThread flag is true, reset it and proceed + triggerThread_ = false; + } } }