Skip to content

Commit

Permalink
Merge pull request #7 from pilksoc/sf-be-websocketmanager
Browse files Browse the repository at this point in the history
BE/ws: Set up basic websocket server
  • Loading branch information
stephanie-flower authored Mar 2, 2024
2 parents 7247116 + 7a16630 commit eefb154
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 2 deletions.
9 changes: 9 additions & 0 deletions backend/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::{ws, Clients, Result};
use warp::Reply;

pub async fn ws_handler(ws: warp::ws::Ws, clients: Clients) -> Result<impl Reply>
{
println!("ws_handler"); //debug

Ok(ws.on_upgrade(move |socket| ws::client_connection(socket, clients)))
}
40 changes: 38 additions & 2 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
fn main() {
println!("Hello, world!");
use std::{collections::HashMap, convert::Infallible, sync::Arc};
use tokio::sync::{mpsc, Mutex};
use warp::{filters::ws::Message, Filter, Rejection};

mod handlers;
mod ws;

// type that represents a connecting client
#[derive(Debug, Clone)]
pub struct Client {
pub client_id: String,
pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>,
}

// type aliases!
type Clients = Arc<Mutex<HashMap<String, Client>>>;
type Result<T> = std::result::Result<T, Rejection>;

#[tokio::main]
async fn main() {

//initialise a hashmap to store currently connected clients. We may want some more logic here if we want currently connected clients to be stored somewhere
let clients: Clients = Arc::new(Mutex::new(HashMap::new()));

println!("configuring websocket route"); //debug
let ws_route = warp::path("ws")
.and(warp::ws())
.and(with_clients(clients.clone()))
.and_then(handlers::ws_handler);

let routes = ws_route.with(warp::cors().allow_any_origin());
println!("starting server"); //debug
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}

fn with_clients(clients: Clients) -> impl Filter<Extract = (Clients,), Error = Infallible> + Clone {
warp::any().map(move || clients.clone())
}

81 changes: 81 additions & 0 deletions backend/src/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use crate::{Client, Clients};
use futures::{FutureExt, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
use warp::ws::{Message, WebSocket};

pub async fn client_connection(ws: WebSocket, clients: Clients) {
println!("establishing client connection... {:?}", ws); //debug

// splitting the WebSocket stream object into separate 'Sink' and 'Stream' objects.
// This lets us split up the logic of sending and recieving tasks
// 'Stream' lets us recieve messages from the client
// 'Sink' letes us establish a connection from the unbounded channel
let (client_ws_sender, mut client_ws_rcv) = ws.split();
// creates an unbounded channel. It is configured to send messages to the client.
let (client_sender, client_rcv) = mpsc::unbounded_channel();

let client_rcv = UnboundedReceiverStream::new(client_rcv);

// 'spawns' a new task, that stays alive until the client has disconnected.
tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| {
if let Err(e) = result {
println!("error sending websocket msg: {}", e);
}
}));

// creating a new uuid to use as the key in the 'clients' hashmap, and a new instance of a 'client'
let uuid = Uuid::new_v4().simple().to_string();

let new_client = Client {
client_id: uuid.clone(),
//the client_sender object is stored within this new client instance so that we can send messages to this connected client in other parts of the code
sender: Some(client_sender),
};

//obtains a lock on the client list and inserts the new client into the hashmap using the uuid as the key.
clients.lock().await.insert(uuid.clone(), new_client);

// creates a loop that handles incoming messages from the client
while let Some(result) = client_ws_rcv.next().await {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
println!("error receiving message for id {}): {}", uuid.clone(), e);
break;
}
};
client_msg(&uuid, msg, &clients).await;
}

// as the above will keep running as long as the client is active, when we exit the loop we can safely remove this client instance from the hashmap.
clients.lock().await.remove(&uuid);
println!("{} disconnected", uuid); //debug
}

// example function to respond to a clients message, this just responds to 'ping!' with 'pong!', but later we will replace this with;
// ->recieve client game info <- send back client game state
// wwwwwwwwwwwwwwwwwwwww i am so tired
async fn client_msg(client_id: &str, msg: Message, clients: &Clients) {
println!("received message from {}: {:?}", client_id, msg); //debug

let message = match msg.to_str() {
Ok(v) => v,
Err(_) => return,
};

if message == "ping" || message == "ping\n" {
let locked = clients.lock().await;
match locked.get(client_id) {
Some(v) => {
if let Some(sender) = &v.sender {
println!("sending pong");
let _ = sender.send(Ok(Message::text("pong")));
}
}
None => return,
}
return;
};
}

0 comments on commit eefb154

Please sign in to comment.