From 6e336bcdee17cd74cb6a0d5c872315944b4a4df0 Mon Sep 17 00:00:00 2001 From: Brendon Costa <126128098+bcostdolby@users.noreply.github.com> Date: Tue, 1 Oct 2024 12:56:13 +1000 Subject: [PATCH] refactor: [DIOS-6937] Use safer async wrapper with weak_ptr object lifetimes (#49) --- package.json | 2 +- src/IncomingStreamBridge.i | 12 +++---- src/MediaFrameListenerBridge.i | 3 +- src/rtmp-server_wrap.cxx | 66 +++++++++------------------------- 4 files changed, 25 insertions(+), 58 deletions(-) diff --git a/package.json b/package.json index eebc4f1..8c26485 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "uuid": "^3.3.2" }, "peerDependencies": { - "medooze-media-server-src": "^2.4.2" + "medooze-media-server-src": "^3.0.0" }, "devDependencies": { "@types/node": "^20.8.6", diff --git a/src/IncomingStreamBridge.i b/src/IncomingStreamBridge.i index 2134b88..da23ce2 100644 --- a/src/IncomingStreamBridge.i +++ b/src/IncomingStreamBridge.i @@ -8,10 +8,11 @@ class IncomingStreamBridge : private: static constexpr size_t BaseVideoSSRC = 2; public: + IncomingStreamBridge(v8::Local object, int maxLateOffset = 200, int maxBufferingTime = 400) : - audio(new MediaFrameListenerBridge(loop, 1, false, true)), + audio(MediaFrameListenerBridge::Create(loop, 1, false, true)), videos({ - {0, std::make_shared(loop, BaseVideoSSRC, false, true)} + {0, MediaFrameListenerBridge::Create(loop, BaseVideoSSRC, false, true)} }), mutex(true), maxLateOffset(maxLateOffset), @@ -23,7 +24,7 @@ public: loop.Start(-1); //Create dispatch timer - dispatch = loop.CreateTimer([this](std::chrono::milliseconds now){ + dispatch = loop.CreateTimerUnsafe([this](std::chrono::milliseconds now){ //Iterate over the enqueued packets for(auto it = queue.begin(); it!=queue.end(); it = queue.erase(it)) @@ -55,7 +56,6 @@ public: }); } - virtual ~IncomingStreamBridge() { Log("IncomingStreamBridge::~IncomingStreamBridge()\n"); @@ -134,7 +134,7 @@ public: frame->SetTime(now); //Run on thread - loop.Async([=](...) { + loop.AsyncUnsafe([=](...) { //Check if it is the first time we see the video track if (frame->GetType() == MediaFrame::Video && videos.find(frame->GetSSRC())==videos.end()) { @@ -144,7 +144,7 @@ public: //Log Error("-IncomingStreamBridge::Enqueue() | New multivideotrack received [id:%d,ssrc:%d]\n", id, ssrc); //Add it - videos[id] = std::make_shared(loop, ssrc, false, true); + videos[id] = MediaFrameListenerBridge::Create(loop, ssrc, false, true); //Fire event on main node thread RTMPServerModule::Async([=,cloned=persistent](){ diff --git a/src/MediaFrameListenerBridge.i b/src/MediaFrameListenerBridge.i index cdfd241..68776d9 100644 --- a/src/MediaFrameListenerBridge.i +++ b/src/MediaFrameListenerBridge.i @@ -16,7 +16,6 @@ struct MediaFrameListenerBridge : public MediaFrameListener, public MediaFrameProducer { - MediaFrameListenerBridge(TimeService& timeService, int ssrc); QWORD numFrames; QWORD numPackets; @@ -80,7 +79,7 @@ SHARED_PTR_BEGIN(MediaFrameListenerBridge) { MediaFrameListenerBridgeShared(TimeService& timeService, int ssrc) { - return new std::shared_ptr(new MediaFrameListenerBridge(timeService, ssrc)); + return new std::shared_ptr(MediaFrameListenerBridge::Create(timeService, ssrc)); } SHARED_PTR_TO(RTPIncomingMediaStream) SHARED_PTR_TO(RTPReceiver) diff --git a/src/rtmp-server_wrap.cxx b/src/rtmp-server_wrap.cxx index f67c31f..0196391 100644 --- a/src/rtmp-server_wrap.cxx +++ b/src/rtmp-server_wrap.cxx @@ -2296,7 +2296,7 @@ MediaFrameListenerBridgeShared* MediaFrameListenerBridgeShared_from_proxy(const SWIGINTERN MediaFrameListenerBridgeShared *new_MediaFrameListenerBridgeShared(TimeService &timeService,int ssrc){ - return new std::shared_ptr(new MediaFrameListenerBridge(timeService, ssrc)); + return new std::shared_ptr(MediaFrameListenerBridge::Create(timeService, ssrc)); } SWIGINTERN RTPIncomingMediaStreamShared MediaFrameListenerBridgeShared_toRTPIncomingMediaStream__SWIG(MediaFrameListenerBridgeShared *self){ return std::static_pointer_cast(*self); @@ -2323,10 +2323,11 @@ class IncomingStreamBridge : private: static constexpr size_t BaseVideoSSRC = 2; public: + IncomingStreamBridge(v8::Local object, int maxLateOffset = 200, int maxBufferingTime = 400) : - audio(new MediaFrameListenerBridge(loop, 1, false, true)), + audio(MediaFrameListenerBridge::Create(loop, 1, false, true)), videos({ - {0, std::make_shared(loop, BaseVideoSSRC, false, true)} + {0, MediaFrameListenerBridge::Create(loop, BaseVideoSSRC, false, true)} }), mutex(true), maxLateOffset(maxLateOffset), @@ -2338,7 +2339,7 @@ class IncomingStreamBridge : loop.Start(-1); //Create dispatch timer - dispatch = loop.CreateTimer([this](std::chrono::milliseconds now){ + dispatch = loop.CreateTimerUnsafe([this](std::chrono::milliseconds now){ //Iterate over the enqueued packets for(auto it = queue.begin(); it!=queue.end(); it = queue.erase(it)) @@ -2370,7 +2371,6 @@ class IncomingStreamBridge : }); } - virtual ~IncomingStreamBridge() { Log("IncomingStreamBridge::~IncomingStreamBridge()\n"); @@ -2449,7 +2449,7 @@ class IncomingStreamBridge : frame->SetTime(now); //Run on thread - loop.Async([=](...) { + loop.AsyncUnsafe([=](...) { //Check if it is the first time we see the video track if (frame->GetType() == MediaFrame::Video && videos.find(frame->GetSSRC())==videos.end()) { @@ -2459,7 +2459,7 @@ class IncomingStreamBridge : //Log Error("-IncomingStreamBridge::Enqueue() | New multivideotrack received [id:%d,ssrc:%d]\n", id, ssrc); //Add it - videos[id] = std::make_shared(loop, ssrc, false, true); + videos[id] = MediaFrameListenerBridge::Create(loop, ssrc, false, true); //Fire event on main node thread RTMPServerModule::Async([=,cloned=persistent](){ @@ -7344,47 +7344,6 @@ static void _wrap_delete_FrameDispatchCoordinatorShared(const v8::WeakCallbackIn } -static SwigV8ReturnValue _wrap_new_MediaFrameListenerBridge(const SwigV8Arguments &args) { - SWIGV8_HANDLESCOPE(); - - SWIGV8_OBJECT self = args.Holder(); - TimeService *arg1 = 0 ; - int arg2 ; - void *argp1 = 0 ; - int res1 = 0 ; - int val2 ; - int ecode2 = 0 ; - MediaFrameListenerBridge *result; - if(self->InternalFieldCount() < 1) SWIG_exception_fail(SWIG_ERROR, "Illegal call of constructor _wrap_new_MediaFrameListenerBridge."); - if(args.Length() != 2) SWIG_exception_fail(SWIG_ERROR, "Illegal number of arguments for _wrap_new_MediaFrameListenerBridge."); - res1 = SWIG_ConvertPtr(args[0], &argp1, SWIGTYPE_p_TimeService, 0 ); - if (!SWIG_IsOK(res1)) { - SWIG_exception_fail(SWIG_ArgError(res1), "in method '" "new_MediaFrameListenerBridge" "', argument " "1"" of type '" "TimeService &""'"); - } - if (!argp1) { - SWIG_exception_fail(SWIG_ValueError, "invalid null reference " "in method '" "new_MediaFrameListenerBridge" "', argument " "1"" of type '" "TimeService &""'"); - } - arg1 = reinterpret_cast< TimeService * >(argp1); - ecode2 = SWIG_AsVal_int(args[1], &val2); - if (!SWIG_IsOK(ecode2)) { - SWIG_exception_fail(SWIG_ArgError(ecode2), "in method '" "new_MediaFrameListenerBridge" "', argument " "2"" of type '" "int""'"); - } - arg2 = static_cast< int >(val2); - result = (MediaFrameListenerBridge *)new MediaFrameListenerBridge(*arg1,arg2); - - - - - - SWIGV8_SetPrivateData(self, result, SWIGTYPE_p_MediaFrameListenerBridge, SWIG_POINTER_OWN); - SWIGV8_RETURN(self); - - goto fail; -fail: - SWIGV8_RETURN(SWIGV8_UNDEFINED()); -} - - static void _wrap_MediaFrameListenerBridge_numFrames_set(v8::Local property, v8::Local value, const SwigV8PropertyCallbackInfoVoid &info) { SWIGV8_HANDLESCOPE(); @@ -8627,6 +8586,15 @@ static void _wrap_delete_MediaFrameListenerBridge(const v8::WeakCallbackInfo _exports_FrameDispatchCoordinatorShared_obj = _exports_Fra #endif /* Class: MediaFrameListenerBridge (_exports_MediaFrameListenerBridge) */ SWIGV8_FUNCTION_TEMPLATE _exports_MediaFrameListenerBridge_class_0 = SWIGV8_CreateClassTemplate("MediaFrameListenerBridge"); -_exports_MediaFrameListenerBridge_class_0->SetCallHandler(_wrap_new_MediaFrameListenerBridge); +_exports_MediaFrameListenerBridge_class_0->SetCallHandler(_wrap_new_veto_MediaFrameListenerBridge); _exports_MediaFrameListenerBridge_class_0->Inherit(_exports_MediaFrameListenerBridge_class); #if (SWIG_V8_VERSION < 0x0704) _exports_MediaFrameListenerBridge_class_0->SetHiddenPrototype(true);