Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Hyper 1.0 #15

Merged
merged 8 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ categories = ["http", "web", "network-programming", "web-programming::http-serve

[features]
default = []
http2 = ["hyper/http2"]
http2 = ["hyper/http2", "hyper-util/http2"]

[dependencies]
log = "0.4.17"
num_cpus = "1.15.0"
tokio = "1.14.1"
futures-core = "0.3.25"
hyper = { version = "0.14.23", features = ["http1", "server", "stream"] }
mio = { version = "0.8.5", features = ["os-poll", "net"] }
log = "0.4"
hyper = { version = "1", features = ["http1", "server"] }
hyper-util = { version = "0.1", features = ["http1", "server"] }
mio = { version = "1", features = ["os-poll", "net"] }
bytes = "1"
http-body-util = "0.1"

[dev-dependencies]
matchit = "0.7.0"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {

## Features

Astra supports both HTTP/1 and HTTP/2 with most the configuration options that Hyper exposes. Features that depend on Tokio however, such as [`http2_keep_alive_while_idle`](https://docs.rs/hyper/latest/hyper/client/struct.Builder.html#method.http2_keep_alive_while_idle), are not supported and blocked on [better hyper support](https://github.com/hyperium/hyper/issues/2846).
Astra supports both HTTP/1 and HTTP/2 with most the configuration options that Hyper exposes. Features that depend on timers however, such as [`http2_keep_alive_while_idle`](https://docs.rs/hyper/latest/hyper/client/conn/http2/struct.Builder.html#method.keep_alive_while_idle), are currently unsupported.

Astra is currently an HTTP *server* library only. The client API is unimplemented.

Expand Down
8 changes: 7 additions & 1 deletion src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::VecDeque;
use std::future::Future;
use std::num::NonZero;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
Expand Down Expand Up @@ -87,7 +88,12 @@ impl Executor {
}),
condvar: Condvar::new(),
keep_alive: keep_alive.unwrap_or_else(|| Duration::from_secs(6)),
max_workers: max_workers.unwrap_or_else(|| num_cpus::get() * 15),
max_workers: max_workers.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(NonZero::get)
.unwrap_or(1)
* 15
}),
}),
}
}
Expand Down
157 changes: 91 additions & 66 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::executor;

use core::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, debug_assert, io};
use std::{cmp, io, mem};

use futures_core::Stream;
use hyper::body::HttpBody;
use bytes::BytesMut;
use http_body_util::{BodyExt, Full};
use hyper::body::Frame;

pub use hyper::body::Bytes;

Expand Down Expand Up @@ -69,7 +71,9 @@ pub type ResponseBuilder = hyper::http::response::Builder;
/// Response::new(Body::new("Hello World!"))
/// }
/// ```
pub struct Body(pub(crate) hyper::Body);
pub struct Body(pub(crate) BoxBody);

type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, io::Error>;

impl Body {
/// Create a body from a string or bytes.
Expand All @@ -80,12 +84,14 @@ impl Body {
/// let bytes = Body::new(vec![0, 1, 0, 1, 0]);
/// ```
pub fn new(data: impl Into<Bytes>) -> Body {
Body(hyper::Body::from(data.into()))
Body(BoxBody::new(
Full::new(data.into()).map_err(|err| match err {}),
))
}

/// Create an empty body.
pub fn empty() -> Body {
Body(hyper::Body::empty())
Body(BoxBody::default())
}

/// Create a body from an implementor of [`io::Read`].
Expand All @@ -107,7 +113,7 @@ impl Body {
where
R: io::Read + Send + 'static,
{
Body(hyper::Body::wrap_stream(ReaderStream::new(reader)))
Body(BoxBody::new(ReaderBody::new(reader)))
}

/// Create a [`BodyReader`] that implements [`std::io::Read`].
Expand All @@ -132,23 +138,69 @@ impl Iterator for Body {
type Item = io::Result<Bytes>;

fn next(&mut self) -> Option<Self::Item> {
executor::Parker::new()
.block_on(self.0.data())
.map(|res| res.map_err(|err| io::Error::new(io::ErrorKind::Other, err)))
struct FrameFuture<'body>(Pin<&'body mut BoxBody>);

impl Future for FrameFuture<'_> {
type Output = Option<Result<Frame<Bytes>, io::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
hyper::body::Body::poll_frame(self.0.as_mut(), cx)
}
}

loop {
let result = executor::Parker::new().block_on(FrameFuture(Pin::new(&mut self.0)))?;

return match result {
Ok(frame) => match frame.into_data() {
Ok(bytes) => Some(Ok(bytes)),
Err(_) => continue,
},
Err(err) => Some(Err(err)),
};
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
Stream::size_hint(&self.0)
let size_hint = hyper::body::Body::size_hint(&self.0);
(
size_hint.lower() as _,
size_hint.upper().map(|size| size as _),
)
}
}

/// Wraps [`Body`] and implements [`std::io::Read`]
pub struct BodyReader<'b> {
body: &'b mut Body,
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

impl Default for Body {
fn default() -> Self {
Self::empty()
}
}

impl hyper::body::Body for Body {
type Data = Bytes;
type Error = io::Error;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}
}

