From 65de36bf97ea18a7da3adfc17bb975bf05c390e7 Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Thu, 24 Dec 2020 07:31:18 -0800 Subject: [PATCH] Upgrade to hyper v0.14 --- Cargo.toml | 19 +++++---- README.md | 5 ++- examples/client.rs | 12 +++--- src/lib.rs | 26 ++++++------ src/stream.rs | 100 ++++++++++++++++++++++----------------------- 5 files changed, 82 insertions(+), 80 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6c521bc..d0977a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,23 +1,26 @@ [package] name = "hyper-timeout" -version = "0.3.1" +version = "0.4.0" authors = ["Herman J. Radtke III "] +edition = "2018" description = "A connect, read and write timeout aware connector to be used with hyper Client." license = "MIT/Apache-2.0" documentation = "https://github.com/hjr3/hyper-timeout" homepage = "https://github.com/hjr3/hyper-timeout" repository = "https://github.com/hjr3/hyper-timeout" -edition = "2018" +readme = "README.md" [badges] travis-ci = { repository = "https://github.com/hjr3/hyper-timeout", branch = "master" } [dependencies] -bytes = "0.5" -hyper = { version = "0.13", default-features = false, features = ["tcp"] } -tokio = "0.2" -tokio-io-timeout = "0.4" +bytes = "1.0.0" +hyper = { version = "0.14", default-features = false, features = ["client", "http1", "tcp"] } +pin-project-lite = "0.2" +tokio = "1.0.0" +tokio-io-timeout = "1.0.1" [dev-dependencies] -hyper-tls = "0.4" -tokio = { version = "0.2", features = ["io-std", "macros"] } +#FIXME enable when https://github.com/hyperium/hyper-tls/pull/79 lands +#hyper-tls = "0.4" +tokio = { version = "1.0.0", features = ["io-std", "io-util", "macros"] } diff --git a/README.md b/README.md index 088d8e3..3b6b479 100644 --- a/README.md +++ b/README.md @@ -20,15 +20,16 @@ There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This c Hyper version compatibility: * The `master` branch will track on going development for hyper. +* The `0.4` release supports hyper 0.14. * The `0.3` release supports hyper 0.13. * The `0.2` release supports hyper 0.12. * The `0.1` release supports hyper 0.11. -First, (assuming you are using hyper 0.13) add this to your `Cargo.toml`: +Assuming you are using hyper 0.14, add this to your `Cargo.toml`: ```toml [dependencies] -hyper-timeout = "0.3" +hyper-timeout = "0.4" ``` See the [client example](./examples/client.rs) for a working example. diff --git a/examples/client.rs b/examples/client.rs index 4a1b071..fb6182f 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -4,12 +4,12 @@ use std::time::Duration; use hyper::{Client, body::HttpBody as _}; use tokio::io::{self, AsyncWriteExt as _}; -//use hyper::client::HttpConnector; -use hyper_tls::HttpsConnector; +use hyper::client::HttpConnector; +//use hyper_tls::HttpsConnector; use hyper_timeout::TimeoutConnector; -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { let url = match env::args().nth(1) { Some(url) => url, @@ -23,9 +23,9 @@ async fn main() -> Result<(), Box> { let url = url.parse::().unwrap(); // This example uses `HttpsConnector`, but you can also use hyper `HttpConnector` - //let http = HttpConnector::new(); - let https = HttpsConnector::new(); - let mut connector = TimeoutConnector::new(https); + let h = HttpConnector::new(); + //let h = HttpsConnector::new(); + let mut connector = TimeoutConnector::new(h); connector.set_connect_timeout(Some(Duration::from_secs(5))); connector.set_read_timeout(Some(Duration::from_secs(5))); connector.set_write_timeout(Some(Duration::from_secs(5))); diff --git a/src/lib.rs b/src/lib.rs index f20d793..c93117b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,10 +2,9 @@ use std::future::Future; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::time::timeout; +use tokio::time::{timeout, Duration}; use tokio_io_timeout::TimeoutStream; use hyper::{service::Service, Uri}; @@ -44,21 +43,17 @@ impl TimeoutConnector { impl Service for TimeoutConnector where - T: Service, - T::Response: AsyncRead + AsyncWrite + Send + Unpin, + T: Service + Send, + T::Response: AsyncRead + AsyncWrite + Connection + Send + Unpin, T::Future: Send + 'static, T::Error: Into, { - type Response = TimeoutConnectorStream; + type Response = Pin>>; type Error = BoxError; type Future = Pin> + Send>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.connector.poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), - Poll::Pending => Poll::Pending, - } + self.connector.poll_ready(cx).map_err(Into::into) } fn call(&mut self, dst: Uri) -> Self::Future { @@ -73,7 +68,7 @@ where let mut tm = TimeoutConnectorStream::new(TimeoutStream::new(io)); tm.set_read_timeout(read_timeout); tm.set_write_timeout(write_timeout); - Ok(tm) + Ok(Box::pin(tm)) }; return Box::pin(fut); @@ -89,7 +84,7 @@ where let mut tm = TimeoutConnectorStream::new(TimeoutStream::new(io)); tm.set_read_timeout(read_timeout); tm.set_write_timeout(write_timeout); - Ok(tm) + Ok(Box::pin(tm)) }; Box::pin(fut) @@ -122,9 +117,12 @@ impl TimeoutConnector { } } -impl Connection for TimeoutConnector { +impl Connection for TimeoutConnector +where + T: AsyncRead + AsyncWrite + Connection + Unpin, +{ fn connected(&self) -> Connected { - Connected::new() + self.connector.connected() } } diff --git a/src/stream.rs b/src/stream.rs index ef60c94..ac2287e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,17 +1,22 @@ use std::io; -use std::mem::MaybeUninit; +use std::io::IoSlice; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration}; -use bytes::{Buf, BufMut}; use hyper::client::connect::{Connected, Connection}; -use tokio::io::{AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_io_timeout::TimeoutStream; -/// A timeout stream that implements required traits to be a Connector -#[derive(Debug)] -pub struct TimeoutConnectorStream(TimeoutStream); +pin_project! { + /// A timeout stream that implements required traits to be a Connector + #[derive(Debug)] + pub struct TimeoutConnectorStream { + #[pin] + stream: TimeoutStream + } +} impl TimeoutConnectorStream where @@ -21,46 +26,46 @@ where /// /// There is initially no read or write timeout. pub fn new(stream: TimeoutStream) -> TimeoutConnectorStream { - TimeoutConnectorStream(stream) + TimeoutConnectorStream { stream } } /// Returns the current read timeout. pub fn read_timeout(&self) -> Option { - self.0.read_timeout() + self.stream.read_timeout() } /// Sets the read timeout. /// /// This will reset any pending read timeout. pub fn set_read_timeout(&mut self, timeout: Option) { - self.0.set_read_timeout(timeout) + self.stream.set_read_timeout(timeout) } /// Returns the current write timeout. pub fn write_timeout(&self) -> Option { - self.0.write_timeout() + self.stream.write_timeout() } /// Sets the write timeout. /// /// This will reset any pending write timeout. pub fn set_write_timeout(&mut self, timeout: Option) { - self.0.set_write_timeout(timeout) + self.stream.set_write_timeout(timeout) } /// Returns a shared reference to the inner stream. pub fn get_ref(&self) -> &S { - self.0.get_ref() + self.stream.get_ref() } /// Returns a mutable reference to the inner stream. pub fn get_mut(&mut self) -> &mut S { - self.0.get_mut() + self.stream.get_mut() } /// Consumes the stream, returning the inner stream. pub fn into_inner(self) -> S { - self.0.into_inner() + self.stream.into_inner() } } @@ -68,27 +73,12 @@ impl AsyncRead for TimeoutConnectorStream where S: AsyncRead + AsyncWrite + Unpin, { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } - - fn poll_read_buf( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context, - buf: &mut B, - ) -> Poll> - where - B: BufMut, - { - Pin::new(&mut self.0).poll_read_buf(cx, buf) + buf: &mut ReadBuf, + ) -> Poll> { + self.project().stream.poll_read(cx, buf) } } @@ -97,30 +87,31 @@ where S: AsyncRead + AsyncWrite + Unpin, { fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context, + self: Pin<&mut Self>, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) + self.project().stream.poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.project().stream.poll_write_vectored(cx, bufs) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) + fn is_write_vectored(&self) -> bool { + self.stream.is_write_vectored() } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_flush(cx) } - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut B, - ) -> Poll> - where - B: Buf, - { - Pin::new(&mut self.0).poll_write_buf(cx, buf) + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_shutdown(cx) } } @@ -129,6 +120,15 @@ where S: AsyncRead + AsyncWrite + Connection + Unpin, { fn connected(&self) -> Connected { - self.0.get_ref().connected() + self.stream.get_ref().connected() + } +} + +impl Connection for Pin>> +where + S: AsyncRead + AsyncWrite + Connection + Unpin, +{ + fn connected(&self) -> Connected { + self.stream.get_ref().connected() } }