Skip to content

Commit

Permalink
Stop function introduced (former Destroy) to shutdown (publisher, sub…
Browse files Browse the repository at this point in the history
…scriber, client, server)

Create/Destroy -> Start/Stop
  • Loading branch information
rex-schilasky committed May 10, 2024
1 parent c2970dd commit 1190753
Show file tree
Hide file tree
Showing 24 changed files with 133 additions and 136 deletions.
7 changes: 3 additions & 4 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,8 @@ namespace eCAL

protected:
// class members
std::shared_ptr<CDataWriter> m_datawriter;
long long m_id;
bool m_created;
bool m_initialized;
std::shared_ptr<CDataWriter> m_datawriter;
long long m_id;
bool m_created;
};
}
5 changes: 2 additions & 3 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ namespace eCAL

protected:
// class members
std::shared_ptr<CDataReader> m_datareader;
bool m_created;
bool m_initialized;
std::shared_ptr<CDataReader> m_datareader;
bool m_created;
};
}
6 changes: 3 additions & 3 deletions ecal/core/src/ecal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,16 @@ namespace eCAL
/**
* @brief Finalize eCAL API.
*
* @param components_ Defines which component to finalize.
* @param components_ deprecated.
*
* @return Zero if succeeded, 1 if already finalized, -1 if failed.
**/
int Finalize(unsigned int components_)
int Finalize(unsigned int /*components_*/)
{
if (g_globals_ctx == nullptr) return 1;
g_globals_ctx_ref_cnt--;
if (g_globals_ctx_ref_cnt > 0) return 0;
int const ret = g_globals()->Finalize(components_);
int const ret = g_globals()->Finalize();
delete g_globals_ctx;
g_globals_ctx = nullptr;
return(ret);
Expand Down
16 changes: 8 additions & 8 deletions ecal/core/src/ecal_globals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace eCAL

CGlobals::~CGlobals()
{
Finalize(Init::All);
Finalize();
}

int CGlobals::Initialize(unsigned int components_, std::vector<std::string>* config_keys_ /*= nullptr*/)
Expand Down Expand Up @@ -245,14 +245,14 @@ namespace eCAL
if (memfile_pool_instance) memfile_pool_instance->Create();
#endif
#if ECAL_CORE_SUBSCRIBER
if (subgate_instance && ((components_ & Init::Subscriber) != 0u)) subgate_instance->Create();
if (subgate_instance && ((components_ & Init::Subscriber) != 0u)) subgate_instance->Start();
#endif
#if ECAL_CORE_PUBLISHER
if (pubgate_instance && ((components_ & Init::Publisher) != 0u)) pubgate_instance->Create();
#endif
#if ECAL_CORE_SERVICE
if (servicegate_instance && ((components_ & Init::Service) != 0u)) servicegate_instance->Create();
if (clientgate_instance && ((components_ & Init::Service) != 0u)) clientgate_instance->Create();
if (servicegate_instance && ((components_ & Init::Service) != 0u)) servicegate_instance->Start();
if (clientgate_instance && ((components_ & Init::Service) != 0u)) clientgate_instance->Start();
#endif
#if ECAL_CORE_TIMEPLUGIN
if (timegate_instance && ((components_ & Init::TimeSync) != 0u)) timegate_instance->Create(CTimeGate::eTimeSyncMode::realtime);
Expand Down Expand Up @@ -305,7 +305,7 @@ namespace eCAL
}
}

