Skip to content

Commit

Permalink
Merge pull request #194 from bearcove/cli
Browse files Browse the repository at this point in the history
Introduce httpwg-cli, test against real servers
  • Loading branch information
fasterthanlime authored Aug 10, 2024
2 parents b7bf746 + bf75c24 commit 73b5116
Show file tree
Hide file tree
Showing 17 changed files with 1,037 additions and 331 deletions.
32 changes: 28 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ cov:
build-testbed:
cargo build --release -p fluke-hyper-testbed

t *args:
just test {{args}}

# Run all tests with cargo nextest
test *args:
#!/bin/bash
Expand Down
74 changes: 59 additions & 15 deletions crates/fluke-buffet/src/roll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl Debug for StorageMut {
}

impl StorageMut {
#[inline(always)]
fn off(&self) -> u32 {
match self {
StorageMut::Buf(b) => b.off as _,
StorageMut::Box(b) => b.off,
}
}

#[inline(always)]
fn cap(&self) -> usize {
match self {
Expand Down Expand Up @@ -132,6 +140,9 @@ impl RollMut {
pub fn grow(&mut self) {
let old_cap = self.storage.cap();
let new_cap = old_cap * 2;

tracing::trace!("growing buffer from {} to {}", old_cap, new_cap);

// TODO: optimize via `MaybeUninit`?
let b = vec![0; new_cap].into_boxed_slice();
let mut bs = BoxStorage {
Expand All @@ -148,16 +159,22 @@ impl RollMut {
/// Reallocates the backing storage for this buffer, copying the filled
/// portion into it. Panics if `len() == storage_size()`, in which case
/// reallocating won't do much good
pub fn realloc(&mut self) -> Result<()> {
///
/// Also panics if we're using buf storage and the offset is zero
/// (which means reallocating would not free up any space)
pub fn compact(&mut self) -> Result<()> {
assert!(self.len() != self.storage_size());
tracing::trace!("compacting");

let next_storage = match &self.storage {
StorageMut::Buf(_) => {
StorageMut::Buf(bm) => {
assert!(bm.off != 0);
let mut next_b = BufMut::alloc()?;
next_b[..self.len()].copy_from_slice(&self[..]);
StorageMut::Buf(next_b)
}
StorageMut::Box(b) => {
tracing::trace!("reallocating, storage is box");
if self.len() > BUF_SIZE as usize {
// TODO: optimize via `MaybeUninit`?
let mut next_b = vec![0; b.cap()].into_boxed_slice();
Expand Down Expand Up @@ -186,13 +203,14 @@ impl RollMut {
/// to `realloc`.
pub fn reserve(&mut self) -> Result<()> {
if self.len() < self.cap() {
tracing::trace!("reserve: len < cap, no need to reserve anything");
return Ok(());
}

if self.len() < self.storage_size() {
// we don't need to go up a buffer size
if self.storage.off() > 0 {
// let's try to compact first
trace!(len = %self.len(), cap = %self.cap(), storage_size = %self.storage_size(), "in reserve: reallocating");
self.realloc()?
self.compact()?
} else {
trace!(len = %self.len(), cap = %self.cap(), storage_size = %self.storage_size(), "in reserve: growing");
self.grow()
Expand All @@ -203,15 +221,28 @@ impl RollMut {

/// Make sure we can hold "request_len"
pub fn reserve_at_least(&mut self, requested_len: usize) -> Result<()> {
while self.cap() < requested_len {
if self.cap() < self.storage_size() {
// we don't need to go up a buffer size
self.realloc()?
} else {
self.grow()
}
if requested_len <= self.cap() {
tracing::trace!(%requested_len, cap = %self.cap(), "reserve_at_least: requested_len <= cap, no need to compact");
return Ok(());
}

if self.storage.off() > 0 && requested_len <= (BUF_SIZE as usize - self.len()) {
// we can compact the filled portion!
self.compact()?;
} else {
// we need to allocate box storage of the right size
let new_storage_size =
std::cmp::max(self.storage_size() * 2, requested_len + self.len());
let mut new_b = vec![0u8; new_storage_size].into_boxed_slice();
// copy the filled portion
new_b[..self.len()].copy_from_slice(&self[..]);
self.storage = StorageMut::Box(BoxStorage {
buf: Rc::new(UnsafeCell::new(new_b)),
off: 0,
});
}

assert!(self.cap() >= requested_len);
Ok(())
}

Expand Down Expand Up @@ -271,7 +302,8 @@ impl RollMut {
(res, read_into.buf)
}

/// Put a slice into this buffer, fails if the slice doesn't fit in the buffer's capacity
/// Put a slice into this buffer, fails if the slice doesn't fit in the
/// buffer's capacity
pub fn put(&mut self, s: impl AsRef<[u8]>) -> Result<()> {
let s = s.as_ref();

Expand Down Expand Up @@ -981,7 +1013,7 @@ mod tests {
rm.take_all();
assert_eq!(rm.cap(), init_cap - 5);

rm.realloc().unwrap();
rm.compact().unwrap();
assert_eq!(rm.cap(), BUF_SIZE as usize);
}

Expand All @@ -1000,7 +1032,7 @@ mod tests {

let put = "x".repeat(rm.cap() * 2 / 3);
rm.put(&put).unwrap();
rm.realloc().unwrap();
rm.compact().unwrap();

assert_eq!(rm.storage_size(), BUF_SIZE as usize * 2);
assert_eq!(rm.len(), put.len());
Expand Down Expand Up @@ -1336,4 +1368,16 @@ mod tests {
let roll = rm.take_all();
assert_eq!(std::str::from_utf8(&roll).unwrap(), "hello");
}

#[test]
fn test_reallocate_big() {
let mut rm = RollMut::alloc().unwrap();
rm.put(b"baba yaga").unwrap();
let filled = rm.filled();
let (_frame, rest) = filled.split_at(4);
rm.keep(rest);

rm.reserve_at_least(5263945).unwrap();
assert!(rm.cap() >= 5263945);
}
}
8 changes: 4 additions & 4 deletions crates/fluke-h2-parse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl fmt::Debug for Frame {
FrameType::WindowUpdate => "WindowUpdate",
FrameType::Continuation(_) => "Continuation",
FrameType::Unknown(EncodedFrameType { ty, flags }) => {
return write!(f, "UnknownFrame({:#x}, {:#x})", ty, flags)
return write!(f, "UnknownFrame({:#x}, {:#x}, len={})", ty, flags, self.len)
}
};
let mut s = f.debug_struct(name);
Expand Down Expand Up @@ -376,7 +376,7 @@ impl Frame {
}

/// Returns true if this frame is an ack
pub fn is_ack(self) -> bool {
pub fn is_ack(&self) -> bool {
match self.frame_type {
FrameType::Settings(flags) => flags.contains(SettingsFlags::Ack),
FrameType::Ping(flags) => flags.contains(PingFlags::Ack),
Expand All @@ -385,7 +385,7 @@ impl Frame {
}

/// Returns true if this frame has `EndHeaders` set
pub fn is_end_headers(self) -> bool {
pub fn is_end_headers(&self) -> bool {
match self.frame_type {
FrameType::Headers(flags) => flags.contains(HeadersFlags::EndHeaders),
FrameType::Continuation(flags) => flags.contains(ContinuationFlags::EndHeaders),
Expand All @@ -394,7 +394,7 @@ impl Frame {
}

/// Returns true if this frame has `EndStream` set
pub fn is_end_stream(self) -> bool {
pub fn is_end_stream(&self) -> bool {
match self.frame_type {
FrameType::Data(flags) => flags.contains(DataFlags::EndStream),
FrameType::Headers(flags) => flags.contains(HeadersFlags::EndStream),
Expand Down
11 changes: 11 additions & 0 deletions crates/fluke-httpwg-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "fluke-httpwg-server"
version = "0.1.0"
edition = "2021"

[dependencies]
color-eyre = "0.6.3"
fluke = { version = "0.1.1", path = "../fluke" }
fluke-buffet = { version = "0.2.0", path = "../fluke-buffet" }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
128 changes: 128 additions & 0 deletions crates/fluke-httpwg-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::rc::Rc;

use color_eyre::eyre;
use fluke::{
http::{self, StatusCode},
Body, BodyChunk, Encoder, ExpectResponseHeaders, Responder, Response, ResponseDone,
};
use fluke_buffet::{IntoHalves, RollMut};
use tracing::Level;
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, util::SubscriberInitExt};

fn main() {
setup_tracing_and_error_reporting();

fluke_buffet::start(async move {
let ln = fluke_buffet::net::TcpListener::bind("127.0.0.1:8000".parse().unwrap())
.await
.unwrap();

println!(
"Listening on {:?} for 'http2 prior knowledge' connections (no TLS)",
ln.local_addr().unwrap()
);

loop {
let (stream, addr) = ln.accept().await.unwrap();
tracing::info!(?addr, "Accepted connection");

fluke_buffet::spawn(async move {
let server_conf = Rc::new(fluke::h2::ServerConf {
..Default::default()
});

let client_buf = RollMut::alloc().unwrap();
let driver = Rc::new(TestDriver);
let io = stream.into_halves();
fluke::h2::serve(io, server_conf, client_buf, driver)
.await
.unwrap();
tracing::debug!("http/2 server done");
});
}
});
}

fn setup_tracing_and_error_reporting() {
color_eyre::install().unwrap();

let targets = if let Ok(rust_log) = std::env::var("RUST_LOG") {
rust_log.parse::<Targets>().unwrap()
} else {
Targets::new()
.with_default(Level::INFO)
.with_target("fluke", Level::DEBUG)
.with_target("httpwg", Level::DEBUG)
.with_target("want", Level::INFO)
};

let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(true)
.with_file(false)
.with_line_number(false)
.without_time();

tracing_subscriber::registry()
.with(targets)
.with(fmt_layer)
.init();
}

struct TestDriver;

impl fluke::ServerDriver for TestDriver {
async fn handle<E: Encoder>(
&self,
_req: fluke::Request,
req_body: &mut impl Body,
mut res: Responder<E, ExpectResponseHeaders>,
) -> eyre::Result<Responder<E, ResponseDone>> {
// if the client sent `expect: 100-continue`, we must send a 100 status code
if let Some(h) = _req.headers.get(http::header::EXPECT) {
if &h[..] == b"100-continue" {
res.write_interim_response(Response {
status: StatusCode::CONTINUE,
..Default::default()
})
.await?;
}
}

// then read the full request body
let mut req_body_len = 0;
loop {
let chunk = req_body.next_chunk().await?;
match chunk {
BodyChunk::Done { trailers } => {
// yey
if let Some(trailers) = trailers {
tracing::debug!(trailers_len = %trailers.len(), "received trailers");
}
break;
}
BodyChunk::Chunk(chunk) => {
req_body_len += chunk.len();
}
}
}
tracing::debug!(%req_body_len, "read request body");

tracing::trace!("writing final response");
let mut res = res
.write_final_response(Response {
status: StatusCode::OK,
..Default::default()
})
.await?;

tracing::trace!("writing body chunk");
res.write_chunk("it's less dire to lose, than to lose oneself".into())
.await?;

tracing::trace!("finishing body (with no trailers)");
let res = res.finish_body(None).await?;

tracing::trace!("we're done");
Ok(res)
}
}
Loading

0 comments on commit 73b5116

Please sign in to comment.