Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Events #103

Merged
merged 40 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7e1b571
Set callbacks
Yadunund Jan 30, 2024
ef11bb2
Implement EventsBase class
Yadunund Jan 31, 2024
eef9b2e
Declare certain event types to be compatible although mechanisms are …
Yadunund Jan 31, 2024
9c79e5a
Call event callbacks once an event is queued
Yadunund Jan 31, 2024
ab592b8
Fix typo preventing setting of user callbacks
Yadunund Jan 31, 2024
9ecfd72
Resolve merge conflict with rolling
Yadunund Feb 8, 2024
d2398e6
Refactor graph cache to track qos (#107)
Yadunund Feb 9, 2024
586af20
Move eventbase definition into event.hpp
Yadunund Feb 9, 2024
cc195f8
Update rmw_zenoh_cpp/src/rmw_event.cpp
Yadunund Feb 19, 2024
ab2877d
Merge branch 'rolling' into yadu/events
Yadunund Feb 19, 2024
1f38007
Store current session id in graph cache
Yadunund Feb 20, 2024
70a6c68
Allow event callback functions to be registered with graph cache
Yadunund Feb 20, 2024
33a2160
Store liveliness::Entity in custom types
Yadunund Feb 20, 2024
f573053
Register qos event callbacks
Yadunund Feb 20, 2024
b4e645a
Remove all lambdas in graph cache
Yadunund Feb 20, 2024
395bd31
Implement matched events
Yadunund Feb 23, 2024
360cc8f
Make liveliness token keyexprs globally unique
Yadunund Feb 23, 2024
e182420
Add node id to all liveliness tokens
Yadunund Feb 26, 2024
5e3df35
Debug race condition
Yadunund Feb 26, 2024
07c27f3
Update qos compatibility check to reflect zenoh's behavior
Yadunund Feb 26, 2024
a5b5acf
Get rid of TopicStats
Yadunund Feb 26, 2024
29f3a79
Replace EventsBase inheritance with DataCallbackManager and EventsMan…
Yadunund Feb 26, 2024
534d9f1
Merge pull request #118 from ros2/yadu/qos_events
Yadunund Feb 26, 2024
e4c4ff7
Comment print statements for now. They will be deleted before merge
Yadunund Feb 26, 2024
6f647bc
Merge branch 'rolling' into yadu/events
Yadunund Feb 28, 2024
fdb1050
Merge branch 'rolling' into yadu/events
Yadunund Feb 28, 2024
0072abf
Log reason for bad reply
Yadunund Feb 28, 2024
e4e60b0
Merge branch 'rolling' into yadu/events
Yadunund Feb 29, 2024
bfd409c
Resolve merge conflicts after gid merge
Yadunund Mar 1, 2024
2054a58
Resolve merge conflicts with main
Yadunund Mar 11, 2024
d723fb6
Remove debug printf statements
Yadunund Mar 12, 2024
e51e475
event_mutex_ is a regular mutex
Yadunund Mar 12, 2024
f768e32
Update todo for TopicData
Yadunund Mar 12, 2024
00e8e1c
Construct TopicData from Entity
Yadunund Mar 12, 2024
79ec141
Restrict query and reply queue sizes
Yadunund Mar 14, 2024
61925b6
Merge branch 'rolling' into yadu/events
Yadunund Mar 21, 2024
47973be
Simplify codepaths for matched events
Yadunund Mar 21, 2024
1f507f7
Avoid copying liveliness::Entity
Yadunund Mar 21, 2024
40ad1fb
Merge branch 'rolling' into yadu/events
Yadunund Mar 21, 2024
d93ad02
Handle 0 history depth for publishers too
Yadunund Mar 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/style.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ jobs:
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: style
- name: uncrustify
run: /ros_entrypoint.sh ament_uncrustify rmw_zenoh_cpp/
- name: cpplint
run: /ros_entrypoint.sh ament_cpplint rmw_zenoh_cpp/
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/event.cpp
src/detail/identifier.cpp
src/detail/graph_cache.cpp
src/detail/guard_condition.cpp
Expand Down
229 changes: 229 additions & 0 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <utility>

#include "event.hpp"

#include "rcutils/logging_macros.h"

#include "rmw/error_handling.h"


///=============================================================================
void EventsBase::set_user_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved

if (callback) {
// Push events arrived before setting the the executor callback.
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
callback_ = callback;
} else {
user_data_ = nullptr;
callback_ = nullptr;
}
}

///=============================================================================
void EventsBase::trigger_user_callback()
{
// Trigger the user provided event callback if available.
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
if (callback_ != nullptr) {
callback_(user_data_, 1);
} else {
++unread_count_;
}
}

///=============================================================================
void EventsBase::event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved

// Set the user callback data
event_callback_[event_id] = callback;
event_data_[event_id] = user_data;

if (callback && event_unread_count_[event_id]) {
// Push events happened before having assigned a callback
callback(user_data, event_unread_count_[event_id]);
event_unread_count_[event_id] = 0;
}
return;
}

///=============================================================================
void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_callback_[event_id] != nullptr) {
event_callback_[event_id](event_data_[event_id], 1);
} else {
++event_unread_count_[event_id];
}
return;
}

