Skip to content

Commit

Permalink
fix tokio conflict (#168)
Browse files Browse the repository at this point in the history
* fix tokio conflict

* fix tests

Co-authored-by: Tim Wilson <[email protected]>
  • Loading branch information
tjwilson90 and twilson-palantir authored Mar 8, 2021
1 parent e90dbcc commit c2fcc91
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 66 deletions.
68 changes: 25 additions & 43 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
base64 = "*"
env_logger = { version = "*", default-features = false, features = ["humantime"] }
futures-util = "*"
http = "*"
log = "*"
once_cell = "*"
Expand All @@ -20,7 +21,7 @@ rusqlite = { version = "*", features = ["bundled"] }
serde = { version = "*", features = ["derive"] }
serde_json = "*"
thiserror = "*"
tokio = { version = "*", features = ["blocking", "macros", "parking_lot", "rt-threaded", "stream", "sync", "time"] }
tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-stream = "*"
turbo-hearts-api = { path = "../api" }
turbo-hearts-bot = { path = "../bot" }
Expand Down
17 changes: 7 additions & 10 deletions server/src/bot.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::{CardsError, Games};
use futures_util::FutureExt;
use log::debug;
use rand::distributions::Distribution;
use rand_distr::Gamma;
use std::time::Instant;
use tokio::{
sync::mpsc::{error::TryRecvError, UnboundedReceiver},
time,
time::Duration,
};
use tokio::{sync::mpsc::UnboundedReceiver, time, time::Duration};
use turbo_hearts_api::{
can_claim, BotState, BotStrategy, Card, Cards, GameEvent, GameId, GameState, Seat, UserId,
};
Expand Down Expand Up @@ -58,12 +55,12 @@ impl BotRunner {
loop {
let now = Instant::now();
loop {
match rx.try_recv() {
Ok((event, _)) => {
match rx.recv().now_or_never() {
Some(Some((event, _))) => {
action = self.handle(event);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Closed) => return Ok(()),
Some(None) => return Ok(()),
None => break,
}
}
let delay =
Expand Down Expand Up @@ -165,7 +162,7 @@ impl BotRunner {
async fn delay(delay: Option<Duration>, start: Instant) {
let delay = delay.and_then(|delay| delay.checked_sub(start.elapsed()));
if let Some(delay) = delay {
time::delay_for(delay).await;
time::sleep(delay).await;
}
}

Expand Down
3 changes: 2 additions & 1 deletion server/src/game_endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{auth_redirect, Games, Lobby};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::{Stream, StreamExt};
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
use turbo_hearts_api::{
AcceptClaimRequest, ChargeRequest, ClaimRequest, GameChatRequest, GameEvent, GameId,
PassRequest, PlayRequest, RejectClaimRequest, UserId,
Expand Down Expand Up @@ -49,6 +49,7 @@ fn subscribe<'a>(games: infallible!(&'a Games), user_id: rejection!(UserId)) ->
fn stream(
rx: UnboundedReceiver<(GameEvent, usize)>,
) -> impl Stream<Item = Result<Event, warp::Error>> {
let rx = UnboundedReceiverStream::new(rx);
rx.map(|(event, id)| {
if event.is_ping() {
return Ok(Event::default().comment(String::new()));
Expand Down
3 changes: 2 additions & 1 deletion server/src/lobby_endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{auth_redirect, Games, Lobby};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::{Stream, StreamExt};
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
use turbo_hearts_api::{
AddBotRequest, JoinGameRequest, LeaveGameRequest, LobbyChatRequest, LobbyEvent, NewGameRequest,
Player, PlayerWithOptions, RemovePlayerRequest, StartGameRequest, UserId,
Expand Down Expand Up @@ -42,6 +42,7 @@ fn subscribe<'a>(lobby: infallible!(&'a Lobby), user_id: rejection!(UserId)) ->
}

fn stream(rx: UnboundedReceiver<LobbyEvent>) -> impl Stream<Item = Result<Event, warp::Error>> {
let rx = UnboundedReceiverStream::new(rx);
rx.map(|event| {
Ok(if event.is_ping() {
Event::default().comment(String::new())
Expand Down
11 changes: 6 additions & 5 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::CardsError;
use http::header;
use reqwest::Client;
use tokio::{time, time::Duration};
use tokio_stream::StreamExt;
use turbo_hearts_api::UserId;
use warp::{Filter, Rejection};

Expand Down Expand Up @@ -49,8 +48,9 @@ fn user_id<'a>(users: infallible!(&'a Users)) -> rejection!(UserId) {

fn start_stale_game_cleanup(lobby: &'static Lobby) {
tokio::task::spawn(async move {
let mut stream = time::interval(Duration::from_secs(60 * 60));
while let Some(_) = stream.next().await {
let mut ticker = time::interval(Duration::from_secs(60 * 60));
loop {
ticker.tick().await;
if let Err(e) = lobby.delete_stale_games().await {
log::error!("Failed to delete stale games {:?}", e);
}
Expand All @@ -60,8 +60,9 @@ fn start_stale_game_cleanup(lobby: &'static Lobby) {

fn start_background_pings(lobby: &'static Lobby, games: &'static Games) {
tokio::task::spawn(async move {
let mut stream = time::interval(Duration::from_secs(15));
while let Some(_) = stream.next().await {
let mut ticker = time::interval(Duration::from_secs(15));
loop {
ticker.tick().await;
lobby.ping().await;
games.ping().await;
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl TestRunner {
}
}

#[tokio::test(threaded_scheduler)]
#[tokio::test(flavor = "multi_thread")]
async fn test_lobby() -> Result<(), CardsError> {
async fn test(_: &Database, lobby: &Lobby, _: &Games) -> Result<(), CardsError> {
let mut twilson = lobby.subscribe(*TWILSON).await?;
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn test_lobby() -> Result<(), CardsError> {
TestRunner::new().run(test).await
}

#[tokio::test(threaded_scheduler)]
#[tokio::test(flavor = "multi_thread")]
async fn test_new_game() -> Result<(), CardsError> {
async fn test(_: &Database, lobby: &Lobby, games: &Games) -> Result<(), CardsError> {
let game_id = lobby
Expand Down Expand Up @@ -252,7 +252,7 @@ async fn test_new_game() -> Result<(), CardsError> {
TestRunner::new().run(test).await
}

#[tokio::test(threaded_scheduler)]
#[tokio::test(flavor = "multi_thread")]
async fn test_pass() -> Result<(), CardsError> {
async fn test(db: &Database, _lobby: &Lobby, games: &Games) -> Result<(), CardsError> {
let game_id = GameId::new();
Expand Down Expand Up @@ -314,7 +314,7 @@ async fn test_pass() -> Result<(), CardsError> {
TestRunner::new().run(test).await
}

#[tokio::test(threaded_scheduler)]
#[tokio::test(flavor = "multi_thread")]
async fn test_seeded_game() -> Result<(), CardsError> {
async fn test(_db: &Database, lobby: &Lobby, games: &Games) -> Result<(), CardsError> {
let game_id = lobby
Expand Down Expand Up @@ -429,7 +429,7 @@ async fn test_seeded_game() -> Result<(), CardsError> {
TestRunner::new().run(test).await
}

#[tokio::test(threaded_scheduler)]
#[tokio::test(flavor = "multi_thread")]
async fn test_bot_game() -> Result<(), CardsError> {
async fn test(_: &Database, lobby: &Lobby, games: &Games) -> Result<(), CardsError> {
let game_id = lobby
Expand Down

0 comments on commit c2fcc91

Please sign in to comment.