Skip to content

Commit

Permalink
stream: Provide context-specific timeout messages
Browse files Browse the repository at this point in the history
If we time out while connecting, don't tell the user we timed out
reading a response.
  • Loading branch information
mrkline committed Feb 3, 2024
1 parent 2300a5b commit def31b3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ mod request;
mod resolve;
mod response;
mod stream;
mod timeout;
mod unit;

// rustls is our default tls engine. If the feature is on, it will be
Expand Down
28 changes: 6 additions & 22 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::chunked::Decoder as ChunkDecoder;
use crate::error::ErrorKind;
use crate::pool::{PoolKey, PoolReturner};
use crate::proxy::Proxy;
use crate::timeout::{io_err_timeout, time_until_deadline};
use crate::unit::Unit;
use crate::Response;
use crate::{error::Error, proxy::Proto};
Expand Down Expand Up @@ -83,7 +84,7 @@ impl From<DeadlineStream> for Stream {
impl BufRead for DeadlineStream {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if let Some(deadline) = self.deadline {
let timeout = time_until_deadline(deadline)?;
let timeout = time_until_deadline(deadline, "timed out reading response")?;
if let Some(socket) = self.stream.socket() {
socket.set_read_timeout(Some(timeout))?;
socket.set_write_timeout(Some(timeout))?;
Expand Down Expand Up @@ -130,20 +131,6 @@ impl Read for DeadlineStream {
}
}

// If the deadline is in the future, return the remaining time until
// then. Otherwise return a TimedOut error.
fn time_until_deadline(deadline: Instant) -> io::Result<Duration> {
let now = Instant::now();
match deadline.checked_duration_since(now) {
None => Err(io_err_timeout("timed out reading response".to_string())),
Some(duration) => Ok(duration),
}
}

pub(crate) fn io_err_timeout(error: String) -> io::Error {
io::Error::new(io::ErrorKind::TimedOut, error)
}

#[derive(Debug)]
pub(crate) struct ReadOnlyStream(Cursor<Vec<u8>>);

Expand Down Expand Up @@ -348,6 +335,7 @@ pub(crate) fn connect_host(
hostname: &str,
port: u16,
) -> Result<(TcpStream, SocketAddr), Error> {
const TIMEOUT_MSG: &str = "timed out connecting";
let connect_deadline: Option<Instant> =
if let Some(timeout_connect) = unit.agent.config.timeout_connect {
Instant::now().checked_add(timeout_connect)
Expand Down Expand Up @@ -382,7 +370,7 @@ pub(crate) fn connect_host(
// ensure connect timeout or overall timeout aren't yet hit.
let timeout = match connect_deadline {
Some(deadline) => {
let mut deadline = time_until_deadline(deadline)?;
let mut deadline = time_until_deadline(deadline, TIMEOUT_MSG)?;
if multiple_addrs {
deadline = deadline.div(2);
}
Expand Down Expand Up @@ -430,14 +418,10 @@ pub(crate) fn connect_host(
stream.set_nodelay(unit.agent.config.no_delay)?;

if let Some(deadline) = unit.deadline {
stream.set_read_timeout(Some(time_until_deadline(deadline)?))?;
stream.set_read_timeout(Some(time_until_deadline(deadline, TIMEOUT_MSG)?))?;
stream.set_write_timeout(Some(time_until_deadline(deadline, TIMEOUT_MSG)?))?;
} else {
stream.set_read_timeout(unit.agent.config.timeout_read)?;
}

if let Some(deadline) = unit.deadline {
stream.set_write_timeout(Some(time_until_deadline(deadline)?))?;
} else {
stream.set_write_timeout(unit.agent.config.timeout_write)?;
}

Expand Down
18 changes: 18 additions & 0 deletions src/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! Timeout utilities, mostly used during connecting.
use std::io;
use std::time::{Duration, Instant};

/// If the deadline is in the future, return the remaining time until
/// then. Otherwise return a TimedOut error.
pub fn time_until_deadline<S: Into<String>>(deadline: Instant, error: S) -> io::Result<Duration> {
let now = Instant::now();
match deadline.checked_duration_since(now) {
None => Err(io_err_timeout(error.into())),
Some(duration) => Ok(duration),
}
}

pub fn io_err_timeout(error: String) -> io::Error {
io::Error::new(io::ErrorKind::TimedOut, error)
}

0 comments on commit def31b3

Please sign in to comment.