From 4006aeb46c62d718070f01414c62a8d555136ca1 Mon Sep 17 00:00:00 2001 From: Emmanuel Bosquet Date: Fri, 24 Nov 2023 12:49:56 +0100 Subject: [PATCH] pass Vec instead of ConfigState to new worker new function read_requests_from_file Now, creating a new worker means writing ConfigState as requests in a file, and parsing them when starting the new worker. Previously, writing/reading the ConfigStated into a file was done with serde_json reader and writer, which was inefficient. --- bin/src/worker.rs | 14 +++++----- command/src/request.rs | 59 ++++++++++++++++++++++++++++++++++++++++++ e2e/src/sozu/worker.rs | 21 ++++++++++++--- lib/src/server.rs | 20 +++++--------- 4 files changed, 92 insertions(+), 22 deletions(-) diff --git a/bin/src/worker.rs b/bin/src/worker.rs index 55a5e6496..81d361f2d 100644 --- a/bin/src/worker.rs +++ b/bin/src/worker.rs @@ -32,7 +32,7 @@ use sozu_command_lib::{ logging::target_to_backend, proto::command::{request::RequestType, Request, RunState, Status, WorkerInfo}, ready::Ready, - request::WorkerRequest, + request::{read_requests_from_file, WorkerRequest}, response::WorkerResponse, scm_socket::{Listeners, ScmSocket}, state::ConfigState, @@ -216,8 +216,9 @@ pub fn begin_worker_process( error!("Could not block the worker-to-main channel: {}", e); } - let configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) }; - let config_state: ConfigState = serde_json::from_reader(configuration_state_file) + let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) }; + + let initial_state = read_requests_from_file(&mut configuration_state_file) .with_context(|| "could not parse configuration state data")?; let worker_config = worker_to_main_channel @@ -275,7 +276,7 @@ pub fn begin_worker_process( worker_to_main_channel, worker_to_main_scm_socket, worker_config, - config_state, + initial_state, true, ) .with_context(|| "Could not create server from config")?; @@ -305,8 +306,9 @@ pub fn fork_main_into_worker( tempfile().with_context(|| "could not create temporary file for configuration state")?; util::disable_close_on_exec(state_file.as_raw_fd())?; - serde_json::to_writer(&mut state_file, state) - .with_context(|| "could not write upgrade data to temporary file")?; + state + .write_requests_to_file(&mut state_file) + .with_context(|| "Could not write state to file")?; state_file .rewind() diff --git a/command/src/request.rs b/command/src/request.rs index d278713f1..c8350b23b 100644 --- a/command/src/request.rs +++ b/command/src/request.rs @@ -1,11 +1,17 @@ use std::{ error, fmt::{self, Display}, + fs::File, + io::Read, net::SocketAddr, str::FromStr, }; +use nom::{HexDisplay, Offset}; + use crate::{ + buffer::fixed::Buffer, + parser::parse_several_requests, proto::command::{ request::RequestType, LoadBalancingAlgorithms, PathRuleKind, Request, RequestHttpFrontend, RulePosition, @@ -19,6 +25,10 @@ pub enum RequestError { InvalidSocketAddress { address: String, error: String }, #[error("invalid value {value} for field '{name}'")] InvalidValue { name: String, value: i32 }, + #[error("Could not read requests from file: {0}")] + FileError(std::io::Error), + #[error("Could not parse requests: {0}")] + ParseError(String), } impl Request { @@ -123,6 +133,55 @@ impl fmt::Display for WorkerRequest { } } +pub fn read_requests_from_file(file: &mut File) -> Result, RequestError> { + let mut acc = Vec::new(); + let mut buffer = Buffer::with_capacity(200000); + loop { + let previous = buffer.available_data(); + + let bytes_read = file + .read(buffer.space()) + .map_err(|e| RequestError::FileError(e))?; + + buffer.fill(bytes_read); + + if buffer.available_data() == 0 { + debug!("Empty buffer"); + break; + } + + let mut offset = 0usize; + match parse_several_requests::(buffer.data()) { + Ok((i, requests)) => { + if !i.is_empty() { + debug!("could not parse {} bytes", i.len()); + if previous == buffer.available_data() { + break; + } + } + offset = buffer.data().offset(i); + + acc.push(requests); + } + Err(nom::Err::Incomplete(_)) => { + if buffer.available_data() == buffer.capacity() { + error!( + "message too big, stopping parsing:\n{}", + buffer.data().to_hex(16) + ); + break; + } + } + Err(parse_error) => { + return Err(RequestError::ParseError(parse_error.to_string())); + } + } + buffer.consume(offset); + } + let requests = acc.into_iter().flatten().collect(); + Ok(requests) +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ProxyDestinations { pub to_http_proxy: bool, diff --git a/e2e/src/sozu/worker.rs b/e2e/src/sozu/worker.rs index a25b1948c..017fce7e1 100644 --- a/e2e/src/sozu/worker.rs +++ b/e2e/src/sozu/worker.rs @@ -102,11 +102,19 @@ impl Worker { .send_listeners(&listeners) .expect("could not send listeners"); + let initial_state = state + .generate_requests() + .iter() + .map(|req| WorkerRequest { + id: "initial_state".to_string(), + content: req.clone(), + }) + .collect(); let server = Server::try_new_from_config( cmd_worker_to_main, scm_worker_to_main, config, - state, + initial_state, false, ) .expect("could not create sozu worker"); @@ -139,7 +147,14 @@ impl Worker { .expect("could not send listeners"); let thread_config = config.to_owned(); - let thread_state = state.to_owned(); + let initial_state = state + .generate_requests() + .iter() + .map(|req| WorkerRequest { + id: "initial_state".to_string(), + content: req.clone(), + }) + .collect(); let thread_name = name.to_owned(); let thread_scm_worker_to_main = scm_worker_to_main.to_owned(); @@ -157,7 +172,7 @@ impl Worker { cmd_worker_to_main, thread_scm_worker_to_main, thread_config, - thread_state, + initial_state, false, ) .expect("could not create sozu worker"); diff --git a/lib/src/server.rs b/lib/src/server.rs index 12811decc..0461dbe41 100644 --- a/lib/src/server.rs +++ b/lib/src/server.rs @@ -269,7 +269,7 @@ impl Server { worker_to_main_channel: ProxyChannel, worker_to_main_scm: ScmSocket, config: Config, - config_state: ConfigState, + initial_state: Vec, expects_initial_status: bool, ) -> anyhow::Result { let event_loop = Poll::new().with_context(|| "could not create event loop")?; @@ -331,7 +331,7 @@ impl Server { Some(https), None, server_config, - Some(config_state), + Some(initial_state), expects_initial_status, ) } @@ -348,7 +348,7 @@ impl Server { https: Option, tcp: Option, server_config: ServerConfig, - config_state: Option, + initial_state: Option>, expects_initial_status: bool, ) -> anyhow::Result { FEATURES.with(|_features| { @@ -438,16 +438,10 @@ impl Server { }; // initialize the worker with the state we got from a file - if let Some(state) = config_state { - for (counter, request) in state.generate_requests().iter().enumerate() { - let id = format!("INIT-{counter}"); - let worker_request = WorkerRequest { - id, - content: request.to_owned(), - }; - - trace!("generating initial config request: {:#?}", worker_request); - server.notify_proxys(worker_request); + if let Some(requests) = initial_state { + for request in requests { + trace!("generating initial config request: {:#?}", request); + server.notify_proxys(request); } // do not send back answers to the initialization messages