Skip to content

Commit

Permalink
Error handling and connection retry:
Browse files Browse the repository at this point in the history
- Added StreamState to Stream to merge active and token field as well as
  adding a "Link" variant to represent a Stream asking for connection
- Comment and implement start_stream and end_stream. In particular
  end_stream on a Server handles the error and reconnection logic
- Add set_default_answer which produces a default Kawa answer
- Add forcefully_terminate_answer which properly terminates H1 streams
  and sends RstStream on H2 streams
- Add connection loop with max retries and error handling in ready

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Sep 9, 2023
1 parent e15cffd commit fe61422
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 197 deletions.
40 changes: 23 additions & 17 deletions lib/src/protocol/mux/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use kawa::{AsBuffer, Block, BlockConverter, Chunk, Flags, Kawa, Pair, StatusLine
use crate::protocol::http::parser::compare_no_case;

use super::{
parser::{FrameHeader, FrameType},
serializer::gen_frame_header,
StreamId,
parser::{FrameHeader, FrameType, H2Error},
serializer::{gen_frame_header, gen_rst_stream},
StreamId, StreamState,
};

pub struct H2BlockConverter<'a> {
pub stream_id: StreamId,
pub state: StreamState,
pub encoder: &'a mut hpack::Encoder<'static>,
pub out: Vec<u8>,
}
Expand Down Expand Up @@ -140,20 +141,25 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
.unwrap();
kawa.push_out(Store::new_vec(&header));
kawa.push_out(Store::Alloc(payload.into_boxed_slice(), 0));
}
if end_stream {
let mut header = [0; 9];
gen_frame_header(
&mut header,
&FrameHeader {
payload_len: 0,
frame_type: FrameType::Data,
flags: 1,
stream_id: self.stream_id,
},
)
.unwrap();
kawa.push_out(Store::new_vec(&header));
} else if end_stream {
if kawa.is_error() {
let mut frame = [0; 13];
gen_rst_stream(&mut frame, self.stream_id, H2Error::InternalError).unwrap();
kawa.push_out(Store::new_vec(&frame));
} else {
let mut header = [0; 9];
gen_frame_header(
&mut header,
&FrameHeader {
payload_len: 0,
frame_type: FrameType::Data,
flags: 1,
stream_id: self.stream_id,
},
)
.unwrap();
kawa.push_out(Store::new_vec(&header));
}
}
if end_header || end_stream {
kawa.push_delimiter()
Expand Down
85 changes: 62 additions & 23 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use sozu_command::ready::Ready;
use crate::{
println_,
protocol::mux::{
debug_kawa, update_readiness_after_read, update_readiness_after_write, BackendStatus,
Context, Endpoint, GlobalStreamId, MuxResult, Position,
debug_kawa, set_default_answer, update_readiness_after_read, update_readiness_after_write,
BackendStatus, Context, Endpoint, GlobalStreamId, MuxResult, Position, StreamState, forcefully_terminate_answer,
},
socket::SocketHandler,
Readiness,
Expand Down Expand Up @@ -49,24 +49,43 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
kawa::h1::parse(kawa, parts.context);
debug_kawa(kawa);
if kawa.is_error() {
return MuxResult::Close(self.stream);
match self.position {
Position::Client(_) => {
let StreamState::Linked(token) = stream.state else { unreachable!() };
let global_stream_id = self.stream;
self.readiness.interest.remove(Ready::ALL);
self.end_stream(global_stream_id, context);
endpoint.end_stream(token, global_stream_id, context);
}
Position::Server => {
set_default_answer(&mut stream.back, &mut self.readiness, 400);
stream.state = StreamState::Unlinked;
}
}
return MuxResult::Continue;
}
if kawa.is_terminated() {
self.readiness.interest.remove(Ready::READABLE);
}
if was_initial && kawa.is_main_phase() {
self.requests += 1;
println_!("REQUESTS: {}", self.requests);
match self.position {
Position::Client(_) => endpoint
.readiness_mut(stream.token.unwrap())
.interest
.insert(Ready::WRITABLE),
Position::Server => return MuxResult::Connect(self.stream),
Position::Client(_) => {
let StreamState::Linked(token) = stream.state else { unreachable!() };
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
Position::Server => {
self.requests += 1;
println_!("REQUESTS: {}", self.requests);
stream.state = StreamState::Link
}
};
}
MuxResult::Continue
}

pub fn writable<E>(&mut self, context: &mut Context, mut endpoint: E) -> MuxResult
where
E: Endpoint,
Expand Down Expand Up @@ -95,9 +114,10 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
stream.back.clear();
stream.back.storage.clear();
stream.front.clear();
// do not clear stream.front because of H1 pipelining
let token = stream.token.take().unwrap();
endpoint.end_stream(token, self.stream, context);
// do not clear stream.front.storage because of H1 pipelining
if let StreamState::Linked(token) = stream.state {
endpoint.end_stream(token, self.stream, context);
}
}
}
}
Expand All @@ -114,21 +134,20 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
println_!("close detached client ConnectionH1");
return;
}
Position::Client(BackendStatus::Connecting(_)) => todo!("reconnect"),
Position::Client(BackendStatus::Connected(_)) => {}
Position::Client(BackendStatus::Connecting(_))
| Position::Client(BackendStatus::Connected(_)) => {}
Position::Server => unreachable!(),
}
endpoint.end_stream(
context.streams[self.stream].token.unwrap(),
self.stream,
context,
)
// reconnection is handled by the server
let StreamState::Linked(token) = context.streams[self.stream].state else {unreachable!()};
endpoint.end_stream(token, self.stream, context)
}