/// Implements [`std::io::Read`] for [`Body`].
pub struct BodyReader<'body> {
body: &'body mut Body,
prev_bytes: Bytes,
}

impl<'b> std::io::Read for BodyReader<'b> {
impl std::io::Read for BodyReader<'_> {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
let mut written = 0;
loop {
Expand All @@ -166,8 +218,8 @@ impl<'b> std::io::Read for BodyReader<'b> {
}

if written != 0 {
// pulling from an interator can block, and we have something to return
// already, so return it
// Pulling from the iterator can block and we have something to return
// already, so return it.
return Ok(written);
}

Expand All @@ -183,86 +235,59 @@ impl<'b> std::io::Read for BodyReader<'b> {
}
}

impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

impl Default for Body {
fn default() -> Self {
Self::empty()
}
}

impl HttpBody for Body {
type Data = Bytes;
type Error = hyper::Error;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Pin::new(&mut self.0).poll_data(cx)
}

fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
Pin::new(&mut self.0).poll_trailers(cx)
}
}

struct ReaderStream<R> {
/// Implements `hyper::Body` for an implementor of `io::Read`.
struct ReaderBody<R> {
reader: Option<R>,
buf: Vec<u8>,
buf: BytesMut,
}

const CAP: usize = 4096;

impl<R> ReaderStream<R> {
impl<R> ReaderBody<R> {
/// Create a new `ReaderBody` from an `io::Read`.
fn new(reader: R) -> Self {
Self {
reader: Some(reader),
buf: vec![0; CAP],
buf: BytesMut::zeroed(CHUNK),
}
}
}

impl<R> Unpin for ReaderStream<R> {}
/// The size of the read buffer.
const CHUNK: usize = 4096;

impl<R> Unpin for ReaderBody<R> {}

impl<R> Stream for ReaderStream<R>
impl<R> hyper::body::Body for ReaderBody<R>
where
R: io::Read,
{
type Item = io::Result<Bytes>;
type Data = Bytes;
type Error = io::Error;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let ReaderStream { reader, buf } = &mut *self;
fn poll_frame(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let ReaderBody { reader, buf } = &mut *self;

let reader = match reader {
Some(reader) => reader,
None => return Poll::Ready(None),
};

if buf.capacity() == 0 {
buf.extend_from_slice(&[0; CAP]);
buf.extend_from_slice(&[0; CHUNK]);
}

match reader.read(buf) {
Err(err) => {
self.reader.take();
Poll::Ready(Some(Err(err)))
}
Err(err) => Poll::Ready(Some(Err(err))),
Ok(0) => {
self.reader.take();
Poll::Ready(None)
}
Ok(n) => {
let remaining = buf.split_off(n);
let chunk = std::mem::replace(buf, remaining);
Poll::Ready(Some(Ok(Bytes::from(chunk))))
let chunk = mem::replace(buf, remaining);
Poll::Ready(Some(Ok(Frame::data(Bytes::from(chunk)))))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ mod http;
mod net;
mod server;

pub use http::{Body, Request, Response, ResponseBuilder};
pub use http::{Body, BodyReader, Request, Response, ResponseBuilder};
pub use server::{ConnectionInfo, Server, Service};
26 changes: 18 additions & 8 deletions src/net.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::mem::MaybeUninit;
use std::net::{self as sys, Shutdown};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

use hyper::rt::ReadBufCursor;
use mio::{Events, Token};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

#[derive(Clone)]
pub struct Reactor {
Expand Down Expand Up @@ -210,22 +211,31 @@ impl TcpStream {
}
}

impl AsyncRead for TcpStream {
impl hyper::rt::Read for TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
mut buf: ReadBufCursor<'_>,
) -> Poll<io::Result<()>> {
let unfilled = buf.initialize_unfilled();
let initialized = unsafe {
let buf = buf.as_mut();

self.poll_io(direction::READ, || (&self.sys).read(unfilled), cx)
.map_ok(|read| {
buf.advance(read);
// Zero the buffer.
std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());

// Safety: The buffer was initialized above.
&mut *(buf as *mut [MaybeUninit<u8>] as *mut [u8])
};

self.poll_io(direction::READ, || (&self.sys).read(initialized), cx)
.map_ok(|n| {
// Safety: The entire buffer was initialized above.
unsafe { buf.advance(n) };
})
}
}

impl AsyncWrite for TcpStream {
impl hyper::rt::Write for TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
Loading
Loading