Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Fix RpcClient reference leak in Jason (#27) #140

Merged
merged 4 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions jason/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ mod room;

use std::{cell::RefCell, rc::Rc};

use futures::FutureExt as _;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;

use crate::{
media::{MediaManager, MediaManagerHandle},
peer,
rpc::{
ClientDisconnect, RpcClient as _, RpcTransport, WebSocketRpcClient,
ClientDisconnect, RpcClient, RpcTransport, WebSocketRpcClient,
WebSocketRpcTransport,
},
set_panic_hook,
Expand Down Expand Up @@ -59,32 +60,7 @@ impl Jason {
Ok(Rc::new(ws) as Rc<dyn RpcTransport>)
})
})));
let peer_repository = Box::new(peer::Repository::new(Rc::clone(
&self.0.borrow().media_manager,
)));

spawn_local({
let rpc = Rc::clone(&rpc);
let inner = Rc::clone(&self.0);
async move {
let reason = rpc.on_normal_close().await.unwrap_or_else(|_| {
ClientDisconnect::RpcClientUnexpectedlyDropped.into()
});
// TODO: Don't close all rooms when multiple RPC connections
// will be supported.
inner
.borrow_mut()
.rooms
.drain(..)
.for_each(|room| room.close(reason));
inner.borrow_mut().media_manager = Rc::default();
}
});

let room = Room::new(rpc, peer_repository);
let handle = room.new_handle();
self.0.borrow_mut().rooms.push(room);
handle
self.inner_init_room(rpc)
}

/// Returns handle to [`MediaManager`].
Expand All @@ -101,3 +77,32 @@ impl Jason {
});
}
}

impl Jason {
/// Returns [`RoomHandle`] for [`Room`].
pub fn inner_init_room(&self, rpc: Rc<dyn RpcClient>) -> RoomHandle {
let peer_repository = Box::new(peer::Repository::new(Rc::clone(
&self.0.borrow().media_manager,
)));

let inner = self.0.clone();
spawn_local(rpc.on_normal_close().map(move |res| {
// TODO: Don't close all rooms when multiple rpc connections
// will be supported.
let reason = res.unwrap_or_else(|_| {
ClientDisconnect::RpcClientUnexpectedlyDropped.into()
});
inner
.borrow_mut()
.rooms
.drain(..)
.for_each(|room| room.close(reason));
inner.borrow_mut().media_manager = Rc::default();
}));

let room = Room::new(rpc, peer_repository);
let handle = room.new_handle();
self.0.borrow_mut().rooms.push(room);
handle
}
}
10 changes: 7 additions & 3 deletions jason/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ pub trait RpcClient {
/// abnormal close [`RpcClient::on_connection_loss`] will be thrown.
///
/// [`Future`]: std::future::Future
async fn on_normal_close(&self) -> Result<CloseReason, oneshot::Canceled>;
fn on_normal_close(
&self,
) -> LocalBoxFuture<'static, Result<CloseReason, oneshot::Canceled>>;

/// Sets reason, that will be passed to underlying transport when this
/// client will be dropped.
Expand Down Expand Up @@ -640,10 +642,12 @@ impl RpcClient for WebSocketRpcClient {
}
}

async fn on_normal_close(&self) -> Result<CloseReason, oneshot::Canceled> {
fn on_normal_close(
&self,
) -> LocalBoxFuture<'static, Result<CloseReason, oneshot::Canceled>> {
let (tx, rx) = oneshot::channel();
self.0.borrow_mut().on_close_subscribers.push(tx);
rx.await
Box::pin(rx)
}

fn on_connection_loss(&self) -> LocalBoxStream<'static, ()> {
Expand Down
156 changes: 156 additions & 0 deletions jason/tests/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,158 @@
mod connection;
mod room;

use std::rc::Rc;

use futures::{
channel::{mpsc, oneshot},
stream, StreamExt,
};
use medea_client_api_proto::{CloseReason, RpcSettings, ServerMsg};
use medea_jason::{
rpc::{
websocket::{MockRpcTransport, TransportState},
CloseMsg, RpcTransport, WebSocketRpcClient,
},
Jason,
};
use medea_reactive::ObservableCell;
use wasm_bindgen::closure::Closure;
use wasm_bindgen_futures::JsFuture;
use wasm_bindgen_test::*;

use crate::timeout;

wasm_bindgen_test_configure!(run_in_browser);

/// [`ServerMsg::RpcSettings`] which will be sent in the all tests from this
/// module.
const RPC_SETTINGS: ServerMsg = ServerMsg::RpcSettings(RpcSettings {
idle_timeout_ms: 5_000,
ping_interval_ms: 2_000,
});

