Skip to content

Commit

Permalink
disconnect memory file from process only if there are no more subscri…
Browse files Browse the repository at this point in the history
…ptions from this process
  • Loading branch information
rex-schilasky committed Nov 14, 2024
1 parent ffd3625 commit 54a7c78
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
39 changes: 37 additions & 2 deletions ecal/core/src/readwrite/shm/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,18 @@ namespace eCAL
return sent;
}

void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/)
void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_, const std::string& /*conn_par_*/)
{
// we accept local connections only
if (host_name_ != m_attributes.host_name) return;

// add or update the map with process id's and sets of topic ids
{
const std::lock_guard<std::mutex> lock(m_process_id_topic_id_set_map_sync);
auto& topic_set = m_process_id_topic_id_set_map[process_id_];
topic_set.insert(topic_id_);
}

for (auto& memory_file : m_memory_file_vec)
{
memory_file->Connect(std::to_string(process_id_));
Expand All @@ -97,11 +104,39 @@ namespace eCAL
}
}

void CDataWriterSHM::RemoveSubscription(const std::string& host_name_, const int32_t process_id_, const std::string& /*topic_id_*/)
void CDataWriterSHM::RemoveSubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_)
{
// we accept local disconnections only
if (host_name_ != m_attributes.host_name) return;

// remove topic id from the set for the given process id
bool memfile_has_subscriptions(true);
{
const std::lock_guard<std::mutex> lock(m_process_id_topic_id_set_map_sync);
auto process_it = m_process_id_topic_id_set_map.find(process_id_);

// this process id is connected the memory file
if (process_it != m_process_id_topic_id_set_map.end())
{
// this topic id is in the set and will be removed now
process_it->second.erase(topic_id_);

// this was the last connected topic id for this process id
// that means this process id has no more connection to this memory file
if (process_it->second.empty())
{
// we can remove the empty topic id set
m_process_id_topic_id_set_map.erase(process_it);
// memory file has no more subscriptions from process id
memfile_has_subscriptions = false;
}
}
}

// if memory file is still connected to at least one topic id of this process id
// we return and do not call Disconnect for this process id
if (memfile_has_subscriptions) return;

for (auto& memory_file : m_memory_file_vec)
{
memory_file->Disconnect(std::to_string(process_id_));
Expand Down
7 changes: 7 additions & 0 deletions ecal/core/src/readwrite/shm/ecal_writer_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
#include "readwrite/ecal_writer_base.h"

#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>

Expand Down Expand Up @@ -59,5 +62,9 @@ namespace eCAL
size_t m_write_idx = 0;
std::vector<std::shared_ptr<CSyncMemoryFile>> m_memory_file_vec;
static const std::string m_memfile_base_name;

using ProcessIDTopicIDSetT = std::map<int32_t, std::set<std::string>>;
std::mutex m_process_id_topic_id_set_map_sync;
ProcessIDTopicIDSetT m_process_id_topic_id_set_map;
};
}

0 comments on commit 54a7c78

Please sign in to comment.