Skip to content

Commit

Permalink
fix(decoder): fix decoding extra empty frame (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
0x676e67 authored Dec 30, 2024
1 parent 0de340c commit d8118bc
Showing 1 changed file with 100 additions and 28 deletions.
128 changes: 100 additions & 28 deletions src/client/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg(any(
feature = "gzip",
feature = "zstd",
feature = "brotli",
feature = "deflate"
))]
use futures_util::stream::Fuse;

#[cfg(feature = "gzip")]
use async_compression::tokio::bufread::GzipDecoder;

Expand Down Expand Up @@ -92,19 +100,19 @@ enum Inner {

/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
#[cfg(feature = "gzip")]
Gzip(Pin<Box<FramedRead<GzipDecoder<PeekableIoStreamReader>, BytesCodec>>>),
Gzip(Pin<Box<Fuse<FramedRead<GzipDecoder<PeekableIoStreamReader>, BytesCodec>>>>),

/// A `Brotli` decoder will uncompress the brotlied response content before returning it.
#[cfg(feature = "brotli")]
Brotli(Pin<Box<FramedRead<BrotliDecoder<PeekableIoStreamReader>, BytesCodec>>>),
Brotli(Pin<Box<Fuse<FramedRead<BrotliDecoder<PeekableIoStreamReader>, BytesCodec>>>>),

/// A `Zstd` decoder will uncompress the zstd compressed response content before returning it.
#[cfg(feature = "zstd")]
Zstd(Pin<Box<FramedRead<ZstdDecoder<PeekableIoStreamReader>, BytesCodec>>>),
Zstd(Pin<Box<Fuse<FramedRead<ZstdDecoder<PeekableIoStreamReader>, BytesCodec>>>>),

/// A `Deflate` decoder will uncompress the deflated response content before returning it.
#[cfg(feature = "deflate")]
Deflate(Pin<Box<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>),
Deflate(Pin<Box<Fuse<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>>),

/// A decoder that doesn't have a value yet.
#[cfg(any(
Expand Down Expand Up @@ -336,34 +344,58 @@ impl HttpBody for Decoder {
}
#[cfg(feature = "gzip")]
Inner::Gzip(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
None => {
// poll inner connection until EOF after gzip stream is finished
poll_inner_should_be_empty(
decoder.get_mut().get_mut().get_mut().get_mut(),
cx,
)
}
}
}
#[cfg(feature = "brotli")]
Inner::Brotli(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
None => {
// poll inner connection until EOF after brotli stream is finished
poll_inner_should_be_empty(
decoder.get_mut().get_mut().get_mut().get_mut(),
cx,
)
}
}
}
#[cfg(feature = "zstd")]
Inner::Zstd(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
None => {
// poll inner connection until EOF after zstd stream is finished
poll_inner_should_be_empty(
decoder.get_mut().get_mut().get_mut().get_mut(),
cx,
)
}
}
}
#[cfg(feature = "deflate")]
Inner::Deflate(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
None => {
// poll inner connection until EOF after deflate stream is finished
poll_inner_should_be_empty(
decoder.get_mut().get_mut().get_mut().get_mut(),
cx,
)
}
}
}
}
Expand All @@ -384,6 +416,34 @@ impl HttpBody for Decoder {
}
}

#[cfg(any(
feature = "gzip",
feature = "zstd",
feature = "brotli",
feature = "deflate"
))]
fn poll_inner_should_be_empty(
inner: &mut PeekableIoStream,
cx: &mut Context,
) -> Poll<Option<Result<Frame<Bytes>, crate::Error>>> {
// poll inner connection until EOF after deflate stream is finished
// loop in case of empty frames
let mut inner = Pin::new(inner);
loop {
match futures_core::ready!(inner.as_mut().poll_next(cx)) {
// ignore any empty frames
Some(Ok(bytes)) if bytes.is_empty() => continue,
Some(Ok(_)) => {
return Poll::Ready(Some(Err(crate::error::decode(
"there are extra bytes after body has been decompressed",
))))
}
Some(Err(err)) => return Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => return Poll::Ready(None),
}
}
}

#[cfg(any(
feature = "gzip",
feature = "zstd",
Expand Down Expand Up @@ -426,25 +486,37 @@ impl Future for Pending {

match self.1 {
#[cfg(feature = "brotli")]
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(Box::pin(FramedRead::new(
BrotliDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(Box::pin(
FramedRead::new(
BrotliDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
)
.fuse(),
)))),
#[cfg(feature = "zstd")]
DecoderType::Zstd => Poll::Ready(Ok(Inner::Zstd(Box::pin(FramedRead::new(
ZstdDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
DecoderType::Zstd => Poll::Ready(Ok(Inner::Zstd(Box::pin(
FramedRead::new(
ZstdDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
)
.fuse(),
)))),
#[cfg(feature = "gzip")]
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new(
GzipDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(
FramedRead::new(
GzipDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
)
.fuse(),
)))),
#[cfg(feature = "deflate")]
DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin(FramedRead::new(
ZlibDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin(
FramedRead::new(
ZlibDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
)
.fuse(),
)))),
}
}
}
Expand Down

0 comments on commit d8118bc

Please sign in to comment.