Skip to content

Commit

Permalink
Apply suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate authored and juanlofer-eprosima committed Sep 18, 2024
1 parent e77bab7 commit e00db76
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 87 deletions.
70 changes: 50 additions & 20 deletions ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,33 @@ class DdsBridge : public Bridge
DDSPIPE_CORE_DllAPI
void disable() noexcept override;

// TODO
/**
* Create a new endpoint in the bridge.
*
* It will call either \c create_writer_and_its_tracks_nts_ or \c create_reader_and_its_track_nts_ depending on the
* \c discovered_endpoint_kind.
*
* @param participant_id: The id of the participant who is creating the endpoint.
* @param discovered_endpoint_kind: The kind of endpoint that has been discovered.
*
* Thread safe.
*/
DDSPIPE_CORE_DllAPI
void create_endpoint(
const types::ParticipantId& participant_id,
const types::EndpointKind& discovered_endpoint_kind);

// TODO
/**
* Remove an endpoint from the bridge.
*
* It will call either \c remove_writer_and_its_tracks_nts_ or \c remove_reader_and_its_track_nts_ depending on the
* \c removed_endpoint_kind.
*
* @param participant_id: The id of the participant who is removing the endpoint.
* @param removed_endpoint_kind: The kind of endpoint that has been removed.
*
* Thread safe.
*/
DDSPIPE_CORE_DllAPI
void remove_endpoint(
const types::ParticipantId& participant_id,
Expand All @@ -105,7 +125,7 @@ class DdsBridge : public Bridge
/**
* Create an IWriter for the new participant.
* Create the IReaders in the IWriter's route.
* Create the Tracks of the IReaderes with the IWriter.
* Create the Tracks of the IReaders with the IWriter.
*
* @param participant_id: The id of the participant who is creating the writer.
*
Expand All @@ -127,45 +147,55 @@ class DdsBridge : public Bridge
const types::ParticipantId& participant_id);

/**
* Remove the IWriter from all the Tracks in the bridge.
* Remove the IReaders and Tracks that don't have any IWriters.
* Remove the IWriter from all the Tracks.
* Remove the IReaders and Tracks without IWriters.
*
* @param participant_id: The id of the participant who is removing the writer.
*/
void remove_writer_and_its_tracks_nts_(
const types::ParticipantId& participant_id) noexcept;

/**
* Remove the IReader and its Track from the bridge.
* Remove the IWriters that don't belong to a Track.
* Remove the IReader and its Track.
* Remove the IWriters that no longer belong to a Track.
*
* @param participant_id: The id of the participant who is removing the reader.
*/
void remove_reader_and_its_track_nts_(
const types::ParticipantId& participant_id) noexcept;

// TODO
/**
* @brief Create a Track for an IReader and its IWriters.
*
* @param id: The id of the participant who is creating the track.
* @param reader: The IReader of the track.
* @param writers: The IWriters of the track.
*/
void create_track_nts_(
const types::ParticipantId& id,
const std::shared_ptr<IReader>& reader,
std::map<types::ParticipantId, std::shared_ptr<IWriter>>& writers);

// TODO
/**
* @brief Create an IWriter for a participant in the topic.
*
* A tailored Topic is created for the participant, depending on the QoS configured for it.
*
* @param participant_id: The id of the participant who is creating the writer.
*/
std::shared_ptr<core::IWriter> create_writer_nts_(
const types::ParticipantId& participant_id);

// TODO
/**
* @brief Create an IReader for a participant in the topic.
*
* A tailored Topic is created for the participant, depending on the QoS configured for it.
*
* @param participant_id: The id of the participant who is creating the reader.
*/
std::shared_ptr<core::IReader> create_reader_nts_(
const types::ParticipantId& participant_id);

// TODO
std::set<types::ParticipantId> readers_in_writers_route_nts_(
const types::ParticipantId& writer_id);

// TODO
std::set<types::ParticipantId> writers_in_readers_route_nts_(
const types::ParticipantId& reader_id);

/**
* @brief Impose the Topic QoS that have been pre-configured for a participant.
*
Expand All @@ -182,8 +212,8 @@ class DdsBridge : public Bridge
utils::Heritable<types::DistributedTopic> topic_;

//! Routes associated to the Topic.
RoutesConfiguration::RoutesMap routes_of_readers_;
RoutesConfiguration::RoutesMap routes_of_writers_;
RoutesConfiguration::RoutesMap writers_in_route_;
RoutesConfiguration::RoutesMap readers_in_route_;

//! Topics that explicitally set a QoS attribute for this participant.
std::vector<types::ManualTopic> manual_topics_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,34 @@ struct RoutesConfiguration : public IConfiguration
/////////////////////////

DDSPIPE_CORE_DllAPI
virtual bool is_valid(
bool is_valid(
utils::Formatter& error_msg) const noexcept override;

DDSPIPE_CORE_DllAPI
bool is_valid(
utils::Formatter& error_msg,
const std::map<types::ParticipantId, bool>& participants) const noexcept;

/**
* @brief Returns the writers in each reader's route.
*
* It returns a map with the readers as keys and the writers in each reader's route as values.
*
* The first time this method is called, it will calculate the routes and store them in the object.
* Subsequent calls will return the stored routes.
*/
DDSPIPE_CORE_DllAPI
RoutesMap routes_of_readers(
const std::map<types::ParticipantId, bool>& participant_ids) const noexcept;

/**
* @brief Returns the readers in each writer's route.
*
* It returns a map with the writers as keys and the readers in each writer's route as values.
*
* The first time this method is called, it will calculate the routes and store them in the object.
* Subsequent calls will return the stored routes.
*/
DDSPIPE_CORE_DllAPI
RoutesMap routes_of_writers(
const std::map<types::ParticipantId, bool>& participant_ids) const noexcept;
Expand Down
92 changes: 28 additions & 64 deletions ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>

#include <cpp_utils/exception/UnsupportedException.hpp>
#include <cpp_utils/Log.hpp>

Expand All @@ -38,8 +40,10 @@ DdsBridge::DdsBridge(
{
logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << ".");

routes_of_readers_ = routes_config.routes_of_readers(participants_database->get_participants_repeater_map());
routes_of_writers_ = routes_config.routes_of_writers(participants_database->get_participants_repeater_map());
const auto& participants_repeater_map = participants_database->get_participants_repeater_map();

writers_in_route_ = routes_config.routes_of_readers(participants_repeater_map);
readers_in_route_ = routes_config.routes_of_writers(participants_repeater_map);

if (remove_unused_entities && topic->topic_discoverer() != DEFAULT_PARTICIPANT_ID)
{
Expand All @@ -49,7 +53,6 @@ DdsBridge::DdsBridge(
{
for (const ParticipantId& id : participants_->get_participants_ids())
{
std::lock_guard<std::mutex> lock(mutex_);
create_reader_and_its_track_nts_(id);
}
}
Expand Down Expand Up @@ -138,17 +141,14 @@ void DdsBridge::create_writer_and_its_tracks_nts_(
{
assert(participant_id != DEFAULT_PARTICIPANT_ID);

// 1. Create the writer
// Create the writer
auto writer = create_writer_nts_(participant_id);

// Save the writer
writers_[participant_id] = writer;

// 2. Find the readers in the writer's route
const auto& readers_in_route = routes_of_writers_[participant_id];

// 3. Find or create the tracks in the writer's route
for (const auto& id : readers_in_route)
// Find or create the tracks in the writer's route
for (const auto& id : readers_in_route_[participant_id])
{
if (tracks_.count(id))
{
Expand All @@ -157,16 +157,12 @@ void DdsBridge::create_writer_and_its_tracks_nts_(
}
else
{
// The track doesn't exist

// 3.1. Create the reader
// The track doesn't exist. Create it.
auto reader = create_reader_nts_(id);

// 3.2. Create a writers set from the writer
std::map<ParticipantId, std::shared_ptr<IWriter>> writers;
writers[participant_id] = writer;

// 3.3. Create the track
create_track_nts_(id, reader, writers);
}
}
Expand All @@ -177,47 +173,29 @@ void DdsBridge::create_reader_and_its_track_nts_(
{
assert(participant_id != DEFAULT_PARTICIPANT_ID);

// 1. Create the reader
// Create the reader
auto reader = create_reader_nts_(participant_id);

// 2. Find the writers in the reader's route
const auto& writers_in_route = routes_of_readers_[participant_id];

// 3. Find or create the writers in the reader's route
std::map<ParticipantId, std::shared_ptr<IWriter>> writers;

for (const auto& id : writers_in_route)
// Create the missing writers in the reader's route
for (const auto& id : writers_in_route_[participant_id])
{
if (writers_.count(id))
{
// The writer already exists. Add it to the reader's track.
writers[id] = writers_[id];
}
else
if (writers_.count(id) == 0)
{
// The writer doesn't exist

// 3.1. Create the writer
auto writer = create_writer_nts_(id);

// 3.2. Save the writer
writers_[id] = writer;

// 3.3. Add the writer to the reader's track
writers[id] = writer;
// The writer doesn't exist. Create it.
writers_[id] = create_writer_nts_(id);
}
}

// 4. Create the track
create_track_nts_(participant_id, reader, writers);
// Create the track
create_track_nts_(participant_id, reader, writers_);
}

void DdsBridge::remove_writer_and_its_tracks_nts_(
const ParticipantId& participant_id) noexcept
{
assert(participant_id != DEFAULT_PARTICIPANT_ID);

// 1. Remove the writer from the tracks and remove the tracks without writers
// Remove the writer from the tracks and remove the tracks without writers
for (const auto& track_it : tracks_)
{
auto& track = track_it.second;
Expand All @@ -230,7 +208,7 @@ void DdsBridge::remove_writer_and_its_tracks_nts_(
}
}

// 2. Remove the writer
// Remove the writer
writers_.erase(participant_id);
}

Expand All @@ -239,35 +217,21 @@ void DdsBridge::remove_reader_and_its_track_nts_(
{
assert(participant_id != DEFAULT_PARTICIPANT_ID);

// 1. Find the writers in the reader's route
const auto& writers_in_route = routes_of_readers_[participant_id];

// 2. Remove the writers that don't belong to another track
for (const auto& writer_id : writers_in_route)
// Remove the writers that don't belong to another track
for (const auto& writer_id : writers_in_route_[participant_id])
{
bool is_writer_in_another_track = false;

for (const auto& track_it : tracks_)
{
if (track_it.first == participant_id)
{
continue;
}

if (track_it.second->has_writer(writer_id))
{
is_writer_in_another_track = true;
break;
}
}
const auto& different_track_doesnt_contain_writer = [&](const auto& track_it)
{
return track_it.first == participant_id || !track_it.second->has_writer(writer_id);
};

if (!is_writer_in_another_track)
if (std::all_of(tracks_.begin(), tracks_.end(), different_track_doesnt_contain_writer))
{
writers_.erase(writer_id);
}
}

// 3. Remove the track
// Remove the track
tracks_.erase(participant_id);
}

Expand Down
4 changes: 2 additions & 2 deletions ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_writers(

if (routes_it != routes.end())
{
// The reader has a route. Add only the writers in the route.
// The reader has a route. Add the reader to the route of the writers in its route.
for (const auto& writer_id : routes_it->second)
{
writers_routes[writer_id].insert(reader_id);
}
}
else
{
// The reader doesn't have a route. Add every writer (+ itself if repeater).
// The reader doesn't have a route. Add the reader to the route of every writer (+ itself if repeater).
for (const auto& it : participant_ids)
{
const auto& writer_id = it.first;
Expand Down

0 comments on commit e00db76

Please sign in to comment.