diff --git a/Cargo.lock b/Cargo.lock index 8a79b7836..944d19c8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -634,21 +634,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types-shared" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" - -[[package]] -name = "form_urlencoded" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" -dependencies = [ - "percent-encoding", -] - [[package]] name = "futures" version = "0.3.28" @@ -1026,12 +1011,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "libc" version = "0.2.147" @@ -1271,12 +1250,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f0b59668fe80c5afe998f0c0bf93322bf2cd66cafeeb80581f291716f3467f2" -[[package]] -name = "percent-encoding" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" - [[package]] name = "petgraph" version = "0.6.3" @@ -1898,7 +1871,6 @@ name = "sozu-e2e" version = "0.15.3" dependencies = [ "futures", - "futures-lite", "hyper", "hyper-rustls", "libc", @@ -1917,19 +1889,15 @@ version = "0.15.3" dependencies = [ "anyhow", "cookie-factory", - "foreign-types-shared", "hdrhistogram", "hex", "hpack", "idna", "kawa", - "lazycell", "libc", - "log 0.4.19", "memchr", "mio", "nom", - "pool", "poule", "quickcheck", "rand", @@ -1945,9 +1913,6 @@ dependencies = [ "thiserror", "time", "tiny_http", - "ureq", - "url", - "webpki", "x509-parser", ] @@ -2251,33 +2216,6 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" -[[package]] -name = "ureq" -version = "2.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b11c96ac7ee530603dcdf68ed1557050f374ce55a5a07193ebf8cbc9f8927e9" -dependencies = [ - "base64 0.21.2", - "flate2", - "log 0.4.19", - "once_cell", - "rustls", - "rustls-webpki 0.100.1", - "url", - "webpki-roots", -] - -[[package]] -name = "url" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" -dependencies = [ - "form_urlencoded", - "idna", - "percent-encoding", -] - [[package]] name = "utf8parse" version = "0.2.1" @@ -2375,16 +2313,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki-roots" version = "0.23.1" diff --git a/bin/src/command/mod.rs b/bin/src/command/mod.rs index 8d52fb350..361da1bfc 100644 --- a/bin/src/command/mod.rs +++ b/bin/src/command/mod.rs @@ -13,10 +13,16 @@ use anyhow::{bail, Context}; use async_dup::Arc; use async_io::Async; use futures::{ - channel::{mpsc::*, oneshot}, + channel::{ + mpsc::{channel, Receiver, Sender}, + oneshot, + }, {SinkExt, StreamExt}, }; -use futures_lite::{future, io::*}; +use futures_lite::{ + future, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, +}; use nix::{ sys::signal::{kill, Signal}, unistd::Pid, diff --git a/bin/src/upgrade.rs b/bin/src/upgrade.rs index 0285814be..c39131ba0 100644 --- a/bin/src/upgrade.rs +++ b/bin/src/upgrade.rs @@ -10,7 +10,7 @@ use anyhow::{bail, Context}; use futures_lite::future; use libc::{self, pid_t}; use mio::net::UnixStream; -use nix::unistd::*; +use nix::unistd::{fork, ForkResult}; use serde::{Deserialize, Serialize}; use tempfile::tempfile; diff --git a/command/src/proto/display.rs b/command/src/proto/display.rs index ef6539684..fd2e2dad7 100644 --- a/command/src/proto/display.rs +++ b/command/src/proto/display.rs @@ -1,9 +1,8 @@ use std::fmt::{Display, Formatter}; -use crate::proto::command::TlsVersion; - -use super::command::{ +use crate::proto::command::{ request::RequestType, CertificateAndKey, CertificateSummary, QueryCertificatesFilters, + TlsVersion, }; impl Display for CertificateAndKey { diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 1ed53a9c7..a289a8976 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" [dependencies] futures = "^0.3.28" -futures-lite = "^1.13.0" hyper = { version = "^0.14.27", features = ["client", "http1"] } hyper-rustls = { version = "^0.24.1", default-features = false, features = ["webpki-tokio", "http1", "tls12", "logging"] } libc = "^0.2.147" diff --git a/e2e/src/tests/tests.rs b/e2e/src/tests/tests.rs index ab4e0514e..46a9da079 100644 --- a/e2e/src/tests/tests.rs +++ b/e2e/src/tests/tests.rs @@ -814,7 +814,7 @@ fn try_http_behaviors() -> State { let mut backend = SyncBackend::new( "backend", back_address, - "HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: WebSocket\r\nTransfer-Encoding: Chunked\r\n\r\nearly", + "HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: WebSocket\r\nTransfer-Encoding: Chunked\r\n\r\n", ); info!("expecting upgrade (101 switching protocols)"); @@ -832,6 +832,9 @@ fn try_http_behaviors() -> State { println!("request: {request:?}"); println!("response: {response:?}"); assert!(response.starts_with(&expected_response_start)); + + backend.set_response("early"); + backend.send(0); let expected_response = String::from("early"); let response = client.receive(); assert_eq!(response, Some(expected_response)); diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 917ba62ef..9aa426f5a 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -30,33 +30,26 @@ include = [ [dependencies] anyhow = "^1.0.72" cookie-factory = "^0.3.2" -foreign-types-shared = "^0.3.1" hdrhistogram = "^7.5.2" hex = "^0.4.3" hpack = "^0.3.0" idna = "^0.4.0" kawa = { version = "^0.6.3", default-features = false } -lazycell = "^1.3.0" libc = "^0.2.147" -log = "^0.4.19" memchr = "^2.5.0" mio = { version = "^0.8.8", features = ["os-poll", "os-ext", "net"] } nom = { version = "^7.1.3", default-features = true, features = ["std"] } -pool = "^0.1.4" poule = "^0.3.2" rand = "^0.8.5" regex = "^1.9.3" rustls = "^0.21.6" rustls-pemfile = "^1.0.3" rusty_ulid = "^2.0.0" -serial_test = "^2.0.0" sha2 = "^0.10.7" slab = "^0.4.8" socket2 = { version = "^0.5.3", features = ["all"] } thiserror = "^1.0.44" time = "^0.3.25" -url = "^2.4.0" -webpki = "^0.22.0" x509-parser = "^0.15.1" sozu-command-lib = { path = "../command", version = "^0.15.3" } @@ -64,8 +57,8 @@ sozu-command-lib = { path = "../command", version = "^0.15.3" } [dev-dependencies] quickcheck = "^1.0.3" rand = "^0.8.5" +serial_test = "^2.0.0" tiny_http = "^0.12.0" -ureq = "^2.7.1" [features] default = ["simd"] diff --git a/lib/src/backends.rs b/lib/src/backends.rs index 9040bc5b2..141bbd108 100644 --- a/lib/src/backends.rs +++ b/lib/src/backends.rs @@ -1,21 +1,20 @@ use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc}; use mio::net::TcpStream; +use time::Duration; use sozu_command::{ proto::command::{Event, EventKind, LoadBalancingAlgorithms, LoadBalancingParams, LoadMetric}, state::ClusterId, }; -use time::Duration; use crate::{ + load_balancing::{LeastLoaded, LoadBalancingAlgorithm, PowerOfTwo, Random, RoundRobin}, retry::{self, RetryPolicy}, server::{self, push_event}, PeakEWMA, }; -use super::load_balancing::*; - #[derive(thiserror::Error, Debug)] pub enum BackendError { #[error("No backend found for cluster {0}")] diff --git a/lib/src/buffer_queue.rs b/lib/src/buffer_queue.rs deleted file mode 100644 index ab87aefe0..000000000 --- a/lib/src/buffer_queue.rs +++ /dev/null @@ -1,610 +0,0 @@ -use std::{ - cmp::{max, min}, - fmt, - io::{self, Write}, - str, -}; - -use crate::{ - pool::{Checkout, Pool}, - pool_crate::Reset, -}; - -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum InputElement { - /// length in the stream - Slice(usize), - Splice(usize), // x bytes copied in kernel -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum OutputElement { - /// length in the stream - Slice(usize), - Delete(usize), - Insert(Vec), - Splice(usize), // should copy x bytes from kernel to socket -} - -/// The BufferQueue has two roles: holding incoming data, and indicating -/// which data will go out. When new data arrives, it is added at the -/// end of the internal buffer. This new data is then eventually parsed or -/// handled in some way by external code. The external code then adds -/// element to the queue, indicating what to do with the data: -/// - copy a subset of the input data (and advance if needed) -/// - insert external data, like a HTTP header -/// - splice out of the kernel some data that was spliced in -/// -/// position is the index in the stream of data already handled. -/// it corresponds to the beginning of available data in the Buffer -/// a Slice(begin, end) would point to buffer.data()[begin-position..end-position] -/// (in the easiest case) -/// -/// unparsed_position is the index in the stream of data that was -/// not parsed yet -/// -/// The buffer's available data may be smaller than `end - begin`. -/// It can happen if the parser indicated we need to copy more data than is available, -/// like with a content length -/// -/// should the buffer queue indicate how much data it needs? -pub struct BufferQueue { - /// position of buffer start in stream - pub buffer_position: usize, - pub parsed_position: usize, - pub start_parsing_position: usize, - pub buffer: Checkout, - /// Vec<(start, length)> - pub input_queue: Vec, - pub output_queue: Vec, -} - -impl BufferQueue { - pub fn with_buffer(buffer: Checkout) -> BufferQueue { - BufferQueue { - buffer_position: 0, - parsed_position: 0, - start_parsing_position: 0, - input_queue: Vec::with_capacity(8), - output_queue: Vec::with_capacity(8), - buffer, - } - } - - pub fn invariant(&self) { - debug_assert!( - self.buffer_position <= self.parsed_position, - "buffer_position {} should be smaller than parsed_position {}", - self.buffer_position, - self.parsed_position - ); - debug_assert!( - self.parsed_position <= self.start_parsing_position, - "parsed_position {} should be smaller than start_parsing_position {}", - self.parsed_position, - self.start_parsing_position - ); - } - - pub fn available_input_data(&self) -> usize { - self.input_queue.iter().fold(0, |acc, el| { - acc + match el { - &InputElement::Slice(sz) | &InputElement::Splice(sz) => sz, - } - }) - } - - pub fn sliced_input(&mut self, count: usize) { - let needed = self.start_parsing_position - self.parsed_position; - if needed > 0 { - if count > needed { - self.parsed_position = self.start_parsing_position; - self.input_queue.push(InputElement::Slice(count - needed)); - } else if count <= needed { - self.parsed_position += count; - } - } else if count > 0 { - self.input_queue.push(InputElement::Slice(count)); - } - - self.invariant(); - //println!("sliced_input: buffer size: {}, parsed_position: {} start_parsing_position: {}, input_queue: {:?}, output_queue: {:?}", - // self.buffer.available_data(), self.parsed_position, self.start_parsing_position, - // self.input_queue, self.output_queue); - } - - pub fn spliced_input(&mut self, count: usize) { - //FIXME: do the same thing with needed data as in sliced_input - if count > 0 { - self.input_queue.push(InputElement::Splice(count)); - } - } - - pub fn needs_input(&self) -> bool { - self.start_parsing_position > self.parsed_position - } - - pub fn can_restart_parsing(&self) -> bool { - self.start_parsing_position == self.buffer_position - } - - pub fn empty(&self) -> bool { - self.input_queue.is_empty() && self.output_queue.is_empty() && self.buffer.empty() - } - - pub fn merge_input_slices(&self) -> usize { - let mut acc = 0usize; - for el in self.input_queue.iter() { - match *el { - InputElement::Splice(_) => break, - InputElement::Slice(sz) => acc += sz, - } - } - - assert!( - acc <= self.buffer.available_data(), - "the merged input slices can't be larger than current data in buffer" - ); - acc - } - - // same as available_input_data, TODO: delete one of them? - pub fn input_data_size(&self) -> usize { - let mut acc = 0usize; - for el in self.input_queue.iter() { - match *el { - InputElement::Splice(sz) => acc += sz, - InputElement::Slice(sz) => acc += sz, - } - } - acc - } - - pub fn unparsed_data(&self) -> &[u8] { - let largest_size = self.merge_input_slices(); - //println!("buffer: {}, parsed: {}", self.buffer_position, self.parsed_position); - let start = self.parsed_position - self.buffer_position; - if largest_size == 0 || start >= self.buffer.available_data() { - return &self.buffer.data()[0..0]; - } - //println!("available buffer data: {}, buffer position: {}, parsed_position: {}, start: {}, merged slices size: {}", - // self.buffer.available_data(), self.buffer_position, - //self.parsed_position, start, largest_size); - let end = max(self.buffer.available_data(), start + largest_size); - &self.buffer.data()[start..end] - } - - /// should only be called with a count inferior to self.input_data_size() - pub fn consume_parsed_data(&mut self, size: usize) { - //FIXME: to_consume must contain unparsed_position - parsed_position ? - let mut to_consume = size; - while to_consume > 0 { - let new_first_element = match self.input_queue.first() { - None => { - //assert!(to_consume == 0, "no more element in queue, we should not ask to consume {} more bytes", to_consume); - break; - } - Some(&InputElement::Slice(sz)) => { - if to_consume >= sz { - to_consume -= sz; - None - } else { - let new_element = InputElement::Slice(sz - to_consume); - to_consume = 0; - Some(new_element) - } - } - Some(&InputElement::Splice(sz)) => { - if to_consume >= sz { - to_consume -= sz; - None - } else { - panic!("we should not start parsing from inside a splicing buffer. But what if consume_parsed_data was called during a parsing loop? Should only call consume_parsed_data after the parsing loop finished"); - } - } - }; - - match new_first_element { - None => { - self.input_queue.remove(0); - } - Some(el) => { - self.input_queue[0] = el; - } - }; - } - - self.parsed_position += size - to_consume; - self.start_parsing_position += size; - self.invariant(); - } - - pub fn slice_output(&mut self, count: usize) { - self.output_queue.push(OutputElement::Slice(count)); - } - - pub fn delete_output(&mut self, count: usize) { - self.output_queue.push(OutputElement::Delete(count)); - } - - pub fn splice_output(&mut self, count: usize) { - self.output_queue.push(OutputElement::Splice(count)); - } - - pub fn insert_output(&mut self, v: Vec) { - self.output_queue.push(OutputElement::Insert(v)); - } - - pub fn has_output_data(&self) -> bool { - !self.output_queue.is_empty() - } - - pub fn output_data_size(&self) -> usize { - let mut acc = 0usize; - let mut available_buffer_size = self.buffer.available_data(); - - for el in self.output_queue.iter() { - match *el { - OutputElement::Splice(sz) => acc += sz, - OutputElement::Slice(sz) => { - if available_buffer_size >= sz { - acc += sz; - available_buffer_size -= sz; - } else { - let advance = sz - available_buffer_size; - acc += advance; - return acc; - } - } - OutputElement::Insert(ref v) => acc += v.len(), - OutputElement::Delete(sz) => { - if available_buffer_size >= sz { - available_buffer_size -= sz; - } else { - return acc; - } - } - } - } - acc - } - - pub fn merge_output_slices(&self) -> usize { - let mut acc = 0usize; - for el in self.output_queue.iter() { - match *el { - OutputElement::Slice(sz) => acc += sz, - _ => break, - } - } - - assert!( - acc <= self.buffer.available_data(), - "the merged output slices can't be larger than current data in buffer" - ); - acc - } - - pub fn merge_output_deletes(&self) -> usize { - let mut acc = 0usize; - for el in self.output_queue.iter() { - match *el { - OutputElement::Delete(sz) => acc += sz, - _ => break, - } - } - - assert!( - acc <= self.buffer.available_data(), - "the merged output deletes can't be larger than current data in buffer" - ); - acc - } - - pub fn next_output_data(&self) -> &[u8] { - let it = self.output_queue.iter(); - //first, calculate how many bytes we need to jump - let mut start = 0usize; - let mut largest_size = 0usize; - let mut delete_ended = false; - //println!("NEXT OUTPUT DATA:\nqueue:\n{:?}\nbuffer:\n{}", self.output_queue, self.buffer.data().to_hex(16)); - for el in it { - //println!("start={}, length={}, el = {:?}", start, largest_size, el); - if !delete_ended { - match *el { - OutputElement::Delete(sz) => start += sz, - _ => { - delete_ended = true; - match el { - OutputElement::Slice(sz) => largest_size += *sz, - OutputElement::Insert(vec) => return vec, - _ => break, - } - } - } - } else { - match *el { - OutputElement::Slice(sz) => largest_size += sz, - _ => break, - } - } - } - - //println!("buffer data: {:?}", self.buffer.data()); - //println!("calculated start={}, length={}", start, largest_size); - //FIXME: should not be larger than the buffer - let length = self.buffer.available_data(); - if start > length { - &self.buffer.data()[0..0] - } else { - let end = min(start + largest_size, length); - &self.buffer.data()[start..end] - } - } - - pub fn as_ioslice(&self) -> Vec { - let mut res = Vec::new(); - - let it = self.output_queue.iter(); - //first, calculate how many bytes we need to jump - let mut start = 0usize; - let length = self.buffer.available_data(); - //println!("NEXT OUTPUT DATA:\nqueue:\n{:?}\nbuffer:\n{}", self.output_queue, self.buffer.data().to_hex(16)); - for el in it { - match *el { - OutputElement::Delete(sz) => start += sz, - OutputElement::Slice(sz) => { - //println!("Slice({})", sz); - if sz == 0 { - continue; - } - let end = min(start + sz, length); - let i = std::io::IoSlice::new(&self.buffer.data()[start..end]); - //println!("iovec size: {}", i.len()); - res.push(i); - start = end; - if end == length { - break; - } - } - OutputElement::Insert(ref v) => { - if v.is_empty() { - continue; - } - let i = std::io::IoSlice::new(&v[..]); - //println!("got Insert with {} bytes", v.len()); - res.push(i); - } - OutputElement::Splice(_sz) => { - unimplemented!("splice not used in ioslice") - } - } - } - - //println!("returning iovec: {:?}", res); - //println!("returning iovec with {} bytes", complete_size); - res - } - - /// should only be called with a count inferior to self.input_data_size() - pub fn consume_output_data(&mut self, size: usize) { - let mut to_consume = size; - while to_consume > 0 { - let new_first_element = match self.output_queue.first() { - None => { - assert!( - to_consume == 0, - "no more element in queue, we should not ask to consume {to_consume} more bytes" - ); - break; - } - Some(&OutputElement::Slice(sz)) => { - if to_consume >= sz { - to_consume -= sz; - self.buffer_position += sz; - self.buffer.consume(sz); - None - } else { - let new_element = OutputElement::Slice(sz - to_consume); - self.buffer_position += to_consume; - self.buffer.consume(to_consume); - to_consume = 0; - Some(new_element) - } - } - Some(&OutputElement::Delete(sz)) => { - self.buffer_position += sz; - //FIXME: what if we can't delete that much data? - self.buffer.consume(sz); - None - } - Some(&OutputElement::Splice(sz)) => { - if to_consume >= sz { - to_consume -= sz; - None - } else { - let new_element = OutputElement::Splice(sz - to_consume); - to_consume = 0; - Some(new_element) - } - } - Some(OutputElement::Insert(v)) => { - if to_consume >= v.len() { - to_consume -= v.len(); - None - } else { - let new_element = OutputElement::Insert(Vec::from(&v[to_consume..])); - to_consume = 0; - Some(new_element) - } - } - }; - - match new_first_element { - None => { - self.output_queue.remove(0); - } - Some(el) => { - self.output_queue[0] = el; - } - }; - } - self.invariant(); - } - - pub fn print_unparsed(&self) { - println!("{:?}", str::from_utf8(self.unparsed_data())); - } - - pub fn print_and_consume_output(&mut self) { - while self.output_data_size() > 0 { - println!("{:?}", str::from_utf8(self.next_output_data())); - let len = self.next_output_data().len(); - self.consume_output_data(len); - } - } -} - -impl Write for BufferQueue { - fn write(&mut self, buf: &[u8]) -> io::Result { - match self.buffer.write(buf) { - Err(e) => Err(e), - Ok(sz) => { - if sz > 0 { - self.input_queue.push(InputElement::Slice(sz)); - } - Ok(sz) - } - } - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Reset for BufferQueue { - fn reset(&mut self) { - self.parsed_position = 0; - self.buffer_position = 0; - self.start_parsing_position = 0; - self.buffer.reset(); - self.input_queue.clear(); - self.output_queue.clear(); - } -} - -impl fmt::Debug for BufferQueue { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - //let b: &Buffer = &self.buffer; - write!(f, "BufferQueue {{\nbuffer_position: {},\nparsed_position: {},\nstart_parsing_position: {},\ninput_queue: {:?},\noutput_queue:{:?},\nbuffer: {:?}\n}}", - self.buffer_position, self.parsed_position, self.start_parsing_position, - self.input_queue, self.output_queue, /*b*/ ()) - } -} - -pub fn buf_with_capacity(capacity: usize) -> (Pool, BufferQueue) { - let mut pool = Pool::with_capacity(1, capacity, 16384); - let b = BufferQueue::with_buffer(pool.checkout().unwrap()); - (pool, b) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::sozu_command::buffer::fixed::Buffer; - use nom::HexDisplay; - use std::io::Write; - - #[test] - #[cfg(target_pointer_width = "64")] - fn size_test() { - assert_size!(BufferQueue, 88); - assert_size!(Buffer, 16); - } - - #[test] - fn consume() { - let (_pool, mut b) = buf_with_capacity(10); - b.buffer.write(&b"ABCDEFGHIJ"[..]).unwrap(); - b.buffer.fill(10); - b.input_queue.push(InputElement::Slice(10)); - /*let mut b = BufferQueue { - parsed_position: 0, - buffer_position: 0, - start_parsing_position: 0, - buffer: Buffer::from_slice(b"ABCDEFGHIJ"), - input_queue: vec!(InputElement::Slice(10)), - output_queue: vec!() - };*/ - - // the pool will align the buffer to 16 bytes so there are trailing zeroes - assert_eq!(b.unparsed_data(), &b"ABCDEFGHIJ\0\0\0\0\0\0\0\0\0\0"[..]); - b.consume_parsed_data(4); - assert_eq!(b.parsed_position, 4); - assert_eq!(b.start_parsing_position, 4); - assert_eq!(b.input_queue, vec!(InputElement::Slice(6))); - println!("TEST[{}]", line!()); - assert_eq!(b.unparsed_data(), &b"EFGHIJ\0\0\0\0\0\0\0\0\0\0"[..]); - println!("TEST[{}]", line!()); - - b.slice_output(4); - assert_eq!(b.output_queue, vec!(OutputElement::Slice(4))); - - b.insert_output(Vec::from(&b"test"[..])); - assert_eq!( - b.output_queue, - vec!( - OutputElement::Slice(4), - OutputElement::Insert(Vec::from(&b"test"[..])) - ) - ); - assert_eq!(b.next_output_data(), &b"ABCD"[..]); - - println!("before consume: {b:?}"); - b.consume_output_data(2); - println!("after consume: {b:?}"); - println!("next output data: {}", b.next_output_data().to_hex(8)); - assert_eq!(b.next_output_data(), &b"CD"[..]); - - println!("TEST[{}]", line!()); - b.consume_parsed_data(8); - assert_eq!(b.parsed_position, 10); - assert_eq!(b.start_parsing_position, 12); - assert_eq!(b.input_queue, vec!()); - - println!("TEST[{}]", line!()); - assert_eq!(b.unparsed_data(), &b""[..]); - println!("TEST[{}]", line!()); - - println!("**test**"); - b.consume_output_data(2); - assert_eq!(b.next_output_data(), &b"test"[..]); - b.consume_output_data(2); - assert_eq!(b.next_output_data(), &b"st"[..]); - - b.delete_output(2); - b.slice_output(4); - assert_eq!( - b.output_queue, - vec!( - OutputElement::Insert(Vec::from(&b"st"[..])), - OutputElement::Delete(2), - OutputElement::Slice(4) - ) - ); - - b.consume_output_data(2); - assert_eq!( - b.output_queue, - vec!(OutputElement::Delete(2), OutputElement::Slice(4)) - ); - assert_eq!(b.next_output_data(), &b"GHIJ"[..]); - - b.consume_output_data(1); - assert_eq!(b.output_queue, vec!(OutputElement::Slice(3))); - assert_eq!(b.next_output_data(), &b"HIJ"[..]); - - b.write(&b"KLMNOP"[..]).unwrap(); - } -} diff --git a/lib/src/http.rs b/lib/src/http.rs index 05ffaa244..19e649a5f 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -9,7 +9,11 @@ use std::{ }; use anyhow::Context; -use mio::{net::*, unix::SourceFd, *}; +use mio::{ + net::{TcpListener, TcpStream, UnixStream}, + unix::SourceFd, + Interest, Poll, Registry, Token, +}; use rusty_ulid::Ulid; use slab::Slab; use time::{Duration, Instant}; @@ -28,12 +32,6 @@ use sozu_command::{ }; use crate::{ - protocol::SessionState, router::Router, timer::TimeoutContainer, util::UnwrapLog, CachedTags, - FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, ListenerHandler, - ProxyError, SessionIsToBeClosed, SessionResult, StateMachineBuilder, -}; - -use super::{ backends::BackendMap, pool::Pool, protocol::{ @@ -42,12 +40,16 @@ use super::{ parser::{hostname_and_port, Method}, }, proxy_protocol::expect::ExpectProxyProtocol, - {Http, Pipe}, + Http, Pipe, SessionState, }, - router::Route, + router::{Route, Router}, server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager}, socket::server_bind, - AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionMetrics, StateResult, + timer::TimeoutContainer, + util::UnwrapLog, + AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, + ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, + SessionMetrics, SessionResult, StateMachineBuilder, StateResult, }; #[derive(PartialEq, Eq)] diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 17f3e2382..8aa13e368 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -324,32 +324,11 @@ //! } //! ``` -extern crate hdrhistogram; -extern crate libc; -extern crate log; -extern crate mio; -extern crate nom; -extern crate pool as pool_crate; -extern crate rand; -extern crate rustls; -extern crate rusty_ulid; -extern crate slab; -extern crate socket2; -extern crate time; -extern crate url; #[macro_use] extern crate sozu_command_lib as sozu_command; -extern crate cookie_factory; -extern crate hpack; -extern crate idna; -extern crate lazycell; -extern crate poule; -extern crate regex; -extern crate webpki; #[cfg(test)] #[macro_use] extern crate quickcheck; -extern crate foreign_types_shared; #[macro_use] pub mod util; @@ -358,7 +337,6 @@ pub mod metrics; mod logs; pub mod backends; -pub mod buffer_queue; pub mod features; pub mod http; pub mod load_balancing; @@ -392,18 +370,19 @@ use backends::BackendError; use mio::{net::TcpStream, Interest, Token}; use protocol::http::parser::Method; use router::RouterError; +use time::{Duration, Instant}; +use tls::GenericCertificateResolverError; + use sozu_command::{ - proto::command::{ListenerType, RequestHttpFrontend}, - ObjectKind, -}; -use sozu_command_lib::{ - proto::command::Cluster, ready::Ready, request::WorkerRequest, response::WorkerResponse, + proto::command::{Cluster, ListenerType, RequestHttpFrontend}, + ready::Ready, + request::WorkerRequest, + response::WorkerResponse, state::ClusterId, + ObjectKind, }; -use time::{Duration, Instant}; -use tls::GenericCertificateResolverError; -use self::{backends::BackendMap, router::Route}; +use crate::{backends::BackendMap, router::Route}; /// Anything that can be registered in mio (subscribe to kernel events) #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/lib/src/metrics/local_drain.rs b/lib/src/metrics/local_drain.rs index 58dc13b18..5e2638f9f 100644 --- a/lib/src/metrics/local_drain.rs +++ b/lib/src/metrics/local_drain.rs @@ -2,13 +2,14 @@ use std::{collections::BTreeMap, str, time::Instant}; use hdrhistogram::Histogram; + use sozu_command::proto::command::{ filtered_metrics, response_content::ContentType, AvailableMetrics, BackendMetrics, ClusterMetrics, FilteredMetrics, MetricsConfiguration, Percentiles, QueryMetricsOptions, ResponseContent, WorkerMetrics, }; -use super::{MetricError, MetricValue, Subscriber}; +use crate::metrics::{MetricError, MetricValue, Subscriber}; /// This is how the metrics are stored in the local drain #[derive(Debug, Clone)] diff --git a/lib/src/metrics/mod.rs b/lib/src/metrics/mod.rs index 0563117f1..e71d143c3 100644 --- a/lib/src/metrics/mod.rs +++ b/lib/src/metrics/mod.rs @@ -12,11 +12,12 @@ use std::{ }; use mio::net::UdpSocket; + use sozu_command::proto::command::{ FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent, }; -use self::{local_drain::LocalDrain, network_drain::NetworkDrain}; +use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain}; thread_local! { pub static METRICS: RefCell = RefCell::new(Aggregator::new(String::from("sozu"))); diff --git a/lib/src/protocol/h2/serializer.rs b/lib/src/protocol/h2/serializer.rs index ca67b91ec..6f2f907f3 100644 --- a/lib/src/protocol/h2/serializer.rs +++ b/lib/src/protocol/h2/serializer.rs @@ -5,7 +5,7 @@ use cookie_factory::{ GenError, }; -use super::parser::{FrameHeader, FrameType}; +use crate::protocol::h2::parser::{FrameHeader, FrameType}; pub fn gen_frame_header<'a, 'b>( x: (&'a mut [u8], usize), diff --git a/lib/src/protocol/h2/state.rs b/lib/src/protocol/h2/state.rs index 70cf02b67..152959418 100644 --- a/lib/src/protocol/h2/state.rs +++ b/lib/src/protocol/h2/state.rs @@ -2,9 +2,10 @@ use std::collections::{HashMap, VecDeque}; use nom::Offset; -use crate::Ready; - -use super::{parser, serializer, stream}; +use crate::{ + protocol::h2::{parser, serializer, stream}, + Ready, +}; #[derive(Clone, Debug, PartialEq)] pub struct OutputFrame { diff --git a/lib/src/protocol/kawa_h1/answers.rs b/lib/src/protocol/kawa_h1/answers.rs index 7d21e0ada..7a48fb991 100644 --- a/lib/src/protocol/kawa_h1/answers.rs +++ b/lib/src/protocol/kawa_h1/answers.rs @@ -1,7 +1,6 @@ -use crate::sozu_command::state::ClusterId; use std::{collections::HashMap, rc::Rc}; -use super::DefaultAnswerStatus; +use crate::{protocol::http::DefaultAnswerStatus, sozu_command::state::ClusterId}; #[allow(non_snake_case)] pub struct DefaultAnswers { diff --git a/lib/src/protocol/kawa_h1/mod.rs b/lib/src/protocol/kawa_h1/mod.rs index 1040bf41e..d8ed31678 100644 --- a/lib/src/protocol/kawa_h1/mod.rs +++ b/lib/src/protocol/kawa_h1/mod.rs @@ -10,11 +10,12 @@ use std::{ }; use kawa; -use mio::{net::TcpStream, *}; +use mio::{net::TcpStream, Interest, Token}; use rusty_ulid::Ulid; -use sozu_command::proto::command::{Event, EventKind, ListenerType}; use time::{Duration, Instant}; +use sozu_command::proto::command::{Event, EventKind, ListenerType}; + use crate::{ backends::{Backend, BackendError}, logs::{Endpoint, LogContext, RequestRecord}, diff --git a/lib/src/protocol/pipe.rs b/lib/src/protocol/pipe.rs index 710bf2e0a..fd5fd8ac4 100644 --- a/lib/src/protocol/pipe.rs +++ b/lib/src/protocol/pipe.rs @@ -1,7 +1,6 @@ use std::{cell::RefCell, net::SocketAddr, rc::Rc}; -use mio::net::*; -use mio::*; +use mio::{net::TcpStream, Token}; use rusty_ulid::Ulid; use crate::{ diff --git a/lib/src/protocol/proxy_protocol/parser.rs b/lib/src/protocol/proxy_protocol/parser.rs index f32d78564..8123763c7 100644 --- a/lib/src/protocol/proxy_protocol/parser.rs +++ b/lib/src/protocol/proxy_protocol/parser.rs @@ -10,7 +10,7 @@ use nom::{ Err, IResult, }; -use crate::protocol::proxy_protocol::header::*; +use crate::protocol::proxy_protocol::header::{Command, HeaderV2, ProxyAddr}; const PROTOCOL_SIGNATURE_V2: [u8; 12] = [ 0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, diff --git a/lib/src/protocol/proxy_protocol/relay.rs b/lib/src/protocol/proxy_protocol/relay.rs index f71a89e32..5ff2a8aee 100644 --- a/lib/src/protocol/proxy_protocol/relay.rs +++ b/lib/src/protocol/proxy_protocol/relay.rs @@ -1,21 +1,18 @@ use std::{cell::RefCell, io::Write, rc::Rc}; -use mio::net::TcpStream; -use mio::*; +use mio::{net::TcpStream, Token}; use nom::{Err, Offset}; use rusty_ulid::Ulid; use crate::{ pool::Checkout, - protocol::{pipe::Pipe, SessionResult}, + protocol::{pipe::Pipe, proxy_protocol::parser::parse_v2_header, SessionResult}, socket::{SocketHandler, SocketResult}, sozu_command::ready::Ready, tcp::TcpListener, Protocol, Readiness, SessionMetrics, StateResult, }; -use super::parser::parse_v2_header; - pub struct RelayProxyProtocol { cursor_header: usize, pub backend_readiness: Readiness, diff --git a/lib/src/protocol/proxy_protocol/send.rs b/lib/src/protocol/proxy_protocol/send.rs index e621d5591..657ca5c24 100644 --- a/lib/src/protocol/proxy_protocol/send.rs +++ b/lib/src/protocol/proxy_protocol/send.rs @@ -4,20 +4,22 @@ use std::{ rc::Rc, }; -use mio::{net::TcpStream, *}; +use mio::{net::TcpStream, Token}; use rusty_ulid::Ulid; use crate::{ pool::Checkout, - protocol::{pipe::Pipe, SessionResult}, + protocol::{ + pipe::Pipe, + proxy_protocol::header::{Command, HeaderV2, ProxyProtocolHeader}, + SessionResult, + }, socket::SocketHandler, sozu_command::ready::Ready, tcp::TcpListener, BackendConnectionStatus, Protocol, Readiness, SessionMetrics, StateResult, }; -use super::header::*; - pub struct SendProxyProtocol { cursor_header: usize, pub backend_readiness: Readiness, diff --git a/lib/src/protocol/rustls.rs b/lib/src/protocol/rustls.rs index 300b34a5f..3e15116ba 100644 --- a/lib/src/protocol/rustls.rs +++ b/lib/src/protocol/rustls.rs @@ -1,6 +1,6 @@ use std::{cell::RefCell, io::ErrorKind, rc::Rc}; -use mio::{net::*, Token}; +use mio::{net::TcpStream, Token}; use rustls::ServerConnection; use rusty_ulid::Ulid; diff --git a/lib/src/router/mod.rs b/lib/src/router/mod.rs index e76fcb9d5..e0c9fc2c7 100644 --- a/lib/src/router/mod.rs +++ b/lib/src/router/mod.rs @@ -1,18 +1,18 @@ pub mod pattern_trie; pub mod trie; -use regex::bytes::Regex; use std::str::from_utf8; + +use regex::bytes::Regex; use time::Instant; -use crate::protocol::http::parser::Method; use sozu_command::{ proto::command::{PathRule as CommandPathRule, PathRuleKind, RulePosition}, response::HttpFrontend, state::ClusterId, }; -use self::pattern_trie::TrieNode; +use crate::{protocol::http::parser::Method, router::pattern_trie::TrieNode}; #[derive(thiserror::Error, Debug)] pub enum RouterError { diff --git a/lib/src/server.rs b/lib/src/server.rs index 27b61e848..ae9721560 100644 --- a/lib/src/server.rs +++ b/lib/src/server.rs @@ -13,6 +13,8 @@ use mio::{ Events, Interest, Poll, Token, }; use slab::Slab; +use time::{Duration, Instant}; + use sozu_command::{ channel::Channel, config::Config, @@ -29,7 +31,6 @@ use sozu_command::{ scm_socket::{Listeners, ScmSocket}, state::ConfigState, }; -use time::{Duration, Instant}; use crate::{ backends::{Backend, BackendMap}, diff --git a/lib/src/tcp.rs b/lib/src/tcp.rs index ba9e507ac..44cbbf581 100644 --- a/lib/src/tcp.rs +++ b/lib/src/tcp.rs @@ -16,9 +16,10 @@ use mio::{ }; use rusty_ulid::Ulid; use slab::Slab; -use sozu_command::{proto::command::request::RequestType, ObjectKind}; use time::{Duration, Instant}; +use sozu_command::{proto::command::request::RequestType, ObjectKind}; + use crate::{ backends::{Backend, BackendMap}, logs::{Endpoint, LogContext, RequestRecord}, diff --git a/lib/src/tls.rs b/lib/src/tls.rs index 710c095c9..44081449b 100644 --- a/lib/src/tls.rs +++ b/lib/src/tls.rs @@ -18,12 +18,13 @@ use rustls::{ use sha2::{Digest, Sha256}; use x509_parser::pem::{parse_x509_pem, Pem}; -use crate::router::trie::*; use sozu_command::{ certificate::{get_cn_and_san_attributes, CertificateError, Fingerprint}, proto::command::{AddCertificate, CertificateAndKey, ReplaceCertificate, TlsVersion}, }; +use crate::router::trie::{Key, KeyValue, TrieNode}; + // ----------------------------------------------------------------------------- // CertificateResolver trait