Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Implement configuration #15

Merged
merged 21 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
RUST_LOG=debug
MEDEA_CONF=config.toml
149 changes: 149 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ repository = "https://github.com/instrumentisto/medea"
[dependencies]
actix = "0.7"
actix-web = "0.7"
config = "0.9"
chrono = "0.4"
dotenv = "0.13"
failure = "0.1"
futures = "0.1"
hashbrown = "0.1"
humantime = "1.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
slog = "2.4"
Expand All @@ -24,4 +27,12 @@ slog-stdlog = "3.0"
slog-async = "2.3"
slog-json = "2.3"
slog-scope = "4.1"
failure = "0.1"
smart-default = "0.5"
toml = "0.4"
[dependencies.serde-humantime]
git = "https://github.com/tailhook/serde-humantime"
branch = "serde_wrapper"

[dev-dependencies]
serial_test = "0.2"
serial_test_derive = "0.2"
105 changes: 105 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
###############################
# Common defaults/definitions #
###############################

comma := ,

# Checks two given strings for equality.
eq = $(if $(or $(1),$(2)),$(and $(findstring $(1),$(2)),\
$(findstring $(2),$(1))),1)




######################
# Project parameters #
######################

CARGO_HOME ?= $(strip $(shell dirname $$(dirname $$(which cargo))))
RUST_VER ?= 1.33




###########
# Aliases #
###########

docs: docs.rust


lint: cargo.lint


fmt: cargo.fmt


test: test.unit




##################
# Cargo commands #
##################

# Format Rust sources with rustfmt.
#
# Usage:
# make cargo.fmt [check=(no|yes)]

cargo.fmt:
cargo +nightly fmt --all $(if $(call eq,$(check),yes),-- --check,)


# Lint Rust sources with clippy.
#
# Usage:
# make cargo.lint

cargo.lint:
cargo clippy -- -D clippy::pedantic -D warnings




##########################
# Documentation commands #
##########################

# Generate project documentation of Rust sources.
#
# Usage:
# make docs.rust [open=(yes|no)] [clean=(no|yes)]

docs.rust:
ifeq ($(clean),yes)
@rm -rf target/doc/
endif
cargo +nightly doc $(if $(call eq,$(open),no),,--open)




####################
# Testing commands #
####################

# Run Rust unit tests of project.
#
# Usage:
# make test.unit

test.unit:
cargo test --all




##################
# .PHONY section #
##################

.PHONY: cargo cargo.fmt cargo.lint \
docs docs.rust \
test test.unit
6 changes: 6 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[server]
# Duration, after which remote RPC client will be considered idle if no
# heartbeat messages received.
#
# Default:
# idle_timeout = "10s"
49 changes: 35 additions & 14 deletions src/api/client/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
},
control::Id as MemberId,
},
conf::{Conf, Rpc},
log::prelude::*,
};

