Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Implement commands validation #86

Merged
merged 10 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ All user visible changes to this project will be documented in this file. This p



## TBD [0.2.0] · 2019-??-??
## TBD [0.2.0] · 2020-??-??
[0.2.0]: /../../tree/medea-0.2.0

[Milestone](/../../milestone/2) | [Roadmap](/../../issues/27)
Expand Down Expand Up @@ -46,6 +46,11 @@ All user visible changes to this project will be documented in this file. This p
- `rpc.ping_interval` option to configure `Ping`s sending interval ([#75]).
- Testing:
- E2E tests for signalling ([#28]).

### Fixed

- Signalling:
- Room crashing when handling commands with non existent `peer_id` ([#86](/../../pull/86)).

[#28]: /../../pull/28
[#33]: /../../pull/33
Expand Down
191 changes: 94 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions crates/medea-reactive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ mod spec {
#[tokio::test]
async fn when_eq_doesnt_resolve_if_value_is_not_eq() {
let field = Observable::new(9i32);
await_future_with_timeout(
let _ = await_future_with_timeout(
field.when_eq(0i32),
Duration::from_millis(50),
)
Expand Down Expand Up @@ -789,7 +789,7 @@ mod spec {
let mut field = Observable::new(0i32);
let mut subscription_on_changes = field.subscribe();

task::spawn_local(async move {
let _ = task::spawn_local(async move {
for _ in 0..100 {
*field.borrow_mut() += 1;
}
Expand All @@ -814,7 +814,7 @@ mod spec {
let mut field = Observable::new(0i32);
let subscription = field.when(|change| change == &100);

task::spawn_local(async move {
let _ = task::spawn_local(async move {
for _ in 0..100 {
*field.borrow_mut() += 1;
}
Expand All @@ -838,7 +838,7 @@ mod spec {
let mut field = Observable::new(0i32);
let subscription = field.when_eq(100);

task::spawn_local(async move {
let _ = task::spawn_local(async move {
for _ in 0..100 {
*field.borrow_mut() += 1;
}
Expand All @@ -860,15 +860,15 @@ mod spec {
let field = Observable::new(0i32);
let subscription = field.when(|change| change == &100);
drop(field);
subscription.await.err().unwrap();
let _ = subscription.await.err().unwrap();
}

#[tokio::test]
async fn when_eq_returns_dropped_error_on_drop() {
let field = Observable::new(0i32);
let subscription = field.when_eq(100);
drop(field);
subscription.await.err().unwrap();
let _ = subscription.await.err().unwrap();
}

#[tokio::test]
Expand All @@ -884,7 +884,7 @@ mod spec {
let mut field = Observable::new(0i32);
let subscription = field.subscribe();
*field.borrow_mut() = 0;
await_future_with_timeout(
let _ = await_future_with_timeout(
Box::pin(subscription.skip(1).next()),
Duration::from_millis(50),
)
Expand Down Expand Up @@ -969,7 +969,7 @@ mod spec {
let mut subscription = field.subscribe();
assert_eq!(subscription.next().await.unwrap(), 0);

await_future_with_timeout(
let _ = await_future_with_timeout(
Box::pin(subscription.next()),
Duration::from_millis(10),
)
Expand All @@ -982,7 +982,7 @@ mod spec {
let field = ObservableCell::new(0i32);
let when_will_be_5 = field.when_eq(5);

await_future_with_timeout(
let _ = await_future_with_timeout(
Box::pin(when_will_be_5),
Duration::from_millis(10),
)
Expand Down
16 changes: 8 additions & 8 deletions proto/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub struct CloseDescription {

/// WebSocket message from Medea to Jason.
#[dispatchable]
#[cfg_attr(feature = "medea", derive(Serialize, Debug, Clone, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
#[serde(tag = "event", content = "data")]
pub enum Event {
Expand Down Expand Up @@ -243,15 +243,15 @@ pub enum Event {
/// Represents [RTCIceCandidateInit][1] object.
///
/// [1]: https://www.w3.org/TR/webrtc/#dom-rtcicecandidateinit
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct IceCandidate {
pub candidate: String,
pub sdp_m_line_index: Option<u16>,
pub sdp_mid: Option<String>,
}

/// [`Track`] with specified direction.
#[cfg_attr(feature = "medea", derive(Serialize, Debug, Clone, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct Track {
pub id: TrackId,
Expand All @@ -274,7 +274,7 @@ pub struct TrackPatch {
/// [1]: https://developer.mozilla.org/en-US/docs/Web/API/RTCIceServer
/// [2]: https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration
#[derive(Clone, Debug)]
#[cfg_attr(feature = "medea", derive(Serialize, PartialEq))]
#[cfg_attr(feature = "medea", derive(Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct IceServer {
pub urls: Vec<String>,
Expand All @@ -285,7 +285,7 @@ pub struct IceServer {
}

/// Direction of [`Track`].
#[cfg_attr(feature = "medea", derive(Serialize, Debug, Clone, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
// TODO: Use different struct without mids in TracksApplied event.
pub enum Direction {
Expand All @@ -300,18 +300,18 @@ pub enum Direction {
}

/// Type of [`Track`].
#[cfg_attr(feature = "medea", derive(Serialize, Debug, PartialEq, Clone))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub enum MediaType {
Audio(AudioSettings),
Video(VideoSettings),
}

#[cfg_attr(feature = "medea", derive(Serialize, Clone, Debug, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct AudioSettings {}

#[cfg_attr(feature = "medea", derive(Serialize, Clone, Debug, PartialEq))]
#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))]
#[cfg_attr(feature = "jason", derive(Deserialize))]
pub struct VideoSettings {}

Expand Down
14 changes: 12 additions & 2 deletions src/api/client/rpc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@ use medea_client_api_proto::{CloseDescription, Command, Event};
use crate::api::control::MemberId;

/// Newtype for [`Command`] with actix [`Message`] implementation.
#[derive(From, Into, Message)]
#[derive(Message)]
#[rtype(result = "()")]
pub struct CommandMessage(Command);
pub struct CommandMessage {
pub member_id: MemberId,
pub command: Command,
}

impl CommandMessage {
/// Creates [`CommandMessage`].
pub fn new(member_id: MemberId, command: Command) -> Self {
Self { member_id, command }
}
}

/// Newtype for [`Event`] with actix [`Message`] implementation.
#[derive(Debug, From, Into, Message)]
Expand Down
8 changes: 4 additions & 4 deletions src/api/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use actix::{
AsyncContext, ContextFutureSpawner as _, Handler, Message, StreamHandler,
};
use actix_web_actors::ws::{self, CloseCode};
use futures::future::{self, FutureExt as _, LocalBoxFuture};
use futures::future::{FutureExt as _, LocalBoxFuture};
use medea_client_api_proto::{
ClientMsg, CloseDescription, CloseReason, Event, RpcSettings, ServerMsg,
};
Expand Down Expand Up @@ -179,7 +179,7 @@ impl Actor for WsSession {
));
}
};
future::ready(()).into_actor(this)
actix::fut::ready(())
})
.wait(ctx);
}
Expand Down Expand Up @@ -313,7 +313,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
}
Ok(ClientMsg::Command(command)) => {
self.room
.send_command(command)
.send_command(self.member_id.clone(), command)
.into_actor(self)
.spawn(ctx);
}
Expand Down Expand Up @@ -555,7 +555,7 @@ mod test {
.expect_connection_closed()
.returning(|_, _| future::ready(()).boxed_local());

rpc_server.expect_send_command().return_once(|command| {
rpc_server.expect_send_command().return_once(|_, command| {
let _ = CHAN.0.lock().unwrap().take().unwrap().send(command);
future::ready(()).boxed_local()
});
Expand Down
15 changes: 15 additions & 0 deletions src/api/control/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ impl Into<RoomElement> for MemberSpec {
}

impl MemberSpec {
/// Creates [`MemberSpec`].
pub fn new(
pipeline: Pipeline<EndpointId, MemberElement>,
credentials: String,
on_join: Option<CallbackUrl>,
on_leave: Option<CallbackUrl>,
) -> Self {
Self {
pipeline,
credentials,
on_join,
on_leave,
}
}

/// Returns all [`WebRtcPlayEndpoint`]s of this [`MemberSpec`].
pub fn play_endpoints(
&self,
Expand Down
6 changes: 5 additions & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ pub trait RpcServer: Debug + Send {
/// Sends [`Command`].
///
/// [`Command`]:
fn send_command(&self, msg: Command) -> LocalBoxFuture<'static, ()>;
fn send_command(
&self,
member_id: MemberId,
msg: Command,
) -> LocalBoxFuture<'static, ()>;
}

#[cfg(test)]
Expand Down
Loading