Skip to content

Commit

Permalink
Manually fix a set of Topic QoS, and implement the max-tx-rate (#86)
Browse files Browse the repository at this point in the history
* Make the DdsPipeConfiguration required

Signed-off-by: tempate <[email protected]>

* Rename max-reception-rate to max-rx-rate

Signed-off-by: tempate <[email protected]>

* Support max-tx-rate in the replayer

Signed-off-by: tempate <[email protected]>

* Support manual topics configuration

Signed-off-by: tempate <[email protected]>

* Move the allowed & builtin topics to the pipe

Signed-off-by: tempate <[email protected]>

* Add Topic QoS to the McapReaderParticipant

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Support the new Topic QoS

Signed-off-by: tempate <[email protected]>

* Minor fixes to the documentation

Signed-off-by: tempate <[email protected]>

* Add missing quotes to wildcard entries

Signed-off-by: tempate <[email protected]>

* Fix downsampling test

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply DdsPipe suggestions

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Remove the built-in topics from the DdsReplayer

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply more suggestions

Signed-off-by: tempate <[email protected]>

* Apply the manual topics to the dynamic writer

Signed-off-by: tempate <[email protected]>

* Fix compilation error

Signed-off-by: tempate <[email protected]>

* Remove Topic Filtering outdated note

Signed-off-by: tempate <[email protected]>

* Fix Windows tag

Signed-off-by: tempate <[email protected]>

* Fix forthcoming notes

Signed-off-by: tempate <[email protected]>

* Uncrustify

Signed-off-by: tempate <[email protected]>

---------

Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate authored Nov 8, 2023
1 parent 9270ac5 commit 0d0c59a
Show file tree
Hide file tree
Showing 28 changed files with 412 additions and 402 deletions.
12 changes: 2 additions & 10 deletions ddsrecorder/src/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ std::unique_ptr<eprosima::utils::event::FileWatcherHandler> create_filewatcher(
try
{
eprosima::ddsrecorder::yaml::RecorderConfiguration new_configuration(file_path);
// Create new allowed topics list
auto new_allowed_topics = std::make_shared<core::AllowedTopicList>(
new_configuration.allowlist,
new_configuration.blocklist);
recorder->reload_allowed_topics(new_allowed_topics);
recorder->reload_configuration(new_configuration);
}
catch (const std::exception& e)
{
Expand Down Expand Up @@ -102,11 +98,7 @@ std::unique_ptr<eprosima::utils::event::PeriodicEventHandler> create_periodic_ha
try
{
eprosima::ddsrecorder::yaml::RecorderConfiguration new_configuration(file_path);
// Create new allowed topics list
auto new_allowed_topics = std::make_shared<core::AllowedTopicList>(
new_configuration.allowlist,
new_configuration.blocklist);
recorder->reload_allowed_topics(new_allowed_topics);
recorder->reload_configuration(new_configuration);
}
catch (const std::exception& e)
{
Expand Down
17 changes: 5 additions & 12 deletions ddsrecorder/src/cpp/tool/DdsRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ DdsRecorder::DdsRecorder(
const DdsRecorderStateCode& init_state,
const std::string& file_name)
{
// Create allowed topics list
auto allowed_topics = std::make_shared<AllowedTopicList>(
configuration.allowlist,
configuration.blocklist);

// Create Discovery Database
discovery_database_ =
std::make_shared<DiscoveryDatabase>();
Expand Down Expand Up @@ -115,19 +110,17 @@ DdsRecorder::DdsRecorder(

// Create DDS Pipe
pipe_ = std::make_unique<DdsPipe>(
allowed_topics,
configuration.ddspipe_configuration,
discovery_database_,
payload_pool_,
participants_database_,
thread_pool_,
configuration.builtin_topics,
true);
thread_pool_);
}

utils::ReturnCode DdsRecorder::reload_allowed_topics(
const std::shared_ptr<AllowedTopicList>& allowed_topics)
utils::ReturnCode DdsRecorder::reload_configuration(
const yaml::RecorderConfiguration& new_configuration)
{
return pipe_->reload_allowed_topics(allowed_topics);
return pipe_->reload_configuration(new_configuration.ddspipe_configuration);
}

void DdsRecorder::start()
Expand Down
4 changes: 2 additions & 2 deletions ddsrecorder/src/cpp/tool/DdsRecorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class DdsRecorder
* @return \c RETCODE_OK if allowed topics list has been updated correctly
* @return \c RETCODE_NO_DATA if new allowed topics list is the same as the previous one
*/
utils::ReturnCode reload_allowed_topics(
const std::shared_ptr<ddspipe::core::AllowedTopicList>& allowed_topics);
utils::ReturnCode reload_configuration(
const yaml::RecorderConfiguration& new_configuration);

//! Start recorder (\c mcap_handler_)
void start();
Expand Down
4 changes: 2 additions & 2 deletions ddsrecorder/test/blackbox/mcap/McapFileCreationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ std::unique_ptr<DdsRecorder> create_recorder(
YAML::Node yml;

eprosima::ddsrecorder::yaml::RecorderConfiguration configuration(yml);
configuration.downsampling = downsampling;
configuration.topic_qos.downsampling = downsampling;
// Set default value for downsampling
// TODO: Change mechanism setting topic qos' default values from specs
eprosima::ddspipe::core::types::TopicQoS::default_downsampling.store(downsampling);
eprosima::ddspipe::core::types::TopicQoS::default_topic_qos.set_value(configuration.topic_qos);
configuration.event_window = event_window;
eprosima::ddspipe::core::types::DomainId domainId;
domainId.domain_id = test::DOMAIN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class McapReaderParticipant : public ddspipe::core::IParticipant
DDSRECORDER_PARTICIPANTS_DllAPI
bool is_rtps_kind() const noexcept override;

//! Override topic_qos() IParticipant method
DDSRECORDER_PARTICIPANTS_DllAPI
ddspipe::core::types::TopicQoS topic_qos() const noexcept override;

//! Override create_writer_() IParticipant method
DDSRECORDER_PARTICIPANTS_DllAPI
std::shared_ptr<ddspipe::core::IWriter> create_writer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ bool McapReaderParticipant::is_rtps_kind() const noexcept
return false;
}

TopicQoS McapReaderParticipant::topic_qos() const noexcept
{
return configuration_->topic_qos;
}

std::shared_ptr<IWriter> McapReaderParticipant::create_writer(
const ITopic& /* topic */)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <cpp_utils/memory/Heritable.hpp>

#include <ddspipe_core/configuration/DdsPipeConfiguration.hpp>
#include <ddspipe_core/types/dds/TopicQoS.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>
#include <ddspipe_core/types/topic/filter/IFilterTopic.hpp>

Expand Down Expand Up @@ -52,15 +54,13 @@ class DDSRECORDER_YAML_DllAPI RecorderConfiguration
RecorderConfiguration(
const std::string& file_path);

// DDS Pipe Configuration
ddspipe::core::DdsPipeConfiguration ddspipe_configuration;

// Participants configurations
std::shared_ptr<ddspipe::participants::SimpleParticipantConfiguration> simple_configuration;
std::shared_ptr<ddspipe::participants::ParticipantConfiguration> recorder_configuration;

// Topic filtering
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> allowlist{};
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> blocklist{};
std::set<utils::Heritable<ddspipe::core::types::DistributedTopic>> builtin_topics{};

// Output file params
std::string output_filepath = ".";
std::string output_filename = "output";
Expand All @@ -71,8 +71,6 @@ class DDSRECORDER_YAML_DllAPI RecorderConfiguration
unsigned int buffer_size = 100;
unsigned int event_window = 20;
bool log_publish_time = false;
unsigned int downsampling = 1;
float max_reception_rate = 0;
bool only_with_type = false;
mcap::McapWriterOptions mcap_writer_options{"ros2"};
bool record_types = true;
Expand All @@ -86,9 +84,9 @@ class DDSRECORDER_YAML_DllAPI RecorderConfiguration

// Specs
unsigned int n_threads = 12;
unsigned int max_history_depth = 5000;
int max_pending_samples = 5000; // -1 <-> no limit || 0 <-> no pending samples
unsigned int cleanup_period;
ddspipe::core::types::TopicQoS topic_qos{};

protected:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <cpp_utils/time/time_utils.hpp>
#include <cpp_utils/types/Fuzzy.hpp>

#include <ddspipe_core/configuration/DdsPipeConfiguration.hpp>
#include <ddspipe_core/types/dds/TopicQoS.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>
#include <ddspipe_core/types/topic/filter/IFilterTopic.hpp>

Expand Down Expand Up @@ -52,15 +54,13 @@ class DDSRECORDER_YAML_DllAPI ReplayerConfiguration
ReplayerConfiguration(
const std::string& file_path);

// DDS Pipe Configuration
ddspipe::core::DdsPipeConfiguration ddspipe_configuration;

// Participants configurations
std::shared_ptr<ddsrecorder::participants::McapReaderParticipantConfiguration> mcap_reader_configuration;
std::shared_ptr<ddspipe::participants::SimpleParticipantConfiguration> replayer_configuration;

// Topic filtering
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> allowlist{};
std::set<utils::Heritable<ddspipe::core::types::IFilterTopic>> blocklist{};
std::set<utils::Heritable<ddspipe::core::types::DistributedTopic>> builtin_topics{};

// Replay params
std::string input_file;
utils::Fuzzy<utils::Timestamp> begin_time{};
Expand All @@ -71,7 +71,7 @@ class DDSRECORDER_YAML_DllAPI ReplayerConfiguration

// Specs
unsigned int n_threads = 12;
unsigned int max_history_depth = 5000;
ddspipe::core::types::TopicQoS topic_qos{};

protected:

Expand Down
57 changes: 28 additions & 29 deletions ddsrecorder_yaml/src/cpp/recorder/YamlReaderConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cpp_utils/utils.hpp>

#include <ddspipe_core/types/dynamic_types/types.hpp>
#include <ddspipe_core/types/topic/filter/ManualTopic.hpp>
#include <ddspipe_core/types/topic/filter/WildcardDdsFilterTopic.hpp>
#include <ddspipe_participants/types/address/Address.hpp>

Expand Down Expand Up @@ -115,11 +116,16 @@ void RecorderConfiguration::load_ddsrecorder_configuration_(
WildcardDdsFilterTopic rpc_request_topic, rpc_response_topic;
rpc_request_topic.topic_name.set_value("rq/*");
rpc_response_topic.topic_name.set_value("rr/*");
blocklist.insert(

ddspipe_configuration.blocklist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(rpc_request_topic));
blocklist.insert(

ddspipe_configuration.blocklist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(rpc_response_topic));

// The DDS Pipe should be enabled on start up.
ddspipe_configuration.init_enabled = true;

// Initialize controller domain with the same as the one being recorded
// WARNING: dds tag must have been parsed beforehand
controller_domain = simple_configuration->domain;
Expand Down Expand Up @@ -198,24 +204,6 @@ void RecorderConfiguration::load_recorder_configuration_(
log_publish_time = YamlReader::get<bool>(yml, RECORDER_LOG_PUBLISH_TIME_TAG, version);
}

/////
// Get optional downsampling
if (YamlReader::is_tag_present(yml, DOWNSAMPLING_TAG))
{
downsampling = YamlReader::get_positive_int(yml, DOWNSAMPLING_TAG);
// Set default value for downsampling
TopicQoS::default_downsampling.store(downsampling);
}

/////
// Get optional max reception rate
if (YamlReader::is_tag_present(yml, MAX_RECEPTION_RATE_TAG))
{
// Set default value for max reception rate
TopicQoS::default_max_reception_rate.store(YamlReader::get_nonnegative_float(yml,
MAX_RECEPTION_RATE_TAG));
}

/////
// Get optional only_with_type
if (YamlReader::is_tag_present(yml, RECORDER_ONLY_WITH_TYPE_TAG))
Expand Down Expand Up @@ -290,12 +278,12 @@ void RecorderConfiguration::load_specs_configuration_(
n_threads = YamlReader::get_positive_int(yml, NUMBER_THREADS_TAG);
}

// Get maximum history depth
if (YamlReader::is_tag_present(yml, MAX_HISTORY_DEPTH_TAG))
/////
// Get optional Topic QoS
if (YamlReader::is_tag_present(yml, SPECS_QOS_TAG))
{
max_history_depth = YamlReader::get_positive_int(yml, MAX_HISTORY_DEPTH_TAG);
// Set default value for history
TopicQoS::default_history_depth.store(max_history_depth);
YamlReader::fill<TopicQoS>(topic_qos, YamlReader::get_value_in_tag(yml, SPECS_QOS_TAG), version);
TopicQoS::default_topic_qos.set_value(topic_qos);
}

// Get max pending samples
Expand Down Expand Up @@ -363,28 +351,39 @@ void RecorderConfiguration::load_dds_configuration_(
// Get optional allowlist
if (YamlReader::is_tag_present(yml, ALLOWLIST_TAG))
{
allowlist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, ALLOWLIST_TAG, version);
ddspipe_configuration.allowlist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, ALLOWLIST_TAG,
version);

// Add to allowlist always the type object topic
WildcardDdsFilterTopic internal_topic;
internal_topic.topic_name.set_value(TYPE_OBJECT_TOPIC_NAME);
allowlist.insert(
ddspipe_configuration.allowlist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(internal_topic));
}

/////
// Get optional blocklist
if (YamlReader::is_tag_present(yml, BLOCKLIST_TAG))
{
blocklist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, BLOCKLIST_TAG, version);
ddspipe_configuration.blocklist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, BLOCKLIST_TAG,
version);
}

/////
// Get optional topics
if (YamlReader::is_tag_present(yml, TOPICS_TAG))
{
const auto& manual_topics = YamlReader::get_list<ManualTopic>(yml, TOPICS_TAG, version);
ddspipe_configuration.manual_topics =
std::vector<ManualTopic>(manual_topics.begin(), manual_topics.end());
}

/////
// Get optional builtin topics
if (YamlReader::is_tag_present(yml, BUILTIN_TAG))
{
// WARNING: Parse builtin topics AFTER specs and recorder, as some topic-specific default values are set there
builtin_topics = YamlReader::get_set<utils::Heritable<DistributedTopic>>(yml, BUILTIN_TAG,
ddspipe_configuration.builtin_topics = YamlReader::get_set<utils::Heritable<DistributedTopic>>(yml, BUILTIN_TAG,
version);
}
}
Expand Down
Loading

0 comments on commit 0d0c59a

Please sign in to comment.