diff --git a/.env b/.env index 307d41263..14d016efa 100644 --- a/.env +++ b/.env @@ -1,2 +1,8 @@ RUST_LOG=debug MEDEA_CONF=config.toml + +COMPOSE_PROJECT_NAME=medea +COMPOSE_IMAGE_NAME=hub.instrumentisto.com/medea +COMPOSE_IMAGE_VER=dev + +COMPOSE_EXTERNAL_IP=127.0.0.1 diff --git a/Cargo.lock b/Cargo.lock index ce17bec56..04385dea6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,6 +167,11 @@ dependencies = [ "nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ascii" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "atty" version = "0.2.11" @@ -184,13 +189,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "backtrace" +<<<<<<< HEAD version = "0.3.30" +======= +version = "0.3.26" +>>>>>>> orig-dockerize source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", +<<<<<<< HEAD "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize "rustc-demangle 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -220,6 +233,27 @@ dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "bb8" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "bb8-redis" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bb8 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "redis 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "bitflags" version = "1.0.4" @@ -304,6 +338,18 @@ dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "combine" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ascii 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "config" version = "0.9.3" @@ -462,6 +508,11 @@ name = "dtoa" version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "either" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "encoding" version = "0.2.33" @@ -536,7 +587,11 @@ name = "error-chain" version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ +<<<<<<< HEAD "backtrace 0.3.30 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "backtrace 0.3.26 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize ] [[package]] @@ -544,7 +599,11 @@ name = "failure" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ +<<<<<<< HEAD "backtrace 0.3.30 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "backtrace 0.3.26 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize "failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -565,7 +624,11 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +<<<<<<< HEAD "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize "miniz-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "miniz_oxide_c_api 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -608,6 +671,11 @@ dependencies = [ "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "gcc" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "h2" version = "0.1.22" @@ -850,6 +918,8 @@ version = "0.1.0-dev" dependencies = [ "actix 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 0.7.19 (registry+https://github.com/rust-lang/crates.io-index)", + "bb8 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bb8-redis 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -861,7 +931,14 @@ dependencies = [ "medea-client-api-proto 0.1.0-dev", "medea-macro 0.1.0-dev", "newtype_derive 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +<<<<<<< HEAD "serde 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "redis 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize "serde-humantime 0.1.1 (git+https://github.com/tailhook/serde-humantime?branch=serde_wrapper)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "serial_test 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -989,7 +1066,11 @@ version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +<<<<<<< HEAD "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1164,6 +1245,27 @@ dependencies = [ "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rand" version = "0.5.6" @@ -1182,7 +1284,11 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +<<<<<<< HEAD "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand_hc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1280,6 +1386,22 @@ dependencies = [ "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "redis" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "combine 3.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "redox_syscall" version = "0.1.54" @@ -1361,6 +1483,18 @@ dependencies = [ "untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rust-crypto" +version = "0.2.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rust-ini" version = "0.13.0" @@ -1369,6 +1503,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rustc-demangle" version = "0.1.15" +<<<<<<< HEAD +======= +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rustc-serialize" +version = "0.3.24" +>>>>>>> orig-dockerize source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1856,7 +1998,11 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", +<<<<<<< HEAD "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", +======= + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", +>>>>>>> orig-dockerize "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", "signal-hook 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2389,12 +2535,19 @@ dependencies = [ "checksum arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "bc4662175ead9cd84451d5c35070517777949a2ed84551764129cedb88384841" "checksum argon2rs 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3f67b0b6a86dae6e67ff4ca2b6201396074996379fba2b92ff649126f37cb392" "checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71" +"checksum ascii 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a5fc969a8ce2c9c0c4b0429bb8431544f6658283c8326ba5ff8c762b75369335" "checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652" "checksum autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0e49efa51329a5fd37e7c79db4621af617cd4e3e5bc224939808d076077077bf" +<<<<<<< HEAD "checksum backtrace 0.3.30 (registry+https://github.com/rust-lang/crates.io-index)" = "ada4c783bb7e7443c14e0480f429ae2cc99da95065aeab7ee1b81ada0419404f" +======= +"checksum backtrace 0.3.26 (registry+https://github.com/rust-lang/crates.io-index)" = "1a13fc43f04daf08ab4f71e3d27e1fc27fc437d3e95ac0063a796d92fb40f39b" +>>>>>>> orig-dockerize "checksum backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "797c830ac25ccc92a7f8a7b9862bde440715531514594a6154e3d4a54dd769b6" "checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e" "checksum base64 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "489d6c0ed21b11d038c31b6ceccca973e65d73ba3bd8ecb9a2babf5546164643" +"checksum bb8 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac04c3b2d3327a583c9a93b6c5ab4316e6609f5ec84b71b89ebe518e0edbad2" +"checksum bb8-redis 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9817f38c173f0da1581b923b90e66750a090413ad67a20980d5ad64141bab476" "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" "checksum blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "5d6d530bdd2d52966a6d03b7a964add7ae1a288d25214066fd4b600f0f796400" "checksum brotli-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4445dea95f4c2b41cde57cc9fee236ae4dbae88d8fcbdb4750fc1bb5d86aaecd" @@ -2407,6 +2560,7 @@ dependencies = [ "checksum cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "b486ce3ccf7ffd79fdeb678eac06a9e6c09fc88d33836340becb8fffe87c5e33" "checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum combine 3.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680" "checksum config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f9107d78ed62b3fa5a86e7d18e647abed48cfd8f8fab6c72f4cdb982d196f7e6" "checksum console_error_panic_hook 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d976903543e0c48546a91908f21588a680a8c8f984df9a5d69feccb2b2a211" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" @@ -2424,6 +2578,7 @@ dependencies = [ "checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901" "checksum dotenv 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d0a1279c96732bc6800ce6337b6a614697b0e74ae058dc03c62ebeb78b4d86" "checksum dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" +"checksum either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5527cfe0d098f36e3f8839852688e63c8fff1c90b2b405aef730615f9a7bcf7b" "checksum encoding 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "6b0d943856b990d12d3b55b359144ff341533e516d94098b1d3fc1ac666d36ec" "checksum encoding-index-japanese 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)" = "04e8b2ff42e9a05335dbf8b5c6f7567e5591d0d916ccef4e0b1710d32a0d0c91" "checksum encoding-index-korean 1.20141219.5 (registry+https://github.com/rust-lang/crates.io-index)" = "4dc33fb8e6bcba213fe2f14275f0963fd16f0a02c878e3095ecfdf5bee529d81" @@ -2442,7 +2597,12 @@ dependencies = [ "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)" = "a2037ec1c6c1c4f79557762eab1f7eae1f64f6cb418ace90fae88f0942b60139" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" +<<<<<<< HEAD "checksum h2 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "09ae3ecbdc15e379c7356430d7c0c6a44d3a937324999429dcf89e970297e54f" +======= +"checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" +"checksum h2 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "2b53def7bb0253af7718036fe9338c15defd209136819464384f3a553e07481b" +>>>>>>> orig-dockerize "checksum hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3bae29b6653b3412c2e71e9d486db9f9df5d701941d86683005efb9f2d28e3da" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" "checksum hostname 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "21ceb46a83a85e824ef93669c8b390009623863b5c195d1ba747292c0c72f94e" @@ -2502,6 +2662,8 @@ dependencies = [ "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" "checksum quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faf4799c5d274f3868a4aae320a0a182cbd2baee377b378f080e16a23e9d80db" +"checksum rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)" = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +"checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" "checksum rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" "checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" "checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" @@ -2514,6 +2676,7 @@ dependencies = [ "checksum rand_pcg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44" "checksum rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +"checksum redis 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b543b95de413ac964ca609e91fd9fd58419312e69988fb197f3ff8770312a1af" "checksum redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)" = "12229c14a0f65c4f1cb046a3b52047cdd9da1f4b30f8a39c5063c8bae515e252" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828" @@ -2523,8 +2686,13 @@ dependencies = [ "checksum regex-syntax 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "dcfd8681eebe297b81d98498869d4aae052137651ad7b96822f09ceb690d0a96" "checksum resolv-conf 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b263b4aa1b5de9ffc0054a2386f96992058bb6870aab516f8cdeb8a667d56dcb" "checksum ring 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2c4db68a2e35f3497146b7e4563df7d4773a2433230c5e4b448328e31740458a" +"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" "checksum rust-ini 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3e52c148ef37f8c375d49d5a73aa70713125b7f19095948a923f80afdeb22ec2" "checksum rustc-demangle 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a7f4dccf6f4891ebcc0c39f9b6eb1a83b9bf5d747cb439ec6fba4f3b977038af" +<<<<<<< HEAD +======= +"checksum rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)" = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" +>>>>>>> orig-dockerize "checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" "checksum ryu 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "b96a9549dc8d48f2c283938303c4b5a77aa29bfbc5b54b084fb1630408899a8f" diff --git a/Cargo.toml b/Cargo.toml index ea36a59f9..a88bdf411 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,8 @@ lto = "thin" [dependencies] actix = "0.7" actix-web = "0.7" +bb8 = "0.3.0" +bb8-redis = "0.3.0" config = "0.9" chrono = "0.4" dotenv = "0.13" @@ -33,7 +35,10 @@ macro-attr = "0.2" medea-client-api-proto = { path = "proto/client-api", features = ["medea"] } medea-macro = { path = "crates/medea-macro" } newtype_derive = "0.1" -serde = { version = "1.0", features = ['derive'] } +rand = "0.6" +rust-crypto = "0.2" +redis = "0.10.0" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" slog = "2.4" slog-envlogger = "2.1" @@ -43,6 +48,7 @@ slog-json = "2.3" slog-scope = "4.1" smart-default = "0.5" toml = "0.4" +tokio = "0.1" [dependencies.serde-humantime] git = "https://github.com/tailhook/serde-humantime" branch = "serde_wrapper" @@ -50,4 +56,3 @@ toml = "0.4" [dev-dependencies] serial_test = "0.2" serial_test_derive = "0.2" -tokio = "0.1" diff --git a/Makefile b/Makefile index fc5961bab..f08b7e0e4 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ fmt: cargo.fmt # make up up: - $(MAKE) -j2 up.jason up.medea + $(MAKE) -j3 up.coturn up.jason up.medea test: test.unit @@ -170,6 +170,15 @@ up.medea: cargo run --bin medea +# Run Coturn and Redis. +# +# Usage: +# make up.coturn + +up.coturn: + docker-compose up + + ################## diff --git a/_dev/coturn/redis.conf b/_dev/coturn/redis.conf new file mode 100644 index 000000000..0d351d675 --- /dev/null +++ b/_dev/coturn/redis.conf @@ -0,0 +1,5 @@ +maxmemory 16mb +maxmemory-policy allkeys-lfu +requirepass turn +timeout 0 +tcp-keepalive 60 diff --git a/_dev/coturn/turnserver.conf b/_dev/coturn/turnserver.conf new file mode 100644 index 000000000..71a5b85eb --- /dev/null +++ b/_dev/coturn/turnserver.conf @@ -0,0 +1,10 @@ +lt-cred-mech +fingerprint +no-cli +no-tls +no-dtls +min-port=49160 +max-port=49200 +realm=medea +redis-userdb="ip=127.0.0.1 port=6379 dbname=0 password=turn" +user=USER:PASS diff --git a/config.toml b/config.toml index 500fa0dd8..ed5d3a53b 100644 --- a/config.toml +++ b/config.toml @@ -9,9 +9,6 @@ # Default: # bind_port = 8080 - - - [rpc] # Duration, after which remote RPC client will be considered idle if no # heartbeat messages received. @@ -24,3 +21,45 @@ # # Default: # reconnect_timeout = "10s" + +[turn] +# Turn server IP address. +# +# Default: +# ip = "0.0.0.0" + +# Turn server port. +# +# Default: +# port = 3478 + +# Static user on Turn server. +# +# Default: +# user = "USER" + +# Static user password on Turn server. +# +# Default: +# pass = "PASS" + +[turn.db.redis] +# IP address to bind Redis server to. +# +# Default: +# ip = "0.0.0.0" + +# Redis database port. +# +# Default: +# port = 6379 + +# Redis database password. +# +# Default: +# pass = "turn" + +# Redis database number. +# +# Default: +# db_number = 0 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..ed9d6ac7a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,21 @@ +version: '3.4' + +services: + coturn-db: + container_name: ${COMPOSE_PROJECT_NAME}-coturn-db + image: redis:alpine + command: ["redis-server", "/etc/redis.conf"] + ports: + - "6379:6379" # coturn redis + volumes: + - ./_dev/coturn/redis.conf:/etc/redis.conf:ro + coturn: + container_name: ${COMPOSE_PROJECT_NAME}-coturn + image: instrumentisto/coturn:4.5 + depends_on: ["coturn-db"] + command: + - --log-file=stdout + volumes: + - ./_dev/coturn/turnserver.conf:/etc/coturn/turnserver.conf:ro + - ./.cache/coturn/data:/var/lib/coturn + network_mode: host diff --git a/proto/client-api/src/lib.rs b/proto/client-api/src/lib.rs index 3ae48b263..0fddeef4c 100644 --- a/proto/client-api/src/lib.rs +++ b/proto/client-api/src/lib.rs @@ -57,6 +57,7 @@ pub enum Event { peer_id: u64, sdp_offer: Option, tracks: Vec, + ice_servers: Vec, }, /// Media Server notifies Web Client about necessity to apply specified SDP /// Answer to Web Client's RTCPeerConnection. @@ -92,6 +93,17 @@ pub struct Track { pub media_type: MediaType, } +/// Representation of [`iceServers`] field of [`RTCConfiguration`] dictionary. +#[derive(Clone, Debug, Deserialize, Serialize)] +#[cfg_attr(test, derive(PartialEq))] +pub struct IceServer { + pub urls: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub username: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub credential: Option, +} + /// Direction of [`Track`]. #[cfg_attr(feature = "medea", derive(Serialize))] #[cfg_attr(feature = "jason", derive(Deserialize))] diff --git a/signaling_test.html b/signaling_test.html index db8c8f7af..d29321d8e 100644 --- a/signaling_test.html +++ b/signaling_test.html @@ -65,7 +65,8 @@ } }); - pcs[msg.peer_id] = new RTCPeerConnection(); + var configuration = { iceServers: msg.ice_servers }; + pcs[msg.peer_id] = new RTCPeerConnection(configuration); var pc = pcs[msg.peer_id]; pc.ontrack = function (e) { diff --git a/src/api/client/rpc_connection.rs b/src/api/client/rpc_connection.rs index d0f343775..8d40a3713 100644 --- a/src/api/client/rpc_connection.rs +++ b/src/api/client/rpc_connection.rs @@ -1,6 +1,6 @@ //! [`RpcConnection`] with related messages. -use std::fmt; +use core::fmt; use actix::Message; use futures::Future; @@ -162,6 +162,7 @@ pub mod test { peer_id, sdp_offer, tracks: _, + ice_servers: _, } => { match sdp_offer { Some(_) => self.room.do_send(CommandMessage::from( diff --git a/src/api/client/server.rs b/src/api/client/server.rs index 604976eb7..8b27d0675 100644 --- a/src/api/client/server.rs +++ b/src/api/client/server.rs @@ -81,7 +81,7 @@ pub struct Context { /// Starts HTTP server for handling WebSocket connections of Client API. pub fn run(rooms: RoomsRepository, config: Conf) { - let server_addr = config.server.get_bind_addr(); + let server_addr = config.server.bind_addr(); server::new(move || { App::with_state(Context { @@ -110,9 +110,10 @@ mod test { use crate::{ api::control::Member, - conf::{Conf, Server}, + conf::{Conf, Server, Turn}, media::create_peers, signalling::Room, + turn::new_turn_auth_service_mock, }; use super::*; @@ -120,11 +121,25 @@ mod test { /// Creates [`RoomsRepository`] for tests filled with a single [`Room`]. fn room(conf: Rpc) -> RoomsRepository { let members = hashmap! { - 1 => Member{id: 1, credentials: "caller_credentials".into()}, - 2 => Member{id: 2, credentials: "responder_credentials".into()}, + 1 => Member{ + id: 1, + credentials: "caller_credentials".into(), + ice_user: None + }, + 2 => Member{ + id: 2, + credentials: "responder_credentials".into(), + ice_user: None + }, }; let room = Arbiter::start(move |_| { - Room::new(1, members, create_peers(1, 2), conf.reconnect_timeout) + Room::new( + 1, + members, + create_peers(1, 2), + conf.reconnect_timeout, + new_turn_auth_service_mock(), + ) }); let rooms = hashmap! {1 => room}; RoomsRepository::new(rooms) @@ -161,6 +176,7 @@ mod test { idle_timeout: Duration::new(2, 0), reconnect_timeout: Default::default(), }, + turn: Turn::default(), server: Server::default(), }; diff --git a/src/api/control/member.rs b/src/api/control/member.rs index 880e666aa..318483547 100644 --- a/src/api/control/member.rs +++ b/src/api/control/member.rs @@ -1,5 +1,7 @@ //! Member definitions and implementations. +use crate::media::IceUser; + /// ID of [`Member`]. pub type Id = u64; @@ -11,4 +13,18 @@ pub struct Member { /// Credentials to authorize [`Member`] with. pub credentials: String, + + /// Turn server credentials. + pub ice_user: Option, +} + +impl Member { + /// Returns new instance of [`Memebr`] with given credentials. + pub fn new(id: Id, credentials: String) -> Self { + Self { + id, + credentials, + ice_user: None, + } + } } diff --git a/src/conf/mod.rs b/src/conf/mod.rs index c509f56a9..e1cd303ec 100644 --- a/src/conf/mod.rs +++ b/src/conf/mod.rs @@ -2,6 +2,7 @@ pub mod rpc; pub mod server; +pub mod turn; use std::env; @@ -9,7 +10,11 @@ use config::{Config, Environment, File}; use failure::Error; use serde::{Deserialize, Serialize}; -pub use self::{rpc::Rpc, server::Server}; +pub use self::{ + rpc::Rpc, + server::Server, + turn::{Redis, Turn}, +}; /// CLI argument that is responsible for holding application configuration /// file path. @@ -22,11 +27,12 @@ static APP_CONF_PATH_ENV_VAR_NAME: &str = "MEDEA_CONF"; #[derive(Clone, Debug, Deserialize, Serialize, Default)] #[serde(default)] pub struct Conf { - /// RPC connection settings. - pub rpc: rpc::Rpc, - /// HTTP server settings. - pub server: server::Server, + pub rpc: Rpc, + /// RPC connection settings. + pub server: Server, + /// TURN server settings. + pub turn: Turn, } impl Conf { @@ -71,8 +77,9 @@ where #[cfg(test)] mod tests { + use std::{fs, net::Ipv4Addr, time::Duration}; + use serial_test_derive::serial; - use std::{fs, time::Duration}; use super::*; @@ -186,4 +193,46 @@ mod tests { assert_eq!(file_env_config.rpc.idle_timeout, Duration::from_secs(48)); } + + #[test] + #[serial] + fn redis_conf_test() { + let default_conf = Conf::default(); + + env::set_var("MEDEA_TURN.DB.REDIS.IP", "5.5.5.5"); + env::set_var("MEDEA_TURN.DB.REDIS.PORT", "1234"); + + let env_conf = Conf::parse().unwrap(); + + assert_ne!(default_conf.turn.db.redis.ip, env_conf.turn.db.redis.ip); + assert_ne!( + default_conf.turn.db.redis.port, + env_conf.turn.db.redis.port + ); + + assert_eq!(env_conf.turn.db.redis.ip, Ipv4Addr::new(5, 5, 5, 5)); + assert_eq!(env_conf.turn.db.redis.port, 1234); + assert_eq!( + env_conf.turn.db.redis.addr(), + "5.5.5.5:1234".parse().unwrap() + ); + } + + #[test] + #[serial] + fn turn_conf_test() { + let default_conf = Conf::default(); + + env::set_var("MEDEA_TURN.IP", "5.5.5.5"); + env::set_var("MEDEA_TURN.PORT", "1234"); + + let env_conf = Conf::parse().unwrap(); + + assert_ne!(default_conf.turn.ip, env_conf.turn.ip); + assert_ne!(default_conf.turn.port, env_conf.turn.port); + + assert_eq!(env_conf.turn.ip, Ipv4Addr::new(5, 5, 5, 5)); + assert_eq!(env_conf.turn.port, 1234); + assert_eq!(env_conf.turn.addr(), "5.5.5.5:1234".parse().unwrap()); + } } diff --git a/src/conf/server.rs b/src/conf/server.rs index bd89c3965..ebba4a977 100644 --- a/src/conf/server.rs +++ b/src/conf/server.rs @@ -20,7 +20,7 @@ pub struct Server { impl Server { /// Builds [`SocketAddr`] from `bind_ip` and `bind_port`. #[inline] - pub fn get_bind_addr(&self) -> SocketAddr { + pub fn bind_addr(&self) -> SocketAddr { (self.bind_ip, self.bind_port) .to_socket_addrs() .unwrap() @@ -58,7 +58,7 @@ mod server_spec { assert_eq!(env_conf.server.bind_ip, Ipv4Addr::new(5, 5, 5, 5)); assert_eq!(env_conf.server.bind_port, 1234); assert_eq!( - env_conf.server.get_bind_addr(), + env_conf.server.bind_addr(), "5.5.5.5:1234".parse().unwrap(), ); } diff --git a/src/conf/turn.rs b/src/conf/turn.rs new file mode 100644 index 000000000..806ee65ad --- /dev/null +++ b/src/conf/turn.rs @@ -0,0 +1,74 @@ +//! STUN/TURN server settings. + +use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs as _}; + +use serde::{Deserialize, Serialize}; +use smart_default::*; + +/// STUN/TURN server settings. +#[derive(Clone, Debug, Deserialize, Serialize, SmartDefault)] +#[serde(default)] +pub struct Turn { + /// Database settings + pub db: Db, + /// IP address STUN/TURN server. Defaults to `0.0.0.0`. + #[default(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + pub ip: IpAddr, + /// Port to connect TURN server. Defaults to `3478`. + #[default(3478)] + pub port: u16, + /// Username for authorize on TURN server. + #[default(String::from("USER"))] + pub user: String, + /// Password for authorize on TURN server. + #[default(String::from("PASS"))] + pub pass: String, +} + +impl Turn { + /// Builds [`SocketAddr`] from `ip` and `port`. + #[inline] + pub fn addr(&self) -> SocketAddr { + (self.ip, self.port) + .to_socket_addrs() + .unwrap() + .next() + .unwrap() + } +} + +#[derive(Clone, Debug, Deserialize, Serialize, SmartDefault)] +#[serde(default)] +pub struct Db { + /// Redis server settings. + pub redis: Redis, +} + +#[derive(Clone, Debug, Deserialize, Serialize, SmartDefault)] +#[serde(default)] +pub struct Redis { + /// IP address Redis server. Defaults to `0.0.0.0`. + #[default(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + pub ip: IpAddr, + /// Port to connect Redis server. Defaults to `6379`. + #[default(6379)] + pub port: u16, + /// Password for authorize on Redis server. + #[default(String::from("turn"))] + pub pass: String, + /// The database number to use. This is usually 0. + #[default(0)] + pub db_number: i64, +} + +impl Redis { + /// Builds [`SocketAddr`] from `ip` and `port`. + #[inline] + pub fn addr(&self) -> SocketAddr { + (self.ip, self.port) + .to_socket_addrs() + .unwrap() + .next() + .unwrap() + } +} diff --git a/src/main.rs b/src/main.rs index aab1c2a74..8009c9f05 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,11 +7,13 @@ pub mod conf; pub mod log; pub mod media; pub mod signalling; +pub mod turn; use actix::prelude::*; use dotenv::dotenv; use log::prelude::*; +use crate::turn::new_turn_auth_service; use crate::{ api::{client::server, control::Member}, conf::Conf, @@ -32,11 +34,20 @@ fn main() { info!("{:?}", config); let members = hashmap! { - 1 => Member{id: 1, credentials: "caller_credentials".to_owned()}, - 2 => Member{id: 2, credentials: "responder_credentials".to_owned()}, + 1 => Member::new(1, "caller_credentials".to_owned()), + 2 => Member::new(2, "responder_credentials".to_owned()), }; let peers = create_peers(1, 2); - let room = Room::new(1, members, peers, config.rpc.reconnect_timeout); + + let turn_auth_service = + new_turn_auth_service(&config).expect("Unable to start turn service"); + let room = Room::new( + 1, + members, + peers, + config.rpc.reconnect_timeout, + turn_auth_service, + ); let room = Arbiter::start(move |_| room); let rooms = hashmap! {1 => room}; let rooms_repo = RoomsRepository::new(rooms); diff --git a/src/media/ice_user.rs b/src/media/ice_user.rs new file mode 100644 index 000000000..7055bed1f --- /dev/null +++ b/src/media/ice_user.rs @@ -0,0 +1,35 @@ +use std::net::SocketAddr; + +use medea_client_api_proto::IceServer; + +/// Credentials on Turn server. +#[derive(Clone, Debug)] +pub struct IceUser { + /// Address of Turn server. + pub address: SocketAddr, + /// Username for authorization. + pub name: String, + /// Password for authorization. + pub pass: String, +} + +impl IceUser { + pub fn to_servers_list(&self) -> Vec { + let stun_url = vec![format!("stun:{}", self.address)]; + let stun = IceServer { + urls: stun_url, + username: None, + credential: None, + }; + let turn_urls = vec![ + format!("turn:{}", self.address), + format!("turn:{}?transport=tcp", self.address), + ]; + let turn = IceServer { + urls: turn_urls, + username: Some(self.name.clone()), + credential: Some(self.pass.clone()), + }; + vec![stun, turn] + } +} diff --git a/src/media/mod.rs b/src/media/mod.rs index 3d7e8a4df..7b4962a5c 100644 --- a/src/media/mod.rs +++ b/src/media/mod.rs @@ -1,8 +1,10 @@ //! Representations of media and media connection establishment objects. +pub mod ice_user; pub mod peer; pub mod track; pub use self::{ + ice_user::IceUser, peer::{ create_peers, Id as PeerId, New, Peer, PeerStateError, PeerStateMachine, WaitLocalHaveRemote, WaitLocalSdp, WaitRemoteSdp, diff --git a/src/signalling/participants.rs b/src/signalling/participants.rs index 3f0294028..6c722bb13 100644 --- a/src/signalling/participants.rs +++ b/src/signalling/participants.rs @@ -1,17 +1,23 @@ //! Participant is [`Member`] with [`RpcConnection`]. [`ParticipantService`] //! stores [`Members`] and associated [`RpcConnection`]s, handles -//! [`RpcConnection`] authorization, establishment, message sending. +//! [`RpcConnection`] authorization, establishment, message sending, Turn +//! credentials management. use std::time::{Duration, Instant}; -use actix::{fut::wrap_future, AsyncContext, Context, SpawnHandle}; +use actix::{ + fut::wrap_future, ActorFuture, AsyncContext, Context, MailboxError, + SpawnHandle, +}; use futures::{ future::{self, join_all, Either}, Future, }; use hashbrown::HashMap; + use medea_client_api_proto::Event; +use crate::signalling::RoomId; use crate::{ api::{ client::rpc_connection::{ @@ -21,20 +27,47 @@ use crate::{ control::{Member, MemberId}, }, log::prelude::*, + media::IceUser, signalling::{ - room::{CloseRoom, RoomError}, + room::{ActFuture, CloseRoom, RoomError}, Room, }, + turn::{TurnAuthService, TurnServiceErr, UnreachablePolicy}, }; +#[derive(Debug)] +#[allow(clippy::module_name_repetitions)] +pub enum ParticipantServiceErr { + TurnServiceErr(TurnServiceErr), + MailBoxErr(MailboxError), +} + +impl From for ParticipantServiceErr { + fn from(err: TurnServiceErr) -> Self { + ParticipantServiceErr::TurnServiceErr(err) + } +} + +impl From for ParticipantServiceErr { + fn from(err: MailboxError) -> Self { + ParticipantServiceErr::MailBoxErr(err) + } +} + /// Participant is [`Member`] with [`RpcConnection`]. [`ParticipantService`] /// stores [`Members`] and associated [`RpcConnection`]s, handles /// [`RpcConnection`] authorization, establishment, message sending. -#[derive(Debug)] +#[cfg_attr(not(test), derive(Debug))] pub struct ParticipantService { + /// [`Room`]s id from which this [`ParticipantService`] was created. + room_id: RoomId, + /// [`Member`]s which currently are present in this [`Room`]. members: HashMap, + /// Service for managing authorization on Turn server. + turn: Box, + /// Established [`RpcConnection`]s of [`Member`]s in this [`Room`]. // TODO: Replace Box> with enum, // as the set of all possible RpcConnection types is not closed. @@ -52,11 +85,15 @@ pub struct ParticipantService { impl ParticipantService { pub fn new( + room_id: RoomId, members: HashMap, + turn: Box, reconnect_timeout: Duration, ) -> Self { Self { + room_id, members, + turn, connections: HashMap::new(), reconnect_timeout, drop_connection_tasks: HashMap::new(), @@ -84,6 +121,18 @@ impl ParticipantService { } } + pub fn get_member(&self, member_id: MemberId) -> Option<&Member> { + self.members.get(&member_id) + } + + pub fn take_member(&mut self, member_id: MemberId) -> Option { + self.members.remove(&member_id) + } + + pub fn insert_member(&mut self, member: Member) { + self.members.insert(member.id, member); + } + /// Checks if [`Member`] has **active** [`RcpConnection`]. pub fn member_has_connection(&self, member_id: MemberId) -> bool { self.connections.contains_key(&member_id) @@ -107,12 +156,58 @@ impl ParticipantService { } } + /// Saves provided [`RpcConnection`], registers [`ICEUser`]. + /// If [`Member`] already has any other [`RpcConnection`], + /// then it will be closed. + pub fn connection_established( + &mut self, + ctx: &mut Context, + member_id: MemberId, + con: Box, + ) -> ActFuture<(), ParticipantServiceErr> { + // lookup previous member connection + if let Some(mut connection) = self.connections.remove(&member_id) { + debug!("Closing old RpcConnection for member {}", member_id); + + // cancel RpcConnection close task, since connection is + // reestablished + if let Some(handler) = self.drop_connection_tasks.remove(&member_id) + { + ctx.cancel_future(handler); + } + Box::new(wrap_future(connection.close().then(|_| Ok(())))) + } else { + self.connections.insert(member_id, con); + Box::new( + wrap_future(self.turn.create_user( + member_id, + self.room_id, + UnreachablePolicy::default(), + )) + .map_err(|err, _: &mut Room, _| { + ParticipantServiceErr::from(err) + }) + .and_then( + move |ice: IceUser, room: &mut Room, _| { + if let Some(mut member) = + room.participants.take_member(member_id) + { + member.ice_user.replace(ice); + room.participants.insert_member(member); + }; + wrap_future(future::ok(())) + }, + ), + ) + } + } + /// If [`ClosedReason::Closed`], then removes [`RpcConnection`] associated /// with specified user [`Member`] from the storage and closes the room. /// If [`ClosedReason::Lost`], then creates delayed task that emits /// [`ClosedReason::Closed`]. // TODO: Dont close the room. It is being closed atm, because we have - // no way to handle absence of RtcPeerConnection when. + // no way to handle absence of RtcPeerConnection. pub fn connection_closed( &mut self, ctx: &mut Context, @@ -123,6 +218,7 @@ impl ParticipantService { match reason { ClosedReason::Closed => { self.connections.remove(&member_id); + self.delete_ice_user(member_id); ctx.notify(CloseRoom {}) } ClosedReason::Lost => { @@ -144,41 +240,28 @@ impl ParticipantService { } } - /// Stores provided [`RpcConnection`] for given [`Member`] in the [`Room`]. - /// If [`Member`] already has any other [`RpcConnection`], - /// then it will be closed. - pub fn connection_established( - &mut self, - ctx: &mut Context, - member_id: MemberId, - con: Box, - ) { - // lookup previous member connection - if let Some(mut connection) = self.connections.remove(&member_id) { - debug!("Closing old RpcConnection for member {}", member_id); - - // cancel RpcConnection close task, since connection is - // reestablished - if let Some(handler) = self.drop_connection_tasks.remove(&member_id) - { - ctx.cancel_future(handler); + /// Deletes [`IceUser`] associated with provided [`Member`]. + fn delete_ice_user(&mut self, member_id: MemberId) { + if let Some(mut member) = self.members.remove(&member_id) { + if let Some(ice_user) = member.ice_user.take() { + self.turn.delete_user(ice_user, self.room_id); } - ctx.spawn(wrap_future(connection.close())); - } else { - self.connections.insert(member_id, con); + self.members.insert(member_id, member); } } - /// Cancels all connection close tasks, closes all [`RpcConnection`]s. + /// Cancels all connection close tasks, closes all [`RpcConnection`]s, pub fn drop_connections( &mut self, ctx: &mut Context, ) -> impl Future { + // canceling all drop_connection_tasks self.drop_connection_tasks.drain().for_each(|(_, handle)| { ctx.cancel_future(handle); }); - let close_fut = self.connections.drain().fold( + // closing all RpcConnection's + let mut close_fut = self.connections.drain().fold( vec![], |mut futures, (_, mut connection)| { futures.push(connection.close()); @@ -186,6 +269,23 @@ impl ParticipantService { }, ); + // removing all users from room + let remove_all_users_fut = Box::new( + self.turn + .delete_multiple_users( + self.room_id, + self.members.iter().map(|(id, _)| *id).collect(), + ) + .map_err(|_| ()), + ); + close_fut.push(remove_all_users_fut); + join_all(close_fut).map(|_| ()) } } + +#[cfg(test)] + +pub mod test { + use super::*; +} diff --git a/src/signalling/peers.rs b/src/signalling/peers.rs index 710de7909..506c0ac04 100644 --- a/src/signalling/peers.rs +++ b/src/signalling/peers.rs @@ -18,8 +18,9 @@ pub struct PeerRepository { impl PeerRepository { /// Store [`Peer`] in [`Room`]. - pub fn add_peer>(&mut self, id: PeerId, peer: S) { - self.peers.insert(id, peer.into()); + pub fn add_peer>(&mut self, peer: S) { + let peer = peer.into(); + self.peers.insert(peer.id(), peer); } /// Returns borrowed [`PeerStateMachine`] by its ID. diff --git a/src/signalling/room.rs b/src/signalling/room.rs index 01dfcf63d..c2a1d1c02 100644 --- a/src/signalling/room.rs +++ b/src/signalling/room.rs @@ -1,6 +1,8 @@ //! Room definitions and implementations. Room is responsible for media //! connection establishment between concrete [`Member`]s. +use std::time::Duration; + use actix::{ fut::wrap_future, Actor, ActorFuture, AsyncContext, Context, Handler, Message, @@ -8,9 +10,8 @@ use actix::{ use failure::Fail; use futures::future; use hashbrown::HashMap; -use medea_client_api_proto::{Command, Event, IceCandidate}; -use std::time::Duration; +use medea_client_api_proto::{Command, Event, IceCandidate}; use crate::{ api::{ @@ -26,27 +27,35 @@ use crate::{ WaitLocalHaveRemote, WaitLocalSdp, WaitRemoteSdp, }, signalling::{participants::ParticipantService, peers::PeerRepository}, + turn::TurnAuthService, }; /// ID of [`Room`]. pub type Id = u64; /// Ergonomic type alias for using [`ActorFuture`] for [`Room`]. -type ActFuture = Box>; +pub type ActFuture = + Box>; #[derive(Fail, Debug)] #[allow(clippy::module_name_repetitions)] pub enum RoomError { #[fail(display = "Couldn't find Peer with [id = {}]", _0)] PeerNotFound(PeerId), + #[fail(display = "Couldn't find Member with [id = {}]", _0)] + MemberNotFound(MemberId), + #[fail(display = "Member [id = {}] does not have Turn credentials", _0)] + NoTurnCredentials(MemberId), #[fail(display = "Couldn't find RpcConnection with Member [id = {}]", _0)] ConnectionNotExists(MemberId), #[fail(display = "Unable to send event to Member [id = {}]", _0)] UnableToSendEvent(MemberId), #[fail(display = "PeerError: {}", _0)] PeerStateError(PeerStateError), - #[fail(display = "Generic room error {}", _0)] + #[fail(display = "Generic room error: {}", _0)] BadRoomSpec(String), + #[fail(display = "Turn service error: {}", _0)] + TurnServiceError(String), } impl From for RoomError { @@ -61,7 +70,7 @@ pub struct Room { id: Id, /// [`RpcConnection`]s of [`Member`]s in this [`Room`]. - participants: ParticipantService, + pub participants: ParticipantService, /// [`Peer`]s of [`Member`]s in this [`Room`]. peers: PeerRepository, @@ -74,11 +83,17 @@ impl Room { members: HashMap, peers: HashMap, reconnect_timeout: Duration, + turn: Box, ) -> Self { Self { id, peers: PeerRepository::from(peers), - participants: ParticipantService::new(members, reconnect_timeout), + participants: ParticipantService::new( + id, + members, + turn, + reconnect_timeout, + ), } } @@ -101,24 +116,33 @@ impl Room { } else if peer2.is_sender() { (peer2, peer1) } else { - self.peers.add_peer(peer1.id(), peer1); - self.peers.add_peer(peer2.id(), peer2); + self.peers.add_peer(peer1); + self.peers.add_peer(peer2); return Err(RoomError::BadRoomSpec(format!( "Error while trying to connect Peer [id = {}] and Peer [id = \ {}] cause neither of peers are senders", peer1_id, peer2_id ))); }; - self.peers.add_peer(receiver.id(), receiver); + self.peers.add_peer(receiver); let sender = sender.start(); let member_id = sender.member_id(); + let ice_servers = self + .participants + .get_member(member_id) + .ok_or_else(|| RoomError::MemberNotFound(member_id))? + .ice_user + .as_ref() + .ok_or_else(|| RoomError::NoTurnCredentials(member_id))? + .to_servers_list(); let peer_created = Event::PeerCreated { peer_id: sender.id(), sdp_offer: None, tracks: sender.tracks(), + ice_servers, }; - self.peers.add_peer(sender.id(), sender); + self.peers.add_peer(sender); Ok(Box::new(wrap_future( self.participants .send_event_to_member(member_id, peer_created), @@ -143,14 +167,25 @@ impl Room { let to_peer = to_peer.set_remote_sdp(sdp_offer.clone()); let to_member_id = to_peer.member_id(); + let ice_servers = self + .participants + .get_member(to_member_id) + .ok_or_else(|| RoomError::MemberNotFound(to_member_id))? + .ice_user + .as_ref() + .ok_or_else(|| RoomError::NoTurnCredentials(to_member_id))? + .to_servers_list(); + let event = Event::PeerCreated { peer_id: to_peer_id, sdp_offer: Some(sdp_offer), tracks: to_peer.tracks(), + ice_servers, }; - self.peers.add_peer(from_peer_id, from_peer); - self.peers.add_peer(to_peer_id, to_peer); + self.peers.add_peer(from_peer); + self.peers.add_peer(to_peer); + Ok(Box::new(wrap_future( self.participants.send_event_to_member(to_member_id, event), ))) @@ -180,8 +215,8 @@ impl Room { sdp_answer, }; - self.peers.add_peer(from_peer_id, from_peer); - self.peers.add_peer(to_peer_id, to_peer); + self.peers.add_peer(from_peer); + self.peers.add_peer(to_peer); Ok(Box::new(wrap_future( self.participants.send_event_to_member(to_member_id, event), @@ -341,35 +376,33 @@ impl Handler for Room { msg: RpcConnectionEstablished, ctx: &mut Self::Context, ) -> Self::Result { + let member_id = msg.member_id; info!("RpcConnectionEstablished for member {}", msg.member_id); - // save new connection - self.participants.connection_established( - ctx, - msg.member_id, - msg.connection, - ); - - // get connected member Peers - self.peers - .get_peers_by_member_id(msg.member_id) - .into_iter() - .for_each(|peer| { - // only New peers should be connected - if let PeerStateMachine::New(peer) = peer { - if self - .participants - .member_has_connection(peer.partner_member_id()) - { - ctx.notify(ConnectPeers( - peer.id(), - peer.partner_peer_id(), - )); - } - } - }); - - Box::new(wrap_future(future::ok(()))) + let fut = + self.participants + .connection_established(ctx, msg.member_id, msg.connection) + .map_err(|err, _, _| { + error!("RpcConnectionEstablished error {:?}", err) + }) + .map(move |_, room, ctx| { + room.peers + .get_peers_by_member_id(member_id) + .into_iter() + .for_each(|peer| { + if let PeerStateMachine::New(peer) = peer { + if room.participants.member_has_connection( + peer.partner_member_id(), + ) { + ctx.notify(ConnectPeers( + peer.id(), + peer.partner_peer_id(), + )); + } + } + }); + }); + Box::new(fut) } } @@ -415,22 +448,39 @@ mod test { use std::sync::{atomic::AtomicUsize, Arc, Mutex}; use actix::{Addr, Arbiter, System}; + use medea_client_api_proto::{ - AudioSettings, Direction, MediaType, Track, VideoSettings, + AudioSettings, Direction, IceServer, MediaType, Track, VideoSettings, }; - use crate::api::client::rpc_connection::test::TestConnection; - use crate::media::create_peers; + use crate::{ + api::client::rpc_connection::test::TestConnection, media::create_peers, + turn::new_turn_auth_service_mock, + }; use super::*; fn start_room() -> Addr { let members = hashmap! { - 1 => Member{id: 1, credentials: "caller_credentials".to_owned()}, - 2 => Member{id: 2, credentials: "responder_credentials".to_owned()}, + 1 => Member{ + id: 1, + credentials: "caller_credentials".to_owned(), + ice_user: None + }, + 2 => Member{ + id: 2, + credentials: "responder_credentials".to_owned(), + ice_user: None + }, }; Arbiter::start(move |_| { - Room::new(1, members, create_peers(1, 2), Duration::from_secs(10)) + Room::new( + 1, + members, + create_peers(1, 2), + Duration::from_secs(10), + new_turn_auth_service_mock(), + ) }) } @@ -480,6 +530,21 @@ mod test { media_type: MediaType::Video(VideoSettings {}), }, ], + ice_servers: vec![ + IceServer { + urls: vec!["stun:5.5.5.5:1234".to_string()], + username: None, + credential: None, + }, + IceServer { + urls: vec![ + "turn:5.5.5.5:1234".to_string(), + "turn:5.5.5.5:1234?transport=tcp".to_string() + ], + username: Some("username".to_string()), + credential: Some("password".to_string()), + }, + ], }) .unwrap(), serde_json::to_string(&Event::SdpAnswerMade { @@ -517,6 +582,21 @@ mod test { media_type: MediaType::Video(VideoSettings {}), }, ], + ice_servers: vec![ + IceServer { + urls: vec!["stun:5.5.5.5:1234".to_string()], + username: None, + credential: None, + }, + IceServer { + urls: vec![ + "turn:5.5.5.5:1234".to_string(), + "turn:5.5.5.5:1234?transport=tcp".to_string() + ], + username: Some("username".to_string()), + credential: Some("password".to_string()), + }, + ], }) .unwrap(), serde_json::to_string(&Event::IceCandidateDiscovered { diff --git a/src/turn/mod.rs b/src/turn/mod.rs new file mode 100644 index 000000000..aa4b70c18 --- /dev/null +++ b/src/turn/mod.rs @@ -0,0 +1,9 @@ +pub mod repo; +pub mod service; + +pub use self::service::{ + new_turn_auth_service, TurnAuthService, TurnServiceErr, UnreachablePolicy, +}; + +#[cfg(test)] +pub use self::service::test::new_turn_auth_service_mock; diff --git a/src/turn/repo.rs b/src/turn/repo.rs new file mode 100644 index 000000000..6e5c3339b --- /dev/null +++ b/src/turn/repo.rs @@ -0,0 +1,135 @@ +//! Abstraction over remote Redis database used to store Turn server +//! credentials. +extern crate tokio; + +use bb8::{Pool, RunError}; +use bb8_redis::{RedisConnectionManager, RedisPool}; +use crypto::{digest::Digest, md5::Md5}; +use failure::Fail; +use futures::future::Future; +use redis::{ConnectionInfo, RedisError}; +use tokio::prelude::*; + +use crate::{ + api::control::MemberId, log::prelude::*, media::IceUser, signalling::RoomId, +}; + +#[derive(Fail, Debug)] +pub enum TurnDatabaseErr { + #[fail(display = "Redis returned error: {}", _0)] + RedisError(RedisError), +} + +impl From for TurnDatabaseErr { + fn from(err: RedisError) -> Self { + TurnDatabaseErr::RedisError(err) + } +} + +#[derive(Debug)] +pub struct TurnDatabaseInsertableUser { + pub ice_user: IceUser, + pub room_id: RoomId, +} + +// Abstraction over remote Redis database used to store Turn server +// credentials. +#[allow(clippy::module_name_repetitions)] +#[derive(Debug)] +pub struct TurnDatabase { + pool: RedisPool, + info: ConnectionInfo, +} + +// TODO: Auth after reconnect. +impl TurnDatabase { + /// New TurnDatabase + pub fn new + Clone>( + connection_info: S, + ) -> Result { + let client = redis::Client::open(connection_info.clone().into())?; + let connection_manager = RedisConnectionManager::new(client)?; + let mut runtime = + tokio::runtime::Runtime::new().expect("Unable to create a runtime"); + let pool = runtime.block_on(future::lazy(|| { + Pool::builder().build(connection_manager) + }))?; + let redis_pool = RedisPool::new(pool); + + Ok(Self { + pool: redis_pool, + info: connection_info.into(), + }) + } + + /// Inserts provided [`IceUser`] into remote Redis database. + pub fn insert( + &mut self, + user: &TurnDatabaseInsertableUser, + ) -> impl Future> { + debug!("Store ICE user: {:?}", user); + let key = format!( + "turn/realm/medea/user/{}/key", + format!("{}:{}", user.room_id, user.ice_user.name) + ); + let value = + format!("{}:medea:{}", user.ice_user.name, user.ice_user.pass); + let mut hasher = Md5::new(); + hasher.input_str(&value); + let result = hasher.result_str(); + + self.pool.run(|connection| { + redis::cmd("SET") + .arg(key) + .arg(result) + .query_async(connection) + .map_err(TurnDatabaseErr::RedisError) + }) + } + + /// Deletes provided [`IceUser`] from remote Redis database. + pub fn remove( + &mut self, + user: &TurnDatabaseInsertableUser, + ) -> impl Future> { + debug!("Delete ICE user: {:?}", user); + let key = format!( + "turn/realm/medea/user/{}/key", + format!("{}:{}", user.room_id, user.ice_user.name) + ); + + self.pool.run(|connection| { + redis::cmd("DEL") + .arg(key) + .query_async(connection) + .map_err(TurnDatabaseErr::RedisError) + }) + } + + pub fn remove_users( + &mut self, + room_id: RoomId, + users_ids: &[MemberId], + ) -> impl Future> { + debug!( + "Delete ICE users_ids {{ {:?} }} from room {:?}", + users_ids, room_id + ); + let mut delete_keys = Vec::with_capacity(users_ids.len()); + + for user_id in users_ids { + delete_keys.push(format!( + "turn/realm/medea/user/{}/key", + format!("{}:{}", room_id, user_id) + )); + } + + self.pool.run(|connection| { + redis::cmd("DEL") + .arg(delete_keys) + .to_owned() + .query_async(connection) + .map_err(TurnDatabaseErr::RedisError) + }) + } +} diff --git a/src/turn/service.rs b/src/turn/service.rs new file mode 100644 index 000000000..19cc85338 --- /dev/null +++ b/src/turn/service.rs @@ -0,0 +1,371 @@ +use core::fmt; +use std::net::SocketAddr; + +use actix::{ + fut::wrap_future, Actor, ActorFuture, Addr, Arbiter, Context, Handler, + MailboxError, Message, WrapFuture, +}; +use bb8::RunError; +use failure::Fail; +use futures::future::{err, ok, Future}; +use rand::{distributions::Alphanumeric, Rng}; +use redis::ConnectionInfo; +use smart_default::*; + +use crate::turn::repo::TurnDatabaseInsertableUser; +use crate::{ + api::control::MemberId, + conf::Conf, + media::IceUser, + signalling::RoomId, + turn::repo::{TurnDatabase, TurnDatabaseErr}, +}; + +static TURN_PASS_LEN: usize = 16; + +#[allow(clippy::module_name_repetitions)] +/// Manages Turn server credentials. +pub trait TurnAuthService: fmt::Debug + Send { + /// Generates and registers Turn credentials. + fn create_user( + &self, + member_id: MemberId, + room_id: RoomId, + policy: UnreachablePolicy, + ) -> Box>; + + /// Deletes provided Turn credentials. + fn delete_user( + &self, + user: IceUser, + room_id: RoomId, + ) -> Box>; + + /// Deletes [`users`] from redis with provided [`RoomId`]. + fn delete_multiple_users( + &self, + room_id: RoomId, + users: Vec, + ) -> Box>; +} + +impl TurnAuthService for Addr { + /// Sends [`CreateIceUser`] to [`Service`]. + fn create_user( + &self, + member_id: u64, + room_id: RoomId, + policy: UnreachablePolicy, + ) -> Box> { + Box::new( + self.send(CreateIceUser { + member_id, + room_id, + policy, + }) + .then( + |r: Result, MailboxError>| { + match r { + Ok(Ok(ice)) => Ok(ice), + Ok(Err(err)) => Err(err), + Err(err) => Err(TurnServiceErr::from(err)), + } + }, + ), + ) + } + + /// Sends [`DeleteIceUser`] to [`Service`]. + fn delete_user( + &self, + user: IceUser, + room_id: RoomId, + ) -> Box> { + let delete_user = TurnDatabaseInsertableUser { + ice_user: user, + room_id, + }; + Box::new(self.send(DeleteIceUser(delete_user)).then( + |r: Result, MailboxError>| match r { + Ok(Err(err)) => Err(err), + Err(err) => Err(TurnServiceErr::from(err)), + _ => Ok(()), + }, + )) + } + + /// Sends [`DeleteRoom`] to [`Service`]. + fn delete_multiple_users( + &self, + room_id: RoomId, + users: Vec, + ) -> Box> { + Box::new(self.send(DeleteMultipleUsers { room_id, users }).then( + |r: Result, MailboxError>| match r { + Ok(Err(err)) => Err(err), + Err(err) => Err(TurnServiceErr::from(err)), + _ => Ok(()), + }, + )) + } +} + +/// Ergonomic type alias for using [`ActorFuture`] for [`AuthService`]. +type ActFuture = + Box>; + +#[derive(Debug, Fail)] +pub enum TurnServiceErr { + #[fail(display = "Error accessing TurnAuthRepo: {}", _0)] + TurnAuthRepoErr(TurnDatabaseErr), + #[fail(display = "Mailbox error when accessing TurnAuthRepo: {}", _0)] + MailboxErr(MailboxError), + #[fail(display = "Timeout exceeded while trying to insert/delete IceUser")] + TimedOut, +} + +impl From for TurnServiceErr { + fn from(err: TurnDatabaseErr) -> Self { + TurnServiceErr::TurnAuthRepoErr(err) + } +} + +impl From> for TurnServiceErr { + fn from(err: bb8::RunError) -> Self { + match err { + RunError::User(error) => TurnServiceErr::TurnAuthRepoErr(error), + RunError::TimedOut => TurnServiceErr::TimedOut, + } + } +} + +impl From for TurnServiceErr { + fn from(err: MailboxError) -> Self { + TurnServiceErr::MailboxErr(err) + } +} + +/// Defines [`TurnAuthService`] behaviour if remote database is unreachable +#[derive(Debug, SmartDefault)] +pub enum UnreachablePolicy { + /// Error will be propagated if request to db fails cause it is + /// unreachable. + #[default] + ReturnErr, + /// Static member credentials will be returned if request to db fails cause + /// it is unreachable. + ReturnStatic, +} + +/// [`TurnAuthService`] implementation backed by Redis database. +#[derive(Debug)] +struct Service { + /// Turn credentials repository. + turn_db: TurnDatabase, + /// TurnAuthRepo password. + db_pass: String, + /// Turn server address. + turn_address: SocketAddr, + /// Turn server static user. + turn_username: String, + /// Turn server static user password. + turn_password: String, + /// Lazy static [`ICEUser`]. + static_user: Option, +} + +/// Create new instance [`TurnAuthService`]. +#[allow(clippy::module_name_repetitions)] +pub fn new_turn_auth_service( + config: &Conf, +) -> Result, TurnServiceErr> { + let turn_db = TurnDatabase::new(ConnectionInfo { + addr: Box::new(redis::ConnectionAddr::Tcp( + config.turn.db.redis.ip.to_string(), + config.turn.db.redis.port, + )), + db: config.turn.db.redis.db_number, + passwd: if config.turn.db.redis.pass.is_empty() { + None + } else { + Some(config.turn.db.redis.pass.clone()) + }, + })?; + + let service = Service { + turn_db, + db_pass: config.turn.db.redis.pass.clone(), + turn_address: config.turn.addr(), + turn_username: config.turn.user.clone(), + turn_password: config.turn.db.redis.pass.clone(), + static_user: None, + }; + + Ok(Box::new(Arbiter::start(|_| service))) +} + +impl Service { + /// Generates random alphanumeric string of specified length. + fn new_password(&self, n: usize) -> String { + rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(n) + .collect() + } + + /// Returns [`ICEUser`] with static credentials. + fn static_user(&mut self) -> IceUser { + if self.static_user.is_none() { + self.static_user.replace(IceUser { + address: self.turn_address, + name: self.turn_username.clone(), + pass: self.turn_password.clone(), + }); + }; + + self.static_user.clone().unwrap() + } +} + +impl Actor for Service { + type Context = Context; +} + +/// Request for delete [`ICEUser`] for [`Member`] from Turn database. +#[derive(Debug, Message)] +#[rtype(result = "Result<(), TurnServiceErr>")] +struct DeleteIceUser(pub TurnDatabaseInsertableUser); + +impl Handler for Service { + type Result = Box>; + + /// Deletes provided [`TurnDatabaseInsertableUser`] from [`TurnDatabase`]. + fn handle( + &mut self, + msg: DeleteIceUser, + _ctx: &mut Self::Context, + ) -> Self::Result { + Box::new(self.turn_db.remove(&msg.0).map_err(TurnServiceErr::from)) + } +} + +/// Creates credentials on Turn server for specified member. +#[derive(Debug, Message)] +#[rtype(result = "Result")] +struct CreateIceUser { + pub member_id: MemberId, + pub room_id: RoomId, + pub policy: UnreachablePolicy, +} + +impl Handler for Service { + type Result = ActFuture; + + /// Generates [`IceUser`] with saved Turn address, provided [`MemberId`] and + /// random password. Inserts created [`IceUser`] into [`TurnDatabase`]. + fn handle( + &mut self, + msg: CreateIceUser, + _ctx: &mut Self::Context, + ) -> Self::Result { + let ice_user = IceUser { + address: self.turn_address, + name: msg.member_id.to_string(), + pass: self.new_password(TURN_PASS_LEN), + }; + + let turn_db_user = TurnDatabaseInsertableUser { + ice_user, + room_id: msg.room_id, + }; + + Box::new(self.turn_db.insert(&turn_db_user).into_actor(self).then( + move |result, act, _| { + wrap_future(match result { + Ok(_) => ok(turn_db_user.ice_user), + Err(e) => match msg.policy { + UnreachablePolicy::ReturnErr => err(e.into()), + UnreachablePolicy::ReturnStatic => { + ok(act.static_user()) + } + }, + }) + }, + )) + } +} + +/// Deletes all users from given room in redis. +#[derive(Debug, Message)] +#[rtype(result = "Result<(), TurnServiceErr>")] +struct DeleteMultipleUsers { + room_id: RoomId, + users: Vec, +} + +impl Handler for Service { + type Result = ActFuture<(), TurnServiceErr>; + + /// Deletes all users with provided [`RoomId`] + fn handle( + &mut self, + msg: DeleteMultipleUsers, + _ctx: &mut Self::Context, + ) -> Self::Result { + Box::new( + self.turn_db + .remove_users(msg.room_id, &msg.users) + .map_err(TurnServiceErr::from) + .into_actor(self), + ) + } +} + +#[cfg(test)] +pub mod test { + use futures::future; + + use crate::media::IceUser; + + use super::*; + + #[derive(Debug)] + struct TurnAuthServiceMock {} + + impl TurnAuthService for TurnAuthServiceMock { + fn create_user( + &self, + _: u64, + _: RoomId, + _: UnreachablePolicy, + ) -> Box> { + Box::new(future::ok(IceUser { + address: "5.5.5.5:1234".parse().unwrap(), + name: "username".to_string(), + pass: "password".to_string(), + })) + } + + fn delete_user( + &self, + _: IceUser, + _: RoomId, + ) -> Box> { + Box::new(future::ok(())) + } + + fn delete_multiple_users( + &self, + _: RoomId, + _: Vec, + ) -> Box> { + Box::new(future::ok(())) + } + } + + #[allow(clippy::module_name_repetitions)] + pub fn new_turn_auth_service_mock() -> Box { + Box::new(TurnAuthServiceMock {}) + } + +}