Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/registration receiver logic #1662

Merged
merged 4 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,17 @@ if (ECAL_CORE_REGISTRATION)
src/registration/ecal_registration_receiver.cpp
src/registration/ecal_registration_receiver.h
src/registration/ecal_registration_sender.h
src/registration/ecal_registration_sender_udp.cpp
src/registration/ecal_registration_sender_udp.h
src/registration/udp/ecal_registration_receiver_udp.cpp
src/registration/udp/ecal_registration_receiver_udp.h
src/registration/udp/ecal_registration_sender_udp.cpp
src/registration/udp/ecal_registration_sender_udp.h
)
if(ECAL_CORE_REGISTRATION_SHM)
list(APPEND ecal_registration_src
src/registration/ecal_registration_receiver_shm.cpp
src/registration/ecal_registration_receiver_shm.h
src/registration/ecal_registration_sender_shm.cpp
src/registration/ecal_registration_sender_shm.h
src/registration/shm/ecal_registration_receiver_shm.cpp
src/registration/shm/ecal_registration_receiver_shm.h
src/registration/shm/ecal_registration_sender_shm.cpp
src/registration/shm/ecal_registration_sender_shm.h
src/registration/shm/ecal_memfile_broadcast.cpp
src/registration/shm/ecal_memfile_broadcast.h
src/registration/shm/ecal_memfile_broadcast_reader.cpp
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/registration/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
#include "ecal_def.h"

#include <registration/ecal_process_registration.h>
#include <registration/ecal_registration_sender_udp.h>
#include <registration/udp/ecal_registration_sender_udp.h>
#if ECAL_CORE_REGISTRATION_SHM
#include <registration/ecal_registration_sender_shm.h>
#include <registration/shm/ecal_registration_sender_shm.h>
#endif

namespace eCAL
Expand Down
43 changes: 10 additions & 33 deletions ecal/core/src/registration/ecal_registration_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
*
**/

#include "ecal_registration_receiver.h"
#include "registration/ecal_registration_receiver.h"

#include "registration/udp/ecal_registration_receiver_udp.h"
#if ECAL_CORE_REGISTRATION_SHM
#include "registration/shm/ecal_registration_receiver_shm.h"
#endif
#include "ecal_global_accessors.h"

#include "pubsub/ecal_subgate.h"
Expand Down Expand Up @@ -75,26 +80,13 @@ namespace eCAL

if (m_use_registration_udp)
{
// set network attributes
eCAL::UDP::SReceiverAttr attr;
attr.address = UDP::GetRegistrationAddress();
attr.port = UDP::GetRegistrationPort();
attr.broadcast = UDP::IsBroadcast();
attr.loopback = true;
attr.rcvbuf = Config::GetUdpMulticastRcvBufSizeBytes();

// start registration sample receiver
m_registration_receiver = std::make_shared<UDP::CSampleReceiver>(attr, std::bind(&CRegistrationReceiver::HasSample, this, std::placeholders::_1), std::bind(&CRegistrationReceiver::ApplySerializedSample, this, std::placeholders::_1, std::placeholders::_2));
m_registration_receiver_udp = std::make_unique<CRegistrationReceiverUDP>([this](const Registration::Sample& sample_) {return this->ApplySample(sample_); });
}

#if ECAL_CORE_REGISTRATION_SHM
if (m_use_registration_shm)
{
m_memfile_broadcast.Create(Config::Experimental::GetShmMonitoringDomain(), Config::Experimental::GetShmMonitoringQueueSize());
m_memfile_broadcast.FlushLocalEventQueue();
m_memfile_broadcast_reader.Bind(&m_memfile_broadcast);

m_memfile_reg_rcv.Create(&m_memfile_broadcast_reader);
m_registration_receiver_shm = std::make_unique<CRegistrationReceiverSHM>([this](const Registration::Sample& sample_) {return this->ApplySample(sample_); });
}
#endif

Expand All @@ -105,21 +97,16 @@ namespace eCAL
{
if(!m_created) return;

// stop network registration receive thread
m_registration_receiver = nullptr;

// stop network registration receive thread
if (m_use_registration_udp)
{
m_registration_receiver = nullptr;
m_registration_receiver_udp = nullptr;
}

#if ECAL_CORE_REGISTRATION_SHM
if (m_use_registration_shm)
{
// stop memfile registration receive thread and unbind reader
m_memfile_broadcast_reader.Unbind();
m_memfile_broadcast.Destroy();
m_registration_receiver_shm = nullptr;
}
#endif

Expand All @@ -139,16 +126,6 @@ namespace eCAL
m_loopback = state_;
}

bool CRegistrationReceiver::ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_)
{
if(!m_created) return false;

Registration::Sample sample;
if (!DeserializeFromBuffer(serialized_sample_data_, serialized_sample_size_, sample)) return false;

return ApplySample(sample);
}

