Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Send reason of WebSocket close by server #58

Merged
merged 9 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ All user visible changes to this project will be documented in this file. This p
- Signalling:
- Dynamic `Peer`s creation when client connects ([#28]);
- Auto-removing `Peer`s when `Member` disconnects ([#28]);
- Filter `SetIceCandidate` messages without `candidate` ([#50](/../../pull/50)).
- Filter `SetIceCandidate` messages without `candidate` ([#50](/../../pull/50));
- Send reason of closing WebSocket connection as [Close](https://tools.ietf.org/html/rfc4566#section-5.14) frame's description ([#58](/../../pull/58)).
- Testing:
- E2E tests for signalling ([#28]).

Expand Down
3 changes: 2 additions & 1 deletion proto/client-api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ All user visible changes to this project will be documented in this file. This p
### Added

- `TrackId` and `PeerId` types ([#28]);
- `Incrementable` trait ([#28]).
- `Incrementable` trait ([#28]);
- `CloseReason` and `CloseDescription` types ([#58](/../../pull/58)).

[#28]: /../../pull/28

Expand Down
40 changes: 37 additions & 3 deletions proto/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::collections::HashMap;

use derive_more::Display;
use derive_more::{Constructor, Display};
use medea_macro::dispatchable;
use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize};

Expand Down Expand Up @@ -99,6 +99,40 @@ pub enum Command {
},
}

/// Reason of disconnecting Web Client from Media Server.
#[derive(Debug, Deserialize, Serialize)]
pub enum CloseReason {
/// Client session was finished on a server side.
Finished,

/// Old connection was closed due to a client reconnection.
Reconnected,

/// Connection has been inactive for a while and thus considered idle
/// by a server.
Idle,

/// Establishing of connection with a server was rejected on server side.
///
/// Most likely because of incorrect Member credentials.
Rejected,

/// Server internal error has occurred while connecting.
///
/// This close reason is similar to 500 HTTP status code.
InternalError,
}

/// Description which is sent in [Close] WebSocket frame from Media Server
/// to Web Client.
///
/// [Close]: https://tools.ietf.org/html/rfc6455#section-5.5.1
#[derive(Constructor, Debug, Deserialize, Serialize)]
pub struct CloseDescription {
/// Reason of why WebSocket connection has been closed.
pub reason: CloseReason,
}

/// WebSocket message from Medea to Jason.
#[allow(dead_code)]
#[dispatchable]
Expand Down Expand Up @@ -323,8 +357,8 @@ mod test {
\"command\":\"MakeSdpOffer\",\
\"data\":{\
\"peer_id\":77,\
\"sdp_offer\":\"offer\",\
\"mids\":{\"0\":\"1\"}\
\"sdp_offer\":\"offer\",\
\"mids\":{\"0\":\"1\"}\
}\
}";

Expand Down
17 changes: 13 additions & 4 deletions src/api/client/rpc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use actix::Message;
use derive_more::{From, Into};
use futures::Future;
use medea_client_api_proto::{Command, Event};
use medea_client_api_proto::{CloseDescription, Command, Event};

use crate::api::control::MemberId;

Expand All @@ -24,10 +24,19 @@ pub struct EventMessage(Event);
///
/// [`Member`]: crate::signalling::elements::member::Member
pub trait RpcConnection: fmt::Debug + Send {
/// Closes [`RpcConnection`].
/// Closes [`RpcConnection`] and sends [`CloseDescription`] to the client
/// (in WebSocket implementation description will be sent in a [Close]
/// frame).
///
/// No [`RpcConnectionClosed`] signals should be emitted.
/// Always returns success.
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()>>;
///
/// Always succeeds.
///
/// [Close]: https://tools.ietf.org/html/rfc6455#section-5.5.1
fn close(
&mut self,
close_description: CloseDescription,
) -> Box<dyn Future<Item = (), Error = ()>>;

/// Sends [`Event`] to remote [`Member`].
///
Expand Down
16 changes: 15 additions & 1 deletion src/api/client/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ mod test {
use actix_http::{ws::Message, HttpService};
use actix_http_test::{TestServer, TestServerRuntime};
use futures::{future::IntoFuture as _, sink::Sink as _, Stream as _};
use medea_client_api_proto::{CloseDescription, CloseReason};

use crate::{
api::control, conf::Conf, signalling::Room,
Expand Down Expand Up @@ -230,9 +231,22 @@ mod test {
read.into_future()
.map_err(|(e, _)| panic!("{:?}", e))
.map(|(item, _)| {
let description = CloseDescription::new(
CloseReason::Idle,
);
let close_reason = ws::CloseReason {
code: ws::CloseCode::Normal,
description: Some(
serde_json::to_string(
&description,
)
.unwrap(),
),
};

assert_eq!(
Some(ws::Frame::Close(Some(
ws::CloseCode::Normal.into()
close_reason
))),
item
);
Expand Down
56 changes: 36 additions & 20 deletions src/api/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use actix::{
fut::wrap_future, Actor, ActorContext, ActorFuture, Addr, AsyncContext,
Handler, Message, StreamHandler,
};
use actix_web_actors::ws::{self, CloseReason, WebsocketContext};
use actix_web_actors::ws;
use futures::future::Future;
use medea_client_api_proto::{ClientMsg, ServerMsg};
use medea_client_api_proto::{
ClientMsg, CloseDescription, CloseReason, ServerMsg,
};

use crate::{
api::{
Expand Down Expand Up @@ -63,13 +65,7 @@ impl WsSession {
}
}

fn close_normal(&self, ctx: &mut WebsocketContext<Self>) {
ctx.notify(Close {
reason: Some(ws::CloseCode::Normal.into()),
});
}

/// Start watchdog which will drop connection if now-last_activity >
/// Starts watchdog which will drop connection if now-last_activity >
/// idle_timeout.
fn start_watchdog(&mut self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(Duration::new(1, 0), |session, ctx| {
Expand All @@ -87,7 +83,9 @@ impl WsSession {
session.member_id, err,
)
}
session.close_normal(ctx);
ctx.notify(Close::with_normal_code(&CloseDescription::new(
CloseReason::Idle,
)))
}
});
}
Expand Down Expand Up @@ -120,7 +118,9 @@ impl Actor for WsSession {
{:?}",
session.member_id, e
);
session.close_normal(ctx);
ctx.notify(Close::with_normal_code(
&CloseDescription::new(CloseReason::Rejected),
));
}
},
)
Expand All @@ -133,7 +133,9 @@ impl Actor for WsSession {
{:?}",
session.member_id, send_err,
);
session.close_normal(ctx);
ctx.notify(Close::with_normal_code(
&CloseDescription::new(CloseReason::InternalError),
));
},
),
);
Expand All @@ -145,15 +147,20 @@ impl Actor for WsSession {
}

impl RpcConnection for Addr<WsSession> {
/// Closes [`WsSession`] by sending itself "normal closure" close message.
/// Closes [`WsSession`] by sending itself "normal closure" close message
/// with [`CloseDescription`] as description of [Close] frame.
///
/// Never returns error.
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()>> {
///
/// [Close]: https://tools.ietf.org/html/rfc6455#section-5.5.1
fn close(
&mut self,
close_description: CloseDescription,
) -> Box<dyn Future<Item = (), Error = ()>> {
let fut = self
.send(Close {
reason: Some(ws::CloseCode::Normal.into()),
})
.send(Close::with_normal_code(&close_description))
.or_else(|_| Ok(()));

Box::new(fut)
}

Expand All @@ -173,8 +180,17 @@ impl RpcConnection for Addr<WsSession> {

/// Message for closing [`WsSession`].
#[derive(Message)]
pub struct Close {
reason: Option<CloseReason>,
pub struct Close(ws::CloseReason);

impl Close {
/// Creates [`Close`] message with [`ws::CloseCode::Normal`] and provided
/// [`CloseDescription`] as serialized description.
fn with_normal_code(description: &CloseDescription) -> Self {
Self(ws::CloseReason {
code: ws::CloseCode::Normal,
description: Some(serde_json::to_string(&description).unwrap()),
})
}
}

impl Handler<Close> for WsSession {
Expand All @@ -184,7 +200,7 @@ impl Handler<Close> for WsSession {
fn handle(&mut self, close: Close, ctx: &mut Self::Context) {
debug!("Closing WsSession for member {}", self.member_id);
self.closed_by_server = true;
ctx.close(close.reason);
ctx.close(Some(close.0));
ctx.stop();
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/signalling/participants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::{
Future,
};

use medea_client_api_proto::Event;
use medea_client_api_proto::{CloseDescription, CloseReason, Event};

use crate::{
api::{
Expand Down Expand Up @@ -195,7 +195,11 @@ impl ParticipantService {
{
ctx.cancel_future(handler);
}
Box::new(wrap_future(connection.close().then(move |_| Ok(member))))
Box::new(wrap_future(
connection
.close(CloseDescription::new(CloseReason::Reconnected))
.then(move |_| Ok(member)),
))
} else {
Box::new(
wrap_future(self.turn.create(
Expand Down Expand Up @@ -299,7 +303,10 @@ impl ParticipantService {
let mut close_fut = self.connections.drain().fold(
vec![],
|mut futures, (_, mut connection)| {
futures.push(connection.close());
futures.push(
connection
.close(CloseDescription::new(CloseReason::Finished)),
);
futures
},
);
Expand Down