Skip to content

Commit

Permalink
udp ttl 1 for all platforms in local mode
Browse files Browse the repository at this point in the history
buggy thread usage removed
  • Loading branch information
rex-schilasky committed Nov 29, 2023
1 parent aa39d7c commit 8de0a9a
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 59 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ if(BUILD_ECAL_TESTS)
# test apps
# ------------------------------------------------------
if (HAS_HDF5 AND HAS_QT5)
#add_subdirectory(app/rec/rec_tests/rec_rpc_tests) # exlude for now ..
add_subdirectory(app/rec/rec_tests/rec_rpc_tests)
endif()
endif()

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/io/udp/udp_configurations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ namespace eCAL
if (local_only)
{
// if network is disabled, return a TTL of 0 to restrict multicast packets to the local machine
return 0;
return 1;
}

// if network is enabled, return the configured UDP multicast TTL value
Expand Down
62 changes: 28 additions & 34 deletions ecal/core/src/registration/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ namespace eCAL
}

// start cyclic registration thread
m_reg_sample_snd_thread = std::thread(&CRegistrationProvider::RegisterSendThread, this);
m_reg_sample_snd_thread = std::make_shared<CallbackThread>(std::bind(&CRegistrationProvider::RegisterSendThread, this));
m_reg_sample_snd_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()));

m_created = true;
}
Expand All @@ -117,8 +118,7 @@ namespace eCAL
if(!m_created) return;

// stop cyclic registration thread
m_reg_sample_snd_thread_stop.store(true, std::memory_order_release);
m_reg_sample_snd_thread.join();
m_reg_sample_snd_thread->stop();

// send one last (un)registration message to the world
// thank you and goodbye :-)
Expand Down Expand Up @@ -470,47 +470,41 @@ namespace eCAL

void CRegistrationProvider::RegisterSendThread()
{
while (!m_reg_sample_snd_thread_stop.load(std::memory_order_acquire))
{
// calculate average receive bytes
g_process_rbytes = static_cast<long long>(((double)g_process_rbytes_sum / m_reg_refresh) * 1000.0);
g_process_rbytes_sum = 0;

// calculate average write bytes
g_process_wbytes = static_cast<long long>(((double)g_process_wbytes_sum / m_reg_refresh) * 1000.0);
g_process_wbytes_sum = 0;
// calculate average receive bytes
g_process_rbytes = static_cast<long long>(((double)g_process_rbytes_sum / m_reg_refresh) * 1000.0);
g_process_rbytes_sum = 0;

// refresh subscriber registration
if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations();
// calculate average write bytes
g_process_wbytes = static_cast<long long>(((double)g_process_wbytes_sum / m_reg_refresh) * 1000.0);
g_process_wbytes_sum = 0;

// refresh publisher registration
if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations();
// refresh subscriber registration
if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations();

// refresh server registration
if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations();
// refresh publisher registration
if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations();

// refresh client registration
if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations();
// refresh server registration
if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations();

// register process
RegisterProcess();
// refresh client registration
if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations();

// register server
RegisterServer();
// register process
RegisterProcess();

// register clients
RegisterClient();
// register server
RegisterServer();

// register topics
RegisterTopics();
// register clients
RegisterClient();

// write sample list to shared memory
SendSampleList();
// register topics
RegisterTopics();

// idle thread
std::this_thread::sleep_for(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()));
}
}
// write sample list to shared memory
SendSampleList();
}

