Skip to content

Commit

Permalink
pass initial state to workers in protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Nov 29, 2023
1 parent 33c6d28 commit 59596d0
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 92 deletions.
14 changes: 8 additions & 6 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ async fn client_loop(
if buffer.is_empty() {
break;
}
println!(
trace!(
"client loop: I just created a buffer from my read buf reader: {:?}",
buffer
);
Expand All @@ -988,9 +988,10 @@ async fn client_loop(
};
let message_len = usize::from_le_bytes(delimiter);

println!(
trace!(
"client loop: reading message of encoded length {}, here is the buffer data: {:?}",
message_len, buffer
message_len,
buffer
);

if buffer.len() >= message_len {
Expand Down Expand Up @@ -1062,7 +1063,7 @@ async fn worker_loop(
// this does essentially what Channel::try_read_delimited_message() does
let mut buf_reader = BufReader::new(read_stream);
loop {
println!(
trace!(
"The buf_reader in the client loop has this length: {}",
buf_reader.buffer().len()
);
Expand All @@ -1087,9 +1088,10 @@ async fn worker_loop(
};
let message_len = usize::from_le_bytes(delimiter);

println!(
trace!(
"worker loop: reading message of encoded length {}, here is the buffer data: {:?}",
message_len, buffer
message_len,
buffer
);

if buffer.len() >= message_len {
Expand Down
4 changes: 3 additions & 1 deletion bin/src/ctl/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ pub fn print_status(worker_infos: WorkerInfos) {
let row = row!(
worker_info.id,
worker_info.pid,
RunState::try_from(worker_info.run_state).unwrap().as_str_name()
RunState::try_from(worker_info.run_state)
.unwrap()
.as_str_name()
);
table.add_row(row);
}
Expand Down
6 changes: 3 additions & 3 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sozu_command_lib::{
WorkerResponse,
},
ready::Ready,
request::read_requests_from_file,
request::read_initial_state_from_file,
scm_socket::{Listeners, ScmSocket},
state::ConfigState,
};
Expand Down Expand Up @@ -226,7 +226,7 @@ pub fn begin_worker_process(

let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };

let initial_state = read_requests_from_file(&mut configuration_state_file)
let initial_state = read_initial_state_from_file(&mut configuration_state_file)
.with_context(|| "could not parse configuration state data")?;

let worker_config = worker_to_main_channel
Expand Down Expand Up @@ -319,7 +319,7 @@ pub fn fork_main_into_worker(
util::disable_close_on_exec(state_file.as_raw_fd())?;

state
.write_requests_to_file(&mut state_file)
.write_initial_state_to_file(&mut state_file)
.with_context(|| "Could not write state to file")?;

state_file
Expand Down
9 changes: 2 additions & 7 deletions command/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,6 @@ impl<Tx: Debug + ProstMessage + Default, Rx: Debug + ProstMessage + Default> Cha
.map_err(|_| ChannelError::MismatchBufferSize)?;
let message_len = usize::from_le_bytes(delimiter);

println!(
"channel: reading message of encoded length {}, here is the buffer data: {:?}",
message_len, buffer
);

if buffer.len() >= message_len {
let message = Rx::decode(&buffer[delimiter_size()..message_len])
.map_err(|decode_error| ChannelError::InvalidProtobufMessage(decode_error))?;
Expand All @@ -345,7 +340,7 @@ impl<Tx: Debug + ProstMessage + Default, Rx: Debug + ProstMessage + Default> Cha
///
/// If the channel is nonblocking, you have to flush using `channel.run()` afterwards
pub fn write_message(&mut self, message: &Tx) -> Result<(), ChannelError> {
println!("Writing message {:?}", message);
trace!("Writing message {:?}", message);
if self.blocking {
self.write_message_blocking(message)
} else {
Expand Down Expand Up @@ -393,7 +388,7 @@ impl<Tx: Debug + ProstMessage + Default, Rx: Debug + ProstMessage + Default> Cha

let delimiter = payload_len.to_le_bytes();

println!(
trace!(
"writing message {:?} of encoded length {}: {:?}",
message,
payload.len(),
Expand Down
5 changes: 5 additions & 0 deletions command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -654,4 +654,9 @@ message ListenersCount {
repeated string tls = 2;
// socket addresses of TCP listeners
repeated string tcp = 3;
}

// state passed to a new worker, able to
message InitialState {
repeated WorkerRequest requests = 1;
}
65 changes: 14 additions & 51 deletions command/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@ use std::{
error,
fmt::{self, Display},
fs::File,
io::Read,
io::{BufReader, Read},
net::SocketAddr,
str::FromStr,
};

use nom::{HexDisplay, Offset};
use prost::Message;

use crate::{
buffer::fixed::Buffer,
parser::parse_several_requests,
proto::command::{
request::RequestType, LoadBalancingAlgorithms, PathRuleKind, Request, RequestHttpFrontend,
RulePosition, WorkerRequest,
request::RequestType, InitialState, LoadBalancingAlgorithms, PathRuleKind, Request,
RequestHttpFrontend, RulePosition, WorkerRequest,
},
response::HttpFrontend,
};
Expand Down Expand Up @@ -126,54 +124,19 @@ impl fmt::Display for WorkerRequest {
}
}

pub fn read_requests_from_file(file: &mut File) -> Result<Vec<WorkerRequest>, RequestError> {
let mut acc = Vec::new();
let mut buffer = Buffer::with_capacity(200000);
loop {
let previous = buffer.available_data();
pub fn read_initial_state_from_file(file: &mut File) -> Result<InitialState, RequestError> {
let mut buf_reader = BufReader::new(file);
let mut buffer = Vec::new();
buf_reader
.read_to_end(&mut buffer)
.map_err(|e| RequestError::FileError(e))?;

let bytes_read = file
.read(buffer.space())
.map_err(|e| RequestError::FileError(e))?;

buffer.fill(bytes_read);

if buffer.available_data() == 0 {
debug!("Empty buffer");
break;
}
let initial_state =
InitialState::decode(&buffer[..]).map_err(|e| RequestError::ParseError(e.to_string()))?;
Ok(initial_state)
}

let mut offset = 0usize;
match parse_several_requests::<WorkerRequest>(buffer.data()) {
Ok((i, requests)) => {
if !i.is_empty() {
debug!("could not parse {} bytes", i.len());
if previous == buffer.available_data() {
break;
}
}
offset = buffer.data().offset(i);

acc.push(requests);
}
Err(nom::Err::Incomplete(_)) => {
if buffer.available_data() == buffer.capacity() {
error!(
"message too big, stopping parsing:\n{}",
buffer.data().to_hex(16)
);
break;
}
}
Err(parse_error) => {
return Err(RequestError::ParseError(parse_error.to_string()));
}
}
buffer.consume(offset);
}
let requests = acc.into_iter().flatten().collect();
Ok(requests)
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProxyDestinations {
Expand Down
34 changes: 31 additions & 3 deletions command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use std::{
net::SocketAddr,
};

use prost::DecodeError;
use prost::{DecodeError, Message};

use crate::{
certificate::{self, calculate_fingerprint, Fingerprint},
proto::{
command::{
request::RequestType, ActivateListener, AddBackend, AddCertificate, CertificateAndKey,
Cluster, ClusterInformation, DeactivateListener, FrontendFilters, HttpListenerConfig,
HttpsListenerConfig, ListedFrontends, ListenerType, ListenersList, PathRule,
QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener,
HttpsListenerConfig, InitialState, ListedFrontends, ListenerType, ListenersList,
PathRule, QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener,
ReplaceCertificate, Request, RequestCounts, RequestHttpFrontend, RequestTcpFrontend,
TcpListenerConfig, WorkerRequest,
},
Expand Down Expand Up @@ -1372,6 +1372,34 @@ impl ConfigState {
}
}

pub fn produce_initial_state(&self) -> InitialState {
let mut counter = 0usize;
let mut worker_requests = Vec::new();
for request in self.generate_requests() {
worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
counter += 1;
}
InitialState {
requests: worker_requests,
}
}

/// generate requests necessary to recreate the state,
/// in protobuf, to a temp file
pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
let initial_state = self.produce_initial_state();
let count = initial_state.requests.len();

let bytes_to_write = initial_state.encode_to_vec();
println!("writing {} in the temp file", bytes_to_write.len());
file.write_all(&bytes_to_write)
.map_err(StateError::FileError)?;

file.sync_all().map_err(StateError::FileError)?;

Ok(count)
}

/// generate requests necessary to recreate the state,
/// write them in a JSON form in a file, separated by \n\0,
/// returns the number of written requests
Expand Down
18 changes: 2 additions & 16 deletions e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,7 @@ impl Worker {
.send_listeners(&listeners)
.expect("could not send listeners");

let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let initial_state = state.produce_initial_state();
let server = Server::try_new_from_config(
cmd_worker_to_main,
scm_worker_to_main,
Expand Down Expand Up @@ -147,14 +140,7 @@ impl Worker {
.expect("could not send listeners");

let thread_config = config.to_owned();
let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let initial_state = state.produce_initial_state();
let thread_name = name.to_owned();
let thread_scm_worker_to_main = scm_worker_to_main.to_owned();

Expand Down
10 changes: 5 additions & 5 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use sozu_command::{
CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations,
DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, ListenerType,
LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend, ResponseStatus,
ServerConfig, TcpListenerConfig as CommandTcpListener, WorkerRequest, WorkerResponse,
ServerConfig, TcpListenerConfig as CommandTcpListener, WorkerRequest, WorkerResponse, InitialState,
},
ready::Ready,
scm_socket::{Listeners, ScmSocket},
Expand Down Expand Up @@ -227,7 +227,7 @@ impl Server {
worker_to_main_channel: ProxyChannel,
worker_to_main_scm: ScmSocket,
config: ServerConfig,
initial_state: Vec<WorkerRequest>,
initial_state: InitialState,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
// let server_config = ServerConfig::from_config(&config);
Expand Down Expand Up @@ -306,7 +306,7 @@ impl Server {
https: Option<https::HttpsProxy>,
tcp: Option<tcp::TcpProxy>,
server_config: ServerConfig,
initial_state: Option<Vec<WorkerRequest>>,
initial_state: Option<InitialState>,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
FEATURES.with(|_features| {
Expand Down Expand Up @@ -396,8 +396,8 @@ impl Server {
};

// initialize the worker with the state we got from a file
if let Some(requests) = initial_state {
for request in requests {
if let Some(state) = initial_state {
for request in state.requests {
trace!("generating initial config request: {:#?}", request);
server.notify_proxys(request);
}
Expand Down

0 comments on commit 59596d0

Please sign in to comment.