Skip to content

Commit

Permalink
Config::consume_body_on_drop to auto-consume unread body
Browse files Browse the repository at this point in the history
  • Loading branch information
algesten committed Jan 4, 2025
1 parent 881b1b3 commit 8feef7a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/body/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl BodyBuilder {
mime_type: None,
charset: None,
body_mode: BodyMode::NoBody,
consume_on_drop: None,
},
limit: None,
}
Expand Down
21 changes: 17 additions & 4 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use build::BodyBuilder;
use ureq_proto::http::header;
use ureq_proto::BodyMode;

use crate::config::Config;
use crate::http;
use crate::run::BodyHandler;
use crate::Error;
Expand Down Expand Up @@ -65,6 +66,7 @@ pub(crate) struct ResponseInfo {
mime_type: Option<String>,
charset: Option<String>,
body_mode: BodyMode,
consume_on_drop: Option<u64>,
}

impl Body {
Expand Down Expand Up @@ -383,7 +385,7 @@ impl<'a> BodyWithConfig<'a> {
fn do_build(self) -> BodyReader<'a> {
BodyReader::new(
LimitReader::new(self.handler, self.limit),
&self.info,
self.info.clone(),
self.info.body_mode,
self.lossy_utf8,
)
Expand Down Expand Up @@ -458,7 +460,7 @@ enum ContentEncoding {
}

impl ResponseInfo {
pub fn new(headers: &http::HeaderMap, body_mode: BodyMode) -> Self {
pub fn new(headers: &http::HeaderMap, body_mode: BodyMode, config: &Config) -> Self {
let content_encoding = headers
.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
Expand All @@ -476,6 +478,7 @@ impl ResponseInfo {
mime_type,
charset,
body_mode,
consume_on_drop: config.consume_body_on_drop(),
}
}

Expand Down Expand Up @@ -549,6 +552,7 @@ fn split_content_type(content_type: &str) -> (Option<String>, Option<String>) {
/// ```
pub struct BodyReader<'a> {
reader: MaybeLossyDecoder<CharsetDecoder<ContentDecoder<LimitReader<BodySourceRef<'a>>>>>,
info: Arc<ResponseInfo>,
// If this reader is used as SendBody for another request, this
// body mode can indiciate the content-length. Gzip, charset etc
// would mean input is not same as output.
Expand All @@ -558,7 +562,7 @@ pub struct BodyReader<'a> {
impl<'a> BodyReader<'a> {
fn new(
reader: LimitReader<BodySourceRef<'a>>,
info: &ResponseInfo,
info: Arc<ResponseInfo>,
incoming_body_mode: BodyMode,
lossy_utf8: bool,
) -> BodyReader<'a> {
Expand Down Expand Up @@ -604,8 +608,9 @@ impl<'a> BodyReader<'a> {
};

BodyReader {
outgoing_body_mode,
reader,
info,
outgoing_body_mode,
}
}

Expand Down Expand Up @@ -678,6 +683,14 @@ impl<'a> io::Read for BodyReader<'a> {
}
}

impl<'a> Drop for BodyReader<'a> {
fn drop(&mut self) {
if let Some(max) = self.info.consume_on_drop {
let _ = self.discard(max);
}
}
}

enum CharsetDecoder<R> {
#[cfg(feature = "charset")]
Decoder(charset::CharCodec<R>),
Expand Down
49 changes: 49 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub struct Config {
max_idle_connections: usize,
max_idle_connections_per_host: usize,
max_idle_age: Duration,
consume_body_on_drop: Option<u64>,

// Chain built for middleware.
pub(crate) middleware: MiddlewareChain,
Expand Down Expand Up @@ -380,6 +381,29 @@ impl Config {
pub fn max_idle_age(&self) -> Duration {
self.max_idle_age
}

/// Whether to auto-consume unread body data.
///
/// The max number of bytes to automatically consume if the [`Body`] is dropped

Check failure on line 387 in src/config.rs

View workflow job for this annotation

GitHub Actions / Docs

unresolved link to `Body`

Check failure on line 387 in src/config.rs

View workflow job for this annotation

GitHub Actions / Docs

unresolved link to `Body`
/// without being fully read. This is useful for returning connections to the
/// pool. There is a trade-off between how much data that is reasonable to consume
/// versus dropping the connection and opening a new one.
///
/// The amount set here is in addition to what has already been read. I.e. it is possible
/// to peek at the body and then drop it without consuming the rest.
///
/// This setting has no effect if the body has already been fully read.
///
/// **Note**: ureq is sync and dropping the [`Body`][crate::Body]/[`BodyReader`][crate::BodyReader]
/// with this option set will block until the amount is consumed. This can lead to potentially
/// confusing situations where your code is randomly freezing. For example: if this is set to
/// something big, like 1GB, the point where the body is dropped could block until 1GB
/// is read.
///
/// Defaults to `None`.
pub fn consume_body_on_drop(&self) -> Option<u64> {
self.consume_body_on_drop
}
}

/// Builder of [`Config`]
Expand Down Expand Up @@ -590,6 +614,30 @@ impl<Scope: private::ConfigScope> ConfigBuilder<Scope> {
self
}

/// Whether to auto-consume unread body data.
///
/// The max number of bytes to automatically consume if the [`Body`] is dropped

Check failure on line 619 in src/config.rs

View workflow job for this annotation

GitHub Actions / Docs

unresolved link to `Body`

Check failure on line 619 in src/config.rs

View workflow job for this annotation

GitHub Actions / Docs

unresolved link to `Body`
/// without being fully read. This is useful for returning connections to the
/// pool. There is a trade-off between how much data that is reasonable to consume
/// versus dropping the connection and opening a new one.
///
/// The amount set here is in addition to what has already been read. I.e. it is possible
/// to peek at the body and then drop it without consuming the rest.
///
/// This setting has no effect if the body has already been fully read.
///
/// **Note**: ureq is sync and dropping the [`Body`][crate::Body]/[`BodyReader`][crate::BodyReader]
/// with this option set will block until the amount is consumed. This can lead to potentially
/// confusing situations where your code is randomly freezing. For example: if this is set to
/// something big, like 1GB, the point where the body is dropped could block until 1GB
/// is read.
///
/// Defaults to `None`.
pub fn consume_body_on_drop(mut self, max: Option<u64>) -> Self {
self.config().consume_body_on_drop = max;
self
}

/// Add middleware to use for each request in this agent.
///
/// Defaults to no middleware.
Expand Down Expand Up @@ -819,6 +867,7 @@ impl Default for Config {
max_idle_connections: 10,
max_idle_connections_per_host: 3,
max_idle_age: Duration::from_secs(15),
consume_body_on_drop: None,
middleware: MiddlewareChain::default(),
force_send_body: false,
}
Expand Down
2 changes: 1 addition & 1 deletion src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(crate) fn run(
.map(|f| f.body_mode())
.unwrap_or(BodyMode::NoBody);

let info = ResponseInfo::new(&parts.headers, recv_body_mode);
let info = ResponseInfo::new(&parts.headers, recv_body_mode, &config);

let body = Body::new(handler, info);

Expand Down

0 comments on commit 8feef7a

Please sign in to comment.