diff --git a/Cargo.lock b/Cargo.lock index 16e546b75e..2ed60546e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -955,6 +955,7 @@ dependencies = [ name = "neqo-udp" version = "0.11.0" dependencies = [ + "cfg_aliases", "log", "neqo-common", "quinn-udp", diff --git a/Cargo.toml b/Cargo.toml index f37dd8141d..09e04fe766 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ rust-version = "1.76.0" enum-map = { version = "2.7", default-features = false } log = { version = "0.4", default-features = false } qlog = { version = "0.13", default-features = false } -quinn-udp = { version = "0.5.6", default-features = false, features = ["direct-log"] } +quinn-udp = { version = "0.5.6", default-features = false, features = ["direct-log", "fast-apple-datapath"] } regex = { version = "1.9", default-features = false, features = ["unicode-perl"] } static_assertions = { version = "1.1", default-features = false } url = { version = "2.5.3", default-features = false, features = ["std"] } diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 24102cd283..cb1d829fd4 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -30,6 +30,7 @@ use neqo_crypto::{ }; use neqo_http3::Output; use neqo_transport::{AppError, CloseReason, ConnectionId, Version}; +use neqo_udp::RecvBuf; use tokio::time::Sleep; use url::{Host, Origin, Url}; @@ -394,7 +395,7 @@ struct Runner<'a, H: Handler> { handler: H, timeout: Option>>, args: &'a Args, - recv_buf: Vec, + recv_buf: RecvBuf, } impl<'a, H: Handler> Runner<'a, H> { @@ -412,7 +413,7 @@ impl<'a, H: Handler> Runner<'a, H> { handler, args, timeout: None, - recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE], + recv_buf: RecvBuf::new(), } } @@ -481,9 +482,6 @@ impl<'a, H: Handler> Runner<'a, H> { let Some(dgrams) = self.socket.recv(self.local_addr, &mut self.recv_buf)? else { break; }; - if dgrams.len() == 0 { - break; - } self.client.process_multiple_input(dgrams, Instant::now()); self.process_output().await?; } diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 2dd05a3daf..bcce011ec6 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -29,6 +29,7 @@ use neqo_crypto::{ init_db, AntiReplay, Cipher, }; use neqo_transport::{Output, RandomConnectionIdGenerator, Version}; +use neqo_udp::RecvBuf; use tokio::time::Sleep; use crate::SharedArgs; @@ -202,7 +203,7 @@ pub struct ServerRunner { server: Box, timeout: Option>>, sockets: Vec<(SocketAddr, crate::udp::Socket)>, - recv_buf: Vec, + recv_buf: RecvBuf, } impl ServerRunner { @@ -217,7 +218,7 @@ impl ServerRunner { server, timeout: None, sockets, - recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE], + recv_buf: RecvBuf::new(), } } diff --git a/neqo-bin/src/udp.rs b/neqo-bin/src/udp.rs index 8bc78c2665..bd8d0eef85 100644 --- a/neqo-bin/src/udp.rs +++ b/neqo-bin/src/udp.rs @@ -7,7 +7,7 @@ use std::{io, net::SocketAddr}; use neqo_common::Datagram; -use neqo_udp::DatagramIter; +use neqo_udp::{DatagramIter, RecvBuf}; /// Ideally this would live in [`neqo-udp`]. [`neqo-udp`] is used in Firefox. /// @@ -59,7 +59,7 @@ impl Socket { pub fn recv<'a>( &self, local_address: SocketAddr, - recv_buf: &'a mut [u8], + recv_buf: &'a mut RecvBuf, ) -> Result>, io::Error> { self.inner .try_io(tokio::io::Interest::READABLE, || { diff --git a/neqo-udp/Cargo.toml b/neqo-udp/Cargo.toml index 73efce539b..150a68da7e 100644 --- a/neqo-udp/Cargo.toml +++ b/neqo-udp/Cargo.toml @@ -21,6 +21,9 @@ log = { workspace = true } neqo-common = { path = "./../neqo-common" } quinn-udp = { workspace = true } +[build-dependencies] +cfg_aliases = "0.2" + [package.metadata.cargo-machete] ignored = ["log"] diff --git a/neqo-udp/build.rs b/neqo-udp/build.rs new file mode 100644 index 0000000000..c0ad7df7b3 --- /dev/null +++ b/neqo-udp/build.rs @@ -0,0 +1,16 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Platforms + apple: { + any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "visionos" + ) + }, + } +} diff --git a/neqo-udp/src/lib.rs b/neqo-udp/src/lib.rs index 61f7fa1ca2..ab272b5559 100644 --- a/neqo-udp/src/lib.rs +++ b/neqo-udp/src/lib.rs @@ -7,20 +7,56 @@ #![allow(clippy::missing_errors_doc)] // Functions simply delegate to tokio and quinn-udp. use std::{ + array, io::{self, IoSliceMut}, + iter, net::SocketAddr, slice::{self, Chunks}, }; +use log::{log_enabled, Level}; use neqo_common::{qdebug, qtrace, Datagram, IpTos}; use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState}; -/// Socket receive buffer size. +/// Receive buffer size /// -/// Allows reading multiple datagrams in a single [`Socket::recv`] call. -// -// TODO: Experiment with different values across platforms. -pub const RECV_BUF_SIZE: usize = u16::MAX as usize; +/// Fits a maximum size UDP datagram, or, on platforms with segmentation +/// offloading, multiple smaller datagrams. +const RECV_BUF_SIZE: usize = u16::MAX as usize; + +/// The number of buffers to pass to the OS on [`Socket::recv`]. +/// +/// Platforms without segmentation offloading, i.e. platforms not able to read +/// multiple datagrams into a single buffer, can benefit from using multiple +/// buffers instead. +/// +/// Platforms with segmentation offloading have not shown performance +/// improvements when additionally using multiple buffers. +/// +/// - Linux/Android: use segmentation offloading via GRO +/// - Windows: use segmentation offloading via URO (caveat see ) +/// - Apple: no segmentation offloading available, use multiple buffers +#[cfg(not(apple))] +const NUM_BUFS: usize = 1; +#[cfg(apple)] +// Value approximated based on neqo-bin "Download" benchmark only. +const NUM_BUFS: usize = 16; + +/// A UDP receive buffer. +pub struct RecvBuf(Vec>); + +impl RecvBuf { + #[must_use] + pub fn new() -> Self { + Self(vec![vec![0; RECV_BUF_SIZE]; NUM_BUFS]) + } +} + +impl Default for RecvBuf { + fn default() -> Self { + Self::new() + } +} pub fn send_inner( state: &UdpSocketState, @@ -52,88 +88,98 @@ use std::os::fd::AsFd as SocketRef; #[cfg(windows)] use std::os::windows::io::AsSocket as SocketRef; +#[allow(clippy::missing_panics_doc)] pub fn recv_inner<'a>( local_address: SocketAddr, state: &UdpSocketState, socket: impl SocketRef, - recv_buf: &'a mut [u8], + recv_buf: &'a mut RecvBuf, ) -> Result, io::Error> { - let mut meta; - - let data = loop { - meta = RecvMeta::default(); + let mut metas = [RecvMeta::default(); NUM_BUFS]; + let mut iovs: [IoSliceMut; NUM_BUFS] = { + let mut bufs = recv_buf.0.iter_mut().map(|b| IoSliceMut::new(b)); + array::from_fn(|_| bufs.next().expect("NUM_BUFS elements")) + }; - state.recv( - (&socket).into(), - &mut [IoSliceMut::new(recv_buf)], - slice::from_mut(&mut meta), - )?; + let n = state.recv((&socket).into(), &mut iovs, &mut metas)?; - if meta.len == 0 || meta.stride == 0 { - qdebug!( - "ignoring datagram from {} to {} len {} stride {}", + if log_enabled!(Level::Trace) { + for meta in metas.iter().take(n) { + qtrace!( + "received {} bytes from {} to {} in {} segments", + meta.len, meta.addr, local_address, - meta.len, - meta.stride + if meta.stride == 0 { + 0 + } else { + meta.len.div_ceil(meta.stride) + } ); - continue; } - - break &recv_buf[..meta.len]; - }; - - qtrace!( - "received {} bytes from {} to {} in {} segments", - data.len(), - meta.addr, - local_address, - data.len().div_ceil(meta.stride), - ); + } Ok(DatagramIter { - meta, - datagrams: data.chunks(meta.stride), + current_buffer: None, + remaining_buffers: metas.into_iter().zip(recv_buf.0.iter()).take(n), local_address, }) } pub struct DatagramIter<'a> { - meta: RecvMeta, - datagrams: Chunks<'a, u8>, + /// The current buffer, containing zero or more datagrams, each sharing the + /// same [`RecvMeta`]. + current_buffer: Option<(RecvMeta, Chunks<'a, u8>)>, + /// Remaining buffers, each containing zero or more datagrams, one + /// [`RecvMeta`] per buffer. + remaining_buffers: + iter::Take, slice::Iter<'a, Vec>>>, + /// The local address of the UDP socket used to receive the datagrams. local_address: SocketAddr, } -impl std::fmt::Debug for DatagramIter<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DatagramIter") - .field("meta", &self.meta) - .field("local_address", &self.local_address) - .finish() - } -} - impl<'a> Iterator for DatagramIter<'a> { type Item = Datagram<&'a [u8]>; fn next(&mut self) -> Option { - self.datagrams.next().map(|d| { - Datagram::from_slice( - self.meta.addr, - self.local_address, - self.meta - .ecn - .map(|n| IpTos::from(n as u8)) - .unwrap_or_default(), - d, - ) - }) - } -} - -impl ExactSizeIterator for DatagramIter<'_> { - fn len(&self) -> usize { - self.datagrams.len() + loop { + // Return the next datagram in the current buffer, if any. + if let Some((meta, d)) = self + .current_buffer + .as_mut() + .and_then(|(meta, ds)| ds.next().map(|d| (meta, d))) + { + return Some(Datagram::from_slice( + meta.addr, + self.local_address, + meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), + d, + )); + } + + // There are no more datagrams in the current buffer. Try promoting + // one of the remaining buffers, if any, to be the current buffer. + let Some((meta, buf)) = self.remaining_buffers.next() else { + // Handled all buffers. No more datagrams. Iterator is empty. + return None; + }; + + // Ignore empty datagrams. + if meta.len == 0 || meta.stride == 0 { + qdebug!( + "ignoring empty datagram from {} to {} len {} stride {}", + meta.addr, + self.local_address, + meta.len, + meta.stride + ); + continue; + } + + // Got another buffer. Let's chunk it into datagrams and return the + // first datagram in the next loop iteration. + self.current_buffer = Some((meta, buf[0..meta.len].chunks(meta.stride))); + } } } @@ -162,7 +208,7 @@ impl Socket { pub fn recv<'a>( &self, local_address: SocketAddr, - recv_buf: &'a mut [u8], + recv_buf: &'a mut RecvBuf, ) -> Result, io::Error> { recv_inner(local_address, &self.state, &self.inner, recv_buf) } @@ -182,22 +228,19 @@ mod tests { } #[test] - fn ignore_empty_datagram() -> Result<(), io::Error> { - let sender = socket()?; + fn handle_empty_datagram() -> Result<(), io::Error> { + // quinn-udp doesn't support sending emtpy datagrams across all + // platforms. Use `std` socket instead. See also + // . + let sender = std::net::UdpSocket::bind("127.0.0.1:0")?; let receiver = Socket::new(std::net::UdpSocket::bind("127.0.0.1:0")?)?; let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - let datagram = Datagram::new( - sender.inner.local_addr()?, - receiver.inner.local_addr()?, - IpTos::default(), - vec![], - ); + sender.send_to(&[], receiver.inner.local_addr()?)?; + let mut recv_buf = RecvBuf::new(); + let mut datagrams = receiver.recv(receiver_addr, &mut recv_buf)?; - sender.send(&datagram)?; - let mut recv_buf = vec![0; RECV_BUF_SIZE]; - let res = receiver.recv(receiver_addr, &mut recv_buf); - assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::WouldBlock); + assert_eq!(datagrams.next(), None); Ok(()) } @@ -217,7 +260,7 @@ mod tests { sender.send(&datagram)?; - let mut recv_buf = vec![0; RECV_BUF_SIZE]; + let mut recv_buf = RecvBuf::new(); let mut received_datagrams = receiver .recv(receiver_addr, &mut recv_buf) .expect("receive to succeed"); @@ -260,7 +303,7 @@ mod tests { // Allow for one GSO sendmmsg to result in multiple GRO recvmmsg. let mut num_received = 0; - let mut recv_buf = vec![0; RECV_BUF_SIZE]; + let mut recv_buf = RecvBuf::new(); while num_received < max_gso_segments { receiver .recv(receiver_addr, &mut recv_buf) @@ -277,20 +320,4 @@ mod tests { Ok(()) } - - #[test] - fn fmt_datagram_iter() { - let dgrams = []; - - let i = DatagramIter { - meta: RecvMeta::default(), - datagrams: dgrams.chunks(1), - local_address: "[::]:0".parse().unwrap(), - }; - - assert_eq!( - &format!("{i:?}"), - "DatagramIter { meta: RecvMeta { addr: [::]:0, len: 0, stride: 0, ecn: None, dst_ip: None }, local_address: [::]:0 }" - ); - } }