Skip to content

Commit

Permalink
fix client timer and add slow request tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 2, 2018
1 parent f007860 commit f3ce657
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 54 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

* Added slow request timeout setting

* Respond with 408 response on slow request timeout #523


### Fixed

Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ language-tags = "0.2"
lazy_static = "1.0"
lazycell = "1.0.0"
parking_lot = "0.6"
serde_urlencoded = "^0.5.3"
url = { version="1.7", features=["query_encoding"] }
cookie = { version="0.11", features=["percent-encode"] }
brotli2 = { version="^0.3.2", optional = true }
Expand Down Expand Up @@ -125,7 +126,7 @@ webpki-roots = { version = "0.15", optional = true }
# unix sockets
tokio-uds = { version="0.2", optional = true }

serde_urlencoded = "^0.5.3"
backtrace="*"

[dev-dependencies]
env_logger = "0.5"
Expand Down
55 changes: 24 additions & 31 deletions src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::KeepAlive;
pub(crate) trait ServiceProvider {
fn register(
&self, server: Server, lst: net::TcpListener, host: String,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64,
addr: net::SocketAddr, keep_alive: KeepAlive, secure: bool, client_timeout: u64,
client_shutdown: u64,
) -> Server;
}
Expand All @@ -28,7 +28,6 @@ where
{
factory: F,
acceptor: A,
no_client_timer: bool,
}

