Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Questions on basic usage #99

Open
sondrelg opened this issue Jan 30, 2025 · 0 comments
Open

Questions on basic usage #99

sondrelg opened this issue Jan 30, 2025 · 0 comments

Comments

@sondrelg
Copy link

sondrelg commented Jan 30, 2025

First of all, thanks for the great library!

I've spent the last few days implementing fastwebsockets and I have a few (hopefully easy to answer) questions on how to handle some patterns I struggled with. Hope it's not too much bother! 🙇 Let me know if you prefer I break this up into more issues, or ask somewhere else!

What's the best way to handle blocking calls?

I'm using fastwebsockets this with axum, and while clients are connected I need to perform an async blocking subscribe-like call to an upstream resource, waiting for items to come in, which I pass to the connected client. In the first implementation, I would just .await the future for it, but quickly found out that this prevents me from reacting to incoming frames. Not handling frames in means I don't notice disconnects, pings, etc.. - and I'm expecting to block for minutes to hours.

To be able to await items, while also handling incoming frames, I ended up using tokio::select!:

pub async fn handle_client(...) -> Result<(), WebSocketError> {
    let mut ws = FragmentCollector::new(fut.await?);

    'outer: loop {

        // Create a subscription future we can poll while handling frames
        let subscribe_future = wait_for_items(...);
        pin_utils::pin_mut!(get_ticket_future);

        loop {
            // Poll both the read_frame and subscription future and handle whichever resolves first
            tokio::select! {
                frame_result = ws.read_frame() => {
                    let frame = frame_result.unwrap();
                    match frame.opcode {
                        OpCode::Text => ... 
                        OpCode::Close => return Ok(()),
                        OpCode::Ping => ws.write_frame(Frame::pong(frame.payload)).await?,
                        OpCode::Binary => ...,
                        OpCode::Continuation | OpCode::Pong => unreachable!(),
                    }
                }
                item_result = &mut subscribe_future => {
                    let item = item_result.unwrap();
                    let outgoing_message = create_message(&item)?;
                    ws.write_frame(Frame::text(Payload::Borrowed(outgoing_message))).await?;
                    // Continue outer loop to recreate the future
                    continue 'outer;
                }
            }
        }
    }
} 

Is there a simpler way to handle frames while polling a future like this?

How should I handle server-initiated graceful closes?

I've come to learn that to gracefully close a connection, the server should send a close frame and wait for the client to confirm it. To be able to do this reliably, I've added state machinery to transition into a "waiting for close"-state.

Since one of the reasons I might close a connection is to shut down, I've also added a timeout limit:

use std::time::Duration;

enum SubscribeResult {
    ItemFound(Item),
    ShuttingDown
}

enum States {
    Subscribe,
    CloseRequested {
        initiated_at: DateTime<Utc>,
    }
}

pub async fn handle_client(...) -> Result<(), WebSocketError> {
    let mut ws = FragmentCollector::new(fut.await?);

    let mut state = States::Subscribe;

    'outer: loop {

         match state {
             States::Subscribe => {
                 // Create a subscription future we can poll while handling frames
                 let subscribe_future = wait_for_items(...);
                 pin_utils::pin_mut!(get_ticket_future);

                 loop {
                     // Poll both the read_frame and subscription future and handle whichever resolves first
                     tokio::select! {
                        frame_result = ws.read_frame() => {
                            let frame = frame_result.unwrap();
                            match frame.opcode {
                                OpCode::Text => {
                                     // We do not expect a text frame here, so close the connection
                                     ws.write_frame(Frame::close(CloseCode::Policy.into(), b"Text frame not expected")).await?;
                                     // Set new state and loop
                                     state = States::CloseRequested { initiated_at: Utc::now() };
                                     break 'outer;
                                 }
                                OpCode::Close => return Ok(()),
                                OpCode::Ping => ws.write_frame(Frame::pong(frame.payload)).await?,
                                OpCode::Binary => {
                                    // We do not support binary frames, so close the connection
                                     ws.write_frame(Frame::close(CloseCode::Policy.into(), b"Binary frames are not supported")).await?;
                                     // Set new state and loop
                                     state = States::CloseRequested { initiated_at: Utc::now() };
                                     break 'outer;
                                 },
                                OpCode::Continuation | OpCode::Pong => unreachable!(),
                            }
                        }
                        item_result = &mut subscribe_future => {
                            // Send message to the client and start again
                            match item_result {
                                 SubscribeResult::ItemFound(item) => {
                                     let outgoing_message = create_message(&item)?;
                                     ws.write_frame(Frame::text(Payload::Borrowed(outgoing_message))).await?;
                                     continue 'outer;
                                 }
                                 SubscribeResult::ShuttingDown => {
                                     // We are shutting down, so close the connection
                                     ws.write_frame(Frame::close(CloseCode::Away.into(), b"Shutting down")).await?;
                                     // Set new state and loop
                                     state = States::CloseRequested { initiated_at: Utc::now() };
                                     break 'outer;
                             }
                        }
                    }
                 }
                 }
             }
             States::CloseRequested { initiated_at } => {
                 loop {
                     // Set a timeout of 5 seconds for reading a frame
                     let frame = match tokio::time::timeout(Duration::from_secs(5), ws.read_frame()).await {
                         Ok(Ok(frame)) => frame,
                         // Close the connection if we fail to read a frame
                         Ok(Err(_)) => return Ok(()),
                         // Close the connection if nothing comes in, within the timeout
                         Err(_timeout_elapsed) => return Ok(()),
                     };

                     match frame.opcode {
                         OpCode::Ping => {
                             if Utc::now() > close_start + Duration::from_secs(5) {
                                 info!("Closing connection abruptly, since the client hasn't responded in time");
                                 return Ok(());
                             };
                             ws.write_frame(Frame::pong(frame.payload)).await?;
                         },
                         // Close the connection
                          _ => return Ok(()),
                     }
                 }
             }
         }
    }
}

