Skip to content

Commit

Permalink
feat: Add a Form::into_stream() method on async multipart forms. (#…
Browse files Browse the repository at this point in the history
…2525)

The async equivalent of the change in #2524.

An example use case is compressing multipart form data with zstd.
The entire contents of the payload have to be compressed together,
which requires accessing the contents of the Form.

The existing stream functionality mostly implements what we need,
but just isn't publicly accessible. This PR adds a pub method for it.
  • Loading branch information
obi1kenobi authored Jan 9, 2025
1 parent f7d929f commit 3099f30
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions src/async_impl/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,23 @@ impl Form {
}

/// Consume this instance and transform into an instance of Body for use in a request.
pub(crate) fn stream(mut self) -> Body {
pub(crate) fn stream(self) -> Body {
if self.inner.fields.is_empty() {
return Body::empty();
}

Body::stream(self.into_stream())
}

/// Produce a stream of the bytes in this `Form`, consuming it.
pub fn into_stream(mut self) -> impl Stream<Item = Result<Bytes, crate::Error>> + Send + Sync {
if self.inner.fields.is_empty() {
let empty_stream: Pin<
Box<dyn Stream<Item = Result<Bytes, crate::Error>> + Send + Sync>,
> = Box::pin(futures_util::stream::empty());
return empty_stream;
}

// create initial part to init reduce chain
let (name, part) = self.inner.fields.remove(0);
let start = Box::pin(self.part_stream(name, part))
Expand All @@ -161,7 +173,7 @@ impl Form {
let last = stream::once(future::ready(Ok(
format!("--{}--\r\n", self.boundary()).into()
)));
Body::stream(stream.chain(last))
Box::pin(stream.chain(last))
}

/// Generate a hyper::Body stream for a single Part instance of a Form request.
Expand Down

0 comments on commit 3099f30

Please sign in to comment.