Skip to content

Commit

Permalink
mavsdk_server: fix crash on mavsdk_server init
Browse files Browse the repository at this point in the history
I saw quite a few crashes when starting up mavsdk_server with many gRPC
subscriptions called at once.

Presumably, the `stream_stop_promises` vector is being re-allocated in
memory when insertions happen, so the iterator where deletions happen is
invalidated. We obviously should not do both at the same time, otherwise
bad things happen (TM).

Signed-off-by: Julian Oes <[email protected]>
  • Loading branch information
julianoes committed May 12, 2024
1 parent 68c5dc2 commit d018094
Show file tree
Hide file tree
Showing 35 changed files with 175 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/mavsdk_server/src/plugins/action/action_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -747,6 +748,7 @@ class ActionServiceImpl final : public rpc::action::ActionService::Service {
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -763,12 +765,14 @@ class ActionServiceImpl final : public rpc::action::ActionService::Service {
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -782,6 +786,7 @@ class ActionServiceImpl final : public rpc::action::ActionService::Service {
LazyPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -789,6 +790,7 @@ class ActionServerServiceImpl final : public rpc::action_server::ActionServerSer
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -805,12 +807,14 @@ class ActionServerServiceImpl final : public rpc::action_server::ActionServerSer
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -824,6 +828,7 @@ class ActionServerServiceImpl final : public rpc::action_server::ActionServerSer
LazyServerPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -233,6 +234,7 @@ class ArmAuthorizerServerServiceImpl final
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -249,12 +251,14 @@ class ArmAuthorizerServerServiceImpl final
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -268,6 +272,7 @@ class ArmAuthorizerServerServiceImpl final
LazyServerPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -432,6 +433,7 @@ class CalibrationServiceImpl final : public rpc::calibration::CalibrationService
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -448,12 +450,14 @@ class CalibrationServiceImpl final : public rpc::calibration::CalibrationService
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -467,6 +471,7 @@ class CalibrationServiceImpl final : public rpc::calibration::CalibrationService
LazyPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
5 changes: 5 additions & 0 deletions src/mavsdk_server/src/plugins/camera/camera_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -1687,6 +1688,7 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -1703,12 +1705,14 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -1722,6 +1726,7 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
LazyPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -2017,6 +2018,7 @@ class CameraServerServiceImpl final : public rpc::camera_server::CameraServerSer
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -2033,12 +2035,14 @@ class CameraServerServiceImpl final : public rpc::camera_server::CameraServerSer
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -2052,6 +2056,7 @@ class CameraServerServiceImpl final : public rpc::camera_server::CameraServerSer
LazyServerPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -297,6 +298,7 @@ class ComponentMetadataServiceImpl final
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -313,12 +315,14 @@ class ComponentMetadataServiceImpl final
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -332,6 +336,7 @@ class ComponentMetadataServiceImpl final
LazyPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -114,6 +115,7 @@ class ComponentMetadataServerServiceImpl final
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -130,12 +132,14 @@ class ComponentMetadataServerServiceImpl final
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -149,6 +153,7 @@ class ComponentMetadataServerServiceImpl final
LazyServerPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
5 changes: 5 additions & 0 deletions src/mavsdk_server/src/plugins/failure/failure_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -259,6 +260,7 @@ class FailureServiceImpl final : public rpc::failure::FailureService::Service {
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -275,12 +277,14 @@ class FailureServiceImpl final : public rpc::failure::FailureService::Service {
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -294,6 +298,7 @@ class FailureServiceImpl final : public rpc::failure::FailureService::Service {
LazyPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -371,6 +372,7 @@ class FollowMeServiceImpl final : public rpc::follow_me::FollowMeService::Servic
void stop()
{
_stopped.store(true);
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto& prom : _stream_stop_promises) {
if (auto handle = prom.lock()) {
handle->set_value();
Expand All @@ -387,12 +389,14 @@ class FollowMeServiceImpl final : public rpc::follow_me::FollowMeService::Servic
handle->set_value();
}
} else {
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
_stream_stop_promises.push_back(prom);
}
}

void unregister_stream_stop_promise(std::shared_ptr<std::promise<void>> prom)
{
std::lock_guard<std::mutex> lock(_stream_stop_mutex);
for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end();
/* ++it */) {
if (it->lock() == prom) {
Expand All @@ -406,6 +410,7 @@ class FollowMeServiceImpl final : public rpc::follow_me::FollowMeService::Servic
LazyPlugin& _lazy_plugin;

std::atomic<bool> _stopped{false};
std::mutex _stream_stop_mutex{};
std::vector<std::weak_ptr<std::promise<void>>> _stream_stop_promises{};
};

Expand Down
Loading

0 comments on commit d018094

Please sign in to comment.