Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/eclipse-ecal/ecal into fe…
Browse files Browse the repository at this point in the history
…ature/server-client-protocol-version-2
  • Loading branch information
FlorianReimold committed Sep 19, 2023
2 parents 1bd7c4f + fdffcbe commit 7b52bc2
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,5 +485,5 @@ jobs:
if: env.IS_DOWNLOAD_AVAILABLE == 'true'
uses: actions/upload-artifact@v3
with:
name: windows-setup-eclipse
name: windows-setup-signed
path: ${{ runner.workspace }}/_build/complete/_deploy/${{ env.ASSET_NAME }}
5 changes: 4 additions & 1 deletion app/mon/mon_tui/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ target_include_directories(${PROJECT_NAME}
PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src>)

target_compile_definitions(${PROJECT_NAME}
PRIVATE $<$<BOOL:${MSVC}>:PCRE_STATIC;_UNICODE>)
PRIVATE
$<$<BOOL:${MSVC}>:PCRE_STATIC;_UNICODE>
FTXUI_VERSION_MAJOR=${ftxui_VERSION_MAJOR}
)

create_targets_protobuf()

Expand Down
22 changes: 20 additions & 2 deletions app/mon/mon_tui/src/tui/style_sheet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ struct StyleSheet
default: return nothing;
}
};
sheet.tab.entries.transform = [](EntryState state)
#if FTXUI_VERSION_MAJOR >= 5
sheet.tab.entries_option.transform =
#else
sheet.tab.entries.transform =
#endif
[](EntryState state)
{
if(state.active)
{
Expand All @@ -110,7 +115,11 @@ struct StyleSheet
}
return text(state.label);
};
#if FTXUI_VERSION_MAJOR >= 5
sheet.tab.direction = Direction::Right;
#else
sheet.tab.direction = MenuOption::Right;
#endif
sheet.tab.elements_prefix = [] {
return text(" ");
};
Expand Down Expand Up @@ -202,14 +211,23 @@ struct StyleSheet
default: return nothing;
}
};
sheet.tab.entries.transform = [](EntryState state) {
#if FTXUI_VERSION_MAJOR >= 5
sheet.tab.entries_option.transform =
#else
sheet.tab.entries.transform =
#endif
[](EntryState state) {
if(state.active)
{
return text(state.label) | inverted;
}
return text(state.label);
};
#if FTXUI_VERSION_MAJOR >= 5
sheet.tab.direction = Direction::Right;
#else
sheet.tab.direction = MenuOption::Right;
#endif
sheet.tab.elements_prefix = [] {
return text(" ");
};
Expand Down
2 changes: 1 addition & 1 deletion doc/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ empy
semantic-version
exhale
Jinja2
sphinxcontrib-youtube
sphinxcontrib-youtube<=1.2
sphinxcontrib-apidoc

11 changes: 8 additions & 3 deletions doc/rst/advanced/ecal_in_docker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ In eCAL, you are able to set host belonging over network borders by utilizing th

.. code-block:: bash
sudo docker network create --driver=bridge --subnet=192.100.0.0/24 my_network
sudo docker network create --driver=bridge --subnet=10.0.10.0/24 my_network
#. Edit your :file:`ecal.ini` and run your Container within the newly created docker network

Expand All @@ -225,7 +225,7 @@ In eCAL, you are able to set host belonging over network borders by utilizing th

.. code-block:: bash
sudo docker run --rm -it --ipc=host --pid=host --network=my_network --name=container1 --h=container1 --ip=192.168.100.2 -v /etc/ecal/ecal.ini:/etc/ecal/ecal.ini ecal-runtime
sudo docker run --rm -it --ipc=host --pid=host --network=my_network --name=container1 -h=container1 --ip=10.0.10.10 -v /etc/ecal/ecal.ini:/etc/ecal/ecal.ini ecal-runtime
- You should now be inside the root shell of your Container.
Check if your :file:`ecal.ini` file is correct.
Expand All @@ -250,6 +250,11 @@ In eCAL, you are able to set host belonging over network borders by utilizing th
sudo ip route add 239.0.0.0/24 dev <br-xxx> metric 1
- Review your network configuration. Your eCAL-Monitor should resemble this example:

.. image:: img_documentation/doku_ecal_docker_mon.png


#. (optional) After adding the route, you register the Container with IP address and name in /etc/hosts for DNS resolution, enabling easy access to it by hostname within the network.

.. code-block:: bash
Expand All @@ -258,4 +263,4 @@ In eCAL, you are able to set host belonging over network borders by utilizing th
.. image:: img_documentation/vscode_etc_hosts.png

After all steps are done, all eCAL nodes can communicate seamlessly from docker to the host and vice versa.
When you are done, all eCAL nodes can communicate seamlessly from docker to the host and vice versa.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 18 additions & 14 deletions ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ namespace eCAL

const auto& ecal_sample = ecal_sample_.topic();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
CDataWriter::SLocalSubscriptionInfo subscription_info;
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());
SDataTypeInformation topic_information{ eCALSampleToTopicInformation(ecal_sample_) };
const std::string process_id = std::to_string(ecal_sample.pid());

