Skip to content

Commit

Permalink
eCAL Core: Callback after zero length send contains stale message
Browse files Browse the repository at this point in the history
2333d20 introduced a bug when sending zero length messages over SHM - the callback would contain stale non-zero length messages from the previous call. (#1203)
This commit fixes the introduced bug, and adds a testcase for this scenario.
  • Loading branch information
KerstinKeller authored and FlorianReimold committed Oct 27, 2023
1 parent e16950f commit f0d7b24
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 9 deletions.
4 changes: 2 additions & 2 deletions ecal/core/include/ecal/msg/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ namespace eCAL
// counting and frequency calculation for the monitoring layer
if (!IsSubscribed())
{
return(CPublisher::Send(nullptr, 0));
return(CPublisher::Send(nullptr, 0, time_));
}

// if we have a subscription allocate memory for the
Expand All @@ -143,7 +143,7 @@ namespace eCAL
else
{
// see !IsSubscribed()
return(CPublisher::Send(nullptr, 0));
return(CPublisher::Send(nullptr, 0, time_));
}
return(0);
}
Expand Down
13 changes: 6 additions & 7 deletions ecal/core/src/io/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,18 @@ namespace eCAL
// and close the file immediately
else
{
// need to resize the buffer especially if data_size = 0, otherwise it might contain stale data.
m_ecal_buffer.resize((size_t)mfile_hdr.data_size);

// read payload
// if data length == 0, there is no need to further read data
// we just flag to process the empty buffer
if (mfile_hdr.data_size == 0)
{
post_process_buffer = true;
}
else
if (mfile_hdr.data_size != 0)
{
m_ecal_buffer.resize((size_t)mfile_hdr.data_size);
m_memfile.Read(m_ecal_buffer.data(), (size_t)mfile_hdr.data_size, mfile_hdr.hdr_size);
post_process_buffer = true;
}

post_process_buffer = true;
}

// store clock
Expand Down
112 changes: 112 additions & 0 deletions testing/ecal/pubsub_test/src/pubsub_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/

#include <ecal/ecal.h>
#include <ecal/msg/string/publisher.h>
#include <ecal/msg/string/subscriber.h>

#include <atomic>

Expand Down Expand Up @@ -694,6 +696,116 @@ TEST(IO, ZeroPayloadMessageUDP)
eCAL::Finalize();
}


TEST(IO, MultipleSendsSHM)
{
// default send string
std::vector<std::string> send_vector{ "this", "is", "a", "", "testtest" };
std::string last_received_msg;
long long last_received_timestamp;

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::string::CSubscriber<std::string> sub("A");

// create publisher for topic "A"
eCAL::string::CPublisher<std::string> pub("A");
pub.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
pub.SetLayerMode(eCAL::TLayer::tlayer_shm, eCAL::TLayer::smode_on);
pub.ShmSetAcknowledgeTimeout(10); // Make sure we receive the data

// add callback
auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/)
{
last_received_msg = msg_;
last_received_timestamp = time_;
};
EXPECT_TRUE(sub.AddReceiveCallback(save_data));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);
long long timestamp = 1;
for (const auto elem : send_vector)
{
pub.Send(elem, timestamp);
eCAL::Process::SleepMS(DATA_FLOW_TIME);
EXPECT_EQ(last_received_msg, elem);
EXPECT_EQ(last_received_timestamp, timestamp);
++timestamp;
}

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}


TEST(IO, MultipleSendsUDP)
{
// default send string
std::vector<std::string> send_vector{ "this", "is", "a", "", "testtest" };
std::string last_received_msg;
long long last_received_timestamp;

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::string::CSubscriber<std::string> sub("A");

// create publisher for topic "A"
eCAL::string::CPublisher<std::string> pub("A");
pub.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
pub.SetLayerMode(eCAL::TLayer::tlayer_udp_mc, eCAL::TLayer::smode_on);
pub.ShmSetAcknowledgeTimeout(10); // Make sure we receive the data

// add callback
auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/)
{
last_received_msg = msg_;
last_received_timestamp = time_;
};
EXPECT_TRUE(sub.AddReceiveCallback(save_data));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);
long long timestamp = 1;
for (const auto elem : send_vector)
{
pub.Send(elem, timestamp);
eCAL::Process::SleepMS(DATA_FLOW_TIME);
EXPECT_EQ(last_received_msg, elem);
EXPECT_EQ(last_received_timestamp, timestamp);
++timestamp;
}

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}





#if 0
TEST(IO, ZeroPayloadMessageTCP)
{
Expand Down

0 comments on commit f0d7b24

Please sign in to comment.