Skip to content

Commit

Permalink
add H1 transport
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 3, 2018
1 parent ae5c4df commit 2710f70
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 74 deletions.
85 changes: 72 additions & 13 deletions src/server/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::{Shutdown, SocketAddr};
use std::net::Shutdown;
use std::{io, mem, time};

use bytes::{Buf, BufMut, BytesMut};
Expand All @@ -16,7 +16,7 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
pub(crate) enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> {
H1(h1::Http1Dispatcher<T, H>),
H2(h2::Http2<T, H>),
Unknown(ServiceConfig<H>, Option<SocketAddr>, T, BytesMut),
Unknown(ServiceConfig<H>, T, BytesMut),
None,
}

Expand All @@ -29,7 +29,7 @@ impl<T: IoStream, H: HttpHandler + 'static> HttpProtocol<T, H> {
let _ = IoStream::shutdown(io, Shutdown::Both);
}
HttpProtocol::H2(ref mut h2) => h2.shutdown(),
HttpProtocol::Unknown(_, _, io, _) => {
HttpProtocol::Unknown(_, io, _) => {
let _ = IoStream::set_linger(io, Some(time::Duration::new(0, 0)));
let _ = IoStream::shutdown(io, Shutdown::Both);
}
Expand Down Expand Up @@ -59,17 +59,14 @@ where
T: IoStream,
H: HttpHandler + 'static,
{
pub(crate) fn new(
settings: ServiceConfig<H>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> {
pub(crate) fn new(settings: ServiceConfig<H>, io: T) -> HttpChannel<T, H> {
let ka_timeout = settings.client_timer();

HttpChannel {
ka_timeout,
node_reg: false,
node: Node::new(HttpProtocol::Unknown(
settings,
peer,
io,
BytesMut::with_capacity(8192),
)),
Expand Down Expand Up @@ -102,7 +99,7 @@ where
Ok(Async::Ready(_)) => {
trace!("Slow request timed out, close connection");
let proto = mem::replace(self.node.get_mut(), HttpProtocol::None);
if let HttpProtocol::Unknown(settings, _, io, buf) = proto {
if let HttpProtocol::Unknown(settings, io, buf) = proto {
*self.node.get_mut() =
HttpProtocol::H1(h1::Http1Dispatcher::for_error(
settings,
Expand All @@ -125,7 +122,7 @@ where
let settings = match self.node.get_mut() {
HttpProtocol::H1(ref mut h1) => h1.settings().clone(),
HttpProtocol::H2(ref mut h2) => h2.settings().clone(),
HttpProtocol::Unknown(ref mut settings, _, _, _) => settings.clone(),
HttpProtocol::Unknown(ref mut settings, _, _) => settings.clone(),
HttpProtocol::None => unreachable!(),
};
settings.head().insert(&mut self.node);
Expand All @@ -135,7 +132,7 @@ where
let kind = match self.node.get_mut() {
HttpProtocol::H1(ref mut h1) => return h1.poll(),
HttpProtocol::H2(ref mut h2) => return h2.poll(),
HttpProtocol::Unknown(_, _, ref mut io, ref mut buf) => {
HttpProtocol::Unknown(_, ref mut io, ref mut buf) => {
let mut err = None;
let mut disconnect = false;
match io.read_available(buf) {
Expand Down Expand Up @@ -173,13 +170,12 @@ where

// upgrade to specific http protocol
let proto = mem::replace(self.node.get_mut(), HttpProtocol::None);
if let HttpProtocol::Unknown(settings, addr, io, buf) = proto {
if let HttpProtocol::Unknown(settings, io, buf) = proto {
match kind {
ProtocolKind::Http1 => {
*self.node.get_mut() = HttpProtocol::H1(h1::Http1Dispatcher::new(
settings,
io,
addr,
buf,
is_eof,
self.ka_timeout.take(),
Expand All @@ -190,7 +186,6 @@ where
*self.node.get_mut() = HttpProtocol::H2(h2::Http2::new(
settings,
io,
addr,
buf.freeze(),
self.ka_timeout.take(),
));
Expand All @@ -202,6 +197,70 @@ where
}
}

#[doc(hidden)]
pub struct H1Channel<T, H>
where
T: IoStream,
H: HttpHandler + 'static,
{
node: Node<HttpProtocol<T, H>>,
node_reg: bool,
}

impl<T, H> H1Channel<T, H>
where
T: IoStream,
H: HttpHandler + 'static,
{
pub(crate) fn new(settings: ServiceConfig<H>, io: T) -> H1Channel<T, H> {
H1Channel {
node_reg: false,
node: Node::new(HttpProtocol::H1(h1::Http1Dispatcher::new(
settings,
io,
BytesMut::with_capacity(8192),
false,
None,
))),
}
}
}

impl<T, H> Drop for H1Channel<T, H>
where
T: IoStream,
H: HttpHandler + 'static,
{
fn drop(&mut self) {
self.node.remove();
}
}

impl<T, H> Future for H1Channel<T, H>
where
T: IoStream,
H: HttpHandler + 'static,
{
type Item = ();
type Error = HttpDispatchError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if !self.node_reg {
self.node_reg = true;
let settings = match self.node.get_mut() {
HttpProtocol::H1(ref mut h1) => h1.settings().clone(),
_ => unreachable!(),
};
settings.head().insert(&mut self.node);
}

match self.node.get_mut() {
HttpProtocol::H1(ref mut h1) => h1.poll(),
_ => unreachable!(),
}
}
}

pub(crate) struct Node<T> {
next: Option<*mut Node<T>>,
prev: Option<*mut Node<T>>,
Expand Down
13 changes: 8 additions & 5 deletions src/server/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ where
H: HttpHandler + 'static,
{
pub fn new(
settings: ServiceConfig<H>, stream: T, addr: Option<SocketAddr>, buf: BytesMut,
is_eof: bool, keepalive_timer: Option<Delay>,
settings: ServiceConfig<H>, stream: T, buf: BytesMut, is_eof: bool,
keepalive_timer: Option<Delay>,
) -> Self {
let addr = stream.peer_addr();
let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer {
(delay.deadline(), Some(delay))
} else if let Some(delay) = settings.keep_alive_timer() {
Expand All @@ -107,12 +108,12 @@ where
};

Http1Dispatcher {
flags,
stream: H1Writer::new(stream, settings.clone()),
decoder: H1Decoder::new(),
payload: None,
tasks: VecDeque::new(),
error: None,
flags,
addr,
buf,
settings,
Expand Down Expand Up @@ -337,9 +338,11 @@ where
/// read data from the stream
pub(self) fn poll_io(&mut self) -> Result<bool, HttpDispatchError> {
if !self.flags.contains(Flags::POLLED) {
let updated = self.parse()?;
self.flags.insert(Flags::POLLED);
return Ok(updated);
if !self.buf.is_empty() {
let updated = self.parse()?;
return Ok(updated);
}
}

// read io from socket
Expand Down
4 changes: 2 additions & 2 deletions src/server/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ where
H: HttpHandler + 'static,
{
pub fn new(
settings: ServiceConfig<H>, io: T, addr: Option<SocketAddr>, buf: Bytes,
keepalive_timer: Option<Delay>,
settings: ServiceConfig<H>, io: T, buf: Bytes, keepalive_timer: Option<Delay>,
) -> Self {
let addr = io.peer_addr();
let extensions = io.extensions();
Http2 {
flags: Flags::empty(),
Expand Down
4 changes: 1 addition & 3 deletions src/server/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ where
type Result = ();

fn handle(&mut self, msg: WrapperStream<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::spawn(
HttpChannel::new(self.settings.clone(), msg, None).map_err(|_| ()),
);
Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg).map_err(|_| ()));
}
}
20 changes: 17 additions & 3 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
//! let _ = sys.run();
//!}
//! ```
use std::net::Shutdown;
use std::net::{Shutdown, SocketAddr};
use std::rc::Rc;
use std::{io, time};

Expand Down Expand Up @@ -143,10 +143,13 @@ pub use self::message::Request;
pub use self::ssl::*;

pub use self::error::{AcceptorError, HttpDispatchError};
pub use self::settings::{ServerSettings, ServiceConfig, ServiceConfigBuilder};
pub use self::settings::ServerSettings;

#[doc(hidden)]
pub use self::service::{HttpService, StreamConfiguration};
pub use self::settings::{ServiceConfig, ServiceConfigBuilder};

#[doc(hidden)]
pub use self::service::{H1Service, HttpService, StreamConfiguration};

#[doc(hidden)]
pub use self::helpers::write_content_length;
Expand Down Expand Up @@ -266,6 +269,12 @@ pub trait Writer {
pub trait IoStream: AsyncRead + AsyncWrite + 'static {
fn shutdown(&mut self, how: Shutdown) -> io::Result<()>;

/// Returns the socket address of the remote peer of this TCP connection.
fn peer_addr(&self) -> Option<SocketAddr> {
None
}

/// Sets the value of the TCP_NODELAY option on this socket.
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>;

fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
Expand Down Expand Up @@ -341,6 +350,11 @@ impl IoStream for TcpStream {
TcpStream::shutdown(self, how)
}

#[inline]
fn peer_addr(&self) -> Option<SocketAddr> {
TcpStream::peer_addr(self).ok()
}

#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
TcpStream::set_nodelay(self, nodelay)
Expand Down
87 changes: 85 additions & 2 deletions src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_net::service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};

use super::channel::HttpChannel;
use super::channel::{H1Channel, HttpChannel};
use super::error::HttpDispatchError;
use super::handler::HttpHandler;
use super::settings::ServiceConfig;
Expand Down Expand Up @@ -89,7 +89,90 @@ where
}

fn call(&mut self, req: Self::Request) -> Self::Future {
HttpChannel::new(self.settings.clone(), req, None)
HttpChannel::new(self.settings.clone(), req)
}
}

/// `NewService` implementation for HTTP1 transport
pub struct H1Service<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
settings: ServiceConfig<H>,
_t: PhantomData<Io>,
}

impl<H, Io> H1Service<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
/// Create new `HttpService` instance.
pub fn new(settings: ServiceConfig<H>) -> Self {
H1Service {
settings,
_t: PhantomData,
}
}
}

impl<H, Io> NewService for H1Service<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
type Request = Io;
type Response = ();
type Error = HttpDispatchError;
type InitError = ();
type Service = H1ServiceHandler<H, Io>;
type Future = FutureResult<Self::Service, Self::InitError>;

fn new_service(&self) -> Self::Future {
ok(H1ServiceHandler::new(self.settings.clone()))
}
}

/// `Service` implementation for HTTP1 transport
pub struct H1ServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
settings: ServiceConfig<H>,
_t: PhantomData<Io>,
}

impl<H, Io> H1ServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
fn new(settings: ServiceConfig<H>) -> H1ServiceHandler<H, Io> {
H1ServiceHandler {
settings,
_t: PhantomData,
}
}
}

impl<H, Io> Service for H1ServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
type Request = Io;
type Response = ();
type Error = HttpDispatchError;
type Future = H1Channel<Io, H>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}

fn call(&mut self, req: Self::Request) -> Self::Future {
H1Channel::new(self.settings.clone(), req)
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/server/ssl/nativetls.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::Shutdown;
use std::net::{Shutdown, SocketAddr};
use std::{io, time};

use actix_net::ssl::TlsStream;
Expand All @@ -12,6 +12,11 @@ impl<Io: IoStream> IoStream for TlsStream<Io> {
Ok(())
}

#[inline]
fn peer_addr(&self) -> Option<SocketAddr> {
self.get_ref().get_ref().peer_addr()
}

#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
Expand Down
Loading

0 comments on commit 2710f70

Please sign in to comment.