impl<F, H, A> HttpServiceBuilder<F, H, A>
Expand All @@ -40,40 +39,26 @@ where
{
/// Create http service builder
pub fn new(factory: F, acceptor: A) -> Self {
Self {
factory,
acceptor,
no_client_timer: false,
}
}

pub(crate) fn no_client_timer(mut self) -> Self {
self.no_client_timer = true;
self
Self { factory, acceptor }
}

fn finish(
&self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive,
&self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, secure: bool,
client_timeout: u64, client_shutdown: u64,
) -> impl ServiceFactory {
let timeout = if self.no_client_timer {
0
} else {
client_timeout
};
let factory = self.factory.clone();
let acceptor = self.acceptor.clone();
move || {
let app = (factory)().into_handler();
let settings = WorkerSettings::new(
app,
keep_alive,
timeout as u64,
client_timeout,
client_shutdown,
ServerSettings::new(addr, &host, false),
);

if timeout == 0 {
if secure {
Either::A(ServerMessageAcceptor::new(
settings.clone(),
TcpAcceptor::new(acceptor.create().map_err(AcceptorError::Service))
Expand All @@ -88,14 +73,16 @@ where
} else {
Either::B(ServerMessageAcceptor::new(
settings.clone(),
TcpAcceptor::new(AcceptorTimeout::new(timeout, acceptor.create()))
.map_err(|_| ())
.map_init_err(|_| ())
.and_then(
HttpService::new(settings)
.map_init_err(|_| ())
.map_err(|_| ()),
),
TcpAcceptor::new(AcceptorTimeout::new(
client_timeout,
acceptor.create(),
)).map_err(|_| ())
.map_init_err(|_| ())
.and_then(
HttpService::new(settings)
.map_init_err(|_| ())
.map_err(|_| ()),
),
))
}
}
Expand All @@ -112,7 +99,6 @@ where
HttpServiceBuilder {
factory: self.factory.clone(),
acceptor: self.acceptor.clone(),
no_client_timer: self.no_client_timer,
}
}
}
Expand All @@ -126,13 +112,20 @@ where
{
fn register(
&self, server: Server, lst: net::TcpListener, host: String,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64,
addr: net::SocketAddr, keep_alive: KeepAlive, secure: bool, client_timeout: u64,
client_shutdown: u64,
) -> Server {
server.listen2(
"actix-web",
lst,
self.finish(host, addr, keep_alive, client_timeout, client_shutdown),
self.finish(
host,
addr,
keep_alive,
secure,
client_timeout,
client_shutdown,
),
)
}
}
30 changes: 19 additions & 11 deletions src/server/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio_timer::Delay;
use super::error::HttpDispatchError;
use super::settings::WorkerSettings;
use super::{h1, h2, HttpHandler, IoStream};
use http::StatusCode;

const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";

Expand Down Expand Up @@ -42,11 +43,9 @@ where
pub(crate) fn new(
settings: WorkerSettings<H>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> {
let ka_timeout = settings.client_timer();

HttpChannel {
ka_timeout,
node: None,
ka_timeout: settings.client_timer(),
proto: Some(HttpProtocol::Unknown(
settings,
peer,
Expand Down Expand Up @@ -91,10 +90,23 @@ where

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// keep-alive timer
if let Some(ref mut timer) = self.ka_timeout {
match timer.poll() {
if self.ka_timeout.is_some() {
match self.ka_timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(_)) => {
trace!("Slow request timed out, close connection");
if let Some(HttpProtocol::Unknown(settings, _, io, buf)) =
self.proto.take()
{
self.proto =
Some(HttpProtocol::H1(h1::Http1Dispatcher::for_error(
settings,
io,
StatusCode::REQUEST_TIMEOUT,
self.ka_timeout.take(),
buf,
)));
return self.poll();
}
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => (),
Expand All @@ -121,12 +133,8 @@ where

let mut is_eof = false;
let kind = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
return h1.poll();
}
Some(HttpProtocol::H2(ref mut h2)) => {
return h2.poll();
}
Some(HttpProtocol::H1(ref mut h1)) => return h1.poll(),
Some(HttpProtocol::H2(ref mut h2)) => return h2.poll(),
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
let mut err = None;
let mut disconnect = false;
Expand Down
36 changes: 35 additions & 1 deletion src/server/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,31 @@ where
}
}

pub(crate) fn for_error(
settings: WorkerSettings<H>, stream: T, status: StatusCode,
mut keepalive_timer: Option<Delay>, buf: BytesMut,
) -> Self {
if let Some(deadline) = settings.client_timer_expire() {
let _ = keepalive_timer.as_mut().map(|delay| delay.reset(deadline));
}

let mut disp = Http1Dispatcher {
flags: Flags::STARTED | Flags::READ_DISCONNECTED,
stream: H1Writer::new(stream, settings.clone()),
decoder: H1Decoder::new(),
payload: None,
tasks: VecDeque::new(),
error: None,
addr: None,
ka_timer: keepalive_timer,
ka_expire: settings.now(),
buf,
settings,
};
disp.push_response_entry(status);
disp
}

#[inline]
pub fn settings(&self) -> &WorkerSettings<H> {
&self.settings
Expand All @@ -133,7 +158,7 @@ where

#[inline]
fn can_read(&self) -> bool {
if self.flags.intersects(Flags::READ_DISCONNECTED) {
if self.flags.contains(Flags::READ_DISCONNECTED) {
return false;
}

Expand Down Expand Up @@ -250,6 +275,15 @@ where
);
let _ = IoStream::shutdown(io, Shutdown::Both);
return Err(HttpDispatchError::ShutdownTimeout);
} else if !self.flags.contains(Flags::STARTED) {
// timeout on first request (slow request) return 408
trace!("Slow request timeout");
self.flags
.insert(Flags::STARTED | Flags::READ_DISCONNECTED);
self.tasks.push_back(Entry::Error(ServerError::err(
Version::HTTP_11,
StatusCode::REQUEST_TIMEOUT,
)));
} else {
trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN);
Expand Down
22 changes: 12 additions & 10 deletions src/server/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ where
lst,
addr,
scheme: "http",
handler: Box::new(
HttpServiceBuilder::new(self.factory.clone(), DefaultAcceptor)
.no_client_timer(),
),
handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(),
DefaultAcceptor,
)),
});

self
Expand Down Expand Up @@ -498,17 +498,18 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
.as_ref()
.map(|h| h.to_owned())
.unwrap_or_else(|| format!("{}", socket.addr));
let client_shutdown = if socket.scheme == "https" {
self.client_shutdown
let (secure, client_shutdown) = if socket.scheme == "https" {
(true, self.client_shutdown)
} else {
0
(false, 0)
};
srv = socket.handler.register(
srv,
socket.lst,
host,
socket.addr,
self.keep_alive,
secure,
self.client_timeout,
client_shutdown,
);
Expand Down Expand Up @@ -550,17 +551,18 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
.as_ref()
.map(|h| h.to_owned())
.unwrap_or_else(|| format!("{}", socket.addr));
let client_shutdown = if socket.scheme == "https" {
self.client_shutdown
let (secure, client_shutdown) = if socket.scheme == "https" {
(true, self.client_shutdown)
} else {
0
(false, 0)
};
srv = socket.handler.register(
srv,
socket.lst,
host,
socket.addr,
self.keep_alive,
secure,
self.client_timeout,
client_shutdown,
);
Expand Down
10 changes: 10 additions & 0 deletions src/server/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ impl<H: 'static> WorkerSettings<H> {
}
}

/// Client timeout for first request.
pub fn client_timer_expire(&self) -> Option<Instant> {
let delay = self.0.client_timeout;
if delay != 0 {
Some(self.now() + Duration::from_millis(delay))
} else {
None
}
}

/// Client shutdown timer
pub fn client_shutdown_timer(&self) -> Option<Instant> {
let delay = self.0.client_shutdown;
Expand Down
40 changes: 40 additions & 0 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,3 +1054,43 @@ fn test_custom_pipeline() {
assert!(response.status().is_success());
}
}

#[test]
fn test_slow_request() {
use actix::System;
use std::net;
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();

let addr = test::TestServer::unused_addr();

thread::spawn(move || {
System::run(move || {
let srv = server::new(|| {
vec![App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})]
});

let srv = srv.bind(addr).unwrap();
srv.client_timeout(200).start();
let _ = tx.send(System::current());
});
});
let sys = rx.recv().unwrap();

thread::sleep(time::Duration::from_millis(200));

let mut stream = net::TcpStream::connect(addr).unwrap();
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeou"));

let mut stream = net::TcpStream::connect(addr).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n");
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeou"));

sys.stop();
}

0 comments on commit f3ce657

Please sign in to comment.