Skip to content

Commit

Permalink
Front to Back:
Browse files Browse the repository at this point in the history
- Add routing capabilities to MuxSession
- Implement Data frame handling
- Fix handle_headers
- Don't store pseudo headers key in kawa
- Rename front and back Kawa fields to rbuffer, wbuffer

A MuxSession can now process an incoming H2 request, connect to the
corresponding backend, translate it in H1, send it and receive the H1
response. We still lack the capability to connect to H2 backends and
forward responses.

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Aug 23, 2023
1 parent 662a70d commit 8e3f964
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 92 deletions.
7 changes: 4 additions & 3 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,11 @@ impl HttpsSession {
let mux = Mux {
frontend_token: self.frontend_token,
frontend,
backends: HashMap::new(),
context: mux::Context::new(self.pool.clone(), handshake.request_id, 1 << 16).ok()?,
listener: self.listener.clone(),
router: mux::Router {
listener: self.listener.clone(),
backends: HashMap::new(),
},
public_address: self.public_address,
peer_address: self.peer_address,
sticky_name: self.sticky_name.clone(),
Expand Down Expand Up @@ -518,7 +520,6 @@ impl L7ListenerHandler for HttpsListener {
let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
Ok(tuple) => tuple,
Err(parse_error) => {
// parse_error contains a slice of given_host, which should NOT escape this scope
return Err(FrontendFromRequestError::HostParse {
host: host.to_owned(),
error: parse_error.to_string(),
Expand Down
14 changes: 13 additions & 1 deletion lib/src/protocol/kawa_h1/editor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rusty_ulid::Ulid;
use crate::{
pool::Checkout,
protocol::http::{parser::compare_no_case, GenericHttpStream, Method},
Protocol,
Protocol, RetrieveClusterError,
};

/// This is the container used to store and use information about the session from within a Kawa parser callback
Expand Down Expand Up @@ -300,4 +300,16 @@ impl HttpContext {
val: kawa::Store::from_string(self.id.to_string()),
}));
}

// -> host, path, method
pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
let given_method = self.method.as_ref().ok_or(RetrieveClusterError::NoMethod)?;
let given_authority = self
.authority
.as_deref()
.ok_or(RetrieveClusterError::NoHost)?;
let given_path = self.path.as_deref().ok_or(RetrieveClusterError::NoPath)?;

Ok((given_authority, given_path, given_method))
}
}
23 changes: 1 addition & 22 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,32 +1028,11 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
true
}

// -> host, path, method
pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
let given_method = self
.context
.method
.as_ref()
.ok_or(RetrieveClusterError::NoMethod)?;
let given_authority = self
.context
.authority
.as_deref()
.ok_or(RetrieveClusterError::NoHost)?;
let given_path = self
.context
.path
.as_deref()
.ok_or(RetrieveClusterError::NoPath)?;

Ok((given_authority, given_path, given_method))
}

