Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new internal event functions with ownership handling (ownership used for linux implementation only) #1222

Merged
merged 8 commits into from
Oct 25, 2023
73 changes: 65 additions & 8 deletions ecal/core/src/ecal_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

/**
* @brief eCAL handle helper class - windows platform
* @brief eCAL handle helper class
**/

#include <ecal/ecal.h>
Expand All @@ -35,12 +35,12 @@

#include "ecal_win_main.h"

namespace eCAL
namespace
{
bool gOpenEvent(EventHandleT* event_, const std::string& event_name_)
bool OpenEvent(eCAL::EventHandleT* event_, const std::string& event_name_)
{
if(event_ == nullptr) return(false);
EventHandleT event;
eCAL::EventHandleT event;
event.name = event_name_;
event.handle = ::CreateEvent(nullptr, false, false, event_name_.c_str());
if(event.handle != nullptr)
Expand All @@ -50,6 +50,25 @@ namespace eCAL
}
return(false);
}
}

namespace eCAL
{
bool gOpenNamedEvent(eCAL::EventHandleT* event_, const std::string& event_name_, bool /*ownership_*/)
{
return OpenEvent(event_, event_name_);
}

bool gOpenUnnamedEvent(eCAL::EventHandleT* event_)
{
return OpenEvent(event_, "");
}

// deprecated
bool gOpenEvent(EventHandleT* event_, const std::string& event_name_)
{
return OpenEvent(event_, event_name_);
}

bool gCloseEvent(const EventHandleT& event_)
{
Expand Down Expand Up @@ -302,9 +321,10 @@ namespace eCAL
class CNamedEvent
{
public:
explicit CNamedEvent(const std::string& name_) :
explicit CNamedEvent(const std::string& name_, bool ownership_) :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to change it, but I would like to make a remark. I strongly dislike the bool for ownership vs non-ownership.
This should be two different classed, e.g. OwnedNamedEvent and NonOwnedNamedEvent/WeakNamedEvent.
This has two advantages:
a) To everyone reading the code that is using the events, it's clear from the type, if the event is the owner / creator or not
b) You can use inheritance. You don't need to do stuff like if(m_owner) ... which makes code clearer and more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, shouldn't the owner always create the event? In the code this is not tied together?

Copy link
Contributor Author

@rex-schilasky rex-schilasky Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to "just fix the issue" and introduce as little change as possible. Let's redesign this in a separate issue.

The same for the creation. The behavior is now exact the same as before. The event is created by the first call of OpenEvent independently from the new ownership attribute. The ownership only plays a role in the destruction phase.

m_name(name_ + "_evt"),
m_event(nullptr)
m_event(nullptr),
m_owner(ownership_)
{
m_name = (m_name[0] != '/') ? "/" + m_name : m_name; // make memory file path compatible for all posix systems
m_event = named_event_open(m_name.c_str());
Expand All @@ -318,7 +338,10 @@ namespace eCAL
{
if(m_event == nullptr) return;
named_event_close(m_event);
named_event_destroy(m_name.c_str());
if(m_owner)
{
named_event_destroy(m_name.c_str());
}
}

void set()
Expand Down Expand Up @@ -371,8 +394,42 @@ namespace eCAL

std::string m_name;
named_event_t* m_event;
bool m_owner;
};

bool gOpenNamedEvent(EventHandleT* event_, const std::string& event_name_, bool ownership_)
{
if(event_ == nullptr) return(false);

EventHandleT event;
event.name = event_name_;
event.handle = new CNamedEvent(event.name, ownership_);

if(event.handle != nullptr)
{
*event_ = event;
return true;
}
return false;
}

bool gOpenUnnamedEvent(EventHandleT* event_)
{
if(event_ == nullptr) return(false);

EventHandleT event;
event.name = "";
event.handle = new CEvent();

if(event.handle != nullptr)
{
*event_ = event;
return true;
}
return false;
}

// deprecated
bool gOpenEvent(EventHandleT* event_, const std::string& event_name_)
{
if(event_ == nullptr) return(false);
Expand All @@ -386,7 +443,7 @@ namespace eCAL
}
else
{
event.handle = new CNamedEvent(event.name);
event.handle = new CNamedEvent(event.name, true);
}

if(event.handle != nullptr)
Expand Down
54 changes: 54 additions & 0 deletions ecal/core/src/ecal_event_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

/**
* @file ecal_event_internal.h
* @brief eCAL event interface (internal)
*
* This file will be renamed back to ecal_event.h after removing event API from eCAL's public API.
**/

#pragma once

#include <ecal/ecal_eventhandle.h>

#include <string>

namespace eCAL
{
/**
* @brief Open a named event with ownership.
*
* @param [out] event_ Returned event struct.
* @param event_name_ Event name.
* @param ownership_ Event is owned by the caller and will be destroyed on CloseEvent
*
* @return True if succeeded.
**/
bool gOpenNamedEvent(eCAL::EventHandleT* event_, const std::string& event_name_, bool ownership_);

/**
* @brief Open an unnamed event.
*
* @param [out] event_ Returned event struct.
*
* @return True if succeeded.
**/
bool gOpenUnnamedEvent(eCAL::EventHandleT* event_);
}
3 changes: 2 additions & 1 deletion ecal/core/src/ecal_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <ecal/ecal_event.h>

#include "ecal_event_internal.h"
#include "ecal_thread.h"
#include <thread>

Expand All @@ -41,7 +42,7 @@ namespace eCAL
{
if(m_tdata.is_started) return(0);

gOpenEvent(&m_tdata.event);
gOpenUnnamedEvent(&m_tdata.event);
m_tdata.do_stop = false;
m_tdata.period = period_;
m_tdata.ext_caller = ext_caller_;
Expand Down
3 changes: 2 additions & 1 deletion ecal/core/src/ecal_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ecal/ecal_types.h>

#include "ecal_def.h"
#include "ecal_event_internal.h"
#include "ecal_descgate.h"
#include "ecal_process.h"
#include "ecal_registration_receiver.h"
Expand Down Expand Up @@ -91,7 +92,7 @@ namespace eCAL
{
const std::string event_name = EVENT_SHUTDOWN_PROC + std::string("_") + std::to_string(process_id_);
EventHandleT event;
if (gOpenEvent(&event, event_name))
if (gOpenNamedEvent(&event, event_name, true))
{
std::cout << "Shutdown local eCAL process " << process_id_ << std::endl;
gSetEvent(event);
Expand Down
5 changes: 3 additions & 2 deletions ecal/core/src/io/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
**/

#include "ecal_def.h"
#include "ecal_event_internal.h"
#include "ecal_memfile_pool.h"

#include <chrono>
Expand Down Expand Up @@ -54,8 +55,8 @@ namespace eCAL
if (m_created) return false;

// open memory file events
gOpenEvent(&m_event_snd, memfile_event_);
gOpenEvent(&m_event_ack, memfile_event_ + "_ack");
gOpenNamedEvent(&m_event_snd, memfile_event_, false);
gOpenNamedEvent(&m_event_ack, memfile_event_ + "_ack", false);

// create memory file access
m_memfile.Create(memfile_name_.c_str(), false);
Expand Down
7 changes: 4 additions & 3 deletions ecal/core/src/io/ecal_memfile_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ecal/ecal_event.h>
#include <ecal/ecal_log.h>

#include "ecal_event_internal.h"
#include "ecal_memfile_header.h"
#include "ecal_memfile_naming.h"
#include "ecal_memfile_sync.h"
Expand Down Expand Up @@ -65,8 +66,8 @@ namespace eCAL
if (iter == m_event_handle_map.end())
{
SEventHandlePair event_pair;
gOpenEvent(&event_pair.event_snd, event_snd_name);
gOpenEvent(&event_pair.event_ack, event_ack_name);
gOpenNamedEvent(&event_pair.event_snd, event_snd_name, true);
gOpenNamedEvent(&event_pair.event_ack, event_ack_name, true);
m_event_handle_map.insert(std::pair<std::string, SEventHandlePair>(process_id_, event_pair));
return true;
}
Expand All @@ -77,7 +78,7 @@ namespace eCAL
// event was deactivated by a sync timeout in SendSyncEvents
if (!gEventIsValid(iter->second.event_ack))
{
gOpenEvent(&iter->second.event_ack, event_ack_name);
gOpenNamedEvent(&iter->second.event_ack, event_ack_name, true);
}

// Set the ack event to valid again, so we will wait for the subscriber
Expand Down
3 changes: 2 additions & 1 deletion ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#endif

#include "ecal_def.h"
#include "ecal_event_internal.h"
#include "ecal_descgate.h"

#include "pubsub/ecal_subgate.h"
Expand All @@ -47,7 +48,7 @@ namespace eCAL
static const std::string event_name(EVENT_SHUTDOWN_PROC + std::string("_") + std::to_string(Process::GetProcessID()));
if (!gEventIsValid(evt))
{
gOpenEvent(&evt, event_name);
gOpenNamedEvent(&evt, event_name, true);
}
return(evt);
}
Expand Down
3 changes: 3 additions & 0 deletions ecal/core/src/readwrite/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ namespace eCAL
for (auto& memory_file : m_memory_file_vec)
{
memory_file->Connect(process_id_);
#ifndef NDEBUG
Logging::Log(log_level_debug1, std::string("CDataWriterSHM::AddLocConnection - Memory FileName: ") + memory_file->GetName() + " to ProcessId " + process_id_);
#endif
}
}

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/readwrite/ecal_writer_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace eCAL

bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override;

void AddLocConnection(const std::string& process_id_, const std::string& /*topic_id_*/, const std::string& conn_par_) override;
void AddLocConnection(const std::string& process_id_, const std::string& topic_id_, const std::string& conn_par_) override;

std::string GetConnectionParameter() override;

Expand Down
79 changes: 72 additions & 7 deletions testing/ecal/pubsub_test/src/pubsub_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,10 +869,6 @@ TEST(IO, MultipleSendsUDP)
eCAL::Finalize();
}





#if 0
TEST(IO, ZeroPayloadMessageTCP)
{
Expand Down Expand Up @@ -923,8 +919,6 @@ TEST(IO, ZeroPayloadMessageTCP)
}
#endif

#include <ecal/msg/string/publisher.h>
#include <ecal/msg/string/subscriber.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These includes should be needed but maybe we can move them to the top?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These includes should be needed but maybe we can move them to the top?

They are there and have been there before ..

TEST(IO, DestroyInCallback)
{
/* Test setup :
Expand Down Expand Up @@ -988,4 +982,75 @@ TEST(IO, DestroyInCallback)
// finalize eCAL API
// without destroying any pub / sub
eCAL::Finalize();
}
}

TEST(IO, SubscriberReconnection)
{
/* Test setup :
* publisher runs permanently in a thread
* subscriber start reading
* subscriber gets out of scope (destruction)
* subscriber starts again in a new scope
* Test ensures that subscriber is reconnecting and all sync mechanism are working properly again.
*/

// initialize eCAL API
eCAL::Initialize(0, nullptr, "SubscriberReconnection");

// enable loop back communication in the same thread
eCAL::Util::EnableLoopback(true);

// start publishing thread
std::atomic<bool> stop_publishing(false);
eCAL::string::CPublisher<std::string> pub_foo("foo");
std::thread pub_foo_t([&pub_foo, &stop_publishing]() {
while (!stop_publishing)
{
pub_foo.Send("Hello World");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "Stopped publishing" << std::endl;
});

// scope 1
{
size_t callback_received_count(0);

eCAL::string::CSubscriber<std::string> sub_foo("foo");
auto receive_lambda = [&sub_foo, &callback_received_count](const char* /*topic_*/, const std::string& /*msg*/, long long /*time_*/, long long /*clock_*/, long long /*id_*/) {
std::cout << "Receiving in scope 1" << std::endl;
callback_received_count++;
};
sub_foo.AddReceiveCallback(receive_lambda);

// sleep for 2 seconds, we should receive something
std::this_thread::sleep_for(std::chrono::seconds(2));

EXPECT_TRUE(callback_received_count > 0);
}

// scope 2
{
size_t callback_received_count(0);

eCAL::string::CSubscriber<std::string> sub_foo("foo");
auto receive_lambda = [&sub_foo, &callback_received_count](const char* /*topic_*/, const std::string& /*msg*/, long long /*time_*/, long long /*clock_*/, long long /*id_*/) {
std::cout << "Receiving in scope 2" << std::endl;
callback_received_count++;
};
sub_foo.AddReceiveCallback(receive_lambda);

// sleep for 2 seconds, we should receive something
std::this_thread::sleep_for(std::chrono::seconds(2));

EXPECT_TRUE(callback_received_count > 0);
}

// stop publishing and join thread
stop_publishing = true;
pub_foo_t.join();

// finalize eCAL API
// without destroying any pub / sub
eCAL::Finalize();
}
Loading