Skip to content

Commit

Permalink
[core] refactor shm registration.
Browse files Browse the repository at this point in the history
  • Loading branch information
KerstinKeller committed Jul 17, 2024
1 parent 4981561 commit 788ab7f
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 58 deletions.
13 changes: 5 additions & 8 deletions ecal/core/src/registration/ecal_registration_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include "registration/ecal_registration_receiver.h"

#include "registration/ecal_registration_receiver_udp.h"
#if ECAL_CORE_REGISTRATION_SHM
#include "registration/ecal_registration_receiver_shm.h"
#endif
#include "ecal_global_accessors.h"

#include "pubsub/ecal_subgate.h"
Expand Down Expand Up @@ -83,11 +86,7 @@ namespace eCAL
#if ECAL_CORE_REGISTRATION_SHM
if (m_use_registration_shm)
{
m_memfile_broadcast.Create(Config::Experimental::GetShmMonitoringDomain(), Config::Experimental::GetShmMonitoringQueueSize());
m_memfile_broadcast.FlushLocalEventQueue();
m_memfile_broadcast_reader.Bind(&m_memfile_broadcast);

m_memfile_reg_rcv.Create(&m_memfile_broadcast_reader);
m_registration_receiver_shm = std::make_unique<CRegistrationReceiverSHM>([this](const Registration::Sample& sample_) {return this->ApplySample(sample_); });
}
#endif

Expand All @@ -107,9 +106,7 @@ namespace eCAL
#if ECAL_CORE_REGISTRATION_SHM
if (m_use_registration_shm)
{
// stop memfile registration receive thread and unbind reader
m_memfile_broadcast_reader.Unbind();
m_memfile_broadcast.Destroy();
m_registration_receiver_shm = nullptr;
}
#endif

Expand Down
11 changes: 2 additions & 9 deletions ecal/core/src/registration/ecal_registration_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@

#include "serialization/ecal_struct_sample_registration.h"

#if ECAL_CORE_REGISTRATION_SHM
#include "ecal_registration_receiver_shm.h"
#endif

#include <atomic>
#include <functional>
#include <map>
Expand All @@ -47,6 +43,7 @@
namespace eCAL
{
class CRegistrationReceiverUDP;
class CRegistrationReceiverSHM;

class CRegistrationReceiver
{
Expand Down Expand Up @@ -85,12 +82,8 @@ namespace eCAL
RegistrationCallbackT m_callback_process;

std::unique_ptr<CRegistrationReceiverUDP> m_registration_receiver_udp;

#if ECAL_CORE_REGISTRATION_SHM
CMemoryFileBroadcast m_memfile_broadcast;
CMemoryFileBroadcastReader m_memfile_broadcast_reader;

CMemfileRegistrationReceiver m_memfile_reg_rcv;
std::unique_ptr<CRegistrationReceiverSHM> m_registration_receiver_shm;
#endif

bool m_use_registration_udp;
Expand Down
47 changes: 24 additions & 23 deletions ecal/core/src/registration/ecal_registration_receiver_shm.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,45 +29,46 @@

#include "ecal_registration_receiver_shm.h"
#include "serialization/ecal_serialize_sample_registration.h"
#include <chrono>
#include <functional>
#include <memory>

#include "shm/ecal_memfile_broadcast.h"
#include "shm/ecal_memfile_broadcast_reader.h"
#include "util/ecal_thread.h"

namespace eCAL
{
//////////////////////////////////////////////////////////////////
// CMemfileRegistrationReceiver
//////////////////////////////////////////////////////////////////

CMemfileRegistrationReceiver::~CMemfileRegistrationReceiver()
{
Destroy();
}

void CMemfileRegistrationReceiver::Create(eCAL::CMemoryFileBroadcastReader* memfile_broadcast_reader_)
CRegistrationReceiverSHM::CRegistrationReceiverSHM(RegistrationApplySampleCallbackT apply_sample_callback)
: m_apply_sample_callback(apply_sample_callback)
{
if (m_created) return;
m_memfile_broadcast = std::make_unique<CMemoryFileBroadcast>();
m_memfile_broadcast->Create(Config::Experimental::GetShmMonitoringDomain(), Config::Experimental::GetShmMonitoringQueueSize());
m_memfile_broadcast->FlushLocalEventQueue();

// start memfile broadcast receive thread
m_memfile_broadcast_reader = memfile_broadcast_reader_;
m_memfile_broadcast_reader_thread = std::make_shared<CCallbackThread>(std::bind(&CMemfileRegistrationReceiver::Receive, this));
m_memfile_broadcast_reader_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()/2));
m_memfile_broadcast_reader = std::make_unique<CMemoryFileBroadcastReader>();
// This is a bit unclean to take the raw adress of the reader here.
m_memfile_broadcast_reader->Bind(m_memfile_broadcast.get());

m_created = true;
m_memfile_broadcast_reader_thread = std::make_unique<CCallbackThread>(std::bind(&CRegistrationReceiverSHM::Receive, this));
m_memfile_broadcast_reader_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs() / 2));
}

