Skip to content

Commit

Permalink
Merge pull request #17 from hjr3/hyperv14
Browse files Browse the repository at this point in the history
Upgrade to hyper v0.14
  • Loading branch information
hjr3 authored Dec 24, 2020
2 parents 441e2b5 + 65de36b commit 1766cd6
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 80 deletions.
19 changes: 11 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
[package]
name = "hyper-timeout"
version = "0.3.1"
version = "0.4.0"
authors = ["Herman J. Radtke III <[email protected]>"]
edition = "2018"
description = "A connect, read and write timeout aware connector to be used with hyper Client."
license = "MIT/Apache-2.0"
documentation = "https://github.com/hjr3/hyper-timeout"
homepage = "https://github.com/hjr3/hyper-timeout"
repository = "https://github.com/hjr3/hyper-timeout"
edition = "2018"
readme = "README.md"

[badges]
travis-ci = { repository = "https://github.com/hjr3/hyper-timeout", branch = "master" }

[dependencies]
bytes = "0.5"
hyper = { version = "0.13", default-features = false, features = ["tcp"] }
tokio = "0.2"
tokio-io-timeout = "0.4"
bytes = "1.0.0"
hyper = { version = "0.14", default-features = false, features = ["client", "http1", "tcp"] }
pin-project-lite = "0.2"
tokio = "1.0.0"
tokio-io-timeout = "1.0.1"

[dev-dependencies]
hyper-tls = "0.4"
tokio = { version = "0.2", features = ["io-std", "macros"] }
#FIXME enable when https://github.com/hyperium/hyper-tls/pull/79 lands
#hyper-tls = "0.4"
tokio = { version = "1.0.0", features = ["io-std", "io-util", "macros"] }
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This c
Hyper version compatibility:

* The `master` branch will track on going development for hyper.
* The `0.4` release supports hyper 0.14.
* The `0.3` release supports hyper 0.13.
* The `0.2` release supports hyper 0.12.
* The `0.1` release supports hyper 0.11.

First, (assuming you are using hyper 0.13) add this to your `Cargo.toml`:
Assuming you are using hyper 0.14, add this to your `Cargo.toml`:

```toml
[dependencies]
hyper-timeout = "0.3"
hyper-timeout = "0.4"
```

See the [client example](./examples/client.rs) for a working example.
Expand Down
12 changes: 6 additions & 6 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::time::Duration;
use hyper::{Client, body::HttpBody as _};
use tokio::io::{self, AsyncWriteExt as _};

//use hyper::client::HttpConnector;
use hyper_tls::HttpsConnector;
use hyper::client::HttpConnector;
//use hyper_tls::HttpsConnector;

use hyper_timeout::TimeoutConnector;

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = match env::args().nth(1) {
Some(url) => url,
Expand All @@ -23,9 +23,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = url.parse::<hyper::Uri>().unwrap();

// This example uses `HttpsConnector`, but you can also use hyper `HttpConnector`
//let http = HttpConnector::new();
let https = HttpsConnector::new();
let mut connector = TimeoutConnector::new(https);
let h = HttpConnector::new();
//let h = HttpsConnector::new();
let mut connector = TimeoutConnector::new(h);
connector.set_connect_timeout(Some(Duration::from_secs(5)));
connector.set_read_timeout(Some(Duration::from_secs(5)));
connector.set_write_timeout(Some(Duration::from_secs(5)));
Expand Down
26 changes: 12 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::timeout;
use tokio::time::{timeout, Duration};
use tokio_io_timeout::TimeoutStream;

use hyper::{service::Service, Uri};
Expand Down Expand Up @@ -44,21 +43,17 @@ impl<T: Connect> TimeoutConnector<T> {

impl<T> Service<Uri> for TimeoutConnector<T>
where
T: Service<Uri>,
T::Response: AsyncRead + AsyncWrite + Send + Unpin,
T: Service<Uri> + Send,
T::Response: AsyncRead + AsyncWrite + Connection + Send + Unpin,
T::Future: Send + 'static,
T::Error: Into<BoxError>,
{
type Response = TimeoutConnectorStream<T::Response>;
type Response = Pin<Box<TimeoutConnectorStream<T::Response>>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.connector.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
}
self.connector.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, dst: Uri) -> Self::Future {
Expand All @@ -73,7 +68,7 @@ where
let mut tm = TimeoutConnectorStream::new(TimeoutStream::new(io));
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
Ok(tm)
Ok(Box::pin(tm))
};

return Box::pin(fut);
Expand All @@ -89,7 +84,7 @@ where
let mut tm = TimeoutConnectorStream::new(TimeoutStream::new(io));
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
Ok(tm)
Ok(Box::pin(tm))
};

