diff --git a/bin/src/command/mod.rs b/bin/src/command/mod.rs index 6b7dca029..38ea88869 100644 --- a/bin/src/command/mod.rs +++ b/bin/src/command/mod.rs @@ -974,7 +974,7 @@ async fn client_loop( if buffer.is_empty() { break; } - println!( + trace!( "client loop: I just created a buffer from my read buf reader: {:?}", buffer ); @@ -988,9 +988,10 @@ async fn client_loop( }; let message_len = usize::from_le_bytes(delimiter); - println!( + trace!( "client loop: reading message of encoded length {}, here is the buffer data: {:?}", - message_len, buffer + message_len, + buffer ); if buffer.len() >= message_len { @@ -1062,7 +1063,7 @@ async fn worker_loop( // this does essentially what Channel::try_read_delimited_message() does let mut buf_reader = BufReader::new(read_stream); loop { - println!( + trace!( "The buf_reader in the client loop has this length: {}", buf_reader.buffer().len() ); @@ -1087,9 +1088,10 @@ async fn worker_loop( }; let message_len = usize::from_le_bytes(delimiter); - println!( + trace!( "worker loop: reading message of encoded length {}, here is the buffer data: {:?}", - message_len, buffer + message_len, + buffer ); if buffer.len() >= message_len { diff --git a/bin/src/ctl/display.rs b/bin/src/ctl/display.rs index 5ea66c2e3..19f61b8dc 100644 --- a/bin/src/ctl/display.rs +++ b/bin/src/ctl/display.rs @@ -128,7 +128,9 @@ pub fn print_status(worker_infos: WorkerInfos) { let row = row!( worker_info.id, worker_info.pid, - RunState::try_from(worker_info.run_state).unwrap().as_str_name() + RunState::try_from(worker_info.run_state) + .unwrap() + .as_str_name() ); table.add_row(row); } diff --git a/bin/src/worker.rs b/bin/src/worker.rs index 0f584c750..df0000314 100644 --- a/bin/src/worker.rs +++ b/bin/src/worker.rs @@ -36,7 +36,7 @@ use sozu_command_lib::{ WorkerResponse, }, ready::Ready, - request::read_requests_from_file, + request::read_initial_state_from_file, scm_socket::{Listeners, ScmSocket}, state::ConfigState, }; @@ -226,7 +226,7 @@ pub fn begin_worker_process( let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) }; - let initial_state = read_requests_from_file(&mut configuration_state_file) + let initial_state = read_initial_state_from_file(&mut configuration_state_file) .with_context(|| "could not parse configuration state data")?; let worker_config = worker_to_main_channel @@ -319,7 +319,7 @@ pub fn fork_main_into_worker( util::disable_close_on_exec(state_file.as_raw_fd())?; state - .write_requests_to_file(&mut state_file) + .write_initial_state_to_file(&mut state_file) .with_context(|| "Could not write state to file")?; state_file diff --git a/command/src/channel.rs b/command/src/channel.rs index 9f2addad4..f28b9cbf4 100644 --- a/command/src/channel.rs +++ b/command/src/channel.rs @@ -315,11 +315,6 @@ impl Cha .map_err(|_| ChannelError::MismatchBufferSize)?; let message_len = usize::from_le_bytes(delimiter); - println!( - "channel: reading message of encoded length {}, here is the buffer data: {:?}", - message_len, buffer - ); - if buffer.len() >= message_len { let message = Rx::decode(&buffer[delimiter_size()..message_len]) .map_err(|decode_error| ChannelError::InvalidProtobufMessage(decode_error))?; @@ -345,7 +340,7 @@ impl Cha /// /// If the channel is nonblocking, you have to flush using `channel.run()` afterwards pub fn write_message(&mut self, message: &Tx) -> Result<(), ChannelError> { - println!("Writing message {:?}", message); + trace!("Writing message {:?}", message); if self.blocking { self.write_message_blocking(message) } else { @@ -393,7 +388,7 @@ impl Cha let delimiter = payload_len.to_le_bytes(); - println!( + trace!( "writing message {:?} of encoded length {}: {:?}", message, payload.len(), diff --git a/command/src/command.proto b/command/src/command.proto index 21a445487..30453de09 100644 --- a/command/src/command.proto +++ b/command/src/command.proto @@ -654,4 +654,9 @@ message ListenersCount { repeated string tls = 2; // socket addresses of TCP listeners repeated string tcp = 3; +} + +// state passed to a new worker, able to +message InitialState { + repeated WorkerRequest requests = 1; } \ No newline at end of file diff --git a/command/src/request.rs b/command/src/request.rs index 1c0b434f5..389fd3813 100644 --- a/command/src/request.rs +++ b/command/src/request.rs @@ -2,19 +2,17 @@ use std::{ error, fmt::{self, Display}, fs::File, - io::Read, + io::{BufReader, Read}, net::SocketAddr, str::FromStr, }; -use nom::{HexDisplay, Offset}; +use prost::Message; use crate::{ - buffer::fixed::Buffer, - parser::parse_several_requests, proto::command::{ - request::RequestType, LoadBalancingAlgorithms, PathRuleKind, Request, RequestHttpFrontend, - RulePosition, WorkerRequest, + request::RequestType, InitialState, LoadBalancingAlgorithms, PathRuleKind, Request, + RequestHttpFrontend, RulePosition, WorkerRequest, }, response::HttpFrontend, }; @@ -126,54 +124,19 @@ impl fmt::Display for WorkerRequest { } } -pub fn read_requests_from_file(file: &mut File) -> Result, RequestError> { - let mut acc = Vec::new(); - let mut buffer = Buffer::with_capacity(200000); - loop { - let previous = buffer.available_data(); +pub fn read_initial_state_from_file(file: &mut File) -> Result { + let mut buf_reader = BufReader::new(file); + let mut buffer = Vec::new(); + buf_reader + .read_to_end(&mut buffer) + .map_err(|e| RequestError::FileError(e))?; - let bytes_read = file - .read(buffer.space()) - .map_err(|e| RequestError::FileError(e))?; - - buffer.fill(bytes_read); - - if buffer.available_data() == 0 { - debug!("Empty buffer"); - break; - } + let initial_state = + InitialState::decode(&buffer[..]).map_err(|e| RequestError::ParseError(e.to_string()))?; + Ok(initial_state) +} - let mut offset = 0usize; - match parse_several_requests::(buffer.data()) { - Ok((i, requests)) => { - if !i.is_empty() { - debug!("could not parse {} bytes", i.len()); - if previous == buffer.available_data() { - break; - } - } - offset = buffer.data().offset(i); - acc.push(requests); - } - Err(nom::Err::Incomplete(_)) => { - if buffer.available_data() == buffer.capacity() { - error!( - "message too big, stopping parsing:\n{}", - buffer.data().to_hex(16) - ); - break; - } - } - Err(parse_error) => { - return Err(RequestError::ParseError(parse_error.to_string())); - } - } - buffer.consume(offset); - } - let requests = acc.into_iter().flatten().collect(); - Ok(requests) -} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ProxyDestinations { diff --git a/command/src/state.rs b/command/src/state.rs index 2b930f96c..0ec249a70 100644 --- a/command/src/state.rs +++ b/command/src/state.rs @@ -10,7 +10,7 @@ use std::{ net::SocketAddr, }; -use prost::DecodeError; +use prost::{DecodeError, Message}; use crate::{ certificate::{self, calculate_fingerprint, Fingerprint}, @@ -18,8 +18,8 @@ use crate::{ command::{ request::RequestType, ActivateListener, AddBackend, AddCertificate, CertificateAndKey, Cluster, ClusterInformation, DeactivateListener, FrontendFilters, HttpListenerConfig, - HttpsListenerConfig, ListedFrontends, ListenerType, ListenersList, PathRule, - QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener, + HttpsListenerConfig, InitialState, ListedFrontends, ListenerType, ListenersList, + PathRule, QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener, ReplaceCertificate, Request, RequestCounts, RequestHttpFrontend, RequestTcpFrontend, TcpListenerConfig, WorkerRequest, }, @@ -1372,6 +1372,34 @@ impl ConfigState { } } + pub fn produce_initial_state(&self) -> InitialState { + let mut counter = 0usize; + let mut worker_requests = Vec::new(); + for request in self.generate_requests() { + worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request)); + counter += 1; + } + InitialState { + requests: worker_requests, + } + } + + /// generate requests necessary to recreate the state, + /// in protobuf, to a temp file + pub fn write_initial_state_to_file(&self, file: &mut File) -> Result { + let initial_state = self.produce_initial_state(); + let count = initial_state.requests.len(); + + let bytes_to_write = initial_state.encode_to_vec(); + println!("writing {} in the temp file", bytes_to_write.len()); + file.write_all(&bytes_to_write) + .map_err(StateError::FileError)?; + + file.sync_all().map_err(StateError::FileError)?; + + Ok(count) + } + /// generate requests necessary to recreate the state, /// write them in a JSON form in a file, separated by \n\0, /// returns the number of written requests diff --git a/e2e/src/sozu/worker.rs b/e2e/src/sozu/worker.rs index 8f8731eb1..929bad0c4 100644 --- a/e2e/src/sozu/worker.rs +++ b/e2e/src/sozu/worker.rs @@ -102,14 +102,7 @@ impl Worker { .send_listeners(&listeners) .expect("could not send listeners"); - let initial_state = state - .generate_requests() - .into_iter() - .map(|request| WorkerRequest { - id: "initial_state".to_string(), - content: request, - }) - .collect(); + let initial_state = state.produce_initial_state(); let server = Server::try_new_from_config( cmd_worker_to_main, scm_worker_to_main, @@ -147,14 +140,7 @@ impl Worker { .expect("could not send listeners"); let thread_config = config.to_owned(); - let initial_state = state - .generate_requests() - .into_iter() - .map(|request| WorkerRequest { - id: "initial_state".to_string(), - content: request, - }) - .collect(); + let initial_state = state.produce_initial_state(); let thread_name = name.to_owned(); let thread_scm_worker_to_main = scm_worker_to_main.to_owned(); diff --git a/lib/src/server.rs b/lib/src/server.rs index aac93d49b..a5f939d7d 100644 --- a/lib/src/server.rs +++ b/lib/src/server.rs @@ -22,7 +22,7 @@ use sozu_command::{ CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations, DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, ListenerType, LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend, ResponseStatus, - ServerConfig, TcpListenerConfig as CommandTcpListener, WorkerRequest, WorkerResponse, + ServerConfig, TcpListenerConfig as CommandTcpListener, WorkerRequest, WorkerResponse, InitialState, }, ready::Ready, scm_socket::{Listeners, ScmSocket}, @@ -227,7 +227,7 @@ impl Server { worker_to_main_channel: ProxyChannel, worker_to_main_scm: ScmSocket, config: ServerConfig, - initial_state: Vec, + initial_state: InitialState, expects_initial_status: bool, ) -> anyhow::Result { // let server_config = ServerConfig::from_config(&config); @@ -306,7 +306,7 @@ impl Server { https: Option, tcp: Option, server_config: ServerConfig, - initial_state: Option>, + initial_state: Option, expects_initial_status: bool, ) -> anyhow::Result { FEATURES.with(|_features| { @@ -396,8 +396,8 @@ impl Server { }; // initialize the worker with the state we got from a file - if let Some(requests) = initial_state { - for request in requests { + if let Some(state) = initial_state { + for request in state.requests { trace!("generating initial config request: {:#?}", request); server.notify_proxys(request); }