std::string reader_par;
for (const auto& layer : ecal_sample.tlayer())
Expand All @@ -135,7 +136,7 @@ namespace eCAL
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplyLocSubscription(process_id, topic_id, topic_information, reader_par);
iter->second->ApplyLocSubscription(subscription_info, topic_information, reader_par);
}
}

Expand All @@ -145,15 +146,16 @@ namespace eCAL

const auto& ecal_sample = ecal_sample_.topic();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
const std::string process_id = std::to_string(ecal_sample.pid());
CDataWriter::SLocalSubscriptionInfo subscription_info;
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());

// unregister local subscriber
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for (TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->RemoveLocSubscription(process_id, topic_id);
iter->second->RemoveLocSubscription(subscription_info);
}
}

Expand All @@ -162,11 +164,12 @@ namespace eCAL
if(!m_created) return;

const auto& ecal_sample = ecal_sample_.topic();
const std::string& host_name = ecal_sample.hname();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
CDataWriter::SExternalSubscriptionInfo subscription_info;
subscription_info.host_name = ecal_sample.hname();
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());
SDataTypeInformation topic_information{ eCALSampleToTopicInformation(ecal_sample_) };
const std::string process_id = std::to_string(ecal_sample.pid());

std::string reader_par;
for (const auto& layer : ecal_sample.tlayer())
Expand All @@ -185,7 +188,7 @@ namespace eCAL
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->ApplyExtSubscription(host_name, process_id, topic_id, topic_information, reader_par);
iter->second->ApplyExtSubscription(subscription_info, topic_information, reader_par);
}
}

Expand All @@ -194,17 +197,18 @@ namespace eCAL
if (!m_created) return;

const auto& ecal_sample = ecal_sample_.topic();
const std::string& host_name = ecal_sample.hname();
const std::string& topic_name = ecal_sample.tname();
const std::string& topic_id = ecal_sample.tid();
const std::string process_id = std::to_string(ecal_sample.pid());
CDataWriter::SExternalSubscriptionInfo subscription_info;
subscription_info.host_name = ecal_sample.hname();
subscription_info.topic_id = ecal_sample.tid();
subscription_info.process_id = std::to_string(ecal_sample.pid());

// unregister external subscriber
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
auto res = m_topic_name_datawriter_map.equal_range(topic_name);
for (TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter)
{
iter->second->RemoveExtSubscription(host_name, process_id, topic_id);
iter->second->RemoveExtSubscription(subscription_info);
}
}

Expand Down
47 changes: 22 additions & 25 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,83 +655,79 @@ namespace eCAL
else return 0;
}

void CDataWriter::ApplyLocSubscription(const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
void CDataWriter::ApplyLocSubscription(const SLocalSubscriptionInfo& local_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
{
Connect(tid_, tinfo_);
Connect(local_info_.topic_id, tinfo_);

// add key to local subscriber map
const std::string topic_key = process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map[topic_key] = true;
m_loc_sub_map[local_info_] = true;
}

m_loc_subscribed = true;

// add a new local subscription
m_writer.udp_mc.AddLocConnection (process_id_, reader_par_);
m_writer.shm.AddLocConnection (process_id_, reader_par_);
m_writer.udp_mc.AddLocConnection (local_info_.process_id, reader_par_);
m_writer.shm.AddLocConnection (local_info_.process_id, reader_par_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::ApplyLocSubscription");
#endif
}

void CDataWriter::RemoveLocSubscription(const std::string& process_id_, const std::string& tid_)
void CDataWriter::RemoveLocSubscription(const SLocalSubscriptionInfo& local_info_)
{
// remove key from local subscriber map
const std::string topic_key = process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.erase(topic_key);
m_loc_sub_map.erase(local_info_);
}

// remove a local subscription
m_writer.udp_mc.RemLocConnection (process_id_);
m_writer.shm.RemLocConnection (process_id_);
m_writer.udp_mc.RemLocConnection (local_info_.process_id);
m_writer.shm.RemLocConnection (local_info_.process_id);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::RemoveLocSubscription");
#endif
}

void CDataWriter::ApplyExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
void CDataWriter::ApplyExtSubscription(const SExternalSubscriptionInfo& external_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_)
{
Connect(tid_, tinfo_);
Connect(external_info_.topic_id, tinfo_);

// add key to external subscriber map
const std::string topic_key = host_name_ + process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_ext_sub_map[topic_key] = true;
m_ext_sub_map[external_info_] = true;
}

m_ext_subscribed = true;

// add a new external subscription
m_writer.udp_mc.AddExtConnection (host_name_, process_id_, reader_par_);
m_writer.shm.AddExtConnection (host_name_, process_id_, reader_par_);
m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);
m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataWriter::ApplyExtSubscription");
#endif
}