///=============================================================================
bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return true;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

return event_queues_[event_id].empty();
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
}

///=============================================================================
void EventsBase::add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::recursive_mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));

// Since we added new data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}

///=============================================================================
void EventsBase::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = condition_variable;
}

///=============================================================================
void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
event_conditions_[event_id] = nullptr;
}

///=============================================================================
void EventsBase::notify_event(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
if (event_conditions_[event_id] != nullptr) {
event_conditions_[event_id]->notify_one();
}
}
150 changes: 150 additions & 0 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__EVENT_HPP_
#define DETAIL__EVENT_HPP_

#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#include "rmw/event.h"
#include "rmw/event_callback_type.h"


///=============================================================================
// A struct that represents an event status in rmw_zenoh.
enum rmw_zenoh_event_type_t
{
// sentinel value
ZENOH_EVENT_INVALID,

// subscription events
ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE,
ZENOH_EVENT_MESSAGE_LOST,
// RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,

// publisher events
// RMW_EVENT_LIVELINESS_LOST,
// RMW_EVENT_OFFERED_DEADLINE_MISSED,
ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE,
// RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE,
ZENOH_EVENT_PUBLICATION_MATCHED,
};

/// Helper value to indicate the maximum index of events supported.
#define ZENOH_EVENT_ID_MAX rmw_zenoh_event_type_t::ZENOH_EVENT_PUBLICATION_MATCHED
Comment on lines +48 to +51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty minor, but our typical style here is to add a "SENTINEL" value at the end of the enum, and check against that instead. So this would change to:

Suggested change
};
/// Helper value to indicate the maximum index of events supported.
#define ZENOH_EVENT_ID_MAX rmw_zenoh_event_type_t::ZENOH_EVENT_PUBLICATION_MATCHED
ZENOH_EVENT_INVALID
};

And then the checks in event.cpp would change to >= ZENOH_EVENT_INVALID.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If we adopt this approach, the array lengths of these members would now be ZENOH_EVENT_INVALID, eg std::condition_variable * event_conditions_[ZENOH_EVENT_INVALID]{nullptr} which presents a readability problem imo. If you think it's valid, I'm happy to make the change as suggested.


// RMW Event types that we support in rmw_zenoh.
static const std::unordered_map<rmw_event_type_t, rmw_zenoh_event_type_t> event_map{
{RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE},
{RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE},
{RMW_EVENT_MESSAGE_LOST, ZENOH_EVENT_MESSAGE_LOST},
{RMW_EVENT_SUBSCRIPTION_MATCHED, ZENOH_EVENT_SUBSCRIPTION_MATCHED},
{RMW_EVENT_PUBLICATION_MATCHED, ZENOH_EVENT_PUBLICATION_MATCHED}
// TODO(clalancette): Implement remaining events
};

///=============================================================================
/// A struct to store status changes which can be mapped to rmw event statuses.
/// The data field can be used to store serialized information for more complex statuses.
struct rmw_zenoh_event_status_t
{
size_t total_count;
size_t total_count_change;
size_t current_count;
std::string data;
};

///=============================================================================
/// Base class to be inherited by entities that support events.
class EventsBase
{
public:
/// @brief Set the user defined callback that should be called when
/// a new message/response/request is received.
/// @param user_data the data that should be passed to the callback.
/// @param callback the callback to be set.
void set_user_callback(const void * user_data, rmw_event_callback_t callback);

/// Trigger the user callback.
void trigger_user_callback();

/// @brief Set the callback to be triggered when the relevant event is triggered.
/// @param event_id the id of the event
/// @param callback the callback to trigger for this event.
/// @param user_data the data to be passed to the callback.
void event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data);

/// @brief Returns true if the event queue is empty.
/// @param event_id the event id whose event queue should be checked.
bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const;

/// Pop the next event in the queue.
/// @param event_id the event id whose queue should be popped.
std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
rmw_zenoh_event_type_t event_id);

/// Add an event status for an event.
/// @param event_id the event id queue to which the status should be added.
void add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event);

/// @brief Attach the condition variable provided by rmw_wait.
/// @param condition_variable to attach.
void attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable);

/// @brief Detach the condition variable provided by rmw_wait.
void detach_event_condition(rmw_zenoh_event_type_t event_id);

private:
/// @brief Trigger the callback for an event.
/// @param event_id the event id whose callback should be triggered.
void trigger_event_callback(rmw_zenoh_event_type_t event_id);

/// Notify once event is added to an event queue.
void notify_event(rmw_zenoh_event_type_t event_id);

/// Mutex to lock when read/writing members.
// The mutex is recursive as add_new_event() invokes `trigger_event_callback()`.
mutable std::recursive_mutex event_mutex_;
/// Mutex to lock for event_condition.
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via set_user_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
/// Count for
size_t unread_count_ {0};
rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0};
// A dequeue of events for each type of event this RMW supports.
std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> event_queues_[ZENOH_EVENT_ID_MAX + 1] {};
const std::size_t event_queue_depth_ = 10;
};

#endif // DETAIL__EVENT_HPP_
Loading
Loading