From 7a16630f1720327843049082ec380c57a6c2efca Mon Sep 17 00:00:00 2001 From: Steph Flower Date: Sat, 2 Mar 2024 17:25:11 +0000 Subject: [PATCH] BE/ws: Set up basic websocket server --- backend/src/handlers.rs | 9 +++++ backend/src/main.rs | 40 +++++++++++++++++++- backend/src/ws.rs | 81 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 backend/src/handlers.rs create mode 100644 backend/src/ws.rs diff --git a/backend/src/handlers.rs b/backend/src/handlers.rs new file mode 100644 index 0000000..c2a2eb3 --- /dev/null +++ b/backend/src/handlers.rs @@ -0,0 +1,9 @@ +use crate::{ws, Clients, Result}; +use warp::Reply; + +pub async fn ws_handler(ws: warp::ws::Ws, clients: Clients) -> Result +{ + println!("ws_handler"); //debug + + Ok(ws.on_upgrade(move |socket| ws::client_connection(socket, clients))) +} \ No newline at end of file diff --git a/backend/src/main.rs b/backend/src/main.rs index e7a11a9..5e9410d 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,3 +1,39 @@ -fn main() { - println!("Hello, world!"); +use std::{collections::HashMap, convert::Infallible, sync::Arc}; +use tokio::sync::{mpsc, Mutex}; +use warp::{filters::ws::Message, Filter, Rejection}; + +mod handlers; +mod ws; + +// type that represents a connecting client +#[derive(Debug, Clone)] +pub struct Client { + pub client_id: String, + pub sender: Option>>, } + +// type aliases! +type Clients = Arc>>; +type Result = std::result::Result; + +#[tokio::main] +async fn main() { + + //initialise a hashmap to store currently connected clients. We may want some more logic here if we want currently connected clients to be stored somewhere + let clients: Clients = Arc::new(Mutex::new(HashMap::new())); + + println!("configuring websocket route"); //debug + let ws_route = warp::path("ws") + .and(warp::ws()) + .and(with_clients(clients.clone())) + .and_then(handlers::ws_handler); + + let routes = ws_route.with(warp::cors().allow_any_origin()); + println!("starting server"); //debug + warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; +} + +fn with_clients(clients: Clients) -> impl Filter + Clone { + warp::any().map(move || clients.clone()) +} + diff --git a/backend/src/ws.rs b/backend/src/ws.rs new file mode 100644 index 0000000..ae1c19a --- /dev/null +++ b/backend/src/ws.rs @@ -0,0 +1,81 @@ +use crate::{Client, Clients}; +use futures::{FutureExt, StreamExt}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; +use uuid::Uuid; +use warp::ws::{Message, WebSocket}; + +pub async fn client_connection(ws: WebSocket, clients: Clients) { + println!("establishing client connection... {:?}", ws); //debug + + // splitting the WebSocket stream object into separate 'Sink' and 'Stream' objects. + // This lets us split up the logic of sending and recieving tasks + // 'Stream' lets us recieve messages from the client + // 'Sink' letes us establish a connection from the unbounded channel + let (client_ws_sender, mut client_ws_rcv) = ws.split(); + // creates an unbounded channel. It is configured to send messages to the client. + let (client_sender, client_rcv) = mpsc::unbounded_channel(); + + let client_rcv = UnboundedReceiverStream::new(client_rcv); + + // 'spawns' a new task, that stays alive until the client has disconnected. + tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| { + if let Err(e) = result { + println!("error sending websocket msg: {}", e); + } + })); + + // creating a new uuid to use as the key in the 'clients' hashmap, and a new instance of a 'client' + let uuid = Uuid::new_v4().simple().to_string(); + + let new_client = Client { + client_id: uuid.clone(), + //the client_sender object is stored within this new client instance so that we can send messages to this connected client in other parts of the code + sender: Some(client_sender), + }; + + //obtains a lock on the client list and inserts the new client into the hashmap using the uuid as the key. + clients.lock().await.insert(uuid.clone(), new_client); + + // creates a loop that handles incoming messages from the client + while let Some(result) = client_ws_rcv.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + println!("error receiving message for id {}): {}", uuid.clone(), e); + break; + } + }; + client_msg(&uuid, msg, &clients).await; + } + + // as the above will keep running as long as the client is active, when we exit the loop we can safely remove this client instance from the hashmap. + clients.lock().await.remove(&uuid); + println!("{} disconnected", uuid); //debug +} + +// example function to respond to a clients message, this just responds to 'ping!' with 'pong!', but later we will replace this with; +// ->recieve client game info <- send back client game state +// wwwwwwwwwwwwwwwwwwwww i am so tired +async fn client_msg(client_id: &str, msg: Message, clients: &Clients) { + println!("received message from {}: {:?}", client_id, msg); //debug + + let message = match msg.to_str() { + Ok(v) => v, + Err(_) => return, + }; + + if message == "ping" || message == "ping\n" { + let locked = clients.lock().await; + match locked.get(client_id) { + Some(v) => { + if let Some(sender) = &v.sender { + println!("sending pong"); + let _ = sender.send(Ok(Message::text("pong"))); + } + } + None => return, + } + return; + }; +} \ No newline at end of file