diff --git a/lib/src/backends.rs b/lib/src/backends.rs index 84f6ec4b5..384ea18ed 100644 --- a/lib/src/backends.rs +++ b/lib/src/backends.rs @@ -1,5 +1,6 @@ use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc}; +use anyhow::{bail, Context}; use mio::net::TcpStream; use crate::{ @@ -10,7 +11,7 @@ use crate::{ }, }; -use super::{load_balancing::*, Backend, ConnectionError}; +use super::{load_balancing::*, Backend}; #[derive(Debug)] pub struct BackendMap { @@ -82,25 +83,21 @@ impl BackendMap { .unwrap_or(false) } - // TODO: return anyhow::Result with context, log the error downstream pub fn backend_from_cluster_id( &mut self, cluster_id: &str, - ) -> Result<(Rc>, TcpStream), ConnectionError> { - let cluster_backends = match self.backends.get_mut(cluster_id) { - Some(backends) => backends, - None => { - return Err(ConnectionError::NoBackendAvailable(Some( - cluster_id.to_owned(), - ))) - } - }; + ) -> anyhow::Result<(Rc>, TcpStream)> { + let cluster_backends = self + .backends + .get_mut(cluster_id) + .with_context(|| format!("No backend found for cluster {}", cluster_id))?; if cluster_backends.backends.is_empty() { self.available = false; - return Err(ConnectionError::NoBackendAvailable(Some( - cluster_id.to_owned(), - ))); + bail!(format!( + "Found an empty backend list for cluster {}", + cluster_id + )); } let next_backend = match cluster_backends.next_available_backend() { @@ -113,9 +110,7 @@ impl BackendMap { cluster_id.to_string(), )); } - return Err(ConnectionError::NoBackendAvailable(Some( - cluster_id.to_owned(), - ))); + bail!("No more backend available for cluster {}", cluster_id); } }; @@ -131,52 +126,49 @@ impl BackendMap { ) ); - match borrowed_backend.try_connect() { - Ok(tcp_stream) => { - self.available = true; - Ok((next_backend.clone(), tcp_stream)) - } - Err(connection_error) => { - error!( - "could not connect {} to {:?} ({} failures)", - cluster_id, borrowed_backend.address, borrowed_backend.failures - ); - Err(connection_error) - } - } + let tcp_stream = borrowed_backend.try_connect().with_context(|| { + format!( + "could not connect {} to {:?} ({} failures)", + cluster_id, borrowed_backend.address, borrowed_backend.failures + ) + })?; + self.available = true; + + Ok((next_backend.clone(), tcp_stream)) } - // TODO: return anyhow::Result with context, log the error downstream pub fn backend_from_sticky_session( &mut self, cluster_id: &str, sticky_session: &str, - ) -> Result<(Rc>, TcpStream), ConnectionError> { - let sticky_conn: Option>, TcpStream), ConnectionError>> = self + ) -> anyhow::Result<(Rc>, TcpStream)> { + let sticky_conn = self .backends .get_mut(cluster_id) .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session)) - .map(|b| { - let mut backend = b.borrow_mut(); - let conn = backend.try_connect(); - - conn.map(|c| (b.clone(), c)).map_err(|e| { - error!( - "could not connect {} to {:?} using session {} ({} failures)", - cluster_id, backend.address, sticky_session, backend.failures - ); - e - }) + .map(|backend| { + let mut borrowed = backend.borrow_mut(); + let conn = borrowed.try_connect(); + + conn.map(|tcp_stream| (backend.clone(), tcp_stream)) + .map_err(|e| { + error!( + "could not connect {} to {:?} using session {} ({} failures)", + cluster_id, borrowed.address, sticky_session, borrowed.failures + ); + e + }) }); - if let Some(res) = sticky_conn { - res - } else { - debug!( - "Couldn't find a backend corresponding to sticky_session {} for cluster {}", - sticky_session, cluster_id - ); - self.backend_from_cluster_id(cluster_id) + match sticky_conn { + Some(backend_and_stream) => backend_and_stream, + None => { + debug!( + "Couldn't find a backend corresponding to sticky_session {} for cluster {}", + sticky_session, cluster_id + ); + self.backend_from_cluster_id(cluster_id) + } } } diff --git a/lib/src/http.rs b/lib/src/http.rs index c345b27e5..4f1fa6162 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -8,7 +8,7 @@ use std::{ str::from_utf8_unchecked, }; -use anyhow::Context; +use anyhow::{bail, Context}; use mio::{net::*, unix::SourceFd, *}; use rusty_ulid::Ulid; use slab::Slab; @@ -48,7 +48,7 @@ use super::{ }, socket::server_bind, sozu_command::state::ClusterId, - AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, ConnectionError, Protocol, + AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, Protocol, ProxyConfiguration, ProxySession, Readiness, SessionMetrics, SessionResult, }; @@ -628,7 +628,9 @@ impl Session { // we must wait for an event return SessionResult::Continue; } - Err(connection_error) => error!("{}", connection_error), + Err(connection_error) => { + error!("Error connecting to backend: {:#}", connection_error) + } } } else { self.metrics().backend_connected(); @@ -696,7 +698,9 @@ impl Session { // we must wait for an event return SessionResult::Continue; } - Err(connection_error) => error!("{}", connection_error), + Err(connection_error) => { + error!("Error connecting to backend: {:#}", connection_error) + } } } SessionResult::Continue => {} @@ -849,13 +853,11 @@ impl Session { } } - fn check_circuit_breaker(&mut self) -> Result<(), ConnectionError> { + fn check_circuit_breaker(&mut self) -> anyhow::Result<()> { if self.connection_attempt >= CONN_RETRIES { error!("{} max connection attempt reached", self.log_context()); self.set_answer(DefaultAnswerStatus::Answer503, None); - return Err(ConnectionError::NoBackendAvailable( - self.cluster_id.to_owned(), - )); + bail!("Maximum connection attempt reached"); } Ok(()) } @@ -883,60 +885,61 @@ impl Session { true } + pub fn get_host(&self) -> Option<&str> { + self.protocol.as_ref().and_then(|protocol| match protocol { + State::Http(ref http) => http.get_host(), + _ => None, + }) + } + // -> host, path, method - pub fn extract_route(&self) -> Result<(&str, &str, &Method), ConnectionError> { - let host = self - .http() - .and_then(|http| http.request_state.as_ref()) - .and_then(|request_state| request_state.get_host()) - .ok_or(ConnectionError::NoHostGiven)?; + pub fn extract_route(&self) -> anyhow::Result<(&str, &str, &Method)> { + let given_host = self.get_host().with_context(|| "No host given")?; // redundant // we only keep host, but we need the request's hostname in frontend_from_request (calling twice hostname_and_port) - let host: &str = match hostname_and_port(host.as_bytes()) { - Ok((input, (hostname, port))) => { - if input != &b""[..] { - return Err(ConnectionError::InvalidHost { - hostname: host.to_owned(), - message: "connect_to_backend: invalid remaining chars after hostname" - .to_owned(), - }); - } - - //FIXME: we should check that the port is right too + let (remaining_input, (hostname, port)) = match hostname_and_port(given_host.as_bytes()) { + Ok(tuple) => tuple, - if port == Some(&b"80"[..]) { - // it is alright to call from_utf8_unchecked, - // we already verified that there are only ascii - // chars in there - unsafe { from_utf8_unchecked(hostname) } - } else { - host - } - } Err(parse_error) => { - return Err(ConnectionError::InvalidHost { - hostname: host.to_owned(), - message: format!("Hostname parsing failed: {}", parse_error), - }); + // parse_error contains a slice of given_host, which should NOT escape this scope + bail!( + "Hostname parsing failed for host {}: {}", + given_host.clone(), + parse_error, + ); } }; + if remaining_input != &b""[..] { + bail!("invalid remaining chars after hostname {}", given_host); + } + + //FIXME: we should check that the port is right too + + let host = if port == Some(&b"80"[..]) { + // it is alright to call from_utf8_unchecked, + // we already verified that there are only ascii + // chars in there + unsafe { from_utf8_unchecked(hostname) } + } else { + given_host + }; let request_line = self .http() .and_then(|http| http.request_state.as_ref()) .and_then(|request_state| request_state.get_request_line()) - .ok_or(ConnectionError::NoRequestLineGiven)?; + .with_context(|| "No request line given in the request state")?; Ok((host, &request_line.uri, &request_line.method)) } - fn cluster_id_from_request(&mut self) -> Result { + fn cluster_id_from_request(&mut self) -> anyhow::Result { let (host, uri, method) = match self.extract_route() { - Ok((h, u, m)) => (h, u, m), + Ok(tuple) => tuple, Err(e) => { self.set_answer(DefaultAnswerStatus::Answer400, None); - return Err(e); + return Err(e).with_context(|| "Could not extract route from request"); } }; @@ -952,12 +955,12 @@ impl Session { Some(Route::ClusterId(cluster_id)) => cluster_id, Some(Route::Deny) => { self.set_answer(DefaultAnswerStatus::Answer401, None); - return Err(ConnectionError::Unauthorized); + bail!("Unauthorized route"); } None => { - let no_host_error = ConnectionError::HostNotFound(host.to_owned()); + let no_host_error = format!("Host not found: {}", host); self.set_answer(DefaultAnswerStatus::Answer404, None); - return Err(no_host_error); + bail!(no_host_error); } }; @@ -975,7 +978,7 @@ impl Session { DefaultAnswerStatus::Answer301, Some(Rc::new(answer.into_bytes())), ); - return Err(ConnectionError::HttpsRedirect(cluster_id)); + bail!("Route is unauthorized"); } Ok(cluster_id) @@ -985,39 +988,23 @@ impl Session { &mut self, cluster_id: &str, front_should_stick: bool, - ) -> Result { + ) -> anyhow::Result { let sticky_session = self .http() .and_then(|http| http.request_state.as_ref()) .and_then(|request_state| request_state.get_sticky_session()); - let result = match (front_should_stick, sticky_session) { - (true, Some(sticky_session)) => self - .proxy - .borrow() - .backends - .borrow_mut() - .backend_from_sticky_session(cluster_id, sticky_session) - .map_err(|e| { - debug!( - "Couldn't find a backend corresponding to sticky_session {} for cluster {}", - sticky_session, cluster_id - ); - e - }), - _ => self - .proxy - .borrow() - .backends - .borrow_mut() - .backend_from_cluster_id(cluster_id), - }; - - let (backend, conn) = match result { + let (backend, conn) = match self.get_backend_for_sticky_session( + front_should_stick, + sticky_session, + cluster_id, + ) { Ok((b, c)) => (b, c), Err(e) => { self.set_answer(DefaultAnswerStatus::Answer503, None); - return Err(e); + return Err(e).with_context(|| { + format!("Could not find a backend for cluster {}", cluster_id) + }); } }; @@ -1054,16 +1041,47 @@ impl Session { Ok(conn) } + fn get_backend_for_sticky_session( + &self, + front_should_stick: bool, + sticky_session: Option<&str>, + cluster_id: &str, + ) -> anyhow::Result<(Rc>, TcpStream)> { + match (front_should_stick, sticky_session) { + (true, Some(sticky_session)) => self + .proxy + .borrow() + .backends + .borrow_mut() + .backend_from_sticky_session(cluster_id, sticky_session) + .with_context(|| { + format!( + "Couldn't find a backend corresponding to sticky_session {} for cluster {}", + sticky_session, cluster_id + ) + }), + _ => self + .proxy + .borrow() + .backends + .borrow_mut() + .backend_from_cluster_id(cluster_id), + } + } + fn connect_to_backend( &mut self, session_rc: Rc>, - ) -> Result { + ) -> anyhow::Result { let old_cluster_id = self.http().and_then(|http| http.cluster_id.clone()); let old_back_token = self.back_token(); - self.check_circuit_breaker()?; + self.check_circuit_breaker() + .with_context(|| "Circuit broke")?; - let cluster_id = self.cluster_id_from_request()?; + let cluster_id = self + .cluster_id_from_request() + .with_context(|| "Could not get cluster id from request")?; // check if we can reuse the backend connection if (self.http().and_then(|h| h.cluster_id.as_ref()) == Some(&cluster_id)) @@ -1170,9 +1188,7 @@ impl Session { if not_enough_memory { error!("not enough memory, cannot connect to backend"); self.set_answer(DefaultAnswerStatus::Answer503, None); - return Err(ConnectionError::TooManyConnections( - self.cluster_id.to_owned(), - )); + bail!(format!("Too many connections on cluster {}", cluster_id)); } let back_token = { diff --git a/lib/src/https.rs b/lib/src/https.rs index 6527956a8..4a4f0635b 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -9,7 +9,7 @@ use std::{ sync::Arc, }; -use anyhow::Context; +use anyhow::{bail, Context}; use mio::{ net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream}, unix::SourceFd, @@ -68,9 +68,8 @@ use crate::{ ParsedCertificateAndKey, }, util::UnwrapLog, - AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, ConnectionError, - ListenerHandler, Protocol, ProxyConfiguration, ProxySession, Readiness, SessionMetrics, - SessionResult, + AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, ListenerHandler, Protocol, + ProxyConfiguration, ProxySession, Readiness, SessionMetrics, SessionResult, }; // const SERVER_PROTOS: &[&str] = &["http/1.1", "h2"]; @@ -823,7 +822,9 @@ impl Session { // stop here, we must wait for an event return SessionResult::Continue; } - Err(connection_error) => error!("{}", connection_error), + Err(connection_error) => { + error!("Error connecting to backend: {:#}", connection_error) + } } } else { self.metrics().backend_connected(); @@ -891,7 +892,9 @@ impl Session { // we must wait for an event return SessionResult::Continue; } - Err(connection_error) => error!("{}", connection_error), + Err(connection_error) => { + error!("Error connecting to backend: {:#}", connection_error) + } } } SessionResult::Continue => {} @@ -1040,13 +1043,11 @@ impl Session { } } - fn check_circuit_breaker(&mut self) -> Result<(), ConnectionError> { + fn check_circuit_breaker(&mut self) -> anyhow::Result<()> { if self.connection_attempt >= CONN_RETRIES { error!("{} max connection attempt reached", self.log_context()); self.set_answer(DefaultAnswerStatus::Answer503, None); - return Err(ConnectionError::NoBackendAvailable( - self.cluster_id.to_owned(), - )); + bail!("Maximum connection attempt reached"); } Ok(()) } @@ -1074,71 +1075,63 @@ impl Session { false } - pub fn extract_route(&self) -> Result<(&str, &str, &Method), ConnectionError> { + pub fn extract_route(&self) -> anyhow::Result<(&str, &str, &Method)> { let request_state = match self.state { State::Invalid => unreachable!(), State::Http(ref http) => http.request_state.as_ref(), _ => None, }; - let host = request_state + let given_host = request_state .and_then(|request_state| request_state.get_host()) - .ok_or(ConnectionError::NoHostGiven)?; - - let host: &str = match hostname_and_port(host.as_bytes()) { - Ok((remaining_input, (hostname, port))) => { - if remaining_input != &b""[..] { - return Err(ConnectionError::InvalidHost { - hostname: host.to_owned(), - message: "connect_to_backend: invalid remaining chars after hostname" - .to_owned(), - }); - } + .with_context(|| "No host given")?; - // it is alright to call from_utf8_unchecked, - // we already verified that there are only ascii - // chars in there - let hostname = unsafe { from_utf8_unchecked(hostname) }; + let (remaining_input, (hostname, port)) = match hostname_and_port(given_host.as_bytes()) { + Ok(tuple) => tuple, + Err(parse_error) => { + bail!( + "Hostname parsing failed for host {}: {}", + given_host.clone(), + parse_error, + ); + } + }; - //FIXME: what if we don't use SNI? - let servername = match self.state { - State::Invalid => unreachable!(), - State::Http(ref http) => http.frontend.session.sni_hostname(), - _ => None, - }; - let servername = servername.map(|name| name.to_string()); + if remaining_input != &b""[..] { + bail!("invalid remaining chars after hostname {}", given_host); + } - if servername.as_deref() != Some(hostname) { - error!( - "TLS SNI hostname '{:?}' and Host header '{}' don't match", - servername, hostname - ); - /*FIXME: deactivate this check for a temporary test - unwrap_msg!(session.http()).set_answer(DefaultAnswerStatus::Answer404, None); - */ - return Err(ConnectionError::HostNotFound(hostname.to_owned())); - } + let hostname = unsafe { from_utf8_unchecked(hostname) }; - //FIXME: we should check that the port is right too + //FIXME: what if we don't use SNI? + let servername = match self.state { + State::Invalid => unreachable!(), + State::Http(ref http) => http.frontend.session.sni_hostname(), + _ => None, + }; + let servername = servername.map(|name| name.to_string()); - if port == Some(&b"443"[..]) { - hostname - } else { - host - } - } - Err(parse_error) => { - return Err(ConnectionError::InvalidHost { - hostname: host.to_owned(), - message: format!("Hostname parsing failed: {}", parse_error), - }); - } + if servername.as_deref() != Some(hostname) { + error!( + "TLS SNI hostname '{:?}' and Host header '{}' don't match", + servername, hostname + ); + /*FIXME: deactivate this check for a temporary test + unwrap_msg!(session.http()).set_answer(DefaultAnswerStatus::Answer404, None); + */ + bail!("Host not found: {}", hostname); + } + + let host = if port == Some(&b"443"[..]) { + hostname + } else { + given_host }; let request_line = request_state .as_ref() .and_then(|r| r.get_request_line()) - .ok_or(ConnectionError::NoRequestLineGiven)?; + .with_context(|| "No request line given in the request state")?; Ok((host, &request_line.uri, &request_line.method)) } @@ -1147,7 +1140,7 @@ impl Session { &mut self, cluster_id: &str, front_should_stick: bool, - ) -> Result { + ) -> anyhow::Result { let request_state = match self.state { State::Invalid => unreachable!(), State::Http(ref mut http) => http.request_state.as_ref(), @@ -1179,58 +1172,58 @@ impl Session { .backend_from_cluster_id(cluster_id), }; - match result { + let (backend, conn) = match result { Err(e) => { self.set_answer(DefaultAnswerStatus::Answer503, None); - Err(e) + return Err(e).with_context(|| { + format!("Could not find a backend for cluster {}", cluster_id) + }); } - Ok((backend, conn)) => { - if front_should_stick { - let sticky_name = self.proxy.borrow().listeners[&self.listener_token] - .borrow() - .config - .sticky_name - .clone(); - let sticky_session = Some(StickySession::new( - backend - .borrow() - .sticky_id - .clone() - .unwrap_or(backend.borrow().backend_id.clone()), - )); + Ok((backend, conn)) => (backend, conn), + }; - match self.state { - State::Invalid => unreachable!(), - State::Http(ref mut http) => { - http.sticky_session = sticky_session; - http.sticky_name = sticky_name; - } - _ => {} - }; - } - self.metrics.backend_id = Some(backend.borrow().backend_id.clone()); - self.metrics.backend_start(); + if front_should_stick { + let sticky_name = self.proxy.borrow().listeners[&self.listener_token] + .borrow() + .config + .sticky_name + .clone(); + let sticky_session = Some(StickySession::new( + backend + .borrow() + .sticky_id + .clone() + .unwrap_or(backend.borrow().backend_id.clone()), + )); - match self.state { - State::Invalid => unreachable!(), - State::Http(ref mut http) => { - http.set_backend_id(backend.borrow().backend_id.clone()) - } - _ => {} + match self.state { + State::Invalid => unreachable!(), + State::Http(ref mut http) => { + http.sticky_session = sticky_session; + http.sticky_name = sticky_name; } - self.backend = Some(backend); + _ => {} + }; + } + self.metrics.backend_id = Some(backend.borrow().backend_id.clone()); + self.metrics.backend_start(); - Ok(conn) - } + match self.state { + State::Invalid => unreachable!(), + State::Http(ref mut http) => http.set_backend_id(backend.borrow().backend_id.clone()), + _ => {} } + self.backend = Some(backend); + + Ok(conn) } - fn cluster_id_from_request(&mut self) -> Result { + fn cluster_id_from_request(&mut self) -> anyhow::Result { let (host, uri, method) = match self.extract_route() { Ok(tuple) => tuple, Err(e) => { self.set_answer(DefaultAnswerStatus::Answer400, None); - return Err(e); + return Err(e).with_context(|| "Could not extract route from request"); } }; @@ -1246,12 +1239,12 @@ impl Session { Some(Route::ClusterId(cluster_id)) => Ok(cluster_id), Some(Route::Deny) => { self.set_answer(DefaultAnswerStatus::Answer401, None); - Err(ConnectionError::Unauthorized) + bail!("Route is unauthorized"); } None => { - let no_host_error = ConnectionError::HostNotFound(host.to_owned()); + let no_host_error = format!("Host not found: {}", host); self.set_answer(DefaultAnswerStatus::Answer404, None); - Err(no_host_error) + bail!(no_host_error); } } } @@ -1259,7 +1252,7 @@ impl Session { fn connect_to_backend( &mut self, session_rc: Rc>, - ) -> Result { + ) -> anyhow::Result { let old_cluster_id = match &self.state { State::Invalid => unreachable!(), State::Http(http) => http.cluster_id.clone(), @@ -1267,9 +1260,12 @@ impl Session { }; let old_back_token = self.back_token(); - self.check_circuit_breaker()?; + self.check_circuit_breaker() + .with_context(|| "Circuit break.")?; - let requested_cluster_id = self.cluster_id_from_request()?; + let requested_cluster_id = self + .cluster_id_from_request() + .with_context(|| "Could not get cluster id from request")?; let backend_is_connected = self.back_connected == BackendConnectionStatus::Connected; @@ -1323,7 +1319,9 @@ impl Session { .get(&requested_cluster_id) .map(|cluster| cluster.sticky_session) .unwrap_or(false); - let mut socket = self.backend_from_request(&requested_cluster_id, front_should_stick)?; + let mut socket = self + .backend_from_request(&requested_cluster_id, front_should_stick) + .with_context(|| "Could not get TCP stream from backend")?; // we still want to use the new socket if let Err(e) = socket.set_nodelay(true) { @@ -1348,7 +1346,7 @@ impl Session { self.back_connected = BackendConnectionStatus::Connecting(Instant::now()); - let action_result = match old_back_token { + let connect_action = match old_back_token { Some(back_token) => { self.set_back_token(back_token); if let Err(e) = self.proxy.borrow().registry.register( @@ -1359,7 +1357,7 @@ impl Session { error!("error registering back socket({:?}): {:?}", socket, e); } - Ok(BackendConnectAction::Replace) + BackendConnectAction::Replace } None => { if self.proxy.borrow().sessions.borrow().slab.len() @@ -1367,8 +1365,9 @@ impl Session { { error!("not enough memory, cannot connect to backend"); self.set_answer(DefaultAnswerStatus::Answer503, None); - return Err(ConnectionError::TooManyConnections( - self.cluster_id.to_owned(), + bail!(format!( + "Too many connections for cluster {:?}", + self.cluster_id )); } @@ -1390,7 +1389,7 @@ impl Session { } self.set_back_token(back_token); - Ok(BackendConnectAction::New) + BackendConnectAction::New } }; @@ -1400,7 +1399,7 @@ impl Session { State::Http(http) => http.set_back_timeout(connect_timeout), _ => {} } - action_result + Ok(connect_action) } } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ed9e076ee..30533f137 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -203,6 +203,7 @@ pub mod https; use std::{cell::RefCell, collections::BTreeMap, fmt, net::SocketAddr, rc::Rc, str}; +use anyhow::{bail, Context}; use mio::{net::TcpStream, Token}; use time::{Duration, Instant}; @@ -396,32 +397,6 @@ pub enum SessionResult { ConnectBackend, } -#[derive(thiserror::Error, Debug, PartialEq, Eq)] -pub enum ConnectionError { - #[error("No host given for the backend")] - NoHostGiven, - #[error("Connect to backend failed. No request line to extract a route.")] - NoRequestLineGiven, - #[error("Connect to backend failed. {message:?}. host: {hostname:?}")] - InvalidHost { message: String, hostname: String }, - #[error("Connect to backend failed. Host not found: {0}")] - HostNotFound(String), - #[error("Connect to backend failed. No backend available for cluster {0:?}")] - NoBackendAvailable(Option), - #[error("unimplemented error")] - ToBeDefined, - #[error("Connect to backend failed. Cluster {0} should redirect HTTPS")] - HttpsRedirect(String), - #[error("Connect to backend failed. Route is unauthorized")] - Unauthorized, - #[error("Connect to backend failed. Too many connections. Host {0:?}")] - TooManyConnections(Option), - #[error("Connect to backend failed. Mio connect error: {0}")] - MioConnectError(String), - // #[error("Connect to backend failed. EINPROGRESS, the socket is probably nonblocing")] - // SocketIsNonblocking, -} - #[derive(Debug, PartialEq, Eq)] pub enum SocketType { Listener, @@ -530,9 +505,9 @@ impl Backend { self.connection_time.get(self.active_connections) } - pub fn try_connect(&mut self) -> Result { + pub fn try_connect(&mut self) -> anyhow::Result { if self.status != BackendStatus::Normal { - return Err(ConnectionError::NoBackendAvailable(None)); + bail!("This backend as not a normal status"); } match mio::net::TcpStream::connect(self.address) { @@ -548,7 +523,7 @@ impl Backend { // https://docs.rs/mio/latest/mio/net/struct.TcpStream.html#method.connect // with an example code here: // https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622 - Err(ConnectionError::MioConnectError(mio_error.to_string())) + Err(mio_error).with_context(|| "Failed to connect to socket with MIO") } } } diff --git a/lib/src/server.rs b/lib/src/server.rs index 613e3f2da..3d4940221 100644 --- a/lib/src/server.rs +++ b/lib/src/server.rs @@ -730,76 +730,77 @@ impl Server { fn zombie_check(&mut self) { let now = Instant::now(); - if now - self.last_zombie_check > self.zombie_check_interval { - info!("zombie check"); - self.last_zombie_check = now; - - let mut tokens = HashSet::new(); - let mut frontend_tokens = HashSet::new(); + if now - self.last_zombie_check < self.zombie_check_interval { + return; + } + info!("zombie check"); + self.last_zombie_check = now; - let mut count = 0; - let duration = self.zombie_check_interval; - for (_index, session) in self - .sessions - .borrow_mut() - .slab - .iter_mut() - .filter(|(_, c)| now - c.borrow().last_event() > duration) - { - let t = session.borrow().tokens(); - if !frontend_tokens.contains(&t[0]) { - session.borrow().print_state(); + let mut tokens = HashSet::new(); + let mut frontend_tokens = HashSet::new(); - frontend_tokens.insert(t[0]); - for tk in t.into_iter() { - tokens.insert(tk); - } + let mut count = 0; + let duration = self.zombie_check_interval; + for (_index, session) in self + .sessions + .borrow_mut() + .slab + .iter_mut() + .filter(|(_, c)| now - c.borrow().last_event() > duration) + { + let session_tokens = session.borrow().tokens(); + if !frontend_tokens.contains(&session_tokens[0]) { + session.borrow().print_state(); - count += 1; + frontend_tokens.insert(session_tokens[0]); + for tk in session_tokens.into_iter() { + tokens.insert(tk); } - } - for tk in frontend_tokens.iter() { - let cl = self.to_session(*tk); - if self.sessions.borrow().slab.contains(cl.0) { - let session = { self.sessions.borrow_mut().slab.remove(cl.0) }; - session.borrow_mut().close(); + count += 1; + } + } - let mut sessions = self.sessions.borrow_mut(); - assert!(sessions.nb_connections != 0); - sessions.nb_connections -= 1; - gauge!("client.connections", sessions.nb_connections); - // do not be ready to accept right away, wait until we get back to 10% capacity - if !sessions.can_accept - && sessions.nb_connections < sessions.max_connections * 90 / 100 - { - debug!( - "nb_connections = {}, max_connections = {}, starting to accept again", - sessions.nb_connections, sessions.max_connections - ); - gauge!("accept_queue.backpressure", 0); - sessions.can_accept = true; - } + for tk in frontend_tokens.iter() { + let cl = self.to_session(*tk); + if self.sessions.borrow().slab.contains(cl.0) { + let session = { self.sessions.borrow_mut().slab.remove(cl.0) }; + session.borrow_mut().close(); + + let mut sessions = self.sessions.borrow_mut(); + assert!(sessions.nb_connections != 0); + sessions.nb_connections -= 1; + gauge!("client.connections", sessions.nb_connections); + // do not be ready to accept right away, wait until we get back to 10% capacity + if !sessions.can_accept + && sessions.nb_connections < sessions.max_connections * 90 / 100 + { + debug!( + "nb_connections = {}, max_connections = {}, starting to accept again", + sessions.nb_connections, sessions.max_connections + ); + gauge!("accept_queue.backpressure", 0); + sessions.can_accept = true; } } + } - if count > 0 { - count!("zombies", count); + if count > 0 { + count!("zombies", count); - let mut remaining = 0; - for tk in tokens.into_iter() { - let cl = self.to_session(tk); - let mut sessions = self.sessions.borrow_mut(); - if sessions.slab.contains(cl.0) { - sessions.slab.remove(cl.0); - remaining += 1; - } + let mut remaining = 0; + for tk in tokens.into_iter() { + let cl = self.to_session(tk); + let mut sessions = self.sessions.borrow_mut(); + if sessions.slab.contains(cl.0) { + sessions.slab.remove(cl.0); + remaining += 1; } - info!( - "removing {} zombies ({} remaining tokens after close)", - count, remaining - ); } + info!( + "removing {} zombies ({} remaining tokens after close)", + count, remaining + ); } } diff --git a/lib/src/tcp.rs b/lib/src/tcp.rs index fd445630a..43c0e6f7d 100644 --- a/lib/src/tcp.rs +++ b/lib/src/tcp.rs @@ -7,7 +7,7 @@ use std::{ rc::Rc, }; -use anyhow::Context; +use anyhow::{bail, Context}; use mio::{net::*, unix::SourceFd, *}; use rusty_ulid::Ulid; use slab::Slab; @@ -41,9 +41,8 @@ use crate::{ }, timer::TimeoutContainer, util::UnwrapLog, - AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, ConnectionError, - ListenerHandler, Protocol, ProxyConfiguration, ProxySession, Readiness, SessionMetrics, - SessionResult, + AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, ListenerHandler, Protocol, + ProxyConfiguration, ProxySession, Readiness, SessionMetrics, SessionResult, }; pub enum UpgradeResult { @@ -607,7 +606,9 @@ impl Session { return SessionResult::Continue; } // TODO: should we return CloseSession here? - Err(connection_error) => error!("{}", connection_error), + Err(connection_error) => { + error!("Error connecting to backend: {:#}", connection_error) + } } } else if self.back_readiness().unwrap().event != Ready::empty() { self.reset_connection_attempt(); @@ -625,7 +626,9 @@ impl Session { // we must wait for an event return SessionResult::Continue; } - Err(connection_error) => error!("{}", connection_error), + Err(connection_error) => { + error!("Error connecting to backend: {:#}", connection_error) + } } } @@ -685,7 +688,9 @@ impl Session { // we must wait for an event return SessionResult::Continue; } - Err(connection_error) => error!("{}", connection_error), + Err(connection_error) => { + error!("Error connecting to backend: {:#}", connection_error) + } } } SessionResult::Continue => {} @@ -834,7 +839,7 @@ impl Session { fn connect_to_backend( &mut self, session_rc: Rc>, - ) -> Result { + ) -> anyhow::Result { let cluster_id = if let Some(cluster_id) = self .proxy .borrow() @@ -845,83 +850,88 @@ impl Session { cluster_id } else { error!("no TCP cluster corresponds to that front address"); - return Err(ConnectionError::HostNotFound( - "no host given (TCP)".to_string(), - )); + bail!("no TCP cluster found.") }; self.cluster_id = Some(cluster_id.clone()); if self.connection_attempt >= CONN_RETRIES { error!("{} max connection attempt reached", self.log_context()); - return Err(ConnectionError::NoBackendAvailable(Some(cluster_id))); + bail!(format!("Too many connections on cluster {}", cluster_id)); } if self.proxy.borrow().sessions.borrow().slab.len() >= self.proxy.borrow().sessions.borrow().slab_capacity() { - error!("not enough memory, cannot connect to backend"); - return Err(ConnectionError::TooManyConnections(None)); + bail!("not enough memory, cannot connect to backend"); } - let conn = self + let (backend, mut stream) = self .proxy .borrow() .backends .borrow_mut() - .backend_from_cluster_id(&cluster_id); - match conn { - Ok((backend, mut stream)) => { - if let Err(e) = stream.set_nodelay(true) { - error!( - "error setting nodelay on back socket({:?}): {:?}", - stream, e - ); - } - self.back_connected = BackendConnectionStatus::Connecting(Instant::now()); + .backend_from_cluster_id(&cluster_id) + .with_context(|| { + format!( + "Could not get backend and TCP stream from cluster id {}", + cluster_id + ) + })?; + /* + this was the old error matching for backend_from_cluster_id. + panic! is called in case of mio::net::TcpStream::connect() error + Do we really want to panic ? + Err(ConnectionError::NoBackendAvailable(c_id)) => { + Err(ConnectionError::NoBackendAvailable(c_id)) + } + Err(e) => { + panic!("tcp connect_to_backend: unexpected error: {:?}", e); + } + */ - let back_token = { - let proxy = self.proxy.borrow(); - let mut s = proxy.sessions.borrow_mut(); - let entry = s.slab.vacant_entry(); - let back_token = Token(entry.key()); - let _entry = entry.insert(session_rc.clone()); - back_token - }; + if let Err(e) = stream.set_nodelay(true) { + error!( + "error setting nodelay on back socket({:?}): {:?}", + stream, e + ); + } + self.back_connected = BackendConnectionStatus::Connecting(Instant::now()); - if let Err(e) = self.proxy.borrow().registry.register( - &mut stream, - back_token, - Interest::READABLE | Interest::WRITABLE, - ) { - error!("error registering back socket({:?}): {:?}", stream, e); - } + let back_token = { + let proxy = self.proxy.borrow(); + let mut s = proxy.sessions.borrow_mut(); + let entry = s.slab.vacant_entry(); + let back_token = Token(entry.key()); + let _entry = entry.insert(session_rc.clone()); + back_token + }; - let connect_timeout_duration = Duration::seconds( - self.proxy.borrow().listeners[&self.accept_token] - .borrow() - .config - .connect_timeout as i64, - ); - self.back_timeout.set_duration(connect_timeout_duration); - self.back_timeout.set(back_token); + if let Err(e) = self.proxy.borrow().registry.register( + &mut stream, + back_token, + Interest::READABLE | Interest::WRITABLE, + ) { + error!("error registering back socket({:?}): {:?}", stream, e); + } - self.set_back_token(back_token); - self.set_back_socket(stream); + let connect_timeout_duration = Duration::seconds( + self.proxy.borrow().listeners[&self.accept_token] + .borrow() + .config + .connect_timeout as i64, + ); + self.back_timeout.set_duration(connect_timeout_duration); + self.back_timeout.set(back_token); - self.metrics.backend_id = Some(backend.borrow().backend_id.clone()); - self.metrics.backend_start(); - self.set_backend_id(backend.borrow().backend_id.clone()); + self.set_back_token(back_token); + self.set_back_socket(stream); - Ok(BackendConnectAction::New) - } - Err(ConnectionError::NoBackendAvailable(c_id)) => { - Err(ConnectionError::NoBackendAvailable(c_id)) - } - Err(e) => { - panic!("tcp connect_to_backend: unexpected error: {:?}", e); - } - } + self.metrics.backend_id = Some(backend.borrow().backend_id.clone()); + self.metrics.backend_start(); + self.set_backend_id(backend.borrow().backend_id.clone()); + + Ok(BackendConnectAction::New) } }