Does seem like the right way to handle this to you, or does the crate provide helpers here?

What's the best way to structure states?

The structure in the second example has an 'outer loop, and one inner loop per state. In my actual application I have 5 states, so I end up with 5 inner loops 😅

To not have one massive thousand line function, I've split the code like this:

use std::time::Duration;

enum SubscribeResult {
    ItemFound(Item),
    ShuttingDown
}

enum States {
    Subscribe,
    CloseRequested {
        initiated_at: DateTime<Utc>,
    }
}

enum WsHandlerResult {
    Continue,
    CloseConnection
}

async fn handle_subscribe(state: &mut States, ...) -> Result<(), WebSocketError> {
    let subscribe_future = wait_for_items(...);
    pin_utils::pin_mut!(get_ticket_future);

    loop {
        tokio::select! {
            frame_result = ws.read_frame() => {
                let frame = frame_result.unwrap();
                match frame.opcode {
                    OpCode::Text => {
                         ws.write_frame(Frame::close(CloseCode::Policy.into(), b"Text frame not expected")).await?;
                         *state = States::CloseRequested { initiated_at: Utc::now() };
                        // Tell the outer function to continue 'outer
                        return Ok(WsHandlerResult::Continue);
                     }
                    OpCode::Close => return Ok(()),
                    OpCode::Ping => ws.write_frame(Frame::pong(frame.payload)).await?,
                    OpCode::Binary => {
                         ws.write_frame(Frame::close(CloseCode::Policy.into(), b"Binary frames are not supported")).await?;
                         *state = States::CloseRequested { initiated_at: Utc::now() };
                        // Tell the outer function to continue 'outer
                        return Ok(WsHandlerResult::Continue);
                     },
                    OpCode::Continuation | OpCode::Pong => unreachable!(),
                }
            }
            item_result = &mut subscribe_future => {
                // Send message to the client and start again
                match item_result {
                     SubscribeResult::ItemFound(item) => {
                         let outgoing_message = create_message(&item)?;
                         ws.write_frame(Frame::text(Payload::Borrowed(outgoing_message))).await?;
                        // Tell the outer function to continue 'outer
                        return Ok(WsHandlerResult::Continue);
                     }
                     SubscribeResult::ShuttingDown => {
                         // We are shutting down, so close the connection
                         ws.write_frame(Frame::close(CloseCode::Away.into(), b"Shutting down")).await?;
                         // Set new state and loop
                         state = States::CloseRequested { initiated_at: Utc::now() };
                        // Tell the outer function to continue 'outer
                        return Ok(WsHandlerResult::Continue);
                 }
            }
        }
    }
}

async fn handle_close(state: &mut States, initiated_at: DateTime<Utc>, ...) -> Result<WsHandlerResult, WebSocketError> {
    loop {
        // Set a timeout of 5 seconds for reading a frame
        let frame = match tokio::time::timeout(Duration::from_secs(5), ws.read_frame()).await {
            Ok(Ok(frame)) => frame,
            // Close the connection if we fail to read a frame
            Ok(Err(_)) => return Ok(WsHandlerResult::CloseConnection),
            // Close the connection if nothing comes in, within the timeout
            Err(_timeout_elapsed) => return Ok(WsHandlerResult::CloseConnection),
        };

        match frame.opcode {
            OpCode::Ping => {
                if Utc::now() > initiated_at + Duration::from_secs(5) {
                    info!("Closing connection abruptly, since the client hasn't responded in time");
                    return return Ok(WsHandlerResult::CloseConnection);
                };
                ws.write_frame(Frame::pong(frame.payload)).await?;
            },
            _ => unreachable!(),
        }
    }
}

pub async fn handle_client(...) -> Result<(), WebSocketError> {
    let mut ws = FragmentCollector::new(fut.await?);

    let mut state = States::Subscribe;

    'outer: loop {

         let handler_result = match state {
             States::Subscribe => handle_subscribe(&mut state, &mut ws).await?,
             States::CloseRequested { initiated_at } => handle_close(&mut state, initiate_at, &mut ws).await?,
         };
        
         // Move to the next state or close the connection
         match state_output {
             StateTransition::Continue => continue,
             StateTransition::Close => return Ok(()),
         };
    }
}

While I think this is slightly better than one huge function, it's still pretty messy. Do you know of a better way to achieve this?

@sondrelg sondrelg changed the title How should I structure blocking code Questions on basic usage Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant