Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast shutdown #641

Open
John-Nagle opened this issue Jul 12, 2023 · 5 comments
Open

Fast shutdown #641

John-Nagle opened this issue Jul 12, 2023 · 5 comments
Labels
help wanted Extra attention is needed

Comments

@John-Nagle
Copy link

John-Nagle commented Jul 12, 2023

I get 10-15 second delays when the user suddenly closes my program because tasks are waiting for remote servers to answer. I'd like to have some way to force a fast shutdown. The underlying TcpStream understands shutdown, but there is no way to do that from outside ureq.

Since Agent can't be accessed from multiple threads there's no obvious way to shut down an agent. But it's possible to share the Pool, and I'd like to be able to tell the Pool to shut down everything. Thanks.

(I have a multi-threaded program, and I'm trying to stay away from "async", because the combination of Tokio and threading gets really complicated.)

@John-Nagle
Copy link
Author

Below is a code sample of what I want to do. This forks off a thread which tries to read from a URL. This particular URL (http://0.0.0.1) doesn't work; it just hangs. So when the program exits, there's no clean way to unblock ureq.
The program is stuck waiting for ureq to time out. I need some way to get ureq to call shutdown for the underlying https://doc.rust-lang.org/std/net/struct.TcpStream.html

Is something like that possible?

"Agent" is cloneable and the clones can be shared across threads. So I would suggest implementing the method agent.shutdown() and have the connection associated with that agent shut down immediately, even if there's a pending request on another thread. The pending request and any following request on that agent should return an error.

//! # UREQ shutdown test
use anyhow::{Error};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

/// Reads from a URL with slow polling.
pub struct UrlPoller {
    /// The worker thread that polls some URL
    pub listen_thread: Option<std::thread::JoinHandle<()>>,
    /// The Ureq agent.
    pub agent: ureq::Agent,
    /// Quit flag, set when object is dropped.
    pub quit_flag: Arc<AtomicBool>,
}

impl UrlPoller {
    pub fn new(url: String) -> Result<Self, Error> {
        //  Spawn thread to listen on this socket and act on messages.
        let agent = ureq::AgentBuilder::new().build();
        let agent_clone = agent.clone();
        let quit_flag = Arc::new(AtomicBool::new(false));
        let quit_flag_clone = Arc::clone(&quit_flag);
        let listen_thread = std::thread::Builder::new()
            .name(format!("Event poller from {:?}", url))
            .spawn(move || {
                while !quit_flag_clone.load(Ordering::Relaxed) {
                    let http_result = agent_clone
                        .get(&url)
                        .call();
                    match http_result {
                        Ok(result) => {
                            println!("Result: {:?}", result);
                            std::thread::sleep(std::time::Duration::from_secs(2));  // don't overdo the test fetches
                        }
                        Err(e) => {
                            println!("HTTP error: {:?}", e);
                            break;
                        }
                    }                    
                }  
                println!("Thread finishing.");            
            })?;
        Ok(Self {
            listen_thread: Some(listen_thread),
            agent,
            quit_flag,
        })
    }
}

impl Drop for UrlPoller {
    /// Drop poller, shutting down network activity.
    /// Graceful shutdown - waits for thread exit.
    fn drop(&mut self) {
        println!("Dropping UriPoller");
        self.quit_flag.store(true, Ordering::Relaxed);
        //  ==> Can't do this, but want the API to allow it. <==
        ////self.agent.shutdown();
        //  Wait for thread to exit.
        let _ = self.listen_thread.take().expect("Where is the join handle?").join();
    }
}

fn main() {
    const URL: &str = "http://0.0.0.1"; // This just hangs
    println!("Starting.");
    let drop_time  = { let _poller = UrlPoller::new(URL.to_string());
        std::thread::sleep(std::time::Duration::from_secs(10));  // run for this long
        std::time::Instant::now()
    };
    let drop_elapsed = drop_time.elapsed();     // how long did the drop take?
 
    //  Poller has now been dropped. We want any outstanding HTTP requests to be aborted.
    println!("Done. {} seconds in drop", drop_elapsed.as_secs_f32());
}

@jsha
Copy link
Collaborator

jsha commented Jul 13, 2023

An interesting problem! The first couple of solutions that occur to me:

  • During shutdown, if there are threads whose work you don't care about (because you want to cancel their pending requests anyhow), don't join those threads; simply return from main and let the threads die.
  • Set a timeout on the Agent corresponding to the max time you are willing to wait for a response; this would also wind up being the max time you would wait upon shutdown.

However, neither of these is really satisfying or clean.

To your proposed solution of having the agent shut down all extant requests: with the current design that's a little tricky, since the agent (and specifically the agent's pool) does not retain ownership of the TcpStream once it's handed off to the user for reading. Instead, the TcpStream is wrapped in several layers, one of which is an Arc<Mutex<...>> pointing back to the Pool, so ownership can be given back to the pool when done. It's possible we could rearchitect this a bit.

However, there's another problem: a request consists of DNS lookup, TCP connect, and TCP traffic. Calling TcpStream::shutdown only solves your issue if the hangs are occurring during the TCP traffic stage. If they are occurring during DNS lookup or during TCP connect (more likely given your example of 0.0.0.1), the only thing that can really help is setting timeouts.

And unfortunately, even timeouts are not perfect: we use ToSocketAddrs to do DNS resolution. Under the hood that uses system APIs that don't provide a nice timeout mechanism, so we at present timeouts don't strictly apply to DNS lookups. Instead the system resolver's default DNS lookup timeout comes into play. We may be able to solve this by maintaining a thread pool to perform DNS lookups on, but we don't do that at present.

@John-Nagle
Copy link
Author

this would also wind up being the max time you would wait upon shutdown.

I'm doing that now in some other parts of the program, and it produces an annoying 5 second delay at exit. Here, I'm doing a long HTTP poll for a push-type system. (The other end is not mine, so I don't define the interface.) I have to make an HTTP request which will either return data at some point, or, after 20-30 seconds or so of no events, returns an HTTP status, after which there's another poll. A 30 second delay at shutdown is unacceptable. So I have to do something.

To your proposed solution of having the agent shut down all extant requests: with the current design that's a little tricky,

Ah. Thanks. I realize it's a pain. Anything that can be done in that direction would be appreciated.

... DNS lookup ...

Here, DNS delay isn't a problem - the other end is going to be there if we get far enough to start the long poll.

...simply return from main and let the threads die....

That's what I just wrote code to do, using, as shown above, with a crossbeam-channel that gets closed on drop. The polling thread is still stuck in ureq, but the thread for the channel close gets its close and cleans up. So that works out.

(What I'm doing is a metaverse client. It has many open UDP and HTTP connections to multiple servers. If the program aborts without a clean shutdown, you have an avatar frozen in a shared virtual world, or even moving in an uncontrolled vehicle. The server side will eventually time out and force a logout, but it takes over a minute. Until that happens, users cannot log in again, which annoys them. A clean logout produces a nice little teleporting effect seen by others, and users expect that.)

@algesten
Copy link
Owner

Is this still and issue in ureq 3.x?

@algesten
Copy link
Owner

algesten commented Jan 4, 2025

To solve this we could duplicate the socket using TcpStream::try_clone(). This should always succeed if the socket is open and hasn't been used. The cloned stream would be retained by the the TcpConnector (and SocksConnector) and we need some new Transport::shutdown() call in the Transport trait that instructs the connector to shut down any Transport it created straight away. The shutdown call must be propagated for all Transports in the connector chain.

Finally we should have an Agent::shutdown(self) call that instructs the pool/connector chain to shut down.

All in all this doesn't seem terribly difficult in ureq 3.x We might however want to put it behind a feature flag in case TcpStream::try_clone() isn't guaranteed to work. It also needs to be tested how TcpStream returned by SocksProxy behaves since that socket has already been used to establish the proxy.

@algesten algesten added the help wanted Extra attention is needed label Jan 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants