From b57db88b5750fb69814bec3d4e8ed6b15957a45c Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Tue, 30 Apr 2024 17:12:44 +0200 Subject: [PATCH] pubsub tests reclustered (shm, udp, tcp) --- ecal/tests/cpp/pubsub_test/CMakeLists.txt | 14 + .../pubsub_test/src/pubsub_acknowledge.cpp | 2 +- .../pubsub_test/src/pubsub_multibuffer.cpp | 2 +- .../tests/cpp/pubsub_test/src/pubsub_test.cpp | 249 ++---------------- .../cpp/pubsub_test/src/pubsub_test_shm.cpp | 151 +++++++++++ .../cpp/pubsub_test/src/pubsub_test_udp.cpp | 151 +++++++++++ 6 files changed, 335 insertions(+), 234 deletions(-) create mode 100644 ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp create mode 100644 ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp diff --git a/ecal/tests/cpp/pubsub_test/CMakeLists.txt b/ecal/tests/cpp/pubsub_test/CMakeLists.txt index 5a9e74effd..2ca37d7c7c 100644 --- a/ecal/tests/cpp/pubsub_test/CMakeLists.txt +++ b/ecal/tests/cpp/pubsub_test/CMakeLists.txt @@ -21,11 +21,25 @@ project(test_pubsub) find_package(Threads REQUIRED) find_package(GTest REQUIRED) +if(ECAL_CORE_TRANSPORT_SHM) + set(pubsub_test_src_shm + src/pubsub_test_shm.cpp + ) +endif() + +if(ECAL_CORE_TRANSPORT_UDP) + set(pubsub_test_src_udp + src/pubsub_test_udp.cpp + ) +endif() + set(pubsub_test_src src/pubsub_acknowledge.cpp src/pubsub_multibuffer.cpp src/pubsub_receive_test.cpp src/pubsub_test.cpp + ${pubsub_test_src_shm} + ${pubsub_test_src_udp} ) ecal_add_gtest(${PROJECT_NAME} ${pubsub_test_src}) diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp index 7d9d385005..3df02ad76f 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp @@ -45,7 +45,7 @@ namespace } // This test asserts that a timeouted acknowledge does not break subsequent calls -TEST(PubSub, TimeoutAcknowledgment) +TEST(core_cpp_pubsub, TimeoutAcknowledgment) { // initialize eCAL API EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "TimeoutAcknowledgment", eCAL::Init::All)); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp index f693da7954..8243cf095c 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp @@ -118,7 +118,7 @@ std::vector multibuffer_pub_sub_test(int buffer_count, bool zero_copy, int return received_content; } -TEST(PubSub, MultibufferPubSub) +TEST(core_cpp_pubsub, MultibufferPubSub) { // initialize eCAL API eCAL::Initialize(0, nullptr, "pubsub_test"); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp index a41669fa86..79587b1b39 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp @@ -30,24 +30,27 @@ #define DATA_FLOW_TIME 50 #define PAYLOAD_SIZE 1024 -// subscriber callback function -std::atomic g_callback_received_bytes; -std::atomic g_callback_received_count; -void OnReceive(const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) +namespace { - g_callback_received_bytes += data_->size; - g_callback_received_count++; -} + // subscriber callback function + std::atomic g_callback_received_bytes; + std::atomic g_callback_received_count; + void OnReceive(const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + g_callback_received_bytes += data_->size; + g_callback_received_count++; + } -static std::string CreatePayLoad(size_t payload_size_) -{ - std::string s = "Hello World "; - while(s.size() < payload_size_) + std::string CreatePayLoad(size_t payload_size_) { - s += s; + std::string s = "Hello World "; + while(s.size() < payload_size_) + { + s += s; + } + s.resize(payload_size_); + return(s); } - s.resize(payload_size_); - return(s); } TEST(core_cpp_pubsub, LeakedPubSub) @@ -506,224 +509,6 @@ TEST(core_cpp_pubsub, DynamicCreate) eCAL::Finalize(); } -TEST(core_cpp_pubsub, ZeroPayloadMessageSHM) -{ - // default send string - const std::string send_s; - - // initialize eCAL API - eCAL::Initialize(0, nullptr, "pubsub_test"); - - // publish / subscribe match in the same process - eCAL::Util::EnableLoopback(true); - - // create subscriber for topic "A" - eCAL::CSubscriber sub("A"); - - // create publisher config - eCAL::CPublisher::Config pub_config; - // set transport layer - pub_config.shm.send_mode = eCAL::TLayer::smode_on; - pub_config.udp.send_mode = eCAL::TLayer::smode_off; - pub_config.tcp.send_mode = eCAL::TLayer::smode_off; - - // create publisher for topic "A" - eCAL::CPublisher pub("A", pub_config); - - // add callback - EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2))); - - // let's match them - eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); - - g_callback_received_bytes = 0; - g_callback_received_count = 0; - - EXPECT_EQ(send_s.size(), pub.Send(send_s)); - eCAL::Process::SleepMS(DATA_FLOW_TIME); - - EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0)); - eCAL::Process::SleepMS(DATA_FLOW_TIME); - - // check callback receive - EXPECT_EQ(send_s.size(), g_callback_received_bytes); - EXPECT_EQ(2, g_callback_received_count); - - // destroy subscriber - sub.Destroy(); - - // destroy publisher - pub.Destroy(); - - // finalize eCAL API - eCAL::Finalize(); -} - -TEST(core_cpp_pubsub, ZeroPayloadMessageUDP) -{ - // default send string - std::string send_s; - - // initialize eCAL API - eCAL::Initialize(0, nullptr, "pubsub_test"); - - // publish / subscribe match in the same process - eCAL::Util::EnableLoopback(true); - - // create subscriber for topic "A" - eCAL::CSubscriber sub("A"); - - // create publisher config - eCAL::CPublisher::Config pub_config; - // set transport layer - pub_config.shm.send_mode = eCAL::TLayer::smode_off; - pub_config.udp.send_mode = eCAL::TLayer::smode_on; - pub_config.tcp.send_mode = eCAL::TLayer::smode_off; - - // create publisher for topic "A" - eCAL::CPublisher pub("A", pub_config); - - // add callback - EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2))); - - // let's match them - eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); - - g_callback_received_bytes = 0; - g_callback_received_count = 0; - - EXPECT_EQ(send_s.size(), pub.Send(send_s)); - eCAL::Process::SleepMS(DATA_FLOW_TIME); - - EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0)); - eCAL::Process::SleepMS(DATA_FLOW_TIME); - - // check callback receive - EXPECT_EQ(send_s.size(), g_callback_received_bytes); - EXPECT_EQ(2, g_callback_received_count); - - // destroy subscriber - sub.Destroy(); - - // destroy publisher - pub.Destroy(); - - // finalize eCAL API - eCAL::Finalize(); -} - -TEST(core_cpp_pubsub, MultipleSendsSHM) -{ - // default send string - std::vector send_vector{ "this", "is", "a", "", "testtest" }; - std::string last_received_msg; - long long last_received_timestamp; - - // initialize eCAL API - eCAL::Initialize(0, nullptr, "pubsub_test"); - - // publish / subscribe match in the same process - eCAL::Util::EnableLoopback(true); - - // create subscriber for topic "A" - eCAL::string::CSubscriber sub("A"); - - // create publisher config - eCAL::CPublisher::Config pub_config; - // set transport layer - pub_config.shm.send_mode = eCAL::TLayer::smode_on; - pub_config.udp.send_mode = eCAL::TLayer::smode_off; - pub_config.tcp.send_mode = eCAL::TLayer::smode_off; - - // create publisher for topic "A" - eCAL::string::CPublisher pub("A", pub_config); - - // add callback - auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/) - { - last_received_msg = msg_; - last_received_timestamp = time_; - }; - EXPECT_TRUE(sub.AddReceiveCallback(save_data)); - - // let's match them - eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); - long long timestamp = 1; - for (const auto& elem : send_vector) - { - pub.Send(elem, timestamp); - eCAL::Process::SleepMS(DATA_FLOW_TIME); - EXPECT_EQ(last_received_msg, elem); - EXPECT_EQ(last_received_timestamp, timestamp); - ++timestamp; - } - - // destroy subscriber - sub.Destroy(); - - // destroy publisher - pub.Destroy(); - - // finalize eCAL API - eCAL::Finalize(); -} - -TEST(core_cpp_pubsub, MultipleSendsUDP) -{ - // default send string - const std::vector send_vector{ "this", "is", "a", "", "testtest" }; - std::string last_received_msg; - long long last_received_timestamp(0); - - // initialize eCAL API - eCAL::Initialize(0, nullptr, "pubsub_test"); - - // publish / subscribe match in the same process - eCAL::Util::EnableLoopback(true); - - // create subscriber for topic "A" - eCAL::string::CSubscriber sub("A"); - - // create publisher config - eCAL::CPublisher::Config pub_config; - // set transport layer - pub_config.shm.send_mode = eCAL::TLayer::smode_off; - pub_config.udp.send_mode = eCAL::TLayer::smode_on; - pub_config.tcp.send_mode = eCAL::TLayer::smode_off; - - // create publisher for topic "A" - eCAL::string::CPublisher pub("A", pub_config); - - // add callback - auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/) - { - last_received_msg = msg_; - last_received_timestamp = time_; - }; - EXPECT_TRUE(sub.AddReceiveCallback(save_data)); - - // let's match them - eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); - long long timestamp = 1; - for (const auto& elem : send_vector) - { - pub.Send(elem, timestamp); - eCAL::Process::SleepMS(DATA_FLOW_TIME); - EXPECT_EQ(last_received_msg, elem); - EXPECT_EQ(last_received_timestamp, timestamp); - ++timestamp; - } - - // destroy subscriber - sub.Destroy(); - - // destroy publisher - pub.Destroy(); - - // finalize eCAL API - eCAL::Finalize(); -} - TEST(core_cpp_pubsub, DestroyInCallback) { /* Test setup : diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp new file mode 100644 index 0000000000..a22a9182f7 --- /dev/null +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp @@ -0,0 +1,151 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2019 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include +#include +#include + +#include +#include + +#include + +#define CMN_REGISTRATION_REFRESH 1000 +#define DATA_FLOW_TIME 50 + +namespace +{ + // subscriber callback function + std::atomic g_callback_received_bytes; + std::atomic g_callback_received_count; + void OnReceive(const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + g_callback_received_bytes += data_->size; + g_callback_received_count++; + } +} + +TEST(core_cpp_pubsub, ZeroPayloadMessageSHM) +{ + // default send string + const std::string send_s; + + // initialize eCAL API + eCAL::Initialize(0, nullptr, "pubsub_test"); + + // publish / subscribe match in the same process + eCAL::Util::EnableLoopback(true); + + // create subscriber for topic "A" + eCAL::CSubscriber sub("A"); + + // create publisher config + eCAL::CPublisher::Config pub_config; + // set transport layer + pub_config.shm.send_mode = eCAL::TLayer::smode_on; + pub_config.udp.send_mode = eCAL::TLayer::smode_off; + pub_config.tcp.send_mode = eCAL::TLayer::smode_off; + + // create publisher for topic "A" + eCAL::CPublisher pub("A", pub_config); + + // add callback + EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2))); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + + g_callback_received_bytes = 0; + g_callback_received_count = 0; + + EXPECT_EQ(send_s.size(), pub.Send(send_s)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + + EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + + // check callback receive + EXPECT_EQ(send_s.size(), g_callback_received_bytes); + EXPECT_EQ(2, g_callback_received_count); + + // destroy subscriber + sub.Destroy(); + + // destroy publisher + pub.Destroy(); + + // finalize eCAL API + eCAL::Finalize(); +} + +TEST(core_cpp_pubsub, MultipleSendsSHM) +{ + // default send string + std::vector send_vector{ "this", "is", "a", "", "testtest" }; + std::string last_received_msg; + long long last_received_timestamp; + + // initialize eCAL API + eCAL::Initialize(0, nullptr, "pubsub_test"); + + // publish / subscribe match in the same process + eCAL::Util::EnableLoopback(true); + + // create subscriber for topic "A" + eCAL::string::CSubscriber sub("A"); + + // create publisher config + eCAL::CPublisher::Config pub_config; + // set transport layer + pub_config.shm.send_mode = eCAL::TLayer::smode_on; + pub_config.udp.send_mode = eCAL::TLayer::smode_off; + pub_config.tcp.send_mode = eCAL::TLayer::smode_off; + + // create publisher for topic "A" + eCAL::string::CPublisher pub("A", pub_config); + + // add callback + auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/) + { + last_received_msg = msg_; + last_received_timestamp = time_; + }; + EXPECT_TRUE(sub.AddReceiveCallback(save_data)); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + long long timestamp = 1; + for (const auto& elem : send_vector) + { + pub.Send(elem, timestamp); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + EXPECT_EQ(last_received_msg, elem); + EXPECT_EQ(last_received_timestamp, timestamp); + ++timestamp; + } + + // destroy subscriber + sub.Destroy(); + + // destroy publisher + pub.Destroy(); + + // finalize eCAL API + eCAL::Finalize(); +} diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp new file mode 100644 index 0000000000..aee65c433e --- /dev/null +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp @@ -0,0 +1,151 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2019 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include +#include +#include + +#include +#include + +#include + +#define CMN_REGISTRATION_REFRESH 1000 +#define DATA_FLOW_TIME 50 + +namespace +{ + // subscriber callback function + std::atomic g_callback_received_bytes; + std::atomic g_callback_received_count; + void OnReceive(const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + g_callback_received_bytes += data_->size; + g_callback_received_count++; + } +} + +TEST(core_cpp_pubsub, ZeroPayloadMessageUDP) +{ + // default send string + std::string send_s; + + // initialize eCAL API + eCAL::Initialize(0, nullptr, "pubsub_test"); + + // publish / subscribe match in the same process + eCAL::Util::EnableLoopback(true); + + // create subscriber for topic "A" + eCAL::CSubscriber sub("A"); + + // create publisher config + eCAL::CPublisher::Config pub_config; + // set transport layer + pub_config.shm.send_mode = eCAL::TLayer::smode_off; + pub_config.udp.send_mode = eCAL::TLayer::smode_on; + pub_config.tcp.send_mode = eCAL::TLayer::smode_off; + + // create publisher for topic "A" + eCAL::CPublisher pub("A", pub_config); + + // add callback + EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2))); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + + g_callback_received_bytes = 0; + g_callback_received_count = 0; + + EXPECT_EQ(send_s.size(), pub.Send(send_s)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + + EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + + // check callback receive + EXPECT_EQ(send_s.size(), g_callback_received_bytes); + EXPECT_EQ(2, g_callback_received_count); + + // destroy subscriber + sub.Destroy(); + + // destroy publisher + pub.Destroy(); + + // finalize eCAL API + eCAL::Finalize(); +} + +TEST(core_cpp_pubsub, MultipleSendsUDP) +{ + // default send string + const std::vector send_vector{ "this", "is", "a", "", "testtest" }; + std::string last_received_msg; + long long last_received_timestamp(0); + + // initialize eCAL API + eCAL::Initialize(0, nullptr, "pubsub_test"); + + // publish / subscribe match in the same process + eCAL::Util::EnableLoopback(true); + + // create subscriber for topic "A" + eCAL::string::CSubscriber sub("A"); + + // create publisher config + eCAL::CPublisher::Config pub_config; + // set transport layer + pub_config.shm.send_mode = eCAL::TLayer::smode_off; + pub_config.udp.send_mode = eCAL::TLayer::smode_on; + pub_config.tcp.send_mode = eCAL::TLayer::smode_off; + + // create publisher for topic "A" + eCAL::string::CPublisher pub("A", pub_config); + + // add callback + auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/) + { + last_received_msg = msg_; + last_received_timestamp = time_; + }; + EXPECT_TRUE(sub.AddReceiveCallback(save_data)); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + long long timestamp = 1; + for (const auto& elem : send_vector) + { + pub.Send(elem, timestamp); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + EXPECT_EQ(last_received_msg, elem); + EXPECT_EQ(last_received_timestamp, timestamp); + ++timestamp; + } + + // destroy subscriber + sub.Destroy(); + + // destroy publisher + pub.Destroy(); + + // finalize eCAL API + eCAL::Finalize(); +}