pub fn end_stream(&mut self, stream: GlobalStreamId, context: &mut Context) {
assert_eq!(stream, self.stream);
let stream_context = &mut context.streams[stream].context;
println_!("end H1 stream {stream}: {stream_context:#?}");
let stream = &mut context.streams[stream];
let stream_context = &mut stream.context;
println_!("end H1 stream {}: {stream_context:#?}", self.stream);
self.stream = usize::MAX;
let mut owned_position = Position::Server;
std::mem::swap(&mut owned_position, &mut self.position);
Expand All @@ -143,7 +162,27 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
}
Position::Client(BackendStatus::KeepAlive(_))
| Position::Client(BackendStatus::Disconnecting) => unreachable!(),
Position::Server => todo!(),
Position::Server => match (stream.front.consumed, stream.back.is_main_phase()) {

Check failure on line 165 in lib/src/protocol/mux/h1.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

no field `consumed` on type `Kawa<pool::Checkout>`

Check failure on line 165 in lib/src/protocol/mux/h1.rs

View workflow job for this annotation

GitHub Actions / Build documentation

no field `consumed` on type `Kawa<pool::Checkout>`

Check failure on line 165 in lib/src/protocol/mux/h1.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

no field `consumed` on type `Kawa<pool::Checkout>`

Check failure on line 165 in lib/src/protocol/mux/h1.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

no field `consumed` on type `Kawa<pool::Checkout>`

Check failure on line 165 in lib/src/protocol/mux/h1.rs

View workflow job for this annotation

GitHub Actions / Coverage

no field `consumed` on type `Kawa<pool::Checkout>`
(true, true) => {
// we have a "forwardable" answer from the back
// if the answer is not terminated we send an RstStream to properly clean the stream
// if it is terminated, we finish the transfer, the backend is not necessary anymore
if !stream.back.is_terminated() {
forcefully_terminate_answer(&mut stream.back, &mut self.readiness);
}
stream.state = StreamState::Unlinked;
}
(true, false) => {
set_default_answer(&mut stream.back, &mut self.readiness, 502);
stream.state = StreamState::Unlinked;
}
(false, false) => {
// we do not have an answer, but the request is untouched so we can retry
println!("H1 RECONNECT");
stream.state = StreamState::Link;
}
(false, true) => unreachable!(),
},
}
}

