Skip to content

Commit

Permalink
Merge pull request #139 from hapsoc/goaway-connreset
Browse files Browse the repository at this point in the history
Graceful GOAWAY handling
  • Loading branch information
fasterthanlime authored Jan 25, 2024
2 parents 93fa1bf + 035a345 commit 513ee1d
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 35 deletions.
62 changes: 52 additions & 10 deletions crates/fluke/src/h2/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ pub(crate) struct H2ReadContext<D: ServerDriver + 'static> {
hpack_dec: fluke_hpack::Decoder<'static>,
// TODO: kill, cf. https://github.com/hapsoc/fluke/issues/121
continuation_state: ContinuationState,

/// Whether we've sent a GOAWAY frame.
pub goaway_sent: bool,

/// Whether we've received a GOAWAY frame.
pub goaway_recv: bool,
}

impl<D: ServerDriver + 'static> H2ReadContext<D> {
Expand All @@ -55,6 +61,8 @@ impl<D: ServerDriver + 'static> H2ReadContext<D> {
state,
hpack_dec,
continuation_state: ContinuationState::Idle,
goaway_sent: false,
goaway_recv: false,
}
}

Expand All @@ -65,15 +73,38 @@ impl<D: ServerDriver + 'static> H2ReadContext<D> {
) -> eyre::Result<()> {
loop {
let frame;
(client_buf, frame) =
match read_and_parse(Frame::parse, &mut transport_r, client_buf, 128).await? {
Some((client_buf, frame)) => (client_buf, frame),
None => {
// TODO: this can be fine, if we've sent a GO_AWAY
debug!("h2 client closed connection");
return Err(eyre::Report::from(ConnectionClosed));
let frame_res = read_and_parse(Frame::parse, &mut transport_r, client_buf, 128).await;
let maybe_frame = match frame_res {
Ok(inner) => inner,
Err(e) => {
// if this is a connection reset and we've sent a goaway, ignore it
if self.goaway_sent {
if let Some(io_error) = e.root_cause().downcast_ref::<std::io::Error>() {
if io_error.kind() == std::io::ErrorKind::ConnectionReset {
debug!("ignoring connection reset after goaway");
return Ok(());
}
}
}
};
return Err(e.wrap_err("h2 read"));
}
};
(client_buf, frame) = match maybe_frame {
Some((client_buf, frame)) => (client_buf, frame),
None => {
if self.goaway_sent {
debug!("peer connection reset after we sent a GOAWAY");
return Ok(());
}
if self.goaway_recv {
debug!("peer connection reset after we received a GOAWAY");
return Ok(());
}

debug!("peer connection reset, without a GOAWAY");
return Err(eyre::Report::from(ConnectionClosed));
}
};

debug!(?frame, "Received");

Expand Down Expand Up @@ -399,7 +430,12 @@ impl<D: ServerDriver + 'static> H2ReadContext<D> {
return Err(eyre::eyre!("could not send H2 ping event").into());
}
}
FrameType::GoAway => todo!(),
FrameType::GoAway => {
self.goaway_recv = true;

// TODO: this should probably have other effects than setting
// this flag.
}
FrameType::WindowUpdate => match frame.stream_id.0 {
0 => {
debug!("TODO: ignoring connection-wide window update");
Expand Down Expand Up @@ -485,11 +521,17 @@ impl<D: ServerDriver + 'static> H2ReadContext<D> {
}
}

async fn send_goaway(&self, err: H2ConnectionError) {
async fn send_goaway(&mut self, err: H2ConnectionError) {
// TODO: this should change the global server state: we should ignore
// any streams higher than the last stream id we've seen after
// we've done that.

if self.goaway_sent {
debug!("goaway already sent, ignoring");
return;
}

self.goaway_sent = true;
if self
.ev_tx
.send(H2ConnEvent::GoAway {
Expand Down
98 changes: 75 additions & 23 deletions crates/fluke/src/h2/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::rc::Rc;

use futures_util::TryFutureExt;
use tracing::debug;

use crate::{
h2::{
parse::{self, Frame, FrameType, StreamId},
read::H2ReadContext,
types::{ConnState, ConnectionClosed, H2ConnEvent},
types::{ConnState, H2ConnEvent},
},
util::read_and_parse,
ServerDriver,
Expand Down Expand Up @@ -72,32 +71,85 @@ pub async fn serve(
let (ev_tx, ev_rx) = tokio::sync::mpsc::channel::<H2ConnEvent>(32);

let mut h2_read_cx = H2ReadContext::new(driver.clone(), ev_tx.clone(), state);
let read_task = h2_read_cx.read_loop(client_buf, transport_r);

let write_task = super::write::h2_write_loop(ev_rx, transport_w, out_scratch);

let res = tokio::try_join!(
read_task.map_err(LoopError::Read),
write_task.map_err(LoopError::Write),
);
if let Err(e) = &res {
if let LoopError::Read(r) = e {
if r.downcast_ref::<ConnectionClosed>().is_some() {
return Ok(());

enum Outcome {
PeerGone,
Ok,
}

let mut outcome = Outcome::Ok;

{
let mut read_task = std::pin::pin!(h2_read_cx.read_loop(client_buf, transport_r));
let mut write_task =
std::pin::pin!(super::write::h2_write_loop(ev_rx, transport_w, out_scratch));

tokio::select! {
res = &mut read_task => {
match res {
Err(e) => {
return Err(e.wrap_err("h2 read (finished first)"));
}
Ok(()) => {
debug!("read task finished, waiting on write task now");
let res = write_task.await;
match res {
Err(e) => {
if is_peer_gone(&e) {
outcome = Outcome::PeerGone;
} else {
return Err(e.wrap_err("h2 write (finished second)"));
}
}
Ok(()) => {
debug!("write task finished okay");
}
}
}
}
},
res = write_task.as_mut() => {
match res {
Err(e) => {
if is_peer_gone(&e) {
outcome = Outcome::PeerGone;
} else {
return Err(e.wrap_err("h2 write (finished first)"));
}
}
Ok(()) => {
debug!("write task finished, giving up read task");
}
}
},
};
};

match outcome {
Outcome::PeerGone => {
if h2_read_cx.goaway_sent {
debug!("write task failed with broken pipe, but we already sent a goaway, so we're good");
} else {
return Err(eyre::eyre!(
"peer closed connection without sending a goaway"
));
}
}
debug!("caught error from one of the tasks: {e} / {e:#?}");
Outcome::Ok => {
// all goodt hen!
}
}
res?;

Ok(())
}

#[derive(thiserror::Error, Debug)]
enum LoopError {
#[error("read error: {0}")]
Read(eyre::Report),

#[error("write error: {0}")]
Write(eyre::Report),
fn is_peer_gone(e: &eyre::Report) -> bool {
if let Some(io_error) = e.root_cause().downcast_ref::<std::io::Error>() {
matches!(
io_error.kind(),
std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::ConnectionReset
)
} else {
false
}
}
2 changes: 1 addition & 1 deletion crates/fluke/src/h2/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,5 @@ impl fmt::Debug for H2EventPayload {
}

#[derive(thiserror::Error, Debug)]
#[error("connection closed")]
#[error("the peer closed the connection unexpectedly")]
pub(crate) struct ConnectionClosed;
2 changes: 1 addition & 1 deletion test-crates/fluke-h2spec/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub(crate) async fn run_server(ln: TcpListener) -> color_eyre::Result<()> {

tokio::task::spawn_local(async move {
if let Err(e) = fluke::h2::serve(stream.into_halves(), conf, client_buf, driver).await {
tracing::error!("error serving client {}: {}", addr, e);
tracing::error!("error serving client {addr}: {e}, {e:?}");
}
});
}
Expand Down

0 comments on commit 513ee1d

Please sign in to comment.