fn cluster_id_from_request(
&mut self,
proxy: Rc<RefCell<dyn L7Proxy>>,
) -> Result<String, RetrieveClusterError> {
let (host, uri, method) = match self.extract_route() {
let (host, uri, method) = match self.context.extract_route() {
Ok(tuple) => tuple,
Err(cluster_error) => {
self.set_answer(DefaultAnswerStatus::Answer400, None);
Expand Down
17 changes: 9 additions & 8 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
pub fn readable(&mut self, context: &mut Context) -> MuxResult {
println!("======= MUX H1 READABLE");
let stream = &mut context.streams.get(self.stream);
let kawa = match self.position {
Position::Client => &mut stream.front,
Position::Server => &mut stream.back,
};
let kawa = stream.rbuffer(self.position);
let (size, status) = self.socket.socket_read(kawa.storage.space());
println!(" size: {size}, status: {status:?}");
if size > 0 {
Expand All @@ -37,6 +34,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
}
kawa::h1::parse(kawa, &mut kawa::h1::NoCallbacks);
kawa::debug_kawa(kawa);
if kawa.is_error() {
return MuxResult::Close(self.stream);
}
if kawa.is_terminated() {
self.readiness.interest.remove(Ready::READABLE);
}
Expand All @@ -45,11 +45,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
pub fn writable(&mut self, context: &mut Context) -> MuxResult {
println!("======= MUX H1 WRITABLE");
let stream = &mut context.streams.get(self.stream);
let kawa = match self.position {
Position::Client => &mut stream.back,
Position::Server => &mut stream.front,
};
let kawa = stream.wbuffer(self.position);
kawa.prepare(&mut kawa::h1::BlockConverter);
kawa::debug_kawa(kawa);
let bufs = kawa.as_io_slice();
if bufs.is_empty() {
self.readiness.interest.remove(Ready::WRITABLE);
Expand All @@ -63,6 +61,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
} else {
self.readiness.event.remove(Ready::WRITABLE);
}
if kawa.is_terminated() && kawa.is_completed() {
self.readiness.interest.insert(Ready::READABLE);
}
MuxResult::Continue
}
}
70 changes: 51 additions & 19 deletions lib/src/protocol/mux/h2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, str::from_utf8_unchecked};

use kawa::h1::ParserCallbacks;

Check warning on line 3 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Build documentation

unused import: `kawa::h1::ParserCallbacks`

Check warning on line 3 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused import: `kawa::h1::ParserCallbacks`

Check warning on line 3 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused import: `kawa::h1::ParserCallbacks`

Check warning on line 3 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `kawa::h1::ParserCallbacks`

Check warning on line 3 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `kawa::h1::ParserCallbacks`

Check warning on line 3 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused import: `kawa::h1::ParserCallbacks`

Check warning on line 3 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused import: `kawa::h1::ParserCallbacks`
use rusty_ulid::Ulid;
Expand Down Expand Up @@ -60,23 +60,25 @@ pub struct ConnectionH2<Front: SocketHandler> {
impl<Front: SocketHandler> ConnectionH2<Front> {
pub fn readable(&mut self, context: &mut Context) -> MuxResult {
println!("======= MUX H2 READABLE");
let kawa = if let Some((stream_id, amount)) = self.expect {
let kawa = context.streams.get(stream_id).front(self.position);
let (size, status) = self.socket.socket_read(&mut kawa.storage.space()[..amount]);
println!("{:?}({stream_id}, {amount}) {size} {status:?}", self.state);
if size > 0 {
kawa.storage.fill(size);
if size == amount {
self.expect = None;
let (stream_id, kawa) = if let Some((stream_id, amount)) = self.expect {
let kawa = context.streams.get(stream_id).rbuffer(self.position);
if amount > 0 {
let (size, status) = self.socket.socket_read(&mut kawa.storage.space()[..amount]);
println!("{:?}({stream_id}, {amount}) {size} {status:?}", self.state);
if size > 0 {
kawa.storage.fill(size);
if size == amount {
self.expect = None;
} else {
self.expect = Some((stream_id, amount - size));
return MuxResult::Continue;
}
} else {
self.expect = Some((stream_id, amount - size));
self.readiness.event.remove(Ready::READABLE);
return MuxResult::Continue;
}
} else {
self.readiness.event.remove(Ready::READABLE);
return MuxResult::Continue;
}
kawa
(stream_id, kawa)
} else {
self.readiness.event.remove(Ready::READABLE);
return MuxResult::Continue;
Expand Down Expand Up @@ -186,7 +188,9 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
Ok((_, frame)) => frame,
Err(e) => panic!("{e:?}"),
};
kawa.storage.clear();
if stream_id == 0 {
kawa.storage.clear();
}
let state_result = self.handle(frame, context);
self.state = H2State::Header;
self.expect = Some((0, 9));
Expand Down Expand Up @@ -235,19 +239,47 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
fn handle(&mut self, frame: Frame, context: &mut Context) -> MuxResult {
println!("{frame:?}");
match frame {
Frame::Data(_) => todo!(),
Frame::Data(data) => {
let mut slice = data.payload;
let global_stream_id = *self.streams.get(&data.stream_id).unwrap();
let stream = &mut context.streams.others[global_stream_id - 1];
let kawa = stream.rbuffer(self.position);
slice.start += kawa.storage.head as u32;
kawa.storage.head += slice.len();
let buffer = kawa.storage.buffer();
let payload = slice.data(buffer);
println!("{:?}", unsafe { from_utf8_unchecked(payload) });
kawa.push_block(kawa::Block::Chunk(kawa::Chunk {
data: kawa::Store::Slice(slice),
}));
if data.end_stream {
kawa.push_block(kawa::Block::Flags(kawa::Flags {
end_body: true,
end_chunk: false,
end_header: false,
end_stream: true,
}));
kawa.parsing_phase = kawa::ParsingPhase::Terminated;
}
}
Frame::Headers(headers) => {
if !headers.end_headers {
todo!();
// self.state = H2State::Continuation
}
let global_stream_id = *self.streams.get(&headers.stream_id).unwrap();
let kawa = context.streams.zero.front(self.position);
let kawa = context.streams.zero.rbuffer(self.position);
let buffer = headers.header_block_fragment.data(kawa.storage.buffer());
let stream = &mut context.streams.others[global_stream_id - 1];
let kawa = &mut stream.front;
pkawa::handle_header(kawa, buffer, &mut context.decoder);
stream.context.on_headers(kawa);
pkawa::handle_header(
kawa,
buffer,
headers.end_stream,
&mut context.decoder,
&mut stream.context,
);
kawa::debug_kawa(kawa);
return MuxResult::Connect(global_stream_id);
}
Frame::PushPromise(push_promise) => match self.position {

Check warning on line 285 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Build documentation

unused variable: `push_promise`

Check warning on line 285 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused variable: `push_promise`

Check warning on line 285 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused variable: `push_promise`

Check warning on line 285 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused variable: `push_promise`

Check warning on line 285 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused variable: `push_promise`

Check warning on line 285 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused variable: `push_promise`

Check warning on line 285 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused variable: `push_promise`
Expand Down
Loading

0 comments on commit 8e3f964

Please sign in to comment.