Skip to content

Commit

Permalink
Merge pull request #1037 from sozu-proxy/devel/edemolis/fix/would_blo…
Browse files Browse the repository at this point in the history
…ck_break

Fix: WouldBlock in SocketHandler::socket_write breaks properly
  • Loading branch information
FlorentinDUBOIS authored Nov 23, 2023
2 parents ab9fb52 + 4a444b1 commit 51bf938
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 63 deletions.
2 changes: 2 additions & 0 deletions command/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pub const DEFAULT_MAX_COMMAND_BUFFER_SIZE: usize = 2_000_000;
/// wether to avoid register cluster metrics in the local drain
pub const DEFAULT_DISABLE_CLUSTER_METRICS: bool = false;

pub const MAX_LOOP_ITERATIONS: usize = 100000;

/// Number of TLS 1.3 tickets to send to a client when establishing a connection.
/// The tickets allow the client to resume a session. This protects the client
/// agains session tracking. Increases the number of getrandom syscalls,
Expand Down
1 change: 0 additions & 1 deletion command/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,6 @@ macro_rules! error_access {
#[macro_export]
macro_rules! warn {
($format:expr, $($arg:tt)*) => {
use time;
log!($crate::logging::LogLevel::Warn, $format, "WARN", $($arg)*);
};
($format:expr) => {
Expand Down
21 changes: 10 additions & 11 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mio::{net::TcpStream, Interest, Token};
use rusty_ulid::Ulid;
use time::{Duration, Instant};

use sozu_command::proto::command::{Event, EventKind, ListenerType};
use sozu_command::{proto::command::{Event, EventKind, ListenerType}, config::MAX_LOOP_ITERATIONS};

use crate::{
backends::{Backend, BackendError},
Expand Down Expand Up @@ -350,7 +350,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
incr!("http.frontend_parse_errors");
debug!(
warn!(
"{} Parsing request error in {:?}: {}",
self.log_context(),
marker,
Expand Down Expand Up @@ -679,13 +679,13 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

if let kawa::ParsingPhase::Error { marker, kind } = self.response_stream.parsing_phase {
incr!("http.backend_parse_errors");
debug!(
"{} Parsing request error in {:?}: {}",
warn!(
"{} Parsing response error in {:?}: {}",
self.log_context(),
marker,
match kind {
kawa::ParsingErrorKind::Consuming { index } => {
let kawa = &self.request_stream;
let kawa = &self.response_stream;
parser::view(
kawa.storage.used(),
16,
Expand Down Expand Up @@ -1481,7 +1481,6 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
metrics: &mut SessionMetrics,
) -> SessionResult {
let mut counter = 0;
let max_loop_iterations = 100000;

if self.backend_connection_status.is_connecting()
&& !self.backend_readiness.event.is_empty()
Expand Down Expand Up @@ -1510,7 +1509,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
return SessionResult::Continue;
}
Err(connection_error) => {
error!("Error connecting to backend: {}", connection_error)
warn!("Error connecting to backend: {}", connection_error)
}
}
} else {
Expand All @@ -1527,7 +1526,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
return SessionResult::Close;
}

while counter < max_loop_iterations {
while counter < MAX_LOOP_ITERATIONS {
let frontend_interest = self.frontend_readiness.filter_interest();
let backend_interest = self.backend_readiness.filter_interest();

Expand Down Expand Up @@ -1565,7 +1564,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
return SessionResult::Continue;
}
Err(connection_error) => {
error!("Error connecting to backend: {}", connection_error)
warn!("Error connecting to backend: {}", connection_error)
}
}
}
Expand Down Expand Up @@ -1626,10 +1625,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
counter += 1;
}

if counter == max_loop_iterations {
if counter >= MAX_LOOP_ITERATIONS {
error!(
"PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
self.frontend_token, max_loop_iterations
self.frontend_token, MAX_LOOP_ITERATIONS
);
incr!("http.infinite_loop.error");

Expand Down
70 changes: 38 additions & 32 deletions lib/src/protocol/kawa_h1/parser.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::{self, Write}, str::from_utf8_unchecked};
use std::{cmp::min, fmt, str::from_utf8_unchecked};

use nom::{
bytes::{self, complete::take_while},
Expand Down Expand Up @@ -115,35 +115,41 @@ pub fn hostname_and_port(i: &[u8]) -> IResult<&[u8], (&[u8], Option<&[u8]>)> {
}

pub fn view(buf: &[u8], size: usize, points: &[usize]) -> String {
let mut view = String::new();
let mut end = 0;
for (i, point) in points.iter().enumerate() {
let start = if end + size < *point {
view.push_str("... ");
point - size
} else {
end
};
let stop = if i + 1 < points.len() {
points[i + 1]
} else {
buf.len()
};
end = if point + size > stop {
stop
} else {
point + size
};
for element in &buf[start..*point] {
let _ = view.write_fmt(format_args!("{element:02X} "));
}
view.push_str("| ");
for element in &buf[*point..end] {
let _ = view.write_fmt(format_args!("{element:02X} "));
}
}
if end < buf.len() {
view.push_str("...")
}
view
let len = buf.len();
let (start, end) = match (points.first(), points.last()) {
(Some(start), Some(end)) => (min(*start, len), min(*end + size, len)),
_ => return "NO POINTS".to_string(),
};
return format!("{:02X?}", &buf[start..end]);
// let mut view = String::new();
// let mut end = 0;
// for (i, point) in points.iter().enumerate() {
// let start = if end + size < *point {
// view.push_str("... ");
// point - size
// } else {
// end
// };
// let stop = if i + 1 < points.len() {
// points[i + 1]
// } else {
// buf.len()
// };
// end = if point + size > stop {
// stop
// } else {
// point + size
// };
// for element in &buf[start..*point] {
// let _ = view.write_fmt(format_args!("{element:02X} "));
// }
// view.push_str("| ");
// for element in &buf[*point..end] {
// let _ = view.write_fmt(format_args!("{element:02X} "));
// }
// }
// if end < buf.len() {
// view.push_str("...")
// }
// view
}
8 changes: 4 additions & 4 deletions lib/src/protocol/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{cell::RefCell, net::SocketAddr, rc::Rc};

use mio::{net::TcpStream, Token};
use rusty_ulid::Ulid;
use sozu_command::config::MAX_LOOP_ITERATIONS;

use crate::{
backends::Backend,
Expand Down Expand Up @@ -624,14 +625,13 @@ impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
metrics: &mut SessionMetrics,
) -> SessionResult {
let mut counter = 0;
let max_loop_iterations = 100000;

if self.frontend_readiness.event.is_hup() {
return SessionResult::Close;
}

let token = self.frontend_token;
while counter < max_loop_iterations {
while counter < MAX_LOOP_ITERATIONS {
let frontend_interest = self.frontend_readiness.filter_interest();
let backend_interest = self.backend_readiness.filter_interest();

Expand Down Expand Up @@ -709,10 +709,10 @@ impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
counter += 1;
}

if counter == max_loop_iterations {
if counter >= MAX_LOOP_ITERATIONS {
error!(
"PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
self.frontend_token, max_loop_iterations
self.frontend_token, MAX_LOOP_ITERATIONS
);
incr!("http.infinite_loop.error");

Expand Down
8 changes: 4 additions & 4 deletions lib/src/protocol/proxy_protocol/expect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{cell::RefCell, rc::Rc};
use mio::{net::TcpStream, *};
use nom::{Err, HexDisplay};
use rusty_ulid::Ulid;
use sozu_command::config::MAX_LOOP_ITERATIONS;

use crate::{
logs::LogContext,
Expand Down Expand Up @@ -211,13 +212,12 @@ impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
metrics: &mut SessionMetrics,
) -> SessionResult {
let mut counter = 0;
let max_loop_iterations = 100000;

if self.frontend_readiness.event.is_hup() {
return SessionResult::Close;
}

while counter < max_loop_iterations {
while counter < MAX_LOOP_ITERATIONS {
let frontend_interest = self.frontend_readiness.filter_interest();

trace!(
Expand Down Expand Up @@ -251,10 +251,10 @@ impl<Front: SocketHandler> SessionState for ExpectProxyProtocol<Front> {
counter += 1;
}

if counter == max_loop_iterations {
if counter >= MAX_LOOP_ITERATIONS {
error!(
"PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
self.frontend_token, max_loop_iterations
self.frontend_token, MAX_LOOP_ITERATIONS
);
incr!("http.infinite_loop.error");

Expand Down
8 changes: 4 additions & 4 deletions lib/src/protocol/rustls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{cell::RefCell, io::ErrorKind, rc::Rc};
use mio::{net::TcpStream, Token};
use rustls::ServerConnection;
use rusty_ulid::Ulid;
use sozu_command::config::MAX_LOOP_ITERATIONS;

use crate::{
logs::LogContext, protocol::SessionState, timer::TimeoutContainer, Readiness, Ready,
Expand Down Expand Up @@ -186,13 +187,12 @@ impl SessionState for TlsHandshake {
_metrics: &mut SessionMetrics,
) -> SessionResult {
let mut counter = 0;
let max_loop_iterations = 100000;

if self.frontend_readiness.event.is_hup() {
return SessionResult::Close;
}

while counter < max_loop_iterations {
while counter < MAX_LOOP_ITERATIONS {
let frontend_interest = self.frontend_readiness.filter_interest();

trace!(
Expand Down Expand Up @@ -233,10 +233,10 @@ impl SessionState for TlsHandshake {
counter += 1;
}

if counter == max_loop_iterations {
if counter >= MAX_LOOP_ITERATIONS {
error!(
"PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
self.frontend_token, max_loop_iterations
self.frontend_token, MAX_LOOP_ITERATIONS
);
incr!("http.infinite_loop.error");

Expand Down
42 changes: 40 additions & 2 deletions lib/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use mio::net::{TcpListener, TcpStream};
use rustls::{ProtocolVersion, ServerConnection};
use socket2::{Domain, Protocol, Socket, Type};
use sozu_command::config::MAX_LOOP_ITERATIONS;

#[derive(thiserror::Error, Debug)]
pub enum ServerBindError {
Expand Down Expand Up @@ -60,7 +61,13 @@ pub trait SocketHandler {
impl SocketHandler for TcpStream {
fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
let mut size = 0usize;
let mut counter = 0;
loop {
counter += 1;
if counter > MAX_LOOP_ITERATIONS {
error!("MAX_LOOP_ITERATION reached in TcpStream::socket_read");
incr!("socket.read.infinite_loop.error");
}
if size == buf.len() {
return (size, SocketResult::Continue);
}
Expand All @@ -83,7 +90,13 @@ impl SocketHandler for TcpStream {

fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
let mut size = 0usize;
let mut counter = 0;
loop {
counter += 1;
if counter > MAX_LOOP_ITERATIONS {
error!("MAX_LOOP_ITERATION reached in TcpStream::socket_write");
incr!("socket.write.infinite_loop.error");
}
if size == buf.len() {
return (size, SocketResult::Continue);
}
Expand Down Expand Up @@ -165,7 +178,14 @@ impl SocketHandler for FrontRustls {
let mut is_error = false;
let mut is_closed = false;

let mut counter = 0;
loop {
counter += 1;
if counter > MAX_LOOP_ITERATIONS {
error!("MAX_LOOP_ITERATION reached in FrontRustls::socket_read");
incr!("socket.read.infinite_loop.error");
}

if size == buf.len() {
break;
}
Expand Down Expand Up @@ -251,7 +271,13 @@ impl SocketHandler for FrontRustls {
let mut is_error = false;
let mut is_closed = false;

let mut counter = 0;
loop {
counter += 1;
if counter > MAX_LOOP_ITERATIONS {
error!("MAX_LOOP_ITERATION reached in FrontRustls::socket_write");
incr!("socket.write.infinite_loop.error");
}
if buffered_size == buf.len() {
break;
}
Expand Down Expand Up @@ -297,7 +323,10 @@ impl SocketHandler for FrontRustls {
}
Ok(_sz) => {}
Err(e) => match e.kind() {
ErrorKind::WouldBlock => can_write = false,
ErrorKind::WouldBlock => {
can_write = false;
break;
}
ErrorKind::ConnectionReset
| ErrorKind::ConnectionAborted
| ErrorKind::BrokenPipe => {
Expand Down Expand Up @@ -358,15 +387,24 @@ impl SocketHandler for FrontRustls {
},
}

let mut counter = 0;
loop {
counter += 1;
if counter > MAX_LOOP_ITERATIONS {
error!("MAX_LOOP_ITERATION reached in FrontRustls::socket_write_vectored");
incr!("socket.write.infinite_loop.error");
}
match self.session.write_tls(&mut self.stream) {
Ok(0) => {
//can_write = false;
break;
}
Ok(_sz) => {}
Err(e) => match e.kind() {
ErrorKind::WouldBlock => can_write = false,
ErrorKind::WouldBlock => {
can_write = false;
break;
}
ErrorKind::ConnectionReset
| ErrorKind::ConnectionAborted
| ErrorKind::BrokenPipe => {
Expand Down
Loading

0 comments on commit 51bf938

Please sign in to comment.