void CDataWriter::RemoveExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_)
void CDataWriter::RemoveExtSubscription(const SExternalSubscriptionInfo& external_info_)
{
// remove key from external subscriber map
const std::string topic_key = host_name_ + process_id_ + tid_;
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_ext_sub_map.erase(topic_key);
m_ext_sub_map.erase(external_info_);
}

// remove external subscription
m_writer.udp_mc.RemExtConnection (host_name_, process_id_);
m_writer.shm.RemExtConnection (host_name_, process_id_);
m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id);
m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id);
}

void CDataWriter::RefreshRegistration()
Expand Down Expand Up @@ -768,7 +764,8 @@ namespace eCAL
Register(false);

// check connection timeouts
const std::shared_ptr<std::list<std::string>> loc_timeouts = std::make_shared<std::list<std::string>>();
// Todo: Why are only Local connections removed, not external connections?
const std::shared_ptr<std::list<SLocalSubscriptionInfo>> loc_timeouts = std::make_shared<std::list<SLocalSubscriptionInfo>>();
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.remove_deprecated(loc_timeouts.get());
Expand All @@ -780,7 +777,7 @@ namespace eCAL

for(const auto& loc_sub : *loc_timeouts)
{
m_writer.shm.RemLocConnection(loc_sub);
m_writer.shm.RemLocConnection(loc_sub.process_id);
}

if (!m_loc_subscribed && !m_ext_subscribed)
Expand Down Expand Up @@ -1275,7 +1272,7 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
for (auto sub : m_loc_sub_map)
{
if (sub.first != process_id)
if (sub.first.process_id != process_id)
{
is_internal_only = false;
break;
Expand Down
43 changes: 35 additions & 8 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,31 @@ namespace eCAL
class CDataWriter
{
public:
struct SExternalSubscriptionInfo
{
std::string host_name;
std::string process_id;
std::string topic_id;

friend bool operator<(const SExternalSubscriptionInfo& l, const SExternalSubscriptionInfo& r)
{
return std::tie(l.host_name, l.process_id, l.topic_id)
< std::tie(r.host_name, r.process_id, r.topic_id);
}
};

struct SLocalSubscriptionInfo
{
std::string process_id;
std::string topic_id;

friend bool operator<(const SLocalSubscriptionInfo& l, const SLocalSubscriptionInfo& r)
{
return std::tie(l.process_id, l.topic_id)
< std::tie(r.process_id, r.topic_id);
}
};

CDataWriter();
~CDataWriter();

Expand Down Expand Up @@ -77,11 +102,11 @@ namespace eCAL

size_t Write(CPayloadWriter& payload_, long long time_, long long id_);

void ApplyLocSubscription(const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveLocSubscription(const std::string & process_id_, const std::string& tid_);
void ApplyLocSubscription(const SLocalSubscriptionInfo& local_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveLocSubscription(const SLocalSubscriptionInfo& local_info_);

void ApplyExtSubscription(const std::string& host_name_, const std::string& process_id_, const std::string& tid_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveExtSubscription(const std::string & host_name_, const std::string & process_id_, const std::string& tid_);
void ApplyExtSubscription(const SExternalSubscriptionInfo& external_info_, const SDataTypeInformation& tinfo_, const std::string& reader_par_);
void RemoveExtSubscription(const SExternalSubscriptionInfo& external_info_);

void RefreshRegistration();
void RefreshSendCounter();
Expand Down Expand Up @@ -137,10 +162,12 @@ namespace eCAL
std::vector<char> m_payload_buffer;

std::atomic<bool> m_connected;
using ConnectedMapT = Util::CExpMap<std::string, bool>;
mutable std::mutex m_sub_map_sync;
ConnectedMapT m_loc_sub_map;
ConnectedMapT m_ext_sub_map;

using LocalConnectedMapT = Util::CExpMap<SLocalSubscriptionInfo, bool>;
using ExternalConnectedMapT = Util::CExpMap<SExternalSubscriptionInfo, bool>;
mutable std::mutex m_sub_map_sync;
LocalConnectedMapT m_loc_sub_map;
ExternalConnectedMapT m_ext_sub_map;

std::mutex m_event_callback_map_sync;
using EventCallbackMapT = std::map<eCAL_Publisher_Event, PubEventCallbackT>;
Expand Down
Loading

0 comments on commit 7b52bc2

Please sign in to comment.