Skip to content

Commit

Permalink
eCAL Core: Callback after zero length send contains stale message
Browse files Browse the repository at this point in the history
2333d20 introduced a bug when sending zero length messages over SHM - the callback would contain stale non-zero length messages from the previous call. (#1203)
This commit fixes the introduced bug, and adds a testcase for this scenario.
  • Loading branch information
KerstinKeller authored and FlorianReimold committed Oct 27, 2023
1 parent e797af4 commit 24ed1ef
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 7 deletions.
13 changes: 6 additions & 7 deletions ecal/core/src/io/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,18 @@ namespace eCAL
// and close the file immediately
else
{
// need to resize the buffer especially if data_size = 0, otherwise it might contain stale data.
receive_buffer.resize((size_t)mfile_hdr.data_size);

// read payload
// if data length == 0, there is no need to further read data
// we just flag to process the empty buffer
if (mfile_hdr.data_size == 0)
{
post_process_buffer = true;
}
else
if (mfile_hdr.data_size != 0)
{
receive_buffer.resize((size_t)mfile_hdr.data_size);
m_memfile.Read(receive_buffer.data(), (size_t)mfile_hdr.data_size, mfile_hdr.hdr_size);
post_process_buffer = true;
}

post_process_buffer = true;
}

// store clock
Expand Down
112 changes: 112 additions & 0 deletions testing/ecal/pubsub_test/src/pubsub_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/

#include <ecal/ecal.h>
#include <ecal/msg/string/publisher.h>
#include <ecal/msg/string/subscriber.h>

#include <atomic>
#include <thread>
Expand Down Expand Up @@ -761,6 +763,116 @@ TEST(IO, ZeroPayloadMessageUDP)
eCAL::Finalize();
}


TEST(IO, MultipleSendsSHM)
{
// default send string
std::vector<std::string> send_vector{ "this", "is", "a", "", "testtest" };
std::string last_received_msg;
long long last_received_timestamp;

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::string::CSubscriber<std::string> sub("A");

// create publisher for topic "A"
eCAL::string::CPublisher<std::string> pub("A");
pub.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
pub.SetLayerMode(eCAL::TLayer::tlayer_shm, eCAL::TLayer::smode_on);
pub.ShmSetAcknowledgeTimeout(10); // Make sure we receive the data

// add callback
auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/)
{
last_received_msg = msg_;
last_received_timestamp = time_;
};
EXPECT_TRUE(sub.AddReceiveCallback(save_data));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);
long long timestamp = 1;
for (const auto elem : send_vector)
{
pub.Send(elem, timestamp);
eCAL::Process::SleepMS(DATA_FLOW_TIME);
EXPECT_EQ(last_received_msg, elem);
EXPECT_EQ(last_received_timestamp, timestamp);
++timestamp;
}

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}


TEST(IO, MultipleSendsUDP)
{
// default send string
std::vector<std::string> send_vector{ "this", "is", "a", "", "testtest" };
std::string last_received_msg;
long long last_received_timestamp;

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::string::CSubscriber<std::string> sub("A");

// create publisher for topic "A"
eCAL::string::CPublisher<std::string> pub("A");
pub.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
pub.SetLayerMode(eCAL::TLayer::tlayer_udp_mc, eCAL::TLayer::smode_on);
pub.ShmSetAcknowledgeTimeout(10); // Make sure we receive the data

// add callback
auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/)
{
last_received_msg = msg_;
last_received_timestamp = time_;
};
EXPECT_TRUE(sub.AddReceiveCallback(save_data));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);
long long timestamp = 1;
for (const auto elem : send_vector)
{
pub.Send(elem, timestamp);
eCAL::Process::SleepMS(DATA_FLOW_TIME);
EXPECT_EQ(last_received_msg, elem);
EXPECT_EQ(last_received_timestamp, timestamp);
++timestamp;
}

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}





#if 0
TEST(IO, ZeroPayloadMessageTCP)
{
Expand Down

0 comments on commit 24ed1ef

Please sign in to comment.