Skip to content

Commit

Permalink
well that's super weird
Browse files Browse the repository at this point in the history
  • Loading branch information
fasterthanlime committed Aug 28, 2024
1 parent 486aacf commit fa68aef
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ httpwg-over-tcp *args='':
export TEST_PROTO=h2
export PORT=8001
export RUST_LOG=${RUST_LOG:-info}
./target/release/httpwg --address localhost:8001 "$@" -- ./target/release/httpwg-loona
./target/release/httpwg --frame-timeout 2000 --connect-timeout 2000 --address localhost:8001 "$@" -- ./target/release/httpwg-loona

instruments:
#!/usr/bin/env -S bash -eux
Expand Down
2 changes: 1 addition & 1 deletion crates/buffet/src/bufpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn initialize_allocator() -> Result<()> {

let mem_usage_in_mb: f64 = num_bufs as f64 * (BUF_SIZE as usize) as f64 / 1024.0 / 1024.0;
eprintln!(
"===== Initializing buffer pool with {} buffers, will use {:.2} MiB (override with $BUFFET_NUM_BUFS)",
"==== buffet will use {} buffers, for a constant {:.2} MiB usage (override with $BUFFET_NUM_BUFS)",
num_bufs, mem_usage_in_mb
);
initialize_allocator_with_num_bufs(default_num_bufs as _)
Expand Down
19 changes: 19 additions & 0 deletions crates/buffet/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ pub trait WriteOwned {

for buf in list.pieces.iter().cloned() {
let buf_len = buf.len();

let before_write = std::time::Instant::now();
tracing::trace!("doing write with buf of len {}", buf_len);
let (res, _) = self.write_owned(buf).await;
tracing::trace!(
"doing write with buf of len {}... done in {:?}",
buf_len,
before_write.elapsed()
);

match res {
Ok(0) => {
Expand Down Expand Up @@ -72,8 +80,15 @@ pub trait WriteOwned {
/// Write a list of buffers, re-trying the write if the kernel does a
/// partial write.
async fn writev_all_owned(&mut self, mut list: PieceList) -> std::io::Result<()> {
tracing::trace!("writev_all_owned starts...");
let start = std::time::Instant::now();

while !list.is_empty() {
tracing::trace!("doing writev_owned with {} pieces", list.len());
let before_writev = std::time::Instant::now();
let n = self.writev_owned(&list).await?;
tracing::trace!("writev_owned took {:?}", before_writev.elapsed());

if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
Expand Down Expand Up @@ -101,6 +116,10 @@ pub trait WriteOwned {
}
}
}
tracing::trace!(
"writev_all_owned starts... and succeeds! took {:?}",
start.elapsed()
);

Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions crates/buffet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ pub fn start<F: Future>(task: F) -> F::Output {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.on_thread_park(move || {
let start = std::time::Instant::now();
tracing::trace!("thread park, submitting...");
u.submit().unwrap();
tracing::trace!(
"thread park, submitting... done! (took {:?})",
start.elapsed()
);
})
.build()
.unwrap();
Expand Down
4 changes: 4 additions & 0 deletions crates/buffet/src/net/net_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,22 @@ pub struct TcpWriteHalf(Rc<TcpStream>);

impl WriteOwned for TcpWriteHalf {
async fn write_owned(&mut self, buf: impl Into<Piece>) -> BufResult<usize, Piece> {
tracing::trace!("making new sqe");
let buf = buf.into();
let sqe = Write::new(
io_uring::types::Fd(self.0.fd),
buf.as_ref().as_ptr(),
buf.len().try_into().expect("usize -> u32"),
)
.build();
tracing::trace!("pushing sqe");
let cqe = get_ring().push(sqe).await;
tracing::trace!("pushing sqe.. done!");
let ret = match cqe.error_for_errno() {
Ok(ret) => ret,
Err(e) => return (Err(std::io::Error::from(e)), buf),
};
tracing::trace!("pushing sqe.. done! (it even succeeded)");
(Ok(ret as usize), buf)
}

Expand Down
10 changes: 8 additions & 2 deletions crates/httpwg-hyper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ where

fn call(&self, req: Request<B>) -> Self::Future {
Box::pin(async move {
let (parts, body) = req.into_parts();
let (parts, mut req_body) = req.into_parts();

let path = parts.uri.path();
match path {
"/echo-body" => {
let body: BoxBody<E> = Box::pin(body);
let body: BoxBody<E> = Box::pin(req_body);
let res = Response::builder().body(body).unwrap();
Ok(res)
}
Expand All @@ -64,6 +64,12 @@ where
}
_ => {
let parts = path.trim_start_matches('/').split('/').collect::<Vec<_>>();

// read everything from req body
while let Some(_frame) = req_body.frame().await {
// got frame, nice
}

let body: BoxBody<E> =
Box::pin(http_body_util::Empty::new().map_err(|_| unreachable!()));

Expand Down
2 changes: 1 addition & 1 deletion crates/httpwg-loona/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ harness = false
color-eyre = "0.6.3"
loona = { version = "0.3.0", path = "../loona" }
buffet = { version = "0.3.0", path = "../buffet" }
tracing = { version = "0.1.40", features = ["release_max_level_debug"] }
tracing = { version = "0.1.40" }
tracing-subscriber = "0.3.18"
tokio = { version = "1.39.2", features = ["macros", "sync", "process"] }
eyre = { version = "0.6.12", default-features = false }
Expand Down
11 changes: 8 additions & 3 deletions crates/httpwg-loona/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,21 @@ where
.await
.bx()?
}
_ => res
.write_final_response_with_body(
_ => {
while (req_body.next_chunk().await).is_ok() {
// got a chunk, nice
}

res.write_final_response_with_body(
Response {
status: StatusCode::OK,
..Default::default()
},
&mut SinglePieceBody::from("it's less dire to lose, than to lose oneself"),
)
.await
.bx()?,
.bx()?
}
};
Ok(res)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/httpwg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl<IO: IntoHalves> Conn<IO> {
{
Ok(res) => res,
Err(_) => {
debug!("timed out reading frame header");
debug!("timed out reading frame header (re-filling)");
break 'read;
}
};
Expand Down
1 change: 1 addition & 0 deletions crates/loona-h2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl fmt::Display for StreamId {
}

/// See <https://httpwg.org/specs/rfc9113.html#FrameHeader>
#[derive(Clone, Copy)]
pub struct Frame {
pub frame_type: FrameType,
pub reserved: u8,
Expand Down
11 changes: 9 additions & 2 deletions crates/loona/src/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use crate::{
},
},
util::{read_and_parse, ReadAndParseError},
Headers, Method, Request, Responder, ResponderOrBodyError, ServeOutcome, ServerDriver, SinglePieceBody,
Headers, Method, Request, Responder, ResponderOrBodyError, ServeOutcome, ServerDriver,
SinglePieceBody,
};

use super::{body::ChunkPosition, types::H2ErrorLevel};
Expand Down Expand Up @@ -1334,7 +1335,13 @@ where

let frame = Frame::new(FrameType::RstStream, stream_id)
.with_len((payload.len()).try_into().unwrap());
self.write_frame(frame, PieceList::single(payload)).await?;

for i in 0..15 {
tracing::trace!("Sending rst {i}");
self.write_frame(frame, PieceList::single(payload.clone()))
.await?;
}
tracing::trace!("All rsts sent");

Ok(())
}
Expand Down
14 changes: 13 additions & 1 deletion crates/luring/src/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,19 @@ impl<C: cqueue::Entry> Future for OpInner<C> {
let lifecycle = &mut guard[self.index];
match lifecycle {
Lifecycle::Submitted => {
tracing::trace!("polling OpInner {}... submitted!", self.index);
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
std::task::Poll::Pending
}
Lifecycle::Waiting(_) => {
tracing::trace!("polling OpInner {}... waiting!", self.index);
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
std::task::Poll::Pending
}
Lifecycle::Completed(cqe) => std::task::Poll::Ready(cqe.clone()),
Lifecycle::Completed(cqe) => {
tracing::trace!("polling OpInner {}... completed!", self.index);
std::task::Poll::Ready(cqe.clone())
}
}
}
}
Expand Down Expand Up @@ -207,7 +212,13 @@ impl<S: squeue::Entry, C: cqueue::Entry> IoUringAsync<S, C> {
pub async fn listen(uring: Rc<IoUringAsync<S, C>>) {
let async_fd = AsyncFd::new(uring).unwrap();
loop {
let start = std::time::Instant::now();
tracing::trace!("waiting for uring fd to be ready...");
let mut guard = async_fd.readable().await.unwrap();
tracing::trace!(
"waiting for uring fd to be ready... it is! (took {:?})",
start.elapsed()
);
guard.get_inner().handle_cqe();
guard.clear_ready();
}
Expand Down Expand Up @@ -239,6 +250,7 @@ impl<S: squeue::Entry, C: cqueue::Entry> IoUringAsync<S, C> {
let mut guard = self.slab.borrow_mut();
while let Some(cqe) = unsafe { self.uring.completion_shared() }.next() {
let index = cqe.user_data();
tracing::trace!("received cqe for index {}", index);
let lifecycle = &mut guard[index.try_into().unwrap()];
match lifecycle {
Lifecycle::Submitted => {
Expand Down

0 comments on commit fa68aef

Please sign in to comment.