int CGlobals::Finalize(unsigned int /*components_*/)
int CGlobals::Finalize()
{
if (!initialized) return(1);

Expand All @@ -323,14 +323,14 @@ namespace eCAL
// raw pointers to the gate's functions, so we must make sure that everything
// has been executed, before we delete the gates.
eCAL::service::ServiceManager::instance()->stop();
if (clientgate_instance) clientgate_instance->Destroy();
if (servicegate_instance) servicegate_instance->Destroy();
if (clientgate_instance) clientgate_instance->Stop();
if (servicegate_instance) servicegate_instance->Stop();
#endif
#if ECAL_CORE_PUBLISHER
if (pubgate_instance) pubgate_instance->Destroy();
#endif
#if ECAL_CORE_SUBSCRIBER
if (subgate_instance) subgate_instance->Destroy();
if (subgate_instance) subgate_instance->Stop();
#endif
if (descgate_instance)
{
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/ecal_globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace eCAL

unsigned int GetComponents() const { return(components); };

int Finalize(unsigned int components_);
int Finalize();

const std::unique_ptr<CConfig>& config() { return config_instance; };
const std::unique_ptr<CLog>& log() { return log_instance; };
Expand Down
6 changes: 5 additions & 1 deletion ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ namespace eCAL
{
if(!m_created) return;

// destroy all remaining publisher
// stop & destroy all remaining publisher
const std::unique_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
for (const auto& datawriter : m_topic_name_datawriter_map)
{
datawriter.second->Stop();
}
m_topic_name_datawriter_map.clear();

m_created = false;
Expand Down
54 changes: 16 additions & 38 deletions ecal/core/src/pubsub/ecal_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ namespace eCAL
CPublisher::CPublisher() :
m_datawriter(nullptr),
m_id(0),
m_created(false),
m_initialized(false)
m_created(false)
{
}

Expand All @@ -64,11 +63,9 @@ namespace eCAL
CPublisher::CPublisher(CPublisher&& rhs) noexcept :
m_datawriter(std::move(rhs.m_datawriter)),
m_id(rhs.m_id),
m_created(rhs.m_created),
m_initialized(rhs.m_initialized)
m_created(rhs.m_created)
{
rhs.m_created = false;
rhs.m_initialized = false;
rhs.m_created = false;
}

/**
Expand All @@ -79,39 +76,28 @@ namespace eCAL
// Call destroy, to clean up the current state, then afterwards move all elements
Destroy();

m_datawriter = std::move(rhs.m_datawriter);
m_id = rhs.m_id;
m_created = rhs.m_created;
m_initialized = rhs.m_initialized;
m_datawriter = std::move(rhs.m_datawriter);
m_id = rhs.m_id;
m_created = rhs.m_created;

rhs.m_created = false;
rhs.m_initialized = false;
rhs.m_created = false;

return *this;
}

bool CPublisher::Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_)
{
if (m_created) return(false);
if (topic_name_.empty()) return(false);
if (g_globals() == nullptr) return(false);

// initialize globals
if (g_globals()->IsInitialized(Init::Publisher) == 0)
{
g_globals()->Initialize(Init::Publisher);
m_initialized = true;
}

// create data writer
if (m_created) return(false);
if (topic_name_.empty()) return(false);

// create datawriter
m_datawriter = std::make_shared<CDataWriter>(topic_name_, data_type_info_, config_);

// register publisher gateway (for publisher memory file and event name)
// register datawriter
g_pubgate()->Register(topic_name_, m_datawriter);

// we made it :-)
m_created = true;

return(m_created);
}

Expand All @@ -122,30 +108,22 @@ namespace eCAL

bool CPublisher::Destroy()
{
if(!m_created) return(false);
if(g_globals() == nullptr) return(false);
if(!m_created) return(false);

// unregister data writer
// unregister datawriter
if(g_pubgate() != nullptr) g_pubgate()->Unregister(m_datawriter->GetTopicName(), m_datawriter);
#ifndef NDEBUG
// log it
if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(m_datawriter->GetTopicName() + "::CPublisher::Destroy"));
#endif

// destroy datawriter
// stop & destroy datawriter
m_datawriter->Stop();
m_datawriter.reset();

// we made it :-)
m_created = false;

// if we initialize the globals then we finalize
// here to decrease reference counter
if (m_initialized)
{
g_globals()->Finalize(Init::Publisher);
m_initialized = false;
}

return(true);
}

Expand Down
12 changes: 8 additions & 4 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ namespace eCAL

CSubGate::~CSubGate()
{
Destroy();
Stop();
}

void CSubGate::Create()
void CSubGate::Start()
{
if(m_created) return;

Expand All @@ -59,12 +59,16 @@ namespace eCAL
m_created = true;
}

void CSubGate::Destroy()
void CSubGate::Stop()
{
if(!m_created) return;

// destroy all remaining subscriber
// stop & destroy all remaining subscriber
const std::unique_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
for (const auto& datareader : m_topic_name_datareader_map)
{
datareader.second->Stop();
}
m_topic_name_datareader_map.clear();

m_created = false;
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/pubsub/ecal_subgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ namespace eCAL
CSubGate();
~CSubGate();

void Create();
void Destroy();
void Start();
void Stop();

bool Register(const std::string& topic_name_, const std::shared_ptr<CDataReader>& datareader_);
bool Unregister(const std::string& topic_name_, const std::shared_ptr<CDataReader>& datareader_);
Expand Down
51 changes: 15 additions & 36 deletions ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ namespace eCAL
{
CSubscriber::CSubscriber() :
m_datareader(nullptr),
m_created(false),
m_initialized(false)
m_created(false)
{
}

Expand All @@ -58,24 +57,20 @@ namespace eCAL

CSubscriber::CSubscriber(CSubscriber&& rhs) noexcept :
m_datareader(std::move(rhs.m_datareader)),
m_created(rhs.m_created),
m_initialized(rhs.m_initialized)
m_created(rhs.m_created)
{
rhs.m_created = false;
rhs.m_initialized = false;
rhs.m_created = false;
}

CSubscriber& CSubscriber::operator=(CSubscriber&& rhs) noexcept
{
// Call destroy, to clean up the current state, then afterwards move all elements
Destroy();

m_datareader = std::move(rhs.m_datareader);
m_created = rhs.m_created;
m_initialized = rhs.m_initialized;
m_datareader = std::move(rhs.m_datareader);
m_created = rhs.m_created;

rhs.m_created = false;
rhs.m_initialized = false;
rhs.m_created = false;

return *this;
}
Expand All @@ -87,58 +82,42 @@ namespace eCAL

bool CSubscriber::Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_)
{
if (m_created) return(false);
if (g_globals() == nullptr) return(false);
if (topic_name_.empty()) return(false);
if (m_created) return(false);
if (topic_name_.empty()) return(false);

// initialize globals
if (g_globals()->IsInitialized(Init::Subscriber) == 0)
{
g_globals()->Initialize(Init::Subscriber);
m_initialized = true;
}

// create data reader
// create datareader
m_datareader = std::make_shared<CDataReader>(topic_name_, topic_info_);

// register to subscriber gateway for publisher memory file receive thread
// register datareader
g_subgate()->Register(topic_name_, m_datareader);

// we made it :-)
m_created = true;

return(m_created);
}

bool CSubscriber::Destroy()
{
if(!m_created) return(false);
if(g_globals() == nullptr) return(false);
if(!m_created) return(false);

// remove receive callback
RemReceiveCallback();

// first unregister data reader
// unregister datareader
if(g_subgate() != nullptr) g_subgate()->Unregister(m_datareader->GetTopicName(), m_datareader);

#ifndef NDEBUG
// log it
if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(m_datareader->GetTopicName() + "::CSubscriber::Destroy"));
#endif

// destroy data reader
// stop & destroy datareader
m_datareader->Stop();
m_datareader.reset();

// we made it :-)
m_created = false;

// if we initialize the globals then we finalize
// here to decrease reference counter
if (m_initialized)
{
g_globals()->Finalize(Init::Subscriber);
m_initialized = false;
}

return(true);
}

Expand Down
Loading

0 comments on commit 1190753

Please sign in to comment.