bool CRegistrationProvider::ApplyTopicToDescGate(const std::string& topic_name_
, const SDataTypeInformation& topic_info_
Expand Down
5 changes: 3 additions & 2 deletions ecal/core/src/registration/ecal_registration_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include "io/shm/ecal_memfile_broadcast.h"
#include "io/shm/ecal_memfile_broadcast_writer.h"

#include "util/ecal_thread.h"

#include <atomic>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -99,8 +101,7 @@ namespace eCAL
bool m_reg_process;

std::shared_ptr<CSampleSender> m_reg_sample_snd;
std::thread m_reg_sample_snd_thread;
std::atomic<bool> m_reg_sample_snd_thread_stop;
std::shared_ptr<CallbackThread> m_reg_sample_snd_thread;

using SampleMapT = std::unordered_map<std::string, eCAL::pb::Sample>;
std::mutex m_topics_map_sync;
Expand Down
29 changes: 11 additions & 18 deletions ecal/core/src/registration/ecal_registration_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ namespace eCAL

// start memfile broadcast receive thread
m_memfile_broadcast_reader = memfile_broadcast_reader_;
m_memfile_broadcast_reader_thread = std::thread(&CMemfileRegistrationReceiver::Receive, this);
m_memfile_broadcast_reader_thread = std::make_shared<CallbackThread>(std::bind(&CMemfileRegistrationReceiver::Receive, this));
m_memfile_broadcast_reader_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()/2));

m_created = true;
}
Expand All @@ -59,37 +60,29 @@ namespace eCAL
if (!m_created) return;

// stop memfile broadcast receive thread
m_memfile_broadcast_reader_thread_stop.store(true, std::memory_order_release);
m_memfile_broadcast_reader_thread.join();
m_memfile_broadcast_reader_thread->stop();
m_memfile_broadcast_reader = nullptr;

m_created = false;
}

// TODO: Discuss this with Kristof !
void CMemfileRegistrationReceiver::Receive()
{
while (!m_memfile_broadcast_reader_thread_stop.load(std::memory_order_acquire))
MemfileBroadcastMessageListT message_list;
if (m_memfile_broadcast_reader->Read(message_list, 0))
{
MemfileBroadcastMessageListT message_list;
if (m_memfile_broadcast_reader->Read(message_list, 0))
{
eCAL::pb::SampleList sample_list;
eCAL::pb::SampleList sample_list;

for (const auto& message : message_list)
for (const auto& message : message_list)
{
if (sample_list.ParseFromArray(message.data, static_cast<int>(message.size)))
{
if (sample_list.ParseFromArray(message.data, static_cast<int>(message.size)))
for (const auto& sample : sample_list.samples())
{
for (const auto& sample : sample_list.samples())
{
if (g_registration_receiver()) g_registration_receiver()->ApplySample(sample);
}
if (g_registration_receiver()) g_registration_receiver()->ApplySample(sample);
}
}
}

// idle process
std::this_thread::sleep_for(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()/2));
}
}

Expand Down
5 changes: 2 additions & 3 deletions ecal/core/src/registration/ecal_registration_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ namespace eCAL
private:
void Receive();

CMemoryFileBroadcastReader* m_memfile_broadcast_reader = nullptr;
std::thread m_memfile_broadcast_reader_thread;
std::atomic<bool> m_memfile_broadcast_reader_thread_stop;
CMemoryFileBroadcastReader* m_memfile_broadcast_reader = nullptr;
std::shared_ptr<CallbackThread> m_memfile_broadcast_reader_thread;

bool m_created = false;
};
Expand Down
1 change: 1 addition & 0 deletions samples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ add_subdirectory(cpp/pubsub/flatbuffer/monster_snd)
endif(HAS_FLATBUFFERS)
#add_subdirectory(cpp/pubsub/msgpack/address_rec)
#add_subdirectory(cpp/pubsub/msgpack/address_snd)
add_subdirectory(cpp/pubsub/protobuf/person_loopback)
add_subdirectory(cpp/pubsub/protobuf/person_rec)
add_subdirectory(cpp/pubsub/protobuf/person_rec_events)
add_subdirectory(cpp/pubsub/protobuf/person_rec_lambda_in_class)
Expand Down
49 changes: 49 additions & 0 deletions samples/cpp/pubsub/protobuf/person_loopback/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# ========================= eCAL LICENSE =================================
#
# Copyright (C) 2016 - 2019 Continental Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ========================= eCAL LICENSE =================================