Box::pin(fut)
Expand Down Expand Up @@ -122,9 +117,12 @@ impl<T> TimeoutConnector<T> {
}
}

impl<T> Connection for TimeoutConnector<T> {
impl<T: Connect> Connection for TimeoutConnector<T>
where
T: AsyncRead + AsyncWrite + Connection + Unpin,
{
fn connected(&self) -> Connected {
Connected::new()
self.connector.connected()
}
}

Expand Down
100 changes: 50 additions & 50 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use std::io;
use std::mem::MaybeUninit;
use std::io::IoSlice;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration};

use bytes::{Buf, BufMut};
use hyper::client::connect::{Connected, Connection};
use tokio::io::{AsyncRead, AsyncWrite};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_io_timeout::TimeoutStream;

/// A timeout stream that implements required traits to be a Connector
#[derive(Debug)]
pub struct TimeoutConnectorStream<S>(TimeoutStream<S>);
pin_project! {
/// A timeout stream that implements required traits to be a Connector
#[derive(Debug)]
pub struct TimeoutConnectorStream<S> {
#[pin]
stream: TimeoutStream<S>
}
}

impl<S> TimeoutConnectorStream<S>
where
Expand All @@ -21,74 +26,59 @@ where
///
/// There is initially no read or write timeout.
pub fn new(stream: TimeoutStream<S>) -> TimeoutConnectorStream<S> {
TimeoutConnectorStream(stream)
TimeoutConnectorStream { stream }
}

/// Returns the current read timeout.
pub fn read_timeout(&self) -> Option<Duration> {
self.0.read_timeout()
self.stream.read_timeout()
}

/// Sets the read timeout.
///
/// This will reset any pending read timeout.
pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.0.set_read_timeout(timeout)
self.stream.set_read_timeout(timeout)
}

/// Returns the current write timeout.
pub fn write_timeout(&self) -> Option<Duration> {
self.0.write_timeout()
self.stream.write_timeout()
}

/// Sets the write timeout.
///
/// This will reset any pending write timeout.
pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
self.0.set_write_timeout(timeout)
self.stream.set_write_timeout(timeout)
}

/// Returns a shared reference to the inner stream.
pub fn get_ref(&self) -> &S {
self.0.get_ref()
self.stream.get_ref()
}

/// Returns a mutable reference to the inner stream.
pub fn get_mut(&mut self) -> &mut S {
self.0.get_mut()
self.stream.get_mut()
}

/// Consumes the stream, returning the inner stream.
pub fn into_inner(self) -> S {
self.0.into_inner()
self.stream.into_inner()
}
}

impl<S> AsyncRead for TimeoutConnectorStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
self.0.prepare_uninitialized_buffer(buf)
}

fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}

fn poll_read_buf<B>(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut B,
) -> Poll<Result<usize, io::Error>>
where
B: BufMut,
{
Pin::new(&mut self.0).poll_read_buf(cx, buf)
buf: &mut ReadBuf,
) -> Poll<Result<(), io::Error>> {
self.project().stream.poll_read(cx, buf)
}
}

Expand All @@ -97,30 +87,31 @@ where
S: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
self.project().stream.poll_write(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
self.project().stream.poll_write_vectored(cx, bufs)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
fn is_write_vectored(&self) -> bool {
self.stream.is_write_vectored()
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().stream.poll_flush(cx)
}

fn poll_write_buf<B>(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut B,
) -> Poll<Result<usize, io::Error>>
where
B: Buf,
{
Pin::new(&mut self.0).poll_write_buf(cx, buf)
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().stream.poll_shutdown(cx)
}
}

Expand All @@ -129,6 +120,15 @@ where
S: AsyncRead + AsyncWrite + Connection + Unpin,
{
fn connected(&self) -> Connected {
self.0.get_ref().connected()
self.stream.get_ref().connected()
}
}

impl<S> Connection for Pin<Box<TimeoutConnectorStream<S>>>
where
S: AsyncRead + AsyncWrite + Connection + Unpin,
{
fn connected(&self) -> Connected {
self.stream.get_ref().connected()
}
}

0 comments on commit 1766cd6

Please sign in to comment.