void CMemfileRegistrationReceiver::Destroy()
CRegistrationReceiverSHM::~CRegistrationReceiverSHM()
{
if (!m_created) return;

// stop memfile broadcast receive thread
m_memfile_broadcast_reader_thread->stop();
m_memfile_broadcast_reader_thread = nullptr;

// stop memfile registration receive thread and unbind reader
m_memfile_broadcast_reader->Unbind();
m_memfile_broadcast_reader = nullptr;

m_created = false;
m_memfile_broadcast->Destroy();
m_memfile_broadcast = nullptr;
}

void CMemfileRegistrationReceiver::Receive()
void CRegistrationReceiverSHM::Receive()
{
MemfileBroadcastMessageListT message_list;
if (m_memfile_broadcast_reader->Read(message_list, 0))
Expand All @@ -79,7 +80,7 @@ namespace eCAL
{
for (const auto& sample : sample_list.samples)
{
if (g_registration_receiver() != nullptr) g_registration_receiver()->ApplySample(sample);
m_apply_sample_callback(sample);
}
}
}
Expand Down
35 changes: 17 additions & 18 deletions ecal/core/src/registration/ecal_registration_receiver_shm.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -27,38 +27,37 @@

#pragma once

#include "shm/ecal_memfile_broadcast.h"
#include "shm/ecal_memfile_broadcast_reader.h"

#include "util/ecal_thread.h"
#include <memory>
#include <registration/ecal_registration_types.h>

namespace eCAL
{
class CMemfileRegistrationReceiver
class CCallbackThread;
class CMemoryFileBroadcast;
class CMemoryFileBroadcastReader;

class CRegistrationReceiverSHM
{
public:
CMemfileRegistrationReceiver() = default;
~CMemfileRegistrationReceiver();
CRegistrationReceiverSHM(RegistrationApplySampleCallbackT apply_sample_callback);
~CRegistrationReceiverSHM();

// default copy constructor
CMemfileRegistrationReceiver(const CMemfileRegistrationReceiver& other) = delete;
CRegistrationReceiverSHM(const CRegistrationReceiverSHM& other) = delete;
// default copy assignment operator
CMemfileRegistrationReceiver& operator=(const CMemfileRegistrationReceiver& other) = delete;
CRegistrationReceiverSHM& operator=(const CRegistrationReceiverSHM& other) = delete;
// default move constructor
CMemfileRegistrationReceiver(CMemfileRegistrationReceiver&& other) noexcept = delete;
CRegistrationReceiverSHM(CRegistrationReceiverSHM&& other) noexcept = delete;
// default move assignment operator
CMemfileRegistrationReceiver& operator=(CMemfileRegistrationReceiver&& other) noexcept = delete;

void Create(CMemoryFileBroadcastReader* memfile_broadcast_reader_);
void Destroy();
CRegistrationReceiverSHM& operator=(CRegistrationReceiverSHM&& other) noexcept = delete;

private:
void Receive();

CMemoryFileBroadcastReader* m_memfile_broadcast_reader = nullptr;
std::shared_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;
std::unique_ptr<CMemoryFileBroadcast> m_memfile_broadcast;
std::unique_ptr<CMemoryFileBroadcastReader> m_memfile_broadcast_reader;
std::unique_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;

bool m_created = false;
RegistrationApplySampleCallbackT m_apply_sample_callback;
};
}

0 comments on commit 788ab7f

Please sign in to comment.