bool CRegistrationReceiver::ApplySample(const Registration::Sample& sample_)
{
if (!m_created) return false;
Expand Down
19 changes: 5 additions & 14 deletions ecal/core/src/registration/ecal_registration_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@
#include <cstddef>
#include <ecal/ecal.h>

#include "io/udp/ecal_udp_sample_receiver.h"
#include "serialization/ecal_struct_sample_registration.h"

#if ECAL_CORE_REGISTRATION_SHM
#include "ecal_registration_receiver_shm.h"
#endif

#include <atomic>
#include <functional>
#include <map>
Expand All @@ -47,6 +42,9 @@

namespace eCAL
{
class CRegistrationReceiverUDP;
class CRegistrationReceiverSHM;

class CRegistrationReceiver
{
public:
Expand All @@ -58,7 +56,6 @@ namespace eCAL

void EnableLoopback(bool state_);

bool HasSample(const std::string& /*sample_name_*/) { return(true); };
bool ApplySample(const Registration::Sample& sample_);

bool AddRegistrationCallback(enum eCAL_Registration_Event event_, const RegistrationCallbackT& callback_);
Expand All @@ -69,8 +66,6 @@ namespace eCAL
void RemCustomApplySampleCallback(const std::string& customer_);

protected:
bool ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_);

void ApplySubscriberRegistration(const eCAL::Registration::Sample& sample_);
void ApplyPublisherRegistration(const eCAL::Registration::Sample& sample_);

Expand All @@ -85,14 +80,10 @@ namespace eCAL
RegistrationCallbackT m_callback_service;
RegistrationCallbackT m_callback_client;
RegistrationCallbackT m_callback_process;

std::shared_ptr<UDP::CSampleReceiver> m_registration_receiver;

std::unique_ptr<CRegistrationReceiverUDP> m_registration_receiver_udp;
#if ECAL_CORE_REGISTRATION_SHM
CMemoryFileBroadcast m_memfile_broadcast;
CMemoryFileBroadcastReader m_memfile_broadcast_reader;

CMemfileRegistrationReceiver m_memfile_reg_rcv;
std::unique_ptr<CRegistrationReceiverSHM> m_registration_receiver_shm;
#endif

bool m_use_registration_udp;
Expand Down
33 changes: 33 additions & 0 deletions ecal/core/src/registration/ecal_registration_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

#pragma once

#include <functional>
#include <serialization/ecal_struct_sample_registration.h>

namespace eCAL {
/**
* @brief Apply sample callback type.
*
* @param sample_ The sample protocol buffer registration payload buffer.
* @param sample_size_ The payload buffer size.
**/
using RegistrationApplySampleCallbackT = std::function<bool(const Registration::Sample&)>;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -27,47 +27,48 @@

#include "ecal_globals.h"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if include is necessary / instead include config?


#include "ecal_registration_receiver_shm.h"
#include "registration/shm/ecal_registration_receiver_shm.h"
#include "serialization/ecal_serialize_sample_registration.h"
#include <chrono>
#include <functional>
#include <memory>

#include "registration/shm/ecal_memfile_broadcast.h"
#include "registration/shm/ecal_memfile_broadcast_reader.h"
#include "util/ecal_thread.h"

namespace eCAL
{
//////////////////////////////////////////////////////////////////
// CMemfileRegistrationReceiver
//////////////////////////////////////////////////////////////////

CMemfileRegistrationReceiver::~CMemfileRegistrationReceiver()
{
Destroy();
}

void CMemfileRegistrationReceiver::Create(eCAL::CMemoryFileBroadcastReader* memfile_broadcast_reader_)
CRegistrationReceiverSHM::CRegistrationReceiverSHM(RegistrationApplySampleCallbackT apply_sample_callback)
: m_apply_sample_callback(apply_sample_callback)
{
if (m_created) return;
m_memfile_broadcast = std::make_unique<CMemoryFileBroadcast>();
m_memfile_broadcast->Create(Config::Experimental::GetShmMonitoringDomain(), Config::Experimental::GetShmMonitoringQueueSize());
m_memfile_broadcast->FlushLocalEventQueue();

// start memfile broadcast receive thread
m_memfile_broadcast_reader = memfile_broadcast_reader_;
m_memfile_broadcast_reader_thread = std::make_shared<CCallbackThread>(std::bind(&CMemfileRegistrationReceiver::Receive, this));
m_memfile_broadcast_reader_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()/2));
m_memfile_broadcast_reader = std::make_unique<CMemoryFileBroadcastReader>();
// This is a bit unclean to take the raw adress of the reader here.
m_memfile_broadcast_reader->Bind(m_memfile_broadcast.get());

m_created = true;
m_memfile_broadcast_reader_thread = std::make_unique<CCallbackThread>(std::bind(&CRegistrationReceiverSHM::Receive, this));
m_memfile_broadcast_reader_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs() / 2));
}