cmake_minimum_required(VERSION 3.10)

set(CMAKE_FIND_PACKAGE_PREFER_CONFIG ON)

project(person_loopback)

find_package(eCAL REQUIRED)
find_package(Protobuf REQUIRED)

set(person_loopback_src
src/person_loopback.cpp
)

set(person_loopback_proto
${CMAKE_CURRENT_SOURCE_DIR}/src/protobuf/animal.proto
${CMAKE_CURRENT_SOURCE_DIR}/src/protobuf/house.proto
${CMAKE_CURRENT_SOURCE_DIR}/src/protobuf/person.proto
)
ecal_add_sample(${PROJECT_NAME} ${person_loopback_src})
PROTOBUF_TARGET_CPP(${PROJECT_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/src/protobuf ${person_loopback_proto})

target_link_libraries(${PROJECT_NAME}
eCAL::core
protobuf::libprotobuf
)

target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14)

ecal_install_sample(${PROJECT_NAME})

set_property(TARGET ${PROJECT_NAME} PROPERTY FOLDER samples/cpp/pubsub/protobuf)
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

#include <ecal/ecal.h>
#include <ecal/msg/protobuf/publisher.h>
#include <ecal/msg/protobuf/subscriber.h>

#include <iostream>

#include "person.pb.h"

int main(int argc, char **argv)
{
// initialize eCAL API
eCAL::Initialize(argc, argv, "person publisher");

// set process state
eCAL::Process::SetState(proc_sev_healthy, proc_sev_level1, "I feel good !");

// enable to receive our own messages
eCAL::Util::EnableLoopback(true);

// create a publisher (topic name "person")
eCAL::protobuf::CPublisher<pb::People::Person> pub("person");

// generate a class instance of Person
pb::People::Person person;

eCAL::protobuf::CSubscriber<pb::People::Person> sub("person");
auto receive_lambda = [&sub](const char* topic_name_, const pb::People::Person& person_, const long long time_, const long long clock_, const long long /*id_*/){
std::cout << "------------------------------------------" << std::endl;
std::cout << " RECEIVED " << std::endl;
std::cout << "------------------------------------------" << std::endl;
std::cout << "person id : " << person_.id() << std::endl;
std::cout << "person name : " << person_.name() << std::endl;
std::cout << "person stype : " << person_.stype() << std::endl;
std::cout << "person email : " << person_.email() << std::endl;
std::cout << "dog.name : " << person_.dog().name() << std::endl;
std::cout << "house.rooms : " << person_.house().rooms() << std::endl;
std::cout << "------------------------------------------" << std::endl;
std::cout << std::endl;

};
sub.AddReceiveCallback(receive_lambda);

// enter main loop
auto cnt = 0;
while(eCAL::Ok())
{
// set person object content
person.set_id(++cnt);
person.set_name("Max");
person.set_stype(pb::People::Person_SType_MALE);
person.set_email("[email protected]");
person.mutable_dog()->set_name("Brandy");
person.mutable_house()->set_rooms(4);

// send the person object
pub.Send(person);

// print content
std::cout << "------------------------------------------" << std::endl;
std::cout << " SENT " << std::endl;
std::cout << "------------------------------------------" << std::endl;
std::cout << "person id : " << person.id() << std::endl;
std::cout << "person name : " << person.name() << std::endl;
std::cout << "person stype : " << person.stype() << std::endl;
std::cout << "person email : " << person.email() << std::endl;
std::cout << "dog.name : " << person.dog().name() << std::endl;
std::cout << "house.rooms : " << person.house().rooms() << std::endl;
std::cout << std::endl;

// sleep 500 ms
eCAL::Process::SleepMS(500);
}

// finalize eCAL API
eCAL::Finalize();

return(0);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

syntax = "proto3";

package pb.Animal;

message Dog
{
string name = 1;
string colour = 2;
}
Loading

0 comments on commit 8de0a9a

Please sign in to comment.