Expand Down
140 changes: 83 additions & 57 deletions lib/src/protocol/mux/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
parser::{self, error_code_to_str, Frame, FrameHeader, FrameType, H2Error},
pkawa, serializer, update_readiness_after_read, update_readiness_after_write,
BackendStatus, Context, Endpoint, GenericHttpStream, GlobalStreamId, MuxResult, Position,
StreamId,
StreamId, StreamState, set_default_answer, forcefully_terminate_answer,
},
socket::SocketHandler,
Readiness,
Expand Down Expand Up @@ -359,18 +359,17 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
}
}
self.expect_write = None;
if kawa.is_terminated() && kawa.is_completed() {
if (kawa.is_terminated() || kawa.is_error()) && kawa.is_completed() {
match self.position {
Position::Client(_) => {}
Position::Server => {
// mark stream as reusable
stream.active = false;
println_!("Recycle stream: {global_stream_id}");
endpoint.end_stream(
stream.token.unwrap(),
global_stream_id,
context,
);
let mut state = StreamState::Recycle;
std::mem::swap(&mut stream.state, &mut state);
if let StreamState::Linked(token) = state {
endpoint.end_stream(token, global_stream_id, context);
}
dead_streams.push(stream_id);
}
}
Expand All @@ -379,6 +378,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {

let mut converter = converter::H2BlockConverter {
stream_id: 0,
state: StreamState::Idle,
encoder: &mut self.encoder,
out: Vec::new(),
};
Expand All @@ -389,35 +389,35 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
'outer: for stream_id in priorities {
let global_stream_id = *self.streams.get(stream_id).unwrap();
let stream = &mut context.streams[global_stream_id];
converter.state = stream.state;
let kawa = stream.wbuffer(&self.position);
if kawa.is_main_phase() {
if kawa.is_main_phase() || kawa.is_error() {
converter.stream_id = *stream_id;
kawa.prepare(&mut converter);
debug_kawa(kawa);
while !kawa.out.is_empty() {
let bufs = kawa.as_io_slice();
let (size, status) = self.socket.socket_write_vectored(&bufs);
kawa.consume(size);
if update_readiness_after_write(size, status, &mut self.readiness) {
self.expect_write =
Some(H2StreamId::Other(*stream_id, global_stream_id));
break 'outer;
}
}
while !kawa.out.is_empty() {
let bufs = kawa.as_io_slice();
let (size, status) = self.socket.socket_write_vectored(&bufs);
kawa.consume(size);
if update_readiness_after_write(size, status, &mut self.readiness) {
self.expect_write =
Some(H2StreamId::Other(*stream_id, global_stream_id));
break 'outer;
}
if kawa.is_terminated() && kawa.is_completed() {
match self.position {
Position::Client(_) => {}
Position::Server => {
// mark stream as reusable
stream.active = false;
println_!("Recycle stream: {global_stream_id}");
endpoint.end_stream(
stream.token.unwrap(),
global_stream_id,
context,
);
dead_streams.push(*stream_id);
}
if (kawa.is_terminated() || kawa.is_error()) && kawa.is_completed() {
match self.position {
Position::Client(_) => {}
Position::Server => {
// mark stream as reusable
println_!("Recycle stream: {global_stream_id}");
let mut state = StreamState::Recycle;
std::mem::swap(&mut stream.state, &mut state);
if let StreamState::Linked(token) = state {
endpoint.end_stream(token, global_stream_id, context);
}
dead_streams.push(*stream_id);
}
}
}
Expand Down Expand Up @@ -511,11 +511,14 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
);
debug_kawa(parts.rbuffer);
match self.position {
Position::Client(_) => endpoint
.readiness_mut(stream.token.unwrap())
.interest
.insert(Ready::WRITABLE),
Position::Server => return MuxResult::Connect(global_stream_id),
Position::Client(_) => {
let StreamState::Linked(token) = stream.state else { unreachable!() };
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
Position::Server => stream.state = StreamState::Link,
};
}
Frame::PushPromise(push_promise) => match self.position {
Expand Down Expand Up @@ -609,38 +612,61 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
E: Endpoint,
{
match self.position {
Position::Client(BackendStatus::Connecting(_)) => todo!("reconnect"),
Position::Client(_) => {}
Position::Client(BackendStatus::Connected(_))
| Position::Client(BackendStatus::Connecting(_)) => {}
Position::Client(BackendStatus::Disconnecting)
| Position::Client(BackendStatus::KeepAlive(_)) => unreachable!(),
Position::Server => unreachable!(),
}
// reconnection is handled by the server for each stream separately
for global_stream_id in self.streams.values() {
println_!("end stream: {global_stream_id}");
endpoint.end_stream(
context.streams[*global_stream_id].token.unwrap(),
*global_stream_id,
context,
)
let StreamState::Linked(token) = context.streams[*global_stream_id].state else { unreachable!() };
endpoint.end_stream(token, *global_stream_id, context)
}
}

pub fn end_stream(&mut self, stream: GlobalStreamId, context: &mut Context) {
let stream_context = &mut context.streams[stream].context;
println_!("end H2 stream {stream}: {stream_context:#?}");
let mut found = false;
for (stream_id, global_stream_id) in &self.streams {
if *global_stream_id == stream {
let id = *stream_id;
self.streams.remove(&id);
found = true;
break;
}
}
if !found {
panic!();
}
match self.position {
Position::Client(_) => {}
Position::Server => todo!(),
Position::Client(_) => {
for (stream_id, global_stream_id) in &self.streams {
if *global_stream_id == stream {
let id = *stream_id;
self.streams.remove(&id);
return;
}
}
unreachable!()
}
Position::Server => {
let stream = &mut context.streams[stream];
match (stream.front.consumed, stream.back.is_main_phase()) {

Check failure on line 645 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

no field `consumed` on type `Kawa<pool::Checkout>`

Check failure on line 645 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Build documentation

no field `consumed` on type `Kawa<pool::Checkout>`

Check failure on line 645 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

no field `consumed` on type `Kawa<pool::Checkout>`

Check failure on line 645 in lib/src/protocol/mux/h2.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

no field `consumed` on type `Kawa<pool::Checkout>`
(_, true) => {
// front might not have been consumed (in case of PushPromise)
// we have a "forwardable" answer from the back
// if the answer is not terminated we send an RstStream to properly clean the stream
// if it is terminated, we finish the transfer, the backend is not necessary anymore
if !stream.back.is_terminated() {
forcefully_terminate_answer(&mut stream.back, &mut self.readiness);
}
stream.state = StreamState::Unlinked
}
(true, false) => {
// we do not have an answer, but the request has already been partially consumed
// so we can't retry, send a 502 bad gateway instead
// note: it might be possible to send a RstStream with an adequate error code
set_default_answer(&mut stream.back, &mut self.readiness, 502);
stream.state = StreamState::Unlinked;
}
(false, false) => {
// we do not have an answer, but the request is untouched so we can retry
println!("H2 RECONNECT");
stream.state = StreamState::Link
}
}
}
}
}

Expand Down
Loading

0 comments on commit fe61422

Please sign in to comment.