diff --git a/neqo-bin/src/server/http09.rs b/neqo-bin/src/server/http09.rs index 1887e3ac6f..e183a1b045 100644 --- a/neqo-bin/src/server/http09.rs +++ b/neqo-bin/src/server/http09.rs @@ -185,7 +185,7 @@ impl HttpServer { } impl super::HttpServer for HttpServer { - fn process(&mut self, dgram: Option, now: Instant) -> Output { + fn process(&mut self, dgram: Option>, now: Instant) -> Output { self.server.process(dgram, now) } diff --git a/neqo-bin/src/server/http3.rs b/neqo-bin/src/server/http3.rs index dfef3f1be4..1d1d7a1cf8 100644 --- a/neqo-bin/src/server/http3.rs +++ b/neqo-bin/src/server/http3.rs @@ -80,7 +80,7 @@ impl Display for HttpServer { } impl super::HttpServer for HttpServer { - fn process(&mut self, dgram: Option, now: Instant) -> neqo_http3::Output { + fn process(&mut self, dgram: Option>, now: Instant) -> neqo_http3::Output { self.server.process(dgram, now) } diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 8927890e8e..fc04fa5ad4 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -191,7 +191,7 @@ fn qns_read_response(filename: &str) -> Result, io::Error> { #[allow(clippy::module_name_repetitions)] pub trait HttpServer: Display { - fn process(&mut self, dgram: Option, now: Instant) -> Output; + fn process(&mut self, dgram: Option>, now: Instant) -> Output; fn process_events(&mut self, now: Instant); fn has_events(&self) -> bool; } @@ -222,8 +222,11 @@ impl ServerRunner { } /// Tries to find a socket, but then just falls back to sending from the first. - fn find_socket(&mut self, addr: SocketAddr) -> &mut crate::udp::Socket { - let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap(); + fn find_socket( + sockets: &mut [(SocketAddr, crate::udp::Socket)], + addr: SocketAddr, + ) -> &mut crate::udp::Socket { + let ((_host, first_socket), rest) = sockets.split_first_mut().unwrap(); rest.iter_mut() .map(|(_host, socket)| socket) .find(|socket| { @@ -235,27 +238,67 @@ impl ServerRunner { .unwrap_or(first_socket) } - async fn process(&mut self, mut dgram: Option) -> Result<(), io::Error> { + // Free function (i.e. not taking `&mut self: ServerRunner`) to be callable by + // `ServerRunner::read_and_process` while holding a reference to + // `ServerRunner::recv_buf`. + async fn process_inner( + server: &mut Box, + timeout: &mut Option>>, + sockets: &mut [(SocketAddr, crate::udp::Socket)], + now: &dyn Fn() -> Instant, + mut input_dgram: Option>, + ) -> Result<(), io::Error> { loop { - match self.server.process(dgram.take(), (self.now)()) { + match server.process(input_dgram.take(), now()) { Output::Datagram(dgram) => { - let socket = self.find_socket(dgram.source()); + let socket = Self::find_socket(sockets, dgram.source()); socket.writable().await?; socket.send(&dgram)?; } Output::Callback(new_timeout) => { qdebug!("Setting timeout of {:?}", new_timeout); - self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); - break; - } - Output::None => { + *timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); break; } + Output::None => break, } } Ok(()) } + async fn read_and_process(&mut self, sockets_index: usize) -> Result<(), io::Error> { + loop { + let (host, socket) = self.sockets.get_mut(sockets_index).unwrap(); + let Some(input_dgrams) = socket.recv(*host, &mut self.recv_buf)? else { + break; + }; + + for input_dgram in input_dgrams { + Self::process_inner( + &mut self.server, + &mut self.timeout, + &mut self.sockets, + &self.now, + Some(input_dgram), + ) + .await?; + } + } + + Ok(()) + } + + async fn process(&mut self) -> Result<(), io::Error> { + Self::process_inner( + &mut self.server, + &mut self.timeout, + &mut self.sockets, + &self.now, + None, + ) + .await + } + // Wait for any of the sockets to be readable or the timeout to fire. async fn ready(&mut self) -> Result { let sockets_ready = select_all( @@ -278,30 +321,19 @@ impl ServerRunner { pub async fn run(mut self) -> Res<()> { loop { self.server.process_events((self.now)()); - - self.process(None).await?; + self.process().await?; if self.server.has_events() { continue; } match self.ready().await? { - Ready::Socket(inx) => loop { - let (host, socket) = self.sockets.get_mut(inx).unwrap(); - let Some(dgrams) = socket.recv(*host, &mut self.recv_buf)? else { - break; - }; - if dgrams.len() == 0 { - break; - } - let dgrams: Vec = dgrams.map(|d| d.to_owned()).collect(); - for dgram in dgrams { - self.process(Some(dgram)).await?; - } - }, + Ready::Socket(sockets_index) => { + self.read_and_process(sockets_index).await?; + } Ready::Timeout => { self.timeout = None; - self.process(None).await?; + self.process().await?; } } }