Skip to content

Commit

Permalink
pass Vec<WorkerRequests> instead of ConfigState to new worker
Browse files Browse the repository at this point in the history
new function read_requests_from_file
Now, creating a new worker means writing ConfigState as requests
in a file, and parsing them when starting the new worker.
Previously, writing/reading the ConfigStated into a file
was done with serde_json reader and writer, which was inefficient.
  • Loading branch information
Keksoj committed Nov 24, 2023
1 parent a770f8f commit 4006aeb
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
14 changes: 8 additions & 6 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use sozu_command_lib::{
logging::target_to_backend,
proto::command::{request::RequestType, Request, RunState, Status, WorkerInfo},
ready::Ready,
request::WorkerRequest,
request::{read_requests_from_file, WorkerRequest},
response::WorkerResponse,
scm_socket::{Listeners, ScmSocket},
state::ConfigState,
Expand Down Expand Up @@ -216,8 +216,9 @@ pub fn begin_worker_process(
error!("Could not block the worker-to-main channel: {}", e);
}

let configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };
let config_state: ConfigState = serde_json::from_reader(configuration_state_file)
let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };

let initial_state = read_requests_from_file(&mut configuration_state_file)
.with_context(|| "could not parse configuration state data")?;

let worker_config = worker_to_main_channel
Expand Down Expand Up @@ -275,7 +276,7 @@ pub fn begin_worker_process(
worker_to_main_channel,
worker_to_main_scm_socket,
worker_config,
config_state,
initial_state,
true,
)
.with_context(|| "Could not create server from config")?;
Expand Down Expand Up @@ -305,8 +306,9 @@ pub fn fork_main_into_worker(
tempfile().with_context(|| "could not create temporary file for configuration state")?;
util::disable_close_on_exec(state_file.as_raw_fd())?;

serde_json::to_writer(&mut state_file, state)
.with_context(|| "could not write upgrade data to temporary file")?;
state
.write_requests_to_file(&mut state_file)
.with_context(|| "Could not write state to file")?;

state_file
.rewind()
Expand Down
59 changes: 59 additions & 0 deletions command/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::{
error,
fmt::{self, Display},
fs::File,
io::Read,
net::SocketAddr,
str::FromStr,
};

use nom::{HexDisplay, Offset};

use crate::{
buffer::fixed::Buffer,
parser::parse_several_requests,
proto::command::{
request::RequestType, LoadBalancingAlgorithms, PathRuleKind, Request, RequestHttpFrontend,
RulePosition,
Expand All @@ -19,6 +25,10 @@ pub enum RequestError {
InvalidSocketAddress { address: String, error: String },
#[error("invalid value {value} for field '{name}'")]
InvalidValue { name: String, value: i32 },
#[error("Could not read requests from file: {0}")]
FileError(std::io::Error),
#[error("Could not parse requests: {0}")]
ParseError(String),
}

impl Request {
Expand Down Expand Up @@ -123,6 +133,55 @@ impl fmt::Display for WorkerRequest {
}
}

pub fn read_requests_from_file(file: &mut File) -> Result<Vec<WorkerRequest>, RequestError> {
let mut acc = Vec::new();
let mut buffer = Buffer::with_capacity(200000);
loop {
let previous = buffer.available_data();

let bytes_read = file
.read(buffer.space())
.map_err(|e| RequestError::FileError(e))?;

buffer.fill(bytes_read);

if buffer.available_data() == 0 {
debug!("Empty buffer");
break;
}

let mut offset = 0usize;
match parse_several_requests::<WorkerRequest>(buffer.data()) {
Ok((i, requests)) => {
if !i.is_empty() {
debug!("could not parse {} bytes", i.len());
if previous == buffer.available_data() {
break;
}
}
offset = buffer.data().offset(i);

acc.push(requests);
}
Err(nom::Err::Incomplete(_)) => {
if buffer.available_data() == buffer.capacity() {
error!(
"message too big, stopping parsing:\n{}",
buffer.data().to_hex(16)
);
break;
}
}
Err(parse_error) => {
return Err(RequestError::ParseError(parse_error.to_string()));
}
}
buffer.consume(offset);
}
let requests = acc.into_iter().flatten().collect();
Ok(requests)
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProxyDestinations {
pub to_http_proxy: bool,
Expand Down
21 changes: 18 additions & 3 deletions e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,19 @@ impl Worker {
.send_listeners(&listeners)
.expect("could not send listeners");

let initial_state = state
.generate_requests()
.iter()
.map(|req| WorkerRequest {
id: "initial_state".to_string(),
content: req.clone(),
})
.collect();
let server = Server::try_new_from_config(
cmd_worker_to_main,
scm_worker_to_main,
config,
state,
initial_state,
false,
)
.expect("could not create sozu worker");
Expand Down Expand Up @@ -139,7 +147,14 @@ impl Worker {
.expect("could not send listeners");

let thread_config = config.to_owned();
let thread_state = state.to_owned();
let initial_state = state
.generate_requests()
.iter()
.map(|req| WorkerRequest {
id: "initial_state".to_string(),
content: req.clone(),
})
.collect();
let thread_name = name.to_owned();
let thread_scm_worker_to_main = scm_worker_to_main.to_owned();

Expand All @@ -157,7 +172,7 @@ impl Worker {
cmd_worker_to_main,
thread_scm_worker_to_main,
thread_config,
thread_state,
initial_state,
false,
)
.expect("could not create sozu worker");
Expand Down
20 changes: 7 additions & 13 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Server {
worker_to_main_channel: ProxyChannel,
worker_to_main_scm: ScmSocket,
config: Config,
config_state: ConfigState,
initial_state: Vec<WorkerRequest>,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
let event_loop = Poll::new().with_context(|| "could not create event loop")?;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl Server {
Some(https),
None,
server_config,
Some(config_state),
Some(initial_state),
expects_initial_status,
)
}
Expand All @@ -348,7 +348,7 @@ impl Server {
https: Option<https::HttpsProxy>,
tcp: Option<tcp::TcpProxy>,
server_config: ServerConfig,
config_state: Option<ConfigState>,
initial_state: Option<Vec<WorkerRequest>>,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
FEATURES.with(|_features| {
Expand Down Expand Up @@ -438,16 +438,10 @@ impl Server {
};

// initialize the worker with the state we got from a file
if let Some(state) = config_state {
for (counter, request) in state.generate_requests().iter().enumerate() {
let id = format!("INIT-{counter}");
let worker_request = WorkerRequest {
id,
content: request.to_owned(),
};

trace!("generating initial config request: {:#?}", worker_request);
server.notify_proxys(worker_request);
if let Some(requests) = initial_state {
for request in requests {
trace!("generating initial config request: {:#?}", request);
server.notify_proxys(request);
}

// do not send back answers to the initialization messages
Expand Down

0 comments on commit 4006aeb

Please sign in to comment.