void CMemfileRegistrationReceiver::Destroy()
CRegistrationReceiverSHM::~CRegistrationReceiverSHM()
{
if (!m_created) return;

// stop memfile broadcast receive thread
m_memfile_broadcast_reader_thread->stop();
m_memfile_broadcast_reader_thread = nullptr;

// stop memfile registration receive thread and unbind reader
m_memfile_broadcast_reader->Unbind();
m_memfile_broadcast_reader = nullptr;

m_created = false;
m_memfile_broadcast->Destroy();
m_memfile_broadcast = nullptr;
}

void CMemfileRegistrationReceiver::Receive()
void CRegistrationReceiverSHM::Receive()
{
MemfileBroadcastMessageListT message_list;
if (m_memfile_broadcast_reader->Read(message_list, 0))
Expand All @@ -79,7 +80,7 @@ namespace eCAL
{
for (const auto& sample : sample_list.samples)
{
if (g_registration_receiver() != nullptr) g_registration_receiver()->ApplySample(sample);
m_apply_sample_callback(sample);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -27,38 +27,37 @@

#pragma once

#include "shm/ecal_memfile_broadcast.h"
#include "shm/ecal_memfile_broadcast_reader.h"

#include "util/ecal_thread.h"
#include <memory>
#include <registration/ecal_registration_types.h>

namespace eCAL
{
class CMemfileRegistrationReceiver
class CCallbackThread;
class CMemoryFileBroadcast;
class CMemoryFileBroadcastReader;

class CRegistrationReceiverSHM
{
public:
CMemfileRegistrationReceiver() = default;
~CMemfileRegistrationReceiver();
CRegistrationReceiverSHM(RegistrationApplySampleCallbackT apply_sample_callback);
~CRegistrationReceiverSHM();

// default copy constructor
CMemfileRegistrationReceiver(const CMemfileRegistrationReceiver& other) = delete;
CRegistrationReceiverSHM(const CRegistrationReceiverSHM& other) = delete;
// default copy assignment operator
CMemfileRegistrationReceiver& operator=(const CMemfileRegistrationReceiver& other) = delete;
CRegistrationReceiverSHM& operator=(const CRegistrationReceiverSHM& other) = delete;
// default move constructor
CMemfileRegistrationReceiver(CMemfileRegistrationReceiver&& other) noexcept = delete;
CRegistrationReceiverSHM(CRegistrationReceiverSHM&& other) noexcept = delete;
// default move assignment operator
CMemfileRegistrationReceiver& operator=(CMemfileRegistrationReceiver&& other) noexcept = delete;

void Create(CMemoryFileBroadcastReader* memfile_broadcast_reader_);
void Destroy();
CRegistrationReceiverSHM& operator=(CRegistrationReceiverSHM&& other) noexcept = delete;

private:
void Receive();

CMemoryFileBroadcastReader* m_memfile_broadcast_reader = nullptr;
std::shared_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;
std::unique_ptr<CMemoryFileBroadcast> m_memfile_broadcast;
std::unique_ptr<CMemoryFileBroadcastReader> m_memfile_broadcast_reader;
std::unique_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;

bool m_created = false;
RegistrationApplySampleCallbackT m_apply_sample_callback;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
**/

#include "registration/ecal_registration_sender_shm.h"
#include "registration/shm/ecal_registration_sender_shm.h"
#include "serialization/ecal_serialize_sample_registration.h"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

#include "registration/ecal_registration_sender.h"

#include "shm/ecal_memfile_broadcast.h"
#include "shm/ecal_memfile_broadcast_writer.h"
#include "registration/shm/ecal_memfile_broadcast.h"
#include "registration/shm/ecal_memfile_broadcast_writer.h"

namespace eCAL
{
Expand Down
Loading
Loading