Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Implement commands validation #86

Merged
merged 10 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ All user visible changes to this project will be documented in this file. This p



## TBD [0.2.0] · 2019-??-??
## TBD [0.2.0] · 2020-??-??
[0.2.0]: /../../tree/medea-0.2.0

[Milestone](/../../milestone/2) | [Roadmap](/../../issues/27)
Expand All @@ -32,11 +32,11 @@ All user visible changes to this project will be documented in this file. This p
- Signalling:
- Dynamic `Peer`s creation when client connects ([#28]);
- Auto-removing `Peer`s when `Member` disconnects ([#28]);
- Filter `SetIceCandidate` messages without `candidate` ([#50](/../../pull/50));
- Send reason of closing WebSocket connection as [Close](https://tools.ietf.org/html/rfc4566#section-5.14) frame's description ([#58](/../../pull/58));
- Filter `SetIceCandidate` messages without `candidate` ([#50]);
- Send reason of closing WebSocket connection as [Close](https://tools.ietf.org/html/rfc4566#section-5.14) frame's description ([#58]);
- Send `Event::RpcSettingsUpdated` when `Member` connects ([#75]);
- Send relay mode in `Event::PeerCreated` which is used for configuring client's `RtcIceTransportPolicy` ([#79](/../../pull/79));
- Send `Command::UpdateTracks` on `Event::TracksUpdated` ([#81](/../../pull/81)).
- Send relay mode in `Event::PeerCreated` which is used for configuring client's `RtcIceTransportPolicy` ([#79]);
- Send `Command::UpdateTracks` on `Event::TracksUpdated` ([#81]).
- [Coturn] integration:
- [Coturn] sessions destroying ([#84]).
- Configuration:
Expand All @@ -47,11 +47,21 @@ All user visible changes to this project will be documented in this file. This p
- Testing:
- E2E tests for signalling ([#28]).

### Fixed

- Signalling:
- Room crashing when handling commands with non-existent `peer_id` ([#86]).

[#28]: /../../pull/28
[#33]: /../../pull/33
[#50]: /../../pull/50
[#58]: /../../pull/58
[#63]: /../../pull/63
[#75]: /../../pull/75
[#79]: /../../pull/79
[#81]: /../../pull/81
[#84]: /../../pull/84
[#86]: /../../pull/86



Expand Down
191 changes: 94 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions crates/medea-reactive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ mod spec {
#[tokio::test]
async fn when_eq_doesnt_resolve_if_value_is_not_eq() {
let field = Observable::new(9i32);
await_future_with_timeout(
let _ = await_future_with_timeout(
field.when_eq(0i32),
Duration::from_millis(50),
)
Expand Down Expand Up @@ -789,7 +789,7 @@ mod spec {
let mut field = Observable::new(0i32);
let mut subscription_on_changes = field.subscribe();

task::spawn_local(async move {
let _ = task::spawn_local(async move {
for _ in 0..100 {
*field.borrow_mut() += 1;
}
Expand All @@ -814,7 +814,7 @@ mod spec {
let mut field = Observable::new(0i32);
let subscription = field.when(|change| change == &100);

task::spawn_local(async move {
let _ = task::spawn_local(async move {
for _ in 0..100 {
*field.borrow_mut() += 1;
}
Expand All @@ -838,7 +838,7 @@ mod spec {
let mut field = Observable::new(0i32);
let subscription = field.when_eq(100);

task::spawn_local(async move {
let _ = task::spawn_local(async move {
for _ in 0..100 {
*field.borrow_mut() += 1;
}
Expand All @@ -860,15 +860,15 @@ mod spec {
let field = Observable::new(0i32);
let subscription = field.when(|change| change == &100);
drop(field);
subscription.await.err().unwrap();
let _ = subscription.await.err().unwrap();
}

#[tokio::test]
async fn when_eq_returns_dropped_error_on_drop() {
let field = Observable::new(0i32);
let subscription = field.when_eq(100);
drop(field);
subscription.await.err().unwrap();
let _ = subscription.await.err().unwrap();
}

#[tokio::test]
Expand All @@ -884,7 +884,7 @@ mod spec {
let mut field = Observable::new(0i32);
let subscription = field.subscribe();
*field.borrow_mut() = 0;
await_future_with_timeout(
let _ = await_future_with_timeout(
Box::pin(subscription.skip(1).next()),
Duration::from_millis(50),
)
Expand Down Expand Up @@ -969,7 +969,7 @@ mod spec {
let mut subscription = field.subscribe();
assert_eq!(subscription.next().await.unwrap(), 0);

await_future_with_timeout(
let _ = await_future_with_timeout(
Box::pin(subscription.next()),
Duration::from_millis(10),
)
Expand All @@ -982,7 +982,7 @@ mod spec {
let field = ObservableCell::new(0i32);
let when_will_be_5 = field.when_eq(5);

await_future_with_timeout(
let _ = await_future_with_timeout(
Box::pin(when_will_be_5),
Duration::from_millis(10),
)
Expand Down
16 changes: 8 additions & 8 deletions proto/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub struct CloseDescription {

/// WebSocket message from Medea to Jason.
#[dispatchable]
#[cfg_attr(feature = "medea", derive(Serialize, Debug, Clone, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
#[serde(tag = "event", content = "data")]
pub enum Event {
Expand Down Expand Up @@ -243,15 +243,15 @@ pub enum Event {
/// Represents [RTCIceCandidateInit][1] object.
///
/// [1]: https://www.w3.org/TR/webrtc/#dom-rtcicecandidateinit
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct IceCandidate {
pub candidate: String,
pub sdp_m_line_index: Option<u16>,
pub sdp_mid: Option<String>,
}

/// [`Track`] with specified direction.
#[cfg_attr(feature = "medea", derive(Serialize, Debug, Clone, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct Track {
pub id: TrackId,
Expand All @@ -274,7 +274,7 @@ pub struct TrackPatch {
/// [1]: https://developer.mozilla.org/en-US/docs/Web/API/RTCIceServer
/// [2]: https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration
#[derive(Clone, Debug)]
#[cfg_attr(feature = "medea", derive(Serialize, PartialEq))]
#[cfg_attr(feature = "medea", derive(Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct IceServer {
pub urls: Vec<String>,
Expand All @@ -285,7 +285,7 @@ pub struct IceServer {
}

/// Direction of [`Track`].
#[cfg_attr(feature = "medea", derive(Serialize, Debug, Clone, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
// TODO: Use different struct without mids in TracksApplied event.
pub enum Direction {
Expand All @@ -300,18 +300,18 @@ pub enum Direction {
}

/// Type of [`Track`].
#[cfg_attr(feature = "medea", derive(Serialize, Debug, PartialEq, Clone))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub enum MediaType {
Audio(AudioSettings),
Video(VideoSettings),
}

#[cfg_attr(feature = "medea", derive(Serialize, Clone, Debug, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct AudioSettings {}

#[cfg_attr(feature = "medea", derive(Serialize, Clone, Debug, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct VideoSettings {}

Expand Down
20 changes: 18 additions & 2 deletions src/api/client/rpc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,25 @@ use medea_client_api_proto::{CloseDescription, Command, Event};
use crate::api::control::MemberId;

/// Newtype for [`Command`] with actix [`Message`] implementation.
#[derive(From, Into, Message)]
#[derive(Message)]
#[rtype(result = "()")]
pub struct CommandMessage(Command);
pub struct CommandMessage {
/// ID of [`Member`] that sent this [`Command`] to the server.
///
/// [`Member`]: crate::signalling::elements::member::Member
pub member_id: MemberId,

/// Actual [`Command`] being issued.
pub command: Command,
}

impl CommandMessage {
/// Creates new [`CommandMessage`].
#[inline]
pub fn new(member_id: MemberId, command: Command) -> Self {
Self { member_id, command }
}
}

/// Newtype for [`Event`] with actix [`Message`] implementation.
#[derive(Debug, From, Into, Message)]
Expand Down
8 changes: 4 additions & 4 deletions src/api/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use actix::{
AsyncContext, ContextFutureSpawner as _, Handler, Message, StreamHandler,
};
use actix_web_actors::ws::{self, CloseCode};
use futures::future::{self, FutureExt as _, LocalBoxFuture};
use futures::future::{FutureExt as _, LocalBoxFuture};
use medea_client_api_proto::{
ClientMsg, CloseDescription, CloseReason, Event, RpcSettings, ServerMsg,
};
Expand Down Expand Up @@ -179,7 +179,7 @@ impl Actor for WsSession {
));
}
};
future::ready(()).into_actor(this)
actix::fut::ready(())
})
.wait(ctx);
}
Expand Down Expand Up @@ -313,7 +313,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
}
Ok(ClientMsg::Command(command)) => {
self.room
.send_command(command)
.send_command(self.member_id.clone(), command)
.into_actor(self)
.spawn(ctx);
}
Expand Down Expand Up @@ -555,7 +555,7 @@ mod test {
.expect_connection_closed()
.returning(|_, _| future::ready(()).boxed_local());

rpc_server.expect_send_command().return_once(|command| {
rpc_server.expect_send_command().return_once(|_, command| {
let _ = CHAN.0.lock().unwrap().take().unwrap().send(command);
future::ready(()).boxed_local()
});
Expand Down
16 changes: 16 additions & 0 deletions src/api/control/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ impl Into<RoomElement> for MemberSpec {
}

impl MemberSpec {
/// Creates new [`MemberSpec`] with the given parameters.
#[inline]
pub fn new(
pipeline: Pipeline<EndpointId, MemberElement>,
credentials: String,
on_join: Option<CallbackUrl>,
on_leave: Option<CallbackUrl>,
) -> Self {
Self {
pipeline,
credentials,
on_join,
on_leave,
}
}

/// Returns all [`WebRtcPlayEndpoint`]s of this [`MemberSpec`].
pub fn play_endpoints(
&self,
Expand Down
6 changes: 5 additions & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ pub trait RpcServer: Debug + Send {
/// Sends [`Command`].
///
/// [`Command`]:
fn send_command(&self, msg: Command) -> LocalBoxFuture<'static, ()>;
fn send_command(
&self,
member_id: MemberId,
msg: Command,
) -> LocalBoxFuture<'static, ()>;
}

#[cfg(test)]
Expand Down
Loading