Skip to content

Commit

Permalink
Core: Fixed ack timeout issue (#1207)
Browse files Browse the repository at this point in the history
* Core: Fixed ack timeout issue

This fixes the issue that 1 SHM Ack timeout destroyed the entire ack timeout feature. Now, the internal events are neither closed nor invalidated on timeout, so they can actually be reused later on. It is stored however that this particular event had timeouted, and it will only be waited on again, after the subscriber requested that via registration layer.

* Added test
  • Loading branch information
FlorianReimold committed Oct 26, 2023
1 parent 7ba32dc commit 6e43100
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 6 deletions.
22 changes: 16 additions & 6 deletions ecal/core/src/io/ecal_memfile_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ namespace eCAL
{
gOpenEvent(&iter->second.event_ack, event_ack_name);
}

// Set the ack event to valid again, so we will wait for the subscriber
iter->second.event_ack_is_invalid = false;

return true;
}
}
Expand Down Expand Up @@ -360,14 +364,20 @@ namespace eCAL
long time_to_wait_ms = static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(time_to_wait).count());
if (time_to_wait_ms <= 0) time_to_wait_ms = 0;

if (event_handle.second.event_ack_is_invalid)
{
// The ack event has timeouted before. Thus, we don't wait for it
// anymore, until the subscriber notifies us via registration layer
// that it is still alive.
continue;
}

if (!gWaitForEvent(event_handle.second.event_ack, time_to_wait_ms))
{
// we close the event immediately to not waste time in the next
// write call, the event will be reopened later
// in ApplyLocSubscription if the connection still exists
gCloseEvent(event_handle.second.event_ack);
// invalidate it
gInvalidateEvent(&event_handle.second.event_ack);
// Remember that this event has timeouted. This will not cause the
// publisher to wait for it anymore, until the subscriber actively
// requests that via registration layer again.
event_handle.second.event_ack_is_invalid = true;
#ifndef NDEBUG
Logging::Log(log_level_debug2, m_base_name + "::CSyncMemoryFile::SignalWritten - ACK event timeout");
#endif
Expand Down
1 change: 1 addition & 0 deletions ecal/core/src/io/ecal_memfile_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ namespace eCAL
{
EventHandleT event_snd;
EventHandleT event_ack;
bool event_ack_is_invalid = false; //!< The ack event has timeouted. Thus, we don't wait for it anymore, until the subscriber notifies us via registration layer that it is still alive.
};
typedef std::unordered_map<std::string, SEventHandlePair> EventHandleMapT;
std::mutex m_event_handle_map_sync;
Expand Down
1 change: 1 addition & 0 deletions testing/ecal/pubsub_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ find_package(Threads REQUIRED)
find_package(GTest REQUIRED)

set(pubsub_test_src
src/pubsub_acknowledge.cpp
src/pubsub_gettopics.cpp
src/pubsub_multibuffer.cpp
src/pubsub_test.cpp
Expand Down
112 changes: 112 additions & 0 deletions testing/ecal/pubsub_test/src/pubsub_acknowledge.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

#include <ecal/ecal.h>
#include <ecal/msg/string/publisher.h>
#include <ecal/msg/string/subscriber.h>
#include <algorithm>
#include <atomic>
#include <thread>
#include <vector>
#include <gtest/gtest.h>
#include <ecal/ecal_monitoring.h>

namespace
{
std::chrono::nanoseconds TimeOperation(std::function<void()> func)
{
auto start = std::chrono::steady_clock::now();
func();
auto end = std::chrono::steady_clock::now();
return end - start;
}

template <typename Rep, typename Period>
void AssertOperationExecutionTimeInRange(std::function<void()> func, std::chrono::duration<Rep, Period> min, std::chrono::duration<Rep, Period> max)
{
auto operation_time = TimeOperation(func);
EXPECT_GE(operation_time.count(), std::chrono::duration_cast<std::chrono::nanoseconds>(min).count()) << "Timed operation less than minimum threshold";
EXPECT_LE(operation_time.count(), std::chrono::duration_cast<std::chrono::nanoseconds>(max).count()) << "Timed operation greater than maximum threshold";
}
}

// This test asserts that a timeouted acknowledge does not break subsequent calls
TEST(Core, TimeoutAcknowledgment)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "TimeoutAcknowledgment", eCAL::Init::All));

// enable loop back communication in the same thread
eCAL::Util::EnableLoopback(true);

eCAL::string::CPublisher<std::string> pub("topic");
pub.ShmSetAcknowledgeTimeout(500);
auto sub1 = std::make_shared< eCAL::string::CSubscriber<std::string>>("topic");
auto sleeper_variable_time = [](const char* /*topic_name_*/, const std::string& msg_, long long /*time_*/, long long /*clock_*/, long long /*id_*/)
{
int sleep = std::stoi(msg_);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
};

sub1->AddReceiveCallback(sleeper_variable_time);

// Registration activities
std::this_thread::sleep_for(std::chrono::seconds(2));

// Regular call with acknowledge should take between 9 and 12 ms.
for (int i = 0; i < 5; ++i)
{
AssertOperationExecutionTimeInRange([&pub]()
{
auto send = pub.Send("100");
EXPECT_TRUE(send);
}
, std::chrono::milliseconds(99)
, std::chrono::milliseconds(120)
);
}

AssertOperationExecutionTimeInRange([&pub]()
{
auto send = pub.Send("600");
EXPECT_TRUE(send);
}
, std::chrono::milliseconds(499)
, std::chrono::milliseconds(550)
);

for (int i = 0; i < 5; ++i)
{
auto now = std::chrono::steady_clock::now();
AssertOperationExecutionTimeInRange([&pub]()
{
auto send = pub.Send("100");
EXPECT_TRUE(send);
}
, std::chrono::milliseconds(0)
, std::chrono::milliseconds(120)
);
std::this_thread::sleep_until(now + std::chrono::milliseconds(200));
}

// finalize eCAL API
// without destroying any pub / sub
EXPECT_EQ(0, eCAL::Finalize());

}

0 comments on commit 6e43100

Please sign in to comment.