From d0180943a7bb39c214cbfc0c5567e767dece8d9c Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Mon, 13 May 2024 10:00:21 +1200 Subject: [PATCH] mavsdk_server: fix crash on mavsdk_server init I saw quite a few crashes when starting up mavsdk_server with many gRPC subscriptions called at once. Presumably, the `stream_stop_promises` vector is being re-allocated in memory when insertions happen, so the iterator where deletions happen is invalidated. We obviously should not do both at the same time, otherwise bad things happen (TM). Signed-off-by: Julian Oes --- src/mavsdk_server/src/plugins/action/action_service_impl.h | 5 +++++ .../src/plugins/action_server/action_server_service_impl.h | 5 +++++ .../arm_authorizer_server_service_impl.h | 5 +++++ .../src/plugins/calibration/calibration_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/camera/camera_service_impl.h | 5 +++++ .../src/plugins/camera_server/camera_server_service_impl.h | 5 +++++ .../component_metadata/component_metadata_service_impl.h | 5 +++++ .../component_metadata_server_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/failure/failure_service_impl.h | 5 +++++ .../src/plugins/follow_me/follow_me_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/ftp/ftp_service_impl.h | 5 +++++ .../src/plugins/ftp_server/ftp_server_service_impl.h | 5 +++++ .../src/plugins/geofence/geofence_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/gimbal/gimbal_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/gripper/gripper_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/info/info_service_impl.h | 5 +++++ .../src/plugins/log_files/log_files_service_impl.h | 5 +++++ .../src/plugins/log_streaming/log_streaming_service_impl.h | 5 +++++ .../src/plugins/manual_control/manual_control_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/mission/mission_service_impl.h | 5 +++++ .../src/plugins/mission_raw/mission_raw_service_impl.h | 5 +++++ .../mission_raw_server/mission_raw_server_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/mocap/mocap_service_impl.h | 5 +++++ .../src/plugins/offboard/offboard_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/param/param_service_impl.h | 5 +++++ .../src/plugins/param_server/param_server_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/rtk/rtk_service_impl.h | 5 +++++ .../src/plugins/server_utility/server_utility_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/shell/shell_service_impl.h | 5 +++++ .../src/plugins/telemetry/telemetry_service_impl.h | 5 +++++ .../plugins/telemetry_server/telemetry_server_service_impl.h | 5 +++++ .../src/plugins/transponder/transponder_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/tune/tune_service_impl.h | 5 +++++ src/mavsdk_server/src/plugins/winch/winch_service_impl.h | 5 +++++ templates/mavsdk_server/file.j2 | 5 +++++ 35 files changed, 175 insertions(+) diff --git a/src/mavsdk_server/src/plugins/action/action_service_impl.h b/src/mavsdk_server/src/plugins/action/action_service_impl.h index 5d087ea43d..f37369cca0 100644 --- a/src/mavsdk_server/src/plugins/action/action_service_impl.h +++ b/src/mavsdk_server/src/plugins/action/action_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -747,6 +748,7 @@ class ActionServiceImpl final : public rpc::action::ActionService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -763,12 +765,14 @@ class ActionServiceImpl final : public rpc::action::ActionService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -782,6 +786,7 @@ class ActionServiceImpl final : public rpc::action::ActionService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/action_server/action_server_service_impl.h b/src/mavsdk_server/src/plugins/action_server/action_server_service_impl.h index 8c4d16c91b..6498a760f3 100644 --- a/src/mavsdk_server/src/plugins/action_server/action_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/action_server/action_server_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -789,6 +790,7 @@ class ActionServerServiceImpl final : public rpc::action_server::ActionServerSer void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -805,12 +807,14 @@ class ActionServerServiceImpl final : public rpc::action_server::ActionServerSer handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -824,6 +828,7 @@ class ActionServerServiceImpl final : public rpc::action_server::ActionServerSer LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/arm_authorizer_server/arm_authorizer_server_service_impl.h b/src/mavsdk_server/src/plugins/arm_authorizer_server/arm_authorizer_server_service_impl.h index aa625dc8dc..ad9a9b11dd 100644 --- a/src/mavsdk_server/src/plugins/arm_authorizer_server/arm_authorizer_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/arm_authorizer_server/arm_authorizer_server_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -233,6 +234,7 @@ class ArmAuthorizerServerServiceImpl final void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -249,12 +251,14 @@ class ArmAuthorizerServerServiceImpl final handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -268,6 +272,7 @@ class ArmAuthorizerServerServiceImpl final LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/calibration/calibration_service_impl.h b/src/mavsdk_server/src/plugins/calibration/calibration_service_impl.h index 83eed8cab4..e001b0fa18 100644 --- a/src/mavsdk_server/src/plugins/calibration/calibration_service_impl.h +++ b/src/mavsdk_server/src/plugins/calibration/calibration_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -432,6 +433,7 @@ class CalibrationServiceImpl final : public rpc::calibration::CalibrationService void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -448,12 +450,14 @@ class CalibrationServiceImpl final : public rpc::calibration::CalibrationService handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -467,6 +471,7 @@ class CalibrationServiceImpl final : public rpc::calibration::CalibrationService LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/camera/camera_service_impl.h b/src/mavsdk_server/src/plugins/camera/camera_service_impl.h index 368b0fd14c..f0b600c36a 100644 --- a/src/mavsdk_server/src/plugins/camera/camera_service_impl.h +++ b/src/mavsdk_server/src/plugins/camera/camera_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -1687,6 +1688,7 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -1703,12 +1705,14 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -1722,6 +1726,7 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/camera_server/camera_server_service_impl.h b/src/mavsdk_server/src/plugins/camera_server/camera_server_service_impl.h index 947281fe84..b594f2df21 100644 --- a/src/mavsdk_server/src/plugins/camera_server/camera_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/camera_server/camera_server_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -2017,6 +2018,7 @@ class CameraServerServiceImpl final : public rpc::camera_server::CameraServerSer void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -2033,12 +2035,14 @@ class CameraServerServiceImpl final : public rpc::camera_server::CameraServerSer handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -2052,6 +2056,7 @@ class CameraServerServiceImpl final : public rpc::camera_server::CameraServerSer LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/component_metadata/component_metadata_service_impl.h b/src/mavsdk_server/src/plugins/component_metadata/component_metadata_service_impl.h index 45839657aa..ae5860966d 100644 --- a/src/mavsdk_server/src/plugins/component_metadata/component_metadata_service_impl.h +++ b/src/mavsdk_server/src/plugins/component_metadata/component_metadata_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -297,6 +298,7 @@ class ComponentMetadataServiceImpl final void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -313,12 +315,14 @@ class ComponentMetadataServiceImpl final handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -332,6 +336,7 @@ class ComponentMetadataServiceImpl final LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/component_metadata_server/component_metadata_server_service_impl.h b/src/mavsdk_server/src/plugins/component_metadata_server/component_metadata_server_service_impl.h index b00211d453..4ff8a369fa 100644 --- a/src/mavsdk_server/src/plugins/component_metadata_server/component_metadata_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/component_metadata_server/component_metadata_server_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -114,6 +115,7 @@ class ComponentMetadataServerServiceImpl final void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -130,12 +132,14 @@ class ComponentMetadataServerServiceImpl final handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -149,6 +153,7 @@ class ComponentMetadataServerServiceImpl final LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/failure/failure_service_impl.h b/src/mavsdk_server/src/plugins/failure/failure_service_impl.h index d104cb6f22..accc8eaff4 100644 --- a/src/mavsdk_server/src/plugins/failure/failure_service_impl.h +++ b/src/mavsdk_server/src/plugins/failure/failure_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -259,6 +260,7 @@ class FailureServiceImpl final : public rpc::failure::FailureService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -275,12 +277,14 @@ class FailureServiceImpl final : public rpc::failure::FailureService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -294,6 +298,7 @@ class FailureServiceImpl final : public rpc::failure::FailureService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/follow_me/follow_me_service_impl.h b/src/mavsdk_server/src/plugins/follow_me/follow_me_service_impl.h index a72bd3eb2b..bd7441f5f8 100644 --- a/src/mavsdk_server/src/plugins/follow_me/follow_me_service_impl.h +++ b/src/mavsdk_server/src/plugins/follow_me/follow_me_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -371,6 +372,7 @@ class FollowMeServiceImpl final : public rpc::follow_me::FollowMeService::Servic void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -387,12 +389,14 @@ class FollowMeServiceImpl final : public rpc::follow_me::FollowMeService::Servic handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -406,6 +410,7 @@ class FollowMeServiceImpl final : public rpc::follow_me::FollowMeService::Servic LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/ftp/ftp_service_impl.h b/src/mavsdk_server/src/plugins/ftp/ftp_service_impl.h index a818afd9b5..885f834185 100644 --- a/src/mavsdk_server/src/plugins/ftp/ftp_service_impl.h +++ b/src/mavsdk_server/src/plugins/ftp/ftp_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -451,6 +452,7 @@ class FtpServiceImpl final : public rpc::ftp::FtpService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -467,12 +469,14 @@ class FtpServiceImpl final : public rpc::ftp::FtpService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -486,6 +490,7 @@ class FtpServiceImpl final : public rpc::ftp::FtpService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/ftp_server/ftp_server_service_impl.h b/src/mavsdk_server/src/plugins/ftp_server/ftp_server_service_impl.h index 2d12734ec0..f34cb777e1 100644 --- a/src/mavsdk_server/src/plugins/ftp_server/ftp_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/ftp_server/ftp_server_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -110,6 +111,7 @@ class FtpServerServiceImpl final : public rpc::ftp_server::FtpServerService::Ser void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -126,12 +128,14 @@ class FtpServerServiceImpl final : public rpc::ftp_server::FtpServerService::Ser handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -145,6 +149,7 @@ class FtpServerServiceImpl final : public rpc::ftp_server::FtpServerService::Ser LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/geofence/geofence_service_impl.h b/src/mavsdk_server/src/plugins/geofence/geofence_service_impl.h index 5520e52e17..30577e8b00 100644 --- a/src/mavsdk_server/src/plugins/geofence/geofence_service_impl.h +++ b/src/mavsdk_server/src/plugins/geofence/geofence_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -291,6 +292,7 @@ class GeofenceServiceImpl final : public rpc::geofence::GeofenceService::Service void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -307,12 +309,14 @@ class GeofenceServiceImpl final : public rpc::geofence::GeofenceService::Service handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -326,6 +330,7 @@ class GeofenceServiceImpl final : public rpc::geofence::GeofenceService::Service LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/gimbal/gimbal_service_impl.h b/src/mavsdk_server/src/plugins/gimbal/gimbal_service_impl.h index 96257cf9ba..5c4f8aad59 100644 --- a/src/mavsdk_server/src/plugins/gimbal/gimbal_service_impl.h +++ b/src/mavsdk_server/src/plugins/gimbal/gimbal_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -595,6 +596,7 @@ class GimbalServiceImpl final : public rpc::gimbal::GimbalService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -611,12 +613,14 @@ class GimbalServiceImpl final : public rpc::gimbal::GimbalService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -630,6 +634,7 @@ class GimbalServiceImpl final : public rpc::gimbal::GimbalService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/gripper/gripper_service_impl.h b/src/mavsdk_server/src/plugins/gripper/gripper_service_impl.h index 4f113b7b5f..30d0bf2be5 100644 --- a/src/mavsdk_server/src/plugins/gripper/gripper_service_impl.h +++ b/src/mavsdk_server/src/plugins/gripper/gripper_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -178,6 +179,7 @@ class GripperServiceImpl final : public rpc::gripper::GripperService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -194,12 +196,14 @@ class GripperServiceImpl final : public rpc::gripper::GripperService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -213,6 +217,7 @@ class GripperServiceImpl final : public rpc::gripper::GripperService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/info/info_service_impl.h b/src/mavsdk_server/src/plugins/info/info_service_impl.h index cf0155fcbc..c86cf4e5da 100644 --- a/src/mavsdk_server/src/plugins/info/info_service_impl.h +++ b/src/mavsdk_server/src/plugins/info/info_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -450,6 +451,7 @@ class InfoServiceImpl final : public rpc::info::InfoService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -466,12 +468,14 @@ class InfoServiceImpl final : public rpc::info::InfoService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -485,6 +489,7 @@ class InfoServiceImpl final : public rpc::info::InfoService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/log_files/log_files_service_impl.h b/src/mavsdk_server/src/plugins/log_files/log_files_service_impl.h index 4850e43e79..e3b287a51f 100644 --- a/src/mavsdk_server/src/plugins/log_files/log_files_service_impl.h +++ b/src/mavsdk_server/src/plugins/log_files/log_files_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -249,6 +250,7 @@ class LogFilesServiceImpl final : public rpc::log_files::LogFilesService::Servic void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -265,12 +267,14 @@ class LogFilesServiceImpl final : public rpc::log_files::LogFilesService::Servic handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -284,6 +288,7 @@ class LogFilesServiceImpl final : public rpc::log_files::LogFilesService::Servic LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/log_streaming/log_streaming_service_impl.h b/src/mavsdk_server/src/plugins/log_streaming/log_streaming_service_impl.h index e60aa80e78..ea9baab5a8 100644 --- a/src/mavsdk_server/src/plugins/log_streaming/log_streaming_service_impl.h +++ b/src/mavsdk_server/src/plugins/log_streaming/log_streaming_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -205,6 +206,7 @@ class LogStreamingServiceImpl final : public rpc::log_streaming::LogStreamingSer void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -221,12 +223,14 @@ class LogStreamingServiceImpl final : public rpc::log_streaming::LogStreamingSer handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -240,6 +244,7 @@ class LogStreamingServiceImpl final : public rpc::log_streaming::LogStreamingSer LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/manual_control/manual_control_service_impl.h b/src/mavsdk_server/src/plugins/manual_control/manual_control_service_impl.h index 7d7027d227..a0ead913ef 100644 --- a/src/mavsdk_server/src/plugins/manual_control/manual_control_service_impl.h +++ b/src/mavsdk_server/src/plugins/manual_control/manual_control_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -176,6 +177,7 @@ class ManualControlServiceImpl final : public rpc::manual_control::ManualControl void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -192,12 +194,14 @@ class ManualControlServiceImpl final : public rpc::manual_control::ManualControl handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -211,6 +215,7 @@ class ManualControlServiceImpl final : public rpc::manual_control::ManualControl LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/mission/mission_service_impl.h b/src/mavsdk_server/src/plugins/mission/mission_service_impl.h index 9df64925ae..fc09e99d58 100644 --- a/src/mavsdk_server/src/plugins/mission/mission_service_impl.h +++ b/src/mavsdk_server/src/plugins/mission/mission_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -823,6 +824,7 @@ class MissionServiceImpl final : public rpc::mission::MissionService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -839,12 +841,14 @@ class MissionServiceImpl final : public rpc::mission::MissionService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -858,6 +862,7 @@ class MissionServiceImpl final : public rpc::mission::MissionService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/mission_raw/mission_raw_service_impl.h b/src/mavsdk_server/src/plugins/mission_raw/mission_raw_service_impl.h index c8a88b4495..d66427f3a1 100644 --- a/src/mavsdk_server/src/plugins/mission_raw/mission_raw_service_impl.h +++ b/src/mavsdk_server/src/plugins/mission_raw/mission_raw_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -696,6 +697,7 @@ class MissionRawServiceImpl final : public rpc::mission_raw::MissionRawService:: void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -712,12 +714,14 @@ class MissionRawServiceImpl final : public rpc::mission_raw::MissionRawService:: handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -731,6 +735,7 @@ class MissionRawServiceImpl final : public rpc::mission_raw::MissionRawService:: LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/mission_raw_server/mission_raw_server_service_impl.h b/src/mavsdk_server/src/plugins/mission_raw_server/mission_raw_server_service_impl.h index f4b26c86a8..b921a79921 100644 --- a/src/mavsdk_server/src/plugins/mission_raw_server/mission_raw_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/mission_raw_server/mission_raw_server_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -403,6 +404,7 @@ class MissionRawServerServiceImpl final void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -419,12 +421,14 @@ class MissionRawServerServiceImpl final handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -438,6 +442,7 @@ class MissionRawServerServiceImpl final LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/mocap/mocap_service_impl.h b/src/mavsdk_server/src/plugins/mocap/mocap_service_impl.h index 00740cafad..b7b9604213 100644 --- a/src/mavsdk_server/src/plugins/mocap/mocap_service_impl.h +++ b/src/mavsdk_server/src/plugins/mocap/mocap_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -492,6 +493,7 @@ class MocapServiceImpl final : public rpc::mocap::MocapService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -508,12 +510,14 @@ class MocapServiceImpl final : public rpc::mocap::MocapService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -527,6 +531,7 @@ class MocapServiceImpl final : public rpc::mocap::MocapService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/offboard/offboard_service_impl.h b/src/mavsdk_server/src/plugins/offboard/offboard_service_impl.h index e34095ff11..97ffdaa802 100644 --- a/src/mavsdk_server/src/plugins/offboard/offboard_service_impl.h +++ b/src/mavsdk_server/src/plugins/offboard/offboard_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -763,6 +764,7 @@ class OffboardServiceImpl final : public rpc::offboard::OffboardService::Service void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -779,12 +781,14 @@ class OffboardServiceImpl final : public rpc::offboard::OffboardService::Service handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -798,6 +802,7 @@ class OffboardServiceImpl final : public rpc::offboard::OffboardService::Service LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/param/param_service_impl.h b/src/mavsdk_server/src/plugins/param/param_service_impl.h index d0d6ee145d..a3bc87e8aa 100644 --- a/src/mavsdk_server/src/plugins/param/param_service_impl.h +++ b/src/mavsdk_server/src/plugins/param/param_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -469,6 +470,7 @@ class ParamServiceImpl final : public rpc::param::ParamService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -485,12 +487,14 @@ class ParamServiceImpl final : public rpc::param::ParamService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -504,6 +508,7 @@ class ParamServiceImpl final : public rpc::param::ParamService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/param_server/param_server_service_impl.h b/src/mavsdk_server/src/plugins/param_server/param_server_service_impl.h index 1474dd53bf..abd5931988 100644 --- a/src/mavsdk_server/src/plugins/param_server/param_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/param_server/param_server_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -545,6 +546,7 @@ class ParamServerServiceImpl final : public rpc::param_server::ParamServerServic void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -561,12 +563,14 @@ class ParamServerServiceImpl final : public rpc::param_server::ParamServerServic handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -580,6 +584,7 @@ class ParamServerServiceImpl final : public rpc::param_server::ParamServerServic LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/rtk/rtk_service_impl.h b/src/mavsdk_server/src/plugins/rtk/rtk_service_impl.h index d04655e771..bce0229392 100644 --- a/src/mavsdk_server/src/plugins/rtk/rtk_service_impl.h +++ b/src/mavsdk_server/src/plugins/rtk/rtk_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -130,6 +131,7 @@ class RtkServiceImpl final : public rpc::rtk::RtkService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -146,12 +148,14 @@ class RtkServiceImpl final : public rpc::rtk::RtkService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -165,6 +169,7 @@ class RtkServiceImpl final : public rpc::rtk::RtkService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/server_utility/server_utility_service_impl.h b/src/mavsdk_server/src/plugins/server_utility/server_utility_service_impl.h index e4c95aa259..aad18aeead 100644 --- a/src/mavsdk_server/src/plugins/server_utility/server_utility_service_impl.h +++ b/src/mavsdk_server/src/plugins/server_utility/server_utility_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -168,6 +169,7 @@ class ServerUtilityServiceImpl final : public rpc::server_utility::ServerUtility void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -184,12 +186,14 @@ class ServerUtilityServiceImpl final : public rpc::server_utility::ServerUtility handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -203,6 +207,7 @@ class ServerUtilityServiceImpl final : public rpc::server_utility::ServerUtility LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/shell/shell_service_impl.h b/src/mavsdk_server/src/plugins/shell/shell_service_impl.h index 03be94ad5f..4049bd3cbb 100644 --- a/src/mavsdk_server/src/plugins/shell/shell_service_impl.h +++ b/src/mavsdk_server/src/plugins/shell/shell_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -155,6 +156,7 @@ class ShellServiceImpl final : public rpc::shell::ShellService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -171,12 +173,14 @@ class ShellServiceImpl final : public rpc::shell::ShellService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -190,6 +194,7 @@ class ShellServiceImpl final : public rpc::shell::ShellService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/telemetry/telemetry_service_impl.h b/src/mavsdk_server/src/plugins/telemetry/telemetry_service_impl.h index 4e776b70d4..d9c650c74d 100644 --- a/src/mavsdk_server/src/plugins/telemetry/telemetry_service_impl.h +++ b/src/mavsdk_server/src/plugins/telemetry/telemetry_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -3473,6 +3474,7 @@ class TelemetryServiceImpl final : public rpc::telemetry::TelemetryService::Serv void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -3489,12 +3491,14 @@ class TelemetryServiceImpl final : public rpc::telemetry::TelemetryService::Serv handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -3508,6 +3512,7 @@ class TelemetryServiceImpl final : public rpc::telemetry::TelemetryService::Serv LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/telemetry_server/telemetry_server_service_impl.h b/src/mavsdk_server/src/plugins/telemetry_server/telemetry_server_service_impl.h index 22cb738787..b3373204ac 100644 --- a/src/mavsdk_server/src/plugins/telemetry_server/telemetry_server_service_impl.h +++ b/src/mavsdk_server/src/plugins/telemetry_server/telemetry_server_service_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -1636,6 +1637,7 @@ class TelemetryServerServiceImpl final void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -1652,12 +1654,14 @@ class TelemetryServerServiceImpl final handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -1671,6 +1675,7 @@ class TelemetryServerServiceImpl final LazyServerPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/transponder/transponder_service_impl.h b/src/mavsdk_server/src/plugins/transponder/transponder_service_impl.h index e777b93798..c3d831c681 100644 --- a/src/mavsdk_server/src/plugins/transponder/transponder_service_impl.h +++ b/src/mavsdk_server/src/plugins/transponder/transponder_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -358,6 +359,7 @@ class TransponderServiceImpl final : public rpc::transponder::TransponderService void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -374,12 +376,14 @@ class TransponderServiceImpl final : public rpc::transponder::TransponderService handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -393,6 +397,7 @@ class TransponderServiceImpl final : public rpc::transponder::TransponderService LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/tune/tune_service_impl.h b/src/mavsdk_server/src/plugins/tune/tune_service_impl.h index 04559b25ff..940e015724 100644 --- a/src/mavsdk_server/src/plugins/tune/tune_service_impl.h +++ b/src/mavsdk_server/src/plugins/tune/tune_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -248,6 +249,7 @@ class TuneServiceImpl final : public rpc::tune::TuneService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -264,12 +266,14 @@ class TuneServiceImpl final : public rpc::tune::TuneService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -283,6 +287,7 @@ class TuneServiceImpl final : public rpc::tune::TuneService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/src/mavsdk_server/src/plugins/winch/winch_service_impl.h b/src/mavsdk_server/src/plugins/winch/winch_service_impl.h index f2f4ccdd91..fe09af981a 100644 --- a/src/mavsdk_server/src/plugins/winch/winch_service_impl.h +++ b/src/mavsdk_server/src/plugins/winch/winch_service_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -593,6 +594,7 @@ class WinchServiceImpl final : public rpc::winch::WinchService::Service { void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -609,12 +611,14 @@ class WinchServiceImpl final : public rpc::winch::WinchService::Service { handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { @@ -628,6 +632,7 @@ class WinchServiceImpl final : public rpc::winch::WinchService::Service { LazyPlugin& _lazy_plugin; std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises{}; }; diff --git a/templates/mavsdk_server/file.j2 b/templates/mavsdk_server/file.j2 index 2adeb83fb8..e6b43fccff 100644 --- a/templates/mavsdk_server/file.j2 +++ b/templates/mavsdk_server/file.j2 @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -68,6 +69,7 @@ public: {% endfor %} void stop() { _stopped.store(true); + std::lock_guard lock(_stream_stop_mutex); for (auto& prom : _stream_stop_promises) { if (auto handle = prom.lock()) { handle->set_value(); @@ -83,11 +85,13 @@ private: handle->set_value(); } } else { + std::lock_guard lock(_stream_stop_mutex); _stream_stop_promises.push_back(prom); } } void unregister_stream_stop_promise(std::shared_ptr> prom) { + std::lock_guard lock(_stream_stop_mutex); for (auto it = _stream_stop_promises.begin(); it != _stream_stop_promises.end(); /* ++it */) { if (it->lock() == prom) { it = _stream_stop_promises.erase(it); @@ -103,6 +107,7 @@ private: LazyPlugin& _lazy_plugin; {% endif %} std::atomic _stopped{false}; + std::mutex _stream_stop_mutex{}; std::vector>> _stream_stop_promises {}; };