Skip to content

Commit

Permalink
refactor: reduce dependency on futures-core / futures-util (#2557)
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini authored Feb 18, 2025
1 parent 1abc4fc commit 4eb9868
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 36 deletions.
12 changes: 6 additions & 6 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use std::time::Duration;

use bytes::Bytes;
Expand Down Expand Up @@ -273,7 +273,7 @@ impl HttpBody for Body {
}
}
Inner::Streaming(ref mut body) => Poll::Ready(
futures_core::ready!(Pin::new(body).poll_frame(cx))
ready!(Pin::new(body).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
),
}
Expand Down Expand Up @@ -328,7 +328,7 @@ where
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
Poll::Ready(
futures_core::ready!(this.inner.poll_frame(cx))
ready!(this.inner.poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
)
}
Expand Down Expand Up @@ -371,7 +371,7 @@ where
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}

let item = futures_core::ready!(this.inner.poll_frame(cx))
let item = ready!(this.inner.poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body));
// a ready frame means timeout is reset
this.sleep.set(None);
Expand Down Expand Up @@ -442,7 +442,7 @@ where

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
return match ready!(Pin::new(&mut self.0).poll_frame(cx)) {
Some(Ok(frame)) => {
// skip non-data frames
if let Ok(buf) = frame.into_data() {
Expand Down Expand Up @@ -481,7 +481,7 @@ where
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
match futures_core::ready!(self.project().inner.poll_frame(cx)) {
match ready!(self.project().inner.poll_frame(cx)) {
Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None),
Expand Down
36 changes: 16 additions & 20 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
))]
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

#[cfg(any(
feature = "gzip",
Expand Down Expand Up @@ -364,16 +364,14 @@ impl HttpBody for Decoder {
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))),
Poll::Pending => Poll::Pending,
},
Inner::PlainText(ref mut body) => {
match futures_core::ready!(Pin::new(body).poll_frame(cx)) {
Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))),
None => Poll::Ready(None),
}
}
Inner::PlainText(ref mut body) => match ready!(Pin::new(body).poll_frame(cx)) {
Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))),
None => Poll::Ready(None),
},
#[cfg(feature = "gzip")]
Inner::Gzip(ref mut decoder) => {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
match ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => {
Expand All @@ -387,7 +385,7 @@ impl HttpBody for Decoder {
}
#[cfg(feature = "brotli")]
Inner::Brotli(ref mut decoder) => {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
match ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => {
Expand All @@ -401,7 +399,7 @@ impl HttpBody for Decoder {
}
#[cfg(feature = "zstd")]
Inner::Zstd(ref mut decoder) => {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
match ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => {
Expand All @@ -415,7 +413,7 @@ impl HttpBody for Decoder {
}
#[cfg(feature = "deflate")]
Inner::Deflate(ref mut decoder) => {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
match ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => {
Expand Down Expand Up @@ -459,7 +457,7 @@ fn poll_inner_should_be_empty(
// loop in case of empty frames
let mut inner = Pin::new(inner);
loop {
match futures_core::ready!(inner.as_mut().poll_next(cx)) {
match ready!(inner.as_mut().poll_next(cx)) {
// ignore any empty frames
Some(Ok(bytes)) if bytes.is_empty() => continue,
Some(Ok(_)) => {
Expand Down Expand Up @@ -497,17 +495,15 @@ impl Future for Pending {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_util::StreamExt;

match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) {
match ready!(Pin::new(&mut self.0).poll_peek(cx)) {
Some(Ok(_)) => {
// fallthrough
}
Some(Err(_e)) => {
// error was just a ref, so we need to really poll to move it
return Poll::Ready(Err(futures_core::ready!(
Pin::new(&mut self.0).poll_next(cx)
)
.expect("just peeked Some")
.unwrap_err()));
return Poll::Ready(Err(ready!(Pin::new(&mut self.0).poll_next(cx))
.expect("just peeked Some")
.unwrap_err()));
}
None => return Poll::Ready(Ok(Inner::PlainText(empty()))),
};
Expand Down Expand Up @@ -567,7 +563,7 @@ where

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
return match ready!(Pin::new(&mut self.0).poll_frame(cx)) {
Some(Ok(frame)) => {
// skip non-data frames
if let Ok(buf) = frame.into_data() {
Expand Down
2 changes: 1 addition & 1 deletion src/async_impl/h3_client/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ pub(super) async fn resolve<R>(resolver: &mut R, name: Name) -> Result<R::Addrs,
where
R: Resolve,
{
futures_util::future::poll_fn(|cx| resolver.poll_ready(cx)).await?;
std::future::poll_fn(|cx| resolver.poll_ready(cx)).await?;
resolver.resolve(name).await
}
2 changes: 1 addition & 1 deletion src/async_impl/h3_client/pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytes::Bytes;
use std::collections::HashMap;
use std::future;
use std::sync::mpsc::{Receiver, TryRecvError};
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand All @@ -10,7 +11,6 @@ use crate::async_impl::body::ResponseBody;
use crate::error::{BoxError, Error, Kind};
use crate::Body;
use bytes::Buf;
use futures_util::future;
use h3::client::SendRequest;
use h3_quinn::{Connection, OpenStreams};
use http::uri::{Authority, Scheme};
Expand Down
5 changes: 3 additions & 2 deletions src/blocking/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::future::Future;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::sync::Arc;
use std::task::ready;
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -1306,12 +1307,12 @@ where
futures_util::pin_mut!(fut);

// "select" on the sender being canceled, and the future completing
let res = futures_util::future::poll_fn(|cx| {
let res = std::future::poll_fn(|cx| {
match fut.as_mut().poll(cx) {
Poll::Ready(val) => Poll::Ready(Some(val)),
Poll::Pending => {
// check if the callback is canceled
futures_core::ready!(tx.poll_closed(cx));
ready!(tx.poll_closed(cx));
Poll::Ready(None)
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/blocking/wait.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{Context, Poll, Wake, Waker};
use std::thread::{self, Thread};
use std::time::Duration;

Expand All @@ -20,7 +20,7 @@ where
let thread = ThreadWaker(thread::current());
// Arc shouldn't be necessary, since `Thread` is reference counted internally,
// but let's just stay safe for now.
let waker = futures_util::task::waker(Arc::new(thread));
let waker = Waker::from(Arc::new(thread));
let mut cx = Context::from_waker(&waker);

futures_util::pin_mut!(fut);
Expand Down Expand Up @@ -60,9 +60,13 @@ pub(crate) enum Waited<E> {

struct ThreadWaker(Thread);

impl futures_util::task::ArcWake for ThreadWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.0.unpark();
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}

fn wake_by_ref(self: &Arc<Self>) {
self.0.unpark();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/dns/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Resolve for DnsResolverWithOverrides {
match self.overrides.get(name.as_str()) {
Some(dest) => {
let addrs: Addrs = Box::new(dest.clone().into_iter());
Box::pin(futures_util::future::ready(Ok(addrs)))
Box::pin(std::future::ready(Ok(addrs)))
}
None => self.dns_resolver.resolve(name),
}
Expand Down

0 comments on commit 4eb9868

Please sign in to comment.