Expand Down Expand Up @@ -52,7 +53,11 @@ fn ws_index(
.and_then(move |res| match res {
Ok(_) => ws::start(
&r.drop_state(),
WsSession::new(info.member_id, room),
WsSession::new(
info.member_id,
room,
state.config.idle_timeout,
),
),
Err(MemberNotExists) => Ok(HttpResponse::NotFound().into()),
Err(InvalidCredentials) => Ok(HttpResponse::Forbidden().into()),
Expand All @@ -66,20 +71,26 @@ fn ws_index(
pub struct Context {
/// Repository of all currently existing [`Room`]s in application.
pub rooms: RoomsRepository,

/// Settings of application.
pub config: Rpc,
}

/// Starts HTTP server for handling WebSocket connections of Client API.
pub fn run(rooms: RoomsRepository) {
pub fn run(rooms: RoomsRepository, config: Conf) {
let server_addr = config.server.get_bind_addr();

server::new(move || {
App::with_state(Context {
rooms: rooms.clone(),
config: config.rpc.clone(),
})
.middleware(middleware::Logger::default())
.resource("/ws/{room_id}/{member_id}/{credentials}", |r| {
r.method(http::Method::GET).with(ws_index)
})
})
.bind("0.0.0.0:8080")
.bind(server_addr)
.unwrap()
.start();

Expand All @@ -95,9 +106,9 @@ mod test {
use futures::Stream;
use hashbrown::HashMap;

use crate::api::{
client::{session, Room},
control::Member,
use crate::{
api::{client::Room, control::Member},
conf::{Conf, Server},
};

use super::*;
Expand All @@ -118,18 +129,21 @@ mod test {
}

/// Creates test WebSocket server of Client API which can handle requests.
fn ws_server() -> test::TestServer {
fn ws_server(conf: Conf) -> test::TestServer {
test::TestServer::with_factory(move || {
App::with_state(Context { rooms: room() })
.resource("/ws/{room_id}/{member_id}/{credentials}", |r| {
r.method(http::Method::GET).with(ws_index)
})
App::with_state(Context {
rooms: room(),
config: conf.rpc.clone(),
})
.resource("/ws/{room_id}/{member_id}/{credentials}", |r| {
r.method(http::Method::GET).with(ws_index)
})
})
}

#[test]
fn responses_with_pong() {
let mut server = ws_server();
let mut server = ws_server(Conf::default());
let (read, mut write) =
server.ws_at("/ws/1/1/caller_credentials").unwrap();

Expand All @@ -140,15 +154,22 @@ mod test {

#[test]
fn disconnects_on_idle() {
let mut server = ws_server();
let conf = Conf {
rpc: Rpc {
idle_timeout: Duration::new(1, 0),
},
server: Server::default(),
};

let mut server = ws_server(conf.clone());
let (read, mut write) =
server.ws_at("/ws/1/1/caller_credentials").unwrap();

write.text(r#"{"ping":33}"#);
let (item, read) = server.execute(read.into_future()).unwrap();
assert_eq!(item, Some(ws::Message::Text(r#"{"pong":33}"#.into())));

thread::sleep(session::CLIENT_IDLE_TIMEOUT.add(Duration::from_secs(1)));
thread::sleep(conf.rpc.idle_timeout.add(Duration::from_secs(1)));

let (item, _) = server.execute(read.into_future()).unwrap();
assert_eq!(
Expand Down
24 changes: 13 additions & 11 deletions src/api/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ use crate::{
log::prelude::*,
};

// TODO: via conf
/// Timeout of receiving any WebSocket messages from client.
pub const CLIENT_IDLE_TIMEOUT: Duration = Duration::from_secs(10);

/// Long-running WebSocket connection of Client API.
#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
Expand All @@ -33,11 +29,12 @@ pub struct WsSession {
room: Addr<Room>,

/// Handle for watchdog which checks whether WebSocket client became
/// idle (no `ping` messages received during [`CLIENT_IDLE_TIMEOUT`]).
/// idle (no `ping` messages received during [`idle_timeout`]).
///
/// This one should be renewed on any received WebSocket message
/// from client.
/// This one should be renewed on received `ping` message from client.
idle_handler: Option<SpawnHandle>,
/// Timeout of receiving `ping` messages from client.
idle_timeout: Duration,

/// Indicates whether WebSocket connection is closed by server ot by
/// client.
Expand All @@ -46,11 +43,16 @@ pub struct WsSession {

impl WsSession {
/// Creates new [`WsSession`] for specified [`Member`].
pub fn new(member_id: MemberId, room: Addr<Room>) -> Self {
pub fn new(
member_id: MemberId,
room: Addr<Room>,
idle_timeout: Duration,
) -> Self {
Self {
member_id,
room,
idle_handler: None,
idle_timeout,
closed_by_server: false,
}
}
Expand All @@ -62,7 +64,7 @@ impl WsSession {
}

self.idle_handler =
Some(ctx.run_later(CLIENT_IDLE_TIMEOUT, |sess, ctx| {
Some(ctx.run_later(self.idle_timeout, |sess, ctx| {
info!("WsConnection with member {} is idle", sess.member_id);

let member_id = sess.member_id;
Expand Down Expand Up @@ -110,8 +112,8 @@ impl Actor for WsSession {
.map(|_| ())
.map_err(move |err| {
error!(
"WsSession of member {} failed to join Room, \
because: {:?}",
"WsSession of member {} failed to join Room, because: \
{:?}",
member_id, err,
)
}),
Expand Down
Loading