From e0a850adca0d1de6f59f1099d459e28b07f152ec Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 19 Jun 2024 18:08:04 +0200 Subject: [PATCH 1/4] new CSubscriber method "IsPublished" to be aligned with CPublishers "IsSubscribed" (we may rename both functions to "IsConnected" in the future) new additional check for IsPublished using the connection state of the matching subscriber (should ensure that IsPublished is flagged as true only if publisher is able to send data) test added to test new behavior --- ecal/core/include/ecal/ecal_subscriber.h | 9 +- ecal/core/src/pubsub/ecal_subgate.cpp | 18 +++- ecal/core/src/pubsub/ecal_subscriber.cpp | 8 +- ecal/core/src/readwrite/ecal_reader.h | 3 +- .../pubsub_test/src/pubsub_receive_test.cpp | 97 ++++++++++++++++++- 5 files changed, 124 insertions(+), 11 deletions(-) diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 9eff791399..96350e5695 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -298,6 +298,13 @@ namespace eCAL **/ ECAL_API bool IsCreated() const {return(m_created);} + /** + * @brief Query if the subscriber is published. + * + * @return true if published, false if not. + **/ + ECAL_API bool IsPublished() const; + /** * @brief Query the number of publishers. * diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 031cd0f93d..2d4cf66af3 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -295,8 +295,12 @@ namespace eCAL iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par); } - // inform for local publisher connection - iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least on local subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_loc() > 0) + { + iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + } } } @@ -345,8 +349,12 @@ namespace eCAL const std::string writer_par = tlayer.par_layer().SerializeAsString(); iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par); } - // inform for external publisher connection - iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least on external subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_ext() > 0) + { + iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + } } } diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index a5d92f9adc..dcd3b8f7f1 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -250,6 +250,12 @@ namespace eCAL return(m_datareader->RemEventCallback(type_)); } + bool CSubscriber::IsPublished() const + { + if (m_datareader == nullptr) return(false); + return(m_datareader->IsPublished()); + } + size_t CSubscriber::GetPublisherCount() const { if(m_datareader == nullptr) return(0); diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index ed84808463..61a973f55c 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,6 +96,7 @@ namespace eCAL bool IsCreated() const {return(m_created);} + bool IsPublished() const { return(m_loc_published || m_ext_published); } size_t GetPublisherCount() const { const std::lock_guard lock(m_pub_map_sync); diff --git a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp index d7059015a1..df5e9c0ef1 100644 --- a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -161,8 +161,6 @@ TEST(PubSub, TimingSubscriberReceive) EXPECT_EQ(0, eCAL::Finalize()); } - - // This tests test for sporadically received empty messages which were a problem. TEST(PubSub, SporadicEmptyReceives) { @@ -219,3 +217,96 @@ TEST(PubSub, SporadicEmptyReceives) // finalize eCAL API EXPECT_EQ(0, eCAL::Finalize()); } + +TEST(PubSub, TestSubscriberSeen) +{ + // initialize eCAL API + EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen")); + + // enable data loopback + eCAL::Util::EnableLoopback(true); + + std::atomic subscriber_seen_at_publication_start = false; + std::atomic subscriber_seen_at_publication_end = false; + + std::atomic do_start_publication = false; + std::atomic publication_finished = false; + + // publishing thread + auto publisher_thread = [&]() { + eCAL::CPublisher pub("blob"); + pub.ShmSetAcknowledgeTimeout(500); + + int cnt(0); + const auto max_runs(1000); + while (eCAL::Ok()) + { + if (do_start_publication && cnt < max_runs) + { + if (cnt == 0) + { + subscriber_seen_at_publication_start = pub.IsSubscribed(); + } + + pub.Send(std::to_string(cnt)); + cnt++; + + if (cnt == max_runs) + { + subscriber_seen_at_publication_end = pub.IsSubscribed(); + publication_finished = true; + break; + } + } + } + }; + + // subscribing thread + auto subscriber_thread = [&]() { + eCAL::CSubscriber sub("blob"); + bool received(false); + auto max_lines(10); + auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + if (max_lines) + { + // the final log should look like this + // ----------------------------------- + // Receiving 0 + // Receiving 1 + // Receiving 2 + // Receiving 3 + // Receiving 4 + // Receiving 5 + // Receiving 6 + // Receiving 7 + // Receiving 8 + // Receiving 9 + // ----------------------------------- + std::cout << "Receiving " << std::string(static_cast(data_->buf), data_->size) << std::endl; + max_lines--; + } + }; + sub.AddReceiveCallback(receive_lambda); + + while (eCAL::Ok() && !publication_finished) + { + if (sub.IsPublished()) do_start_publication = true; + } + }; + + // create threads for publisher and subscriber + std::thread pub_thread(publisher_thread); + std::thread sub_thread(subscriber_thread); + + // join threads to the main thread + pub_thread.join(); + sub_thread.join(); + + // finalize eCAL API + eCAL::Finalize(); + + // check if the publisher has seen the subscriber + EXPECT_TRUE(subscriber_seen_at_publication_start); + EXPECT_TRUE(subscriber_seen_at_publication_end); +} From 1fd13a08fc21017a0f22076c88c0168591b5593b Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 19 Jun 2024 18:29:51 +0200 Subject: [PATCH 2/4] atomic boolean Initialization fixed --- testing/ecal/pubsub_test/src/pubsub_receive_test.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp index df5e9c0ef1..ce63f1576a 100644 --- a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp @@ -226,11 +226,11 @@ TEST(PubSub, TestSubscriberSeen) // enable data loopback eCAL::Util::EnableLoopback(true); - std::atomic subscriber_seen_at_publication_start = false; - std::atomic subscriber_seen_at_publication_end = false; + std::atomic subscriber_seen_at_publication_start(false); + std::atomic subscriber_seen_at_publication_end(false); - std::atomic do_start_publication = false; - std::atomic publication_finished = false; + std::atomic do_start_publication(false); + std::atomic publication_finished(false); // publishing thread auto publisher_thread = [&]() { From 7c9edea5e3fc7fc7ce2441b5335c8743c858d9b1 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:05:01 +0200 Subject: [PATCH 3/4] new API CSubscriber function "IsPublished" removed again to keep ABI compatibility within 5.13.x --- ecal/core/include/ecal/ecal_subscriber.h | 7 ------- ecal/core/src/pubsub/ecal_subscriber.cpp | 10 +++++----- testing/ecal/pubsub_test/src/pubsub_receive_test.cpp | 5 +++-- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 96350e5695..147b1b3a3e 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -298,13 +298,6 @@ namespace eCAL **/ ECAL_API bool IsCreated() const {return(m_created);} - /** - * @brief Query if the subscriber is published. - * - * @return true if published, false if not. - **/ - ECAL_API bool IsPublished() const; - /** * @brief Query the number of publishers. * diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index dcd3b8f7f1..0883739c87 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -250,11 +250,11 @@ namespace eCAL return(m_datareader->RemEventCallback(type_)); } - bool CSubscriber::IsPublished() const - { - if (m_datareader == nullptr) return(false); - return(m_datareader->IsPublished()); - } + //bool CSubscriber::IsPublished() const + //{ + // if (m_datareader == nullptr) return(false); + // return(m_datareader->IsPublished()); + //} size_t CSubscriber::GetPublisherCount() const { diff --git a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp index ce63f1576a..fa10df9ead 100644 --- a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp @@ -268,7 +268,7 @@ TEST(PubSub, TestSubscriberSeen) auto max_lines(10); auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) { - if (max_lines) + if (max_lines != 0) { // the final log should look like this // ----------------------------------- @@ -291,7 +291,8 @@ TEST(PubSub, TestSubscriberSeen) while (eCAL::Ok() && !publication_finished) { - if (sub.IsPublished()) do_start_publication = true; + //if (sub.IsPublished()) do_start_publication = true; + if (sub.GetPublisherCount() > 0) do_start_publication = true; } }; From 9f6179620d94b499db8e057d8fdfb3b676521f20 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:06:15 +0200 Subject: [PATCH 4/4] 2 typos --- ecal/core/src/pubsub/ecal_subgate.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 2d4cf66af3..6d08f7c306 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -295,7 +295,7 @@ namespace eCAL iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par); } - // we only inform the subscriber when the publisher has already recognized at least on local subscriber + // we only inform the subscriber when the publisher has already recognized at least one local subscriber // this should avoid to set the "IsPublished" state before the publisher is able to send data if (ecal_sample_.topic().connections_loc() > 0) { @@ -349,7 +349,7 @@ namespace eCAL const std::string writer_par = tlayer.par_layer().SerializeAsString(); iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par); } - // we only inform the subscriber when the publisher has already recognized at least on external subscriber + // we only inform the subscriber when the publisher has already recognized at least one external subscriber // this should avoid to set the "IsPublished" state before the publisher is able to send data if (ecal_sample_.topic().connections_ext() > 0) {