/// Checks that only one [`Rc`] to the [`RpcClient`] exists.
#[wasm_bindgen_test]
async fn only_one_strong_rpc_rc_exists() {
let jason = Jason::default();
let ws =
Rc::new(WebSocketRpcClient::new(Box::new(move |_| {
Box::pin(async move {
let mut transport = MockRpcTransport::new();
transport.expect_on_message().times(3).returning_st(
move || Box::pin(stream::once(async { RPC_SETTINGS })),
);
transport.expect_send().return_once(|_| Ok(()));
transport
.expect_set_close_reason()
.times(1)
.return_once(|_| ());
transport.expect_on_state_change().return_once_st(move || {
Box::pin(stream::once(async { TransportState::Open }))
});
let transport = Rc::new(transport);
Ok(transport as Rc<dyn RpcTransport>)
})
})));

let room = jason.inner_init_room(ws.clone());
room.on_failed_local_stream(Closure::once_into_js(|| {}).into())
.unwrap();
room.on_connection_loss(Closure::once_into_js(|| {}).into())
.unwrap();
JsFuture::from(room.join("ws://example.com".to_string()))
.await
.unwrap();

assert_eq!(Rc::strong_count(&ws), 2);
}

/// Checks that [`RpcClient`] was dropped on [`JasonHandle::dispose`] call.
#[wasm_bindgen_test]
async fn rpc_dropped_on_jason_dispose() {
let jason = Jason::default();
let (test_tx, mut test_rx) = mpsc::unbounded();
let ws =
Rc::new(WebSocketRpcClient::new(Box::new(move |_| {
let test_tx = test_tx.clone();
Box::pin(async move {
let mut transport = MockRpcTransport::new();
transport.expect_on_message().times(3).returning_st(
move || Box::pin(stream::once(async { RPC_SETTINGS })),
);
transport.expect_send().return_once(|_| Ok(()));
transport.expect_set_close_reason().times(1).return_once(
move |reason| {
test_tx.unbounded_send(reason).unwrap();
},
);
transport.expect_on_state_change().return_once_st(move || {
Box::pin(stream::once(async { TransportState::Open }))
});
let transport = Rc::new(transport);
Ok(transport as Rc<dyn RpcTransport>)
})
})));

let room = jason.inner_init_room(ws);
room.on_failed_local_stream(Closure::once_into_js(|| {}).into())
.unwrap();
room.on_connection_loss(Closure::once_into_js(|| {}).into())
.unwrap();
JsFuture::from(room.join("ws://example.com".to_string()))
.await
.unwrap();

jason.dispose();
timeout(300, test_rx.next()).await.unwrap();
}

/// Tests that [`Room`] will trigger [`RoomHandle::on_close`] callback on
/// [`RpcTransport`] close.
#[wasm_bindgen_test]
async fn room_closes_on_rpc_transport_close() {
let jason = Jason::default();
let on_state_change_mock =
Rc::new(ObservableCell::new(TransportState::Open));
let ws = Rc::new(WebSocketRpcClient::new(Box::new({
let on_state_change_mock = on_state_change_mock.clone();
move |_| {
let on_state_change_mock = on_state_change_mock.clone();
Box::pin(async move {
let mut transport = MockRpcTransport::new();
transport.expect_on_message().times(3).returning_st(
move || Box::pin(stream::once(async { RPC_SETTINGS })),
);
transport.expect_send().return_once(|_| Ok(()));
transport.expect_set_close_reason().return_once(|_| ());
transport
.expect_on_state_change()
.return_once_st(move || on_state_change_mock.subscribe());
let transport = Rc::new(transport);
Ok(transport as Rc<dyn RpcTransport>)
})
}
})));

let mut room = jason.inner_init_room(ws);
room.on_failed_local_stream(Closure::once_into_js(|| {}).into())
.unwrap();
room.on_connection_loss(Closure::once_into_js(|| {}).into())
.unwrap();
JsFuture::from(room.join("ws://example.com".to_string()))
.await
.unwrap();

let (test_tx, test_rx) = oneshot::channel();
let closure = wasm_bindgen::closure::Closure::once_into_js(move || {
test_tx.send(()).unwrap();
});
room.on_close(closure.into()).unwrap();

on_state_change_mock.set(TransportState::Closed(CloseMsg::Normal(
1200,
CloseReason::Finished,
)));

timeout(300, test_rx).await.unwrap().unwrap();
}