Skip to content

Commit

Permalink
renamings, layer info added to internal pub/sub maps
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed May 8, 2024
1 parent 37b6fdc commit 0f79cd1
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 184 deletions.
24 changes: 23 additions & 1 deletion ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ namespace eCAL
subscription_info.process_id = ecal_topic.pid;
const SDataTypeInformation topic_information = ecal_topic.tdatatype;

CDataWriter::SLayerStates layer_states;
for (const auto& layer : ecal_topic.tlayer)
{
if (layer.confirmed)
{
switch (layer.type)
{
case TLayer::tlayer_udp_mc:
layer_states.udp = true;
break;
case TLayer::tlayer_shm:
layer_states.shm = true;
break;
case TLayer::tlayer_tcp:
layer_states.tcp = true;
break;
default:
break;
}
}
}

std::string reader_par;
#if 0
for (const auto& layer : ecal_sample.tlayer())
Expand All @@ -138,7 +160,7 @@ namespace eCAL
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplySubscription(subscription_info, topic_information, reader_par);
iter->second->ApplySubscription(subscription_info, topic_information, layer_states, reader_par);
}
}

Expand Down
24 changes: 23 additions & 1 deletion ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,28 @@ namespace eCAL
publication_info.process_id = ecal_topic.pid;
const SDataTypeInformation topic_information = ecal_topic.tdatatype;

CDataReader::SLayerStates layer_states;
for (const auto& layer : ecal_topic.tlayer)
{
if (layer.confirmed)
{
switch (layer.type)
{
case TLayer::tlayer_udp_mc:
layer_states.udp = true;
break;
case TLayer::tlayer_shm:
layer_states.shm = true;
break;
case TLayer::tlayer_tcp:
layer_states.tcp = true;
break;
default:
break;
}
}
}

// register publisher
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
auto res = m_topic_name_datareader_map.equal_range(topic_name);
Expand All @@ -239,7 +261,7 @@ namespace eCAL
iter->second->ApplyLayerParameter(publication_info, tlayer.type, tlayer.par_layer);
}
// inform for publisher connection
iter->second->ApplyPublication(publication_info, topic_information);
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
}
}

Expand Down
71 changes: 27 additions & 44 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,7 @@ namespace eCAL
m_host_group_name(Process::GetHostGroupName()),
m_pid(Process::GetProcessID()),
m_pname(Process::GetProcessName()),
m_topic_size(0),
m_connected(false),
m_read_buf_received(false),
m_read_time(0),
m_receive_time(0),
m_clock(0),
m_frequency_calculator(3.0f),
m_message_drops(0),
m_share_ttype(true),
m_share_tdesc(true),
m_use_udp_confirmed(false),
m_use_shm_confirmed(false),
m_use_tcp_confirmed(false),
m_created(false)
m_frequency_calculator(0.0f)
{
}

Expand Down Expand Up @@ -142,13 +129,13 @@ namespace eCAL

// reset receive callback
{
const std::lock_guard<std::mutex> lock(m_receive_callback_sync);
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
m_receive_callback = nullptr;
}

// reset event callback map
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
m_event_callback_map.clear();
}

Expand All @@ -162,10 +149,6 @@ namespace eCAL
m_clock = 0;
m_message_drops = 0;

m_use_udp_confirmed = false;
m_use_shm_confirmed = false;
m_use_tcp_confirmed = false;

return(true);
}

Expand Down Expand Up @@ -262,7 +245,7 @@ namespace eCAL
Registration::TLayer udp_tlayer;
udp_tlayer.type = tl_ecal_udp;
udp_tlayer.version = 1;
udp_tlayer.confirmed = m_use_udp_confirmed;
udp_tlayer.confirmed = m_confirmed_layers.udp;
ecal_reg_sample_topic.tlayer.push_back(udp_tlayer);
}
#endif
Expand All @@ -273,7 +256,7 @@ namespace eCAL
Registration::TLayer shm_tlayer;
shm_tlayer.type = tl_ecal_shm;
shm_tlayer.version = 1;
shm_tlayer.confirmed = m_use_shm_confirmed;
shm_tlayer.confirmed = m_confirmed_layers.shm;
ecal_reg_sample_topic.tlayer.push_back(shm_tlayer);
}
#endif
Expand All @@ -284,7 +267,7 @@ namespace eCAL
Registration::TLayer tcp_tlayer;
tcp_tlayer.type = tl_ecal_tcp;
tcp_tlayer.version = 1;
tcp_tlayer.confirmed = m_use_tcp_confirmed;
tcp_tlayer.confirmed = m_confirmed_layers.tcp;
ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer);
}
#endif
Expand Down Expand Up @@ -378,7 +361,7 @@ namespace eCAL
{
if (!m_created) return(false);

std::unique_lock<std::mutex> read_buffer_lock(m_read_buf_mutex);
std::unique_lock<std::mutex> read_buffer_lock(m_read_buf_mtx);

// No need to wait (for whatever time) if something has been received
if (!m_read_buf_received)
Expand Down Expand Up @@ -418,13 +401,13 @@ namespace eCAL
size_t CDataReader::AddSample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
{
// ensure thread safety
const std::lock_guard<std::mutex> lock(m_receive_callback_sync);
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
if (!m_created) return(0);

// store receive layer
m_use_udp_confirmed |= layer_ == tl_ecal_udp;
m_use_shm_confirmed |= layer_ == tl_ecal_shm;
m_use_tcp_confirmed |= layer_ == tl_ecal_tcp;
m_confirmed_layers.udp |= layer_ == tl_ecal_udp;
m_confirmed_layers.shm |= layer_ == tl_ecal_shm;
m_confirmed_layers.tcp |= layer_ == tl_ecal_tcp;

// number of hash values to track for duplicates
constexpr int hash_queue_size(64);
Expand Down Expand Up @@ -474,7 +457,7 @@ namespace eCAL
// Update frequency calculation
{
const auto receive_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> freq_lock(m_frequency_calculator_mutex);
const std::lock_guard<std::mutex> freq_lock(m_frequency_calculator_mtx);
m_frequency_calculator.addTick(receive_time);
}

Expand Down Expand Up @@ -511,7 +494,7 @@ namespace eCAL
if(!processed)
{
// push sample into read buffer
const std::lock_guard<std::mutex> read_buffer_lock(m_read_buf_mutex);
const std::lock_guard<std::mutex> read_buffer_lock(m_read_buf_mtx);
m_read_buf.clear();
m_read_buf.assign(payload_, payload_ + size_);
m_read_time = time_;
Expand All @@ -534,7 +517,7 @@ namespace eCAL

// store receive callback
{
const std::lock_guard<std::mutex> lock(m_receive_callback_sync);
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddReceiveCallback");
Expand All @@ -551,7 +534,7 @@ namespace eCAL

// reset receive callback
{
const std::lock_guard<std::mutex> lock(m_receive_callback_sync);
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemReceiveCallback");
Expand All @@ -572,7 +555,7 @@ namespace eCAL
// log it
Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddEventCallback");
#endif
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
m_event_callback_map[type_] = std::move(callback_);
}

Expand All @@ -589,7 +572,7 @@ namespace eCAL
// log it
Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemEventCallback");
#endif
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
m_event_callback_map[type_] = nullptr;
}

Expand All @@ -601,22 +584,22 @@ namespace eCAL
m_id_set = id_set_;
}

void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_)
{
Connect(publication_info_.topic_id, data_type_info_);

// add key to publisher map
{
const std::lock_guard<std::mutex> lock(m_pub_map_sync);
m_pub_map[publication_info_] = data_type_info_;
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map[publication_info_] = std::make_tuple(data_type_info_, layer_states_);
}
}

void CDataReader::RemovePublication(const SPublicationInfo& publication_info_)
{
// remove key from publisher map
{
const std::lock_guard<std::mutex> lock(m_pub_map_sync);
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map.erase(publication_info_);
}
}
Expand Down Expand Up @@ -659,7 +642,7 @@ namespace eCAL

// fire sub_event_connected
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_connected);
if (iter != m_event_callback_map.end() && iter->second)
{
Expand All @@ -673,7 +656,7 @@ namespace eCAL

// fire sub_event_update_connection
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_update_connection);
if (iter != m_event_callback_map.end() && iter->second)
{
Expand All @@ -693,7 +676,7 @@ namespace eCAL

// fire sub_event_disconnected
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_disconnected);
if (iter != m_event_callback_map.end() && iter->second)
{
Expand Down Expand Up @@ -763,7 +746,7 @@ namespace eCAL
#endif
// we fire the message drop event
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto citer = m_event_callback_map.find(sub_event_dropped);
if (citer != m_event_callback_map.end() && citer->second)
{
Expand Down Expand Up @@ -829,7 +812,7 @@ namespace eCAL
int32_t CDataReader::GetFrequency()
{
const auto frequency_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mtx);
return static_cast<int32_t>(m_frequency_calculator.getFrequency(frequency_time) * 1000);
}

Expand All @@ -845,7 +828,7 @@ namespace eCAL

// check connection timeouts
{
const std::lock_guard<std::mutex> lock(m_pub_map_sync);
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map.remove_deprecated();

if (m_pub_map.empty())
Expand Down
Loading

0 comments on commit 0f79cd1

Please sign in to comment.