You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I've spent the last few days implementing fastwebsockets and I have a few (hopefully easy to answer) questions on how to handle some patterns I struggled with. Hope it's not too much bother! 🙇 Let me know if you prefer I break this up into more issues, or ask somewhere else!
What's the best way to handle blocking calls?
I'm using fastwebsockets this with axum, and while clients are connected I need to perform an async blocking subscribe-like call to an upstream resource, waiting for items to come in, which I pass to the connected client. In the first implementation, I would just .await the future for it, but quickly found out that this prevents me from reacting to incoming frames. Not handling frames in means I don't notice disconnects, pings, etc.. - and I'm expecting to block for minutes to hours.
To be able to await items, while also handling incoming frames, I ended up using tokio::select!:
pubasyncfnhandle_client(...) -> Result<(),WebSocketError>{letmut ws = FragmentCollector::new(fut.await?);'outer:loop{// Create a subscription future we can poll while handling frameslet subscribe_future = wait_for_items(...);
pin_utils::pin_mut!(get_ticket_future);loop{// Poll both the read_frame and subscription future and handle whichever resolves first
tokio::select! {
frame_result = ws.read_frame() => {let frame = frame_result.unwrap();match frame.opcode {OpCode::Text => ...
OpCode::Close => returnOk(()),OpCode::Ping => ws.write_frame(Frame::pong(frame.payload)).await?,OpCode::Binary => ...,OpCode::Continuation | OpCode::Pong => unreachable!(),}}
item_result = &mut subscribe_future => {let item = item_result.unwrap();let outgoing_message = create_message(&item)?;
ws.write_frame(Frame::text(Payload::Borrowed(outgoing_message))).await?;// Continue outer loop to recreate the futurecontinue'outer;}}}}}
Is there a simpler way to handle frames while polling a future like this?
How should I handle server-initiated graceful closes?
I've come to learn that to gracefully close a connection, the server should send a close frame and wait for the client to confirm it. To be able to do this reliably, I've added state machinery to transition into a "waiting for close"-state.
Since one of the reasons I might close a connection is to shut down, I've also added a timeout limit:
use std::time::Duration;enumSubscribeResult{ItemFound(Item),ShuttingDown}enumStates{Subscribe,CloseRequested{initiated_at:DateTime<Utc>,}}pubasyncfnhandle_client(...) -> Result<(),WebSocketError>{letmut ws = FragmentCollector::new(fut.await?);letmut state = States::Subscribe;'outer:loop{match state {States::Subscribe => {// Create a subscription future we can poll while handling frameslet subscribe_future = wait_for_items(...);
pin_utils::pin_mut!(get_ticket_future);loop{// Poll both the read_frame and subscription future and handle whichever resolves first
tokio::select! {
frame_result = ws.read_frame() => {let frame = frame_result.unwrap();match frame.opcode {OpCode::Text => {// We do not expect a text frame here, so close the connection
ws.write_frame(Frame::close(CloseCode::Policy.into(),b"Text frame not expected")).await?;// Set new state and loop
state = States::CloseRequested{ initiated_at:Utc::now()};break'outer;}OpCode::Close => returnOk(()),OpCode::Ping => ws.write_frame(Frame::pong(frame.payload)).await?,OpCode::Binary => {// We do not support binary frames, so close the connection
ws.write_frame(Frame::close(CloseCode::Policy.into(),b"Binary frames are not supported")).await?;// Set new state and loop
state = States::CloseRequested{ initiated_at:Utc::now()};break'outer;},OpCode::Continuation | OpCode::Pong => unreachable!(),}}
item_result = &mut subscribe_future => {// Send message to the client and start againmatch item_result {SubscribeResult::ItemFound(item) => {let outgoing_message = create_message(&item)?;
ws.write_frame(Frame::text(Payload::Borrowed(outgoing_message))).await?;continue'outer;}SubscribeResult::ShuttingDown => {// We are shutting down, so close the connection
ws.write_frame(Frame::close(CloseCode::Away.into(),b"Shutting down")).await?;// Set new state and loop
state = States::CloseRequested{ initiated_at:Utc::now()};break'outer;}}}}}}States::CloseRequested{ initiated_at } => {loop{// Set a timeout of 5 seconds for reading a framelet frame = match tokio::time::timeout(Duration::from_secs(5), ws.read_frame()).await{Ok(Ok(frame)) => frame,// Close the connection if we fail to read a frameOk(Err(_)) => returnOk(()),// Close the connection if nothing comes in, within the timeoutErr(_timeout_elapsed) => returnOk(()),};match frame.opcode{OpCode::Ping => {ifUtc::now() > close_start + Duration::from_secs(5){info!("Closing connection abruptly, since the client hasn't responded in time");returnOk(());};
ws.write_frame(Frame::pong(frame.payload)).await?;},// Close the connection
_ => returnOk(()),}}}}}}
Does seem like the right way to handle this to you, or does the crate provide helpers here?
What's the best way to structure states?
The structure in the second example has an 'outer loop, and one inner loop per state. In my actual application I have 5 states, so I end up with 5 inner loops 😅
To not have one massive thousand line function, I've split the code like this:
use std::time::Duration;enumSubscribeResult{ItemFound(Item),ShuttingDown}enumStates{Subscribe,CloseRequested{initiated_at:DateTime<Utc>,}}enumWsHandlerResult{Continue,CloseConnection}asyncfn handle_subscribe(state:&mutStates, ...) -> Result<(),WebSocketError>{let subscribe_future = wait_for_items(...);
pin_utils::pin_mut!(get_ticket_future);loop{
tokio::select! {
frame_result = ws.read_frame() => {let frame = frame_result.unwrap();match frame.opcode {OpCode::Text => {
ws.write_frame(Frame::close(CloseCode::Policy.into(),b"Text frame not expected")).await?;*state = States::CloseRequested{ initiated_at:Utc::now()};// Tell the outer function to continue 'outerreturnOk(WsHandlerResult::Continue);}OpCode::Close => returnOk(()),OpCode::Ping => ws.write_frame(Frame::pong(frame.payload)).await?,OpCode::Binary => {
ws.write_frame(Frame::close(CloseCode::Policy.into(),b"Binary frames are not supported")).await?;*state = States::CloseRequested{ initiated_at:Utc::now()};// Tell the outer function to continue 'outerreturnOk(WsHandlerResult::Continue);},OpCode::Continuation | OpCode::Pong => unreachable!(),}}
item_result = &mut subscribe_future => {// Send message to the client and start againmatch item_result {SubscribeResult::ItemFound(item) => {let outgoing_message = create_message(&item)?;
ws.write_frame(Frame::text(Payload::Borrowed(outgoing_message))).await?;// Tell the outer function to continue 'outerreturnOk(WsHandlerResult::Continue);}SubscribeResult::ShuttingDown => {// We are shutting down, so close the connection
ws.write_frame(Frame::close(CloseCode::Away.into(),b"Shutting down")).await?;// Set new state and loop
state = States::CloseRequested{ initiated_at:Utc::now()};// Tell the outer function to continue 'outerreturnOk(WsHandlerResult::Continue);}}}}}asyncfnhandle_close(state:&mutStates,initiated_at:DateTime<Utc>, ...) -> Result<WsHandlerResult,WebSocketError>{loop{// Set a timeout of 5 seconds for reading a framelet frame = match tokio::time::timeout(Duration::from_secs(5), ws.read_frame()).await{Ok(Ok(frame)) => frame,// Close the connection if we fail to read a frameOk(Err(_)) => returnOk(WsHandlerResult::CloseConnection),// Close the connection if nothing comes in, within the timeoutErr(_timeout_elapsed) => returnOk(WsHandlerResult::CloseConnection),};match frame.opcode{OpCode::Ping => {ifUtc::now() > initiated_at + Duration::from_secs(5){info!("Closing connection abruptly, since the client hasn't responded in time");returnreturnOk(WsHandlerResult::CloseConnection);};
ws.write_frame(Frame::pong(frame.payload)).await?;},
_ => unreachable!(),}}}pubasyncfnhandle_client(...) -> Result<(),WebSocketError>{letmut ws = FragmentCollector::new(fut.await?);letmut state = States::Subscribe;'outer:loop{let handler_result = match state {States::Subscribe => handle_subscribe(&mut state,&mut ws).await?,States::CloseRequested{ initiated_at } => handle_close(&mut state, initiate_at,&mut ws).await?,};// Move to the next state or close the connectionmatch state_output {StateTransition::Continue => continue,StateTransition::Close => returnOk(()),};}}
While I think this is slightly better than one huge function, it's still pretty messy. Do you know of a better way to achieve this?
The text was updated successfully, but these errors were encountered:
sondrelg
changed the title
How should I structure blocking code
Questions on basic usage
Jan 30, 2025
First of all, thanks for the great library!
I've spent the last few days implementing
fastwebsockets
and I have a few (hopefully easy to answer) questions on how to handle some patterns I struggled with. Hope it's not too much bother! 🙇 Let me know if you prefer I break this up into more issues, or ask somewhere else!What's the best way to handle blocking calls?
I'm using
fastwebsockets
this withaxum
, and while clients are connected I need to perform an async blocking subscribe-like call to an upstream resource, waiting for items to come in, which I pass to the connected client. In the first implementation, I would just.await
the future for it, but quickly found out that this prevents me from reacting to incoming frames. Not handling frames in means I don't notice disconnects, pings, etc.. - and I'm expecting to block for minutes to hours.To be able to await items, while also handling incoming frames, I ended up using
tokio::select!
:Is there a simpler way to handle frames while polling a future like this?
How should I handle server-initiated graceful closes?
I've come to learn that to gracefully close a connection, the server should send a close frame and wait for the client to confirm it. To be able to do this reliably, I've added state machinery to transition into a "waiting for close"-state.
Since one of the reasons I might close a connection is to shut down, I've also added a timeout limit:
Does seem like the right way to handle this to you, or does the crate provide helpers here?
What's the best way to structure states?
The structure in the second example has an
'outer
loop, and one inner loop per state. In my actual application I have 5 states, so I end up with 5 inner loops 😅To not have one massive thousand line function, I've split the code like this:
While I think this is slightly better than one huge function, it's still pretty messy. Do you know of a better way to achieve this?
The text was updated successfully, but these errors were encountered: