From 3bbd14056a6de1c2a096762452c46540215c7410 Mon Sep 17 00:00:00 2001 From: "Kyle Holohan (from Dev Box)" <122563280+kyleholohan@users.noreply.github.com> Date: Fri, 31 Jan 2025 10:35:59 -0800 Subject: [PATCH] [catpowder] Enhancement: add buffer pooling and multiple ring entries to XDP backend --- Cargo.toml | 4 +- scripts/config/azure.yaml | 10 + scripts/config/default.yaml | 10 + src/rust/catpowder/win/api.rs | 3 + src/rust/catpowder/win/ring/buffer.rs | 89 ------- src/rust/catpowder/win/ring/generic.rs | 39 +++- src/rust/catpowder/win/ring/mod.rs | 2 - src/rust/catpowder/win/ring/rx_ring.rs | 168 +++++++++---- src/rust/catpowder/win/ring/tx_ring.rs | 191 +++++++++++---- src/rust/catpowder/win/ring/umemreg.rs | 198 ++++++++++++++-- src/rust/catpowder/win/runtime.rs | 311 +++++++++++++++++-------- src/rust/catpowder/win/socket.rs | 14 ++ src/rust/demikernel/config.rs | 92 +++++--- src/rust/runtime/fail.rs | 6 + src/rust/runtime/memory/buffer_pool.rs | 11 +- src/rust/runtime/memory/demibuffer.rs | 86 ++++++- 16 files changed, 886 insertions(+), 348 deletions(-) delete mode 100644 src/rust/catpowder/win/ring/buffer.rs diff --git a/Cargo.toml b/Cargo.toml index 12c38878b..d6809d544 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ x86 = "0.52.0" yaml-rust = "0.4.5" # Demikernel crates (published on crates.io). -demikernel-dpdk-bindings = { version = "1.1.6", optional = true } +demikernel-dpdk-bindings = { version = "1.1.7", optional = true } demikernel-network-simulator = { version = "0.1.0" } # Windows-specific dependencies. @@ -53,7 +53,7 @@ windows = { version = "0.57.0", features = [ "Win32_System_SystemInformation", "Win32_System_Threading", ] } -demikernel-xdp-bindings = { version = "1.0.0", optional = true } +demikernel-xdp-bindings = { version = "1.0.1", optional = true } # for interacting with socket2. windows-sys = { version = "0.52.0", features = ["Win32_Networking_WinSock"] } diff --git a/scripts/config/azure.yaml b/scripts/config/azure.yaml index 2a9c0d34a..ba4c5b1d3 100644 --- a/scripts/config/azure.yaml +++ b/scripts/config/azure.yaml @@ -8,11 +8,21 @@ raw_socket: linux_interface_name: "abcde" xdp_interface_index: 0 xdp_cohost_mode: false + xdp_always_poke_tx: false # Enable the following for XDP cohosting mode, or override in environment: # xdp_tcp_ports: [80, 443] # xdp_udp_ports: [53] # Enable the following line if you have a VF interface # xdp_vf_interface_index: 0 + # The number of buffers to allocate for sending packets. Must be larger than the tx_ring_size. + tx_buffer_count: 4096 + # The number of buffers to allocate for receiving packets for each RSS queue. Must be larger than + # the rx_ring_size. + rx_buffer_count: 4096 + # The number of entries in the TX producer/consumer rings; must be a power of 2. + tx_ring_size: 128 + # The number of entries in each RX producer/consumer ring for each RSS queue; must be a power of 2. + rx_ring_size: 128 dpdk: eal_init: ["-c", "0xff", "-n", "4", "-a", "WW:WW.W", "--proc-type=auto", "--vdev=net_vdev_netvsc0,iface=abcde"] tcp_socket_options: diff --git a/scripts/config/default.yaml b/scripts/config/default.yaml index 3ba7c2627..fffc33968 100644 --- a/scripts/config/default.yaml +++ b/scripts/config/default.yaml @@ -8,11 +8,21 @@ raw_socket: linux_interface_name: "abcde" xdp_interface_index: 0 xdp_cohost_mode: false + xdp_always_poke_tx: false # Enable the following for XDP cohosting mode, or override in environment: # xdp_tcp_ports: [80, 443] # xdp_udp_ports: [53] # Enable the following line if you have a VF interface # xdp_vf_interface_index: 0 + # The number of buffers to allocate for sending packets. Must be larger than the tx_ring_size. + tx_buffer_count: 4096 + # The number of buffers to allocate for receiving packets for each RSS queue. Must be larger than + # the rx_ring_size. + rx_buffer_count: 4096 + # The number of entries in the TX producer/consumer rings; must be a power of 2. + tx_ring_size: 128 + # The number of entries in each RX producer/consumer ring for each RSS queue; must be a power of 2. + rx_ring_size: 128 dpdk: eal_init: ["", "-c", "0xff", "-n", "4", "-a", "WW:WW.W","--proc-type=auto"] tcp_socket_options: diff --git a/src/rust/catpowder/win/api.rs b/src/rust/catpowder/win/api.rs index 44d271ed3..700ab07a8 100644 --- a/src/rust/catpowder/win/api.rs +++ b/src/rust/catpowder/win/api.rs @@ -66,3 +66,6 @@ impl Drop for XdpApi { } } } + +unsafe impl Send for XdpApi {} +unsafe impl Sync for XdpApi {} diff --git a/src/rust/catpowder/win/ring/buffer.rs b/src/rust/catpowder/win/ring/buffer.rs deleted file mode 100644 index ad3f89f08..000000000 --- a/src/rust/catpowder/win/ring/buffer.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -//====================================================================================================================== -// Imports -//====================================================================================================================== - -use crate::{catpowder::win::ring::umemreg::UmemReg, runtime::libxdp}; -use ::std::{ - cell::RefCell, - ops::{Deref, DerefMut}, - rc::Rc, - vec::Vec, -}; - -//====================================================================================================================== -// Structures -//====================================================================================================================== - -pub struct XdpBuffer { - b: *mut libxdp::XSK_BUFFER_DESCRIPTOR, - /// UMEM region that contains the buffer. - umemreg: Rc>, -} - -//====================================================================================================================== -// Implementations -//====================================================================================================================== - -impl XdpBuffer { - pub(super) fn new(b: *mut libxdp::XSK_BUFFER_DESCRIPTOR, umemreg: Rc>) -> Self { - Self { b, umemreg } - } - - pub(super) fn set_len(&mut self, len: usize) { - unsafe { - (*self.b).Length = len as u32; - } - } - - fn len(&self) -> usize { - unsafe { (*self.b).Length as usize } - } - - unsafe fn relative_base_address(&self) -> u64 { - (*self.b).Address.__bindgen_anon_1.BaseAddress() - } - - unsafe fn offset(&self) -> u64 { - (*self.b).Address.__bindgen_anon_1.Offset() - } - - unsafe fn compute_address(&self) -> *mut core::ffi::c_void { - let mut ptr: *mut u8 = self.umemreg.borrow_mut().address() as *mut u8; - ptr = ptr.add(self.relative_base_address() as usize); - ptr = ptr.add(self.offset() as usize); - ptr as *mut core::ffi::c_void - } - - fn to_vector(&self) -> Vec { - let mut out: Vec = Vec::with_capacity(self.len()); - self[..].clone_into(&mut out); - out - } -} - -//====================================================================================================================== -// Trait Implementations -//====================================================================================================================== - -impl From for Vec { - fn from(buffer: XdpBuffer) -> Vec { - buffer.to_vector() - } -} - -impl Deref for XdpBuffer { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.compute_address() as *const u8, self.len()) } - } -} - -impl DerefMut for XdpBuffer { - fn deref_mut(&mut self) -> &mut [u8] { - unsafe { std::slice::from_raw_parts_mut(self.compute_address() as *mut u8, self.len()) } - } -} diff --git a/src/rust/catpowder/win/ring/generic.rs b/src/rust/catpowder/win/ring/generic.rs index fdf1a0e55..4d1ece255 100644 --- a/src/rust/catpowder/win/ring/generic.rs +++ b/src/rust/catpowder/win/ring/generic.rs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +use std::{marker::PhantomData, mem::MaybeUninit}; + use crate::runtime::libxdp; //====================================================================================================================== @@ -9,25 +11,31 @@ use crate::runtime::libxdp; /// A wrapper structure for a XDP ring. #[repr(C)] -pub struct XdpRing(libxdp::XSK_RING); +pub struct XdpRing(libxdp::XSK_RING, PhantomData); //====================================================================================================================== // Implementations //====================================================================================================================== -impl XdpRing { +impl XdpRing { /// Initializes a XDP ring. pub(super) fn new(info: &libxdp::XSK_RING_INFO) -> Self { - Self(unsafe { + let ring: libxdp::XSK_RING = unsafe { let mut ring: libxdp::XSK_RING = std::mem::zeroed(); libxdp::_XskRingInitialize(&mut ring, info); ring - }) + }; + + if !ring.SharedElements.cast::().is_aligned() { + panic!("XdpRing::new(): ring memory is not aligned for type T"); + } + + Self(ring, PhantomData) } /// Reserves a consumer slot in the target ring. - pub(super) fn consumer_reserve(&mut self, count: u32, idx: *mut u32) -> u32 { - unsafe { libxdp::_XskRingConsumerReserve(&mut self.0, count, idx) } + pub(super) fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 { + unsafe { libxdp::_XskRingConsumerReserve(&mut self.0, count, idx as *mut u32) } } /// Releases a consumer slot in the target ring. @@ -36,8 +44,8 @@ impl XdpRing { } /// Reserves a producer slot in the target ring. - pub(super) fn producer_reserve(&mut self, count: u32, idx: *mut u32) -> u32 { - unsafe { libxdp::_XskRingProducerReserve(&mut self.0, count, idx) } + pub(super) fn producer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 { + unsafe { libxdp::_XskRingProducerReserve(&mut self.0, count, idx as *mut u32) } } /// Submits a producer slot in the target ring. @@ -46,7 +54,18 @@ impl XdpRing { } /// Gets the element at the target index. - pub(super) fn get_element(&self, idx: u32) -> *mut std::ffi::c_void { - unsafe { libxdp::_XskRingGetElement(&self.0, idx) } + pub(super) fn get_element(&self, idx: u32) -> &mut MaybeUninit { + // Safety: the alignment of ring elements is validated by the constructor. We rely on the XDP runtime to + // provide valid memory for the ring. + unsafe { &mut *libxdp::_XskRingGetElement(&self.0, idx).cast() } + } + + #[allow(dead_code)] + pub(super) fn needs_poke(&self) -> bool { + unsafe { libxdp::_XskRingProducerNeedPoke(&self.0) != 0 } + } + + pub(super) fn has_error(&self) -> bool { + unsafe { libxdp::_XskRingError(&self.0) != 0 } } } diff --git a/src/rust/catpowder/win/ring/mod.rs b/src/rust/catpowder/win/ring/mod.rs index 6a8c3a9b3..15d81552f 100644 --- a/src/rust/catpowder/win/ring/mod.rs +++ b/src/rust/catpowder/win/ring/mod.rs @@ -5,7 +5,6 @@ // Modules //====================================================================================================================== -mod buffer; mod generic; mod rule; mod rx_ring; @@ -16,6 +15,5 @@ mod umemreg; // Exports //====================================================================================================================== -pub use buffer::XdpBuffer; pub use rx_ring::RxRing; pub use tx_ring::TxRing; diff --git a/src/rust/catpowder/win/ring/rx_ring.rs b/src/rust/catpowder/win/ring/rx_ring.rs index 7f76720e8..164f0a950 100644 --- a/src/rust/catpowder/win/ring/rx_ring.rs +++ b/src/rust/catpowder/win/ring/rx_ring.rs @@ -9,7 +9,6 @@ use crate::{ catpowder::win::{ api::XdpApi, ring::{ - buffer::XdpBuffer, generic::XdpRing, rule::{XdpProgram, XdpRule}, umemreg::UmemReg, @@ -17,9 +16,14 @@ use crate::{ socket::XdpSocket, }, inetstack::protocols::Protocol, - runtime::{fail::Fail, libxdp, limits}, + runtime::{fail::Fail, libxdp, limits, memory::DemiBuffer}, +}; +use std::{ + cell::RefCell, + mem::MaybeUninit, + num::{NonZeroU16, NonZeroU32}, + rc::Rc, }; -use ::std::{cell::RefCell, rc::Rc}; //====================================================================================================================== // Structures @@ -34,13 +38,14 @@ pub struct RxRing { /// A user memory region where receive buffers are stored. mem: Rc>, /// A ring for receiving packets. - rx_ring: XdpRing, + rx_ring: XdpRing, /// A ring for returning receive buffers to the kernel. - rx_fill_ring: XdpRing, + rx_fill_ring: XdpRing, /// Underlying XDP socket. socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped. /// Underlying XDP program. - _program: Option, // NOTE: we keep this here to prevent the program from being dropped. + /// NOTE: we keep this here to prevent the program from being dropped. + _program: Option, } //====================================================================================================================== @@ -49,23 +54,17 @@ pub struct RxRing { impl RxRing { /// Creates a new ring for receiving packets. - fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result { + pub fn new(api: &mut XdpApi, length: u32, buf_count: u32, ifindex: u32, queueid: u32) -> Result { // Create an XDP socket. trace!("creating xdp socket"); let mut socket: XdpSocket = XdpSocket::create(api)?; // Create a UMEM region. trace!("creating umem region"); - let mem: Rc> = Rc::new(RefCell::new(UmemReg::new(length, limits::RECVBUF_SIZE_MAX as u32))); - - // Register the UMEM region. - trace!("registering umem region"); - socket.setsockopt( - api, - libxdp::XSK_SOCKOPT_UMEM_REG, - mem.borrow().as_ref() as *const libxdp::XSK_UMEM_REG as *const core::ffi::c_void, - std::mem::size_of::() as u32, - )?; + let buf_count: NonZeroU32 = NonZeroU32::try_from(buf_count).map_err(Fail::from)?; + let chunk_size: NonZeroU16 = + NonZeroU16::try_from(u16::try_from(limits::RECVBUF_SIZE_MAX).map_err(Fail::from)?).map_err(Fail::from)?; + let mem: Rc> = Rc::new(RefCell::new(UmemReg::new(api, &mut socket, buf_count, chunk_size)?)); // Set rx ring size. trace!("setting rx ring size: {}", length); @@ -105,16 +104,8 @@ impl RxRing { )?; // Initialize rx and rx fill rings. - let mut rx_fill_ring: XdpRing = XdpRing::new(&ring_info.Fill); - let rx_ring: XdpRing = XdpRing::new(&ring_info.Rx); - - // Submit rx buffer to the kernel. - trace!("submitting rx ring buffer"); - let mut ring_index: u32 = 0; - rx_fill_ring.producer_reserve(length, &mut ring_index); - let b: *mut u64 = rx_fill_ring.get_element(ring_index) as *mut u64; - unsafe { *b = 0 }; - rx_fill_ring.producer_submit(length); + let rx_fill_ring: XdpRing = XdpRing::new(&ring_info.Fill); + let rx_ring: XdpRing = XdpRing::new(&ring_info.Rx); Ok(Self { ifindex, @@ -128,8 +119,14 @@ impl RxRing { } /// Create a new RxRing which redirects all traffic on the (if, queue) pair. - pub fn new_redirect_all(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result { - let mut ring: Self = Self::new(api, length, ifindex, queueid)?; + pub fn new_redirect_all( + api: &mut XdpApi, + length: u32, + buf_count: u32, + ifindex: u32, + queueid: u32, + ) -> Result { + let mut ring: Self = Self::new(api, length, buf_count, ifindex, queueid)?; let rules: [XdpRule; 1] = [XdpRule::new(&ring.socket)]; ring.reprogram(api, &rules)?; Ok(ring) @@ -139,12 +136,13 @@ impl RxRing { pub fn new_cohost( api: &mut XdpApi, length: u32, + buf_count: u32, ifindex: u32, queueid: u32, tcp_ports: &[u16], udp_ports: &[u16], ) -> Result { - let mut ring: Self = Self::new(api, length, ifindex, queueid)?; + let mut ring: Self = Self::new(api, length, buf_count, ifindex, queueid)?; let rules: Vec = tcp_ports .iter() @@ -179,31 +177,103 @@ impl RxRing { Ok(()) } - /// Reserves a consumer slot in the rx ring. - pub fn reserve_rx(&mut self, count: u32, idx: &mut u32) -> u32 { - self.rx_ring.consumer_reserve(count, idx) + pub fn socket(&self) -> &XdpSocket { + &self.socket } - /// Releases a consumer slot in the rx ring. - pub fn release_rx(&mut self, count: u32) { - self.rx_ring.consumer_release(count); - } + fn check_error(&self, api: &mut XdpApi) -> Result<(), Fail> { + if self.rx_ring.has_error() { + let mut error: libxdp::XSK_ERROR = 0; + let mut len: u32 = std::mem::size_of::() as u32; + self.socket.getsockopt( + api, + libxdp::XSK_SOCKOPT_RX_ERROR, + &mut error as *mut i32 as *mut core::ffi::c_void, + &mut len, + )?; - /// Reserves a producer slot in the rx fill ring. - pub fn reserve_rx_fill(&mut self, count: u32, idx: &mut u32) -> u32 { - self.rx_fill_ring.producer_reserve(count, idx) + let errno: i32 = match error { + libxdp::_XSK_ERROR_XSK_ERROR_INTERFACE_DETACH => libc::ENODEV, + libxdp::_XSK_ERROR_XSK_ERROR_INVALID_RING => libc::EINVAL, + libxdp::_XSK_ERROR_XSK_NO_ERROR => return Ok(()), + _ => libc::EIO, + }; + return Err(Fail::new(errno, "rx ring has error")); + } + Ok(()) } - /// Submits a producer slot in the rx fill ring. - pub fn submit_rx_fill(&mut self, count: u32) { - self.rx_fill_ring.producer_submit(count); + pub fn provide_buffers(&mut self) { + let mut idx: u32 = 0; + let available: u32 = self.rx_fill_ring.producer_reserve(u32::MAX, &mut idx); + let mut published: u32 = 0; + let mem: std::cell::Ref<'_, UmemReg> = self.mem.borrow(); + for i in 0..available { + if let Some(buf_offset) = mem.get_dehydrated_buffer() { + // Safety: Buffer is allocated from the memory pool, which must be in the contiguous memory range + // starting at the UMEM base region address. + let b: &mut MaybeUninit = self.rx_fill_ring.get_element(idx + i); + b.write(buf_offset as u64); + trace!("provided buffer at offset {}", buf_offset); + published += 1; + } else { + break; + } + } + + if published > 0 { + trace!( + "provided {} buffers to RxRing interface {} queue {}", + published, + self.ifindex, + self.queueid + ); + self.rx_fill_ring.producer_submit(published); + } } - /// Gets the buffer at the target index. - pub fn get_buffer(&self, idx: u32) -> XdpBuffer { - XdpBuffer::new( - self.rx_ring.get_element(idx) as *mut libxdp::XSK_BUFFER_DESCRIPTOR, - self.mem.clone(), - ) + pub fn process_rx(&mut self, api: &mut XdpApi, count: u32, mut callback: Fn) -> Result<(), Fail> + where + Fn: FnMut(DemiBuffer) -> Result<(), Fail>, + { + let mut idx: u32 = 0; + let available: u32 = self.rx_ring.consumer_reserve(count, &mut idx); + let mut consumed: u32 = 0; + let mut err: Option = None; + + if available > 0 { + trace!( + "processing {} buffers from RxRing interface {} queue {}", + available, + self.ifindex, + self.queueid + ); + } + + for i in 0..available { + // Safety: Ring entries are intialized by the XDP runtime. + let desc: &libxdp::XSK_BUFFER_DESCRIPTOR = unsafe { self.rx_ring.get_element(idx + i).assume_init_ref() }; + trace!( + "processing buffer at address {} offset {}", + unsafe { desc.Address.__bindgen_anon_1.BaseAddress() }, + unsafe { desc.Address.__bindgen_anon_1.Offset() } + ); + let db: DemiBuffer = self.mem.borrow().rehydrate_buffer_desc(desc)?; + + // Trim buffer to actual length. Descriptor length should not be greater than buffer length, but guard + // against it anyway. + consumed += 1; + if let Err(e) = callback(db) { + err = Some(e); + break; + } + } + + if consumed > 0 { + self.rx_ring.consumer_release(consumed); + } + + self.check_error(api)?; + err.map_or(Ok(()), |e| Err(e)) } } diff --git a/src/rust/catpowder/win/ring/tx_ring.rs b/src/rust/catpowder/win/ring/tx_ring.rs index 6312dc0f3..03fcebcf5 100644 --- a/src/rust/catpowder/win/ring/tx_ring.rs +++ b/src/rust/catpowder/win/ring/tx_ring.rs @@ -8,12 +8,17 @@ use crate::{ catpowder::win::{ api::XdpApi, - ring::{buffer::XdpBuffer, generic::XdpRing, umemreg::UmemReg}, + ring::{generic::XdpRing, umemreg::UmemReg}, socket::XdpSocket, }, - runtime::{fail::Fail, libxdp, limits}, + runtime::{fail::Fail, libxdp, limits, memory::DemiBuffer}, +}; +use std::{ + cell::RefCell, + mem::MaybeUninit, + num::{NonZeroU16, NonZeroU32}, + rc::Rc, }; -use ::std::{cell::RefCell, rc::Rc}; //====================================================================================================================== // Structures @@ -24,35 +29,42 @@ pub struct TxRing { /// A user memory region where transmit buffers are stored. mem: Rc>, /// A ring for transmitting packets. - tx_ring: XdpRing, + tx_ring: XdpRing, /// A ring for returning transmit buffers to the kernel. - tx_completion_ring: XdpRing, + tx_completion_ring: XdpRing, /// Underlying XDP socket. socket: XdpSocket, + /// Whether to always poke the socket, or only when the ring flag indicates to do so. + always_poke: bool, } impl TxRing { /// Creates a new ring for transmitting packets. - pub fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result { + pub fn new( + api: &mut XdpApi, + length: u32, + buf_count: u32, + ifindex: u32, + queueid: u32, + always_poke: bool, + ) -> Result { // Create an XDP socket. trace!("creating xdp socket"); let mut socket: XdpSocket = XdpSocket::create(api)?; // Create a UMEM region. - trace!("creating umem region"); - let mem: Rc> = Rc::new(RefCell::new(UmemReg::new(1, limits::RECVBUF_SIZE_MAX as u32))); - - // Register the UMEM region. - trace!("registering umem region"); - socket.setsockopt( - api, - libxdp::XSK_SOCKOPT_UMEM_REG, - mem.borrow().as_ref() as *const libxdp::XSK_UMEM_REG as *const core::ffi::c_void, - std::mem::size_of::() as u32, - )?; + let buf_count: NonZeroU32 = NonZeroU32::try_from(buf_count).map_err(Fail::from)?; + let chunk_size: NonZeroU16 = + NonZeroU16::try_from(u16::try_from(limits::RECVBUF_SIZE_MAX).map_err(Fail::from)?).map_err(Fail::from)?; + trace!( + "creating umem region with {} buffers of size {}", + buf_count.get(), + chunk_size.get() + ); + let mem: Rc> = Rc::new(RefCell::new(UmemReg::new(api, &mut socket, buf_count, chunk_size)?)); // Set tx ring size. - trace!("setting tx ring size"); + trace!("setting tx ring size to {}", length); socket.setsockopt( api, libxdp::XSK_SOCKOPT_TX_RING_SIZE, @@ -61,7 +73,7 @@ impl TxRing { )?; // Set tx completion ring size. - trace!("setting tx completion ring size"); + trace!("setting tx completion ring size to {}", length); socket.setsockopt( api, libxdp::XSK_SOCKOPT_TX_COMPLETION_RING_SIZE, @@ -70,7 +82,7 @@ impl TxRing { )?; // Bind tx queue. - trace!("binding tx queue"); + trace!("binding tx queue to interface {} and queue {}", ifindex, queueid); socket.bind(api, ifindex, queueid, libxdp::_XSK_BIND_FLAGS_XSK_BIND_FLAG_TX)?; // Activate socket to enable packet transmission. @@ -89,55 +101,132 @@ impl TxRing { )?; // Initialize tx and tx completion rings. - let tx_ring: XdpRing = XdpRing::new(&ring_info.Tx); - let tx_completion_ring: XdpRing = XdpRing::new(&ring_info.Completion); + let tx_ring: XdpRing = XdpRing::new(&ring_info.Tx); + let tx_completion_ring: XdpRing = XdpRing::new(&ring_info.Completion); Ok(Self { mem, tx_ring, tx_completion_ring, socket, + always_poke, }) } - /// Notifies the socket that there are packets to be transmitted. - pub fn notify_socket( - &self, - api: &mut XdpApi, - flags: i32, - count: u32, - outflags: &mut libxdp::XSK_NOTIFY_RESULT_FLAGS, - ) -> Result<(), Fail> { - self.socket.notify(api, flags, count, outflags) + pub fn socket(&self) -> &XdpSocket { + &self.socket } - /// Reserves a producer slot in the tx ring. - pub fn reserve_tx(&mut self, count: u32, idx: &mut u32) -> u32 { - self.tx_ring.producer_reserve(count, idx) - } + /// Notifies the socket that there are packets to be transmitted. + pub fn poke(&self, api: &mut XdpApi) -> Result<(), Fail> { + let mut outflags: i32 = libxdp::XSK_NOTIFY_RESULT_FLAGS::default(); + let flags: i32 = libxdp::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_POKE_TX; + + if self.always_poke || self.tx_ring.needs_poke() { + self.socket.notify(api, flags, u32::MAX, &mut outflags)?; + } - /// Submits a producer slot in the tx ring. - pub fn submit_tx(&mut self, count: u32) { - self.tx_ring.producer_submit(count); + Ok(()) } - /// Reserves a consumer slot in the tx completion ring. - pub fn reserve_tx_completion(&mut self, count: u32, idx: &mut u32) -> u32 { - self.tx_completion_ring.consumer_reserve(count, idx) + fn check_error(&self, api: &mut XdpApi) -> Result<(), Fail> { + if self.tx_ring.has_error() { + let mut error: libxdp::XSK_ERROR = 0; + let mut len: u32 = std::mem::size_of::() as u32; + self.socket.getsockopt( + api, + libxdp::XSK_SOCKOPT_TX_ERROR, + &mut error as *mut i32 as *mut core::ffi::c_void, + &mut len, + )?; + + let errno: i32 = match error { + libxdp::_XSK_ERROR_XSK_ERROR_INTERFACE_DETACH => libc::ENODEV, + libxdp::_XSK_ERROR_XSK_ERROR_INVALID_RING => libc::EINVAL, + libxdp::_XSK_ERROR_XSK_NO_ERROR => return Ok(()), + _ => libc::EIO, + }; + return Err(Fail::new(errno, "tx ring has error")); + } + Ok(()) } - /// Releases a consumer slot in the tx completion ring. - pub fn release_tx_completion(&mut self, count: u32) { - self.tx_completion_ring.consumer_release(count); + pub fn get_buffer(&self) -> Option { + self.mem.borrow().get_buffer() } - /// Gets the buffer at the target index and set its length. - pub fn get_buffer(&self, idx: u32, len: usize) -> XdpBuffer { - let mut buf = XdpBuffer::new( - self.tx_ring.get_element(idx) as *mut libxdp::XSK_BUFFER_DESCRIPTOR, - self.mem.clone(), + pub fn transmit_buffer(&mut self, api: &mut XdpApi, buf: DemiBuffer) -> Result<(), Fail> { + let buf: DemiBuffer = if !self.mem.borrow().is_data_in_pool(&buf) { + trace!("copying buffer to umem region"); + let mut copy: DemiBuffer = self + .mem + .borrow() + .get_buffer() + .ok_or_else(|| Fail::new(libc::ENOMEM, "out of memory"))?; + + if copy.len() < buf.len() { + return Err(Fail::new(libc::EINVAL, "buffer too large")); + } else if copy.len() > buf.len() { + copy.trim(copy.len() - buf.len())?; + } + + unsafe { std::ptr::copy_nonoverlapping(buf.as_ptr(), copy.as_mut_ptr(), buf.len()) }; + copy + } else { + buf + }; + + let buf_desc: libxdp::XSK_BUFFER_DESCRIPTOR = self.mem.borrow().dehydrate_buffer(buf); + trace!( + "transmitting buffer at offset {}, offset {} with length {}", + unsafe { buf_desc.Address.__bindgen_anon_1.BaseAddress() }, + unsafe { buf_desc.Address.__bindgen_anon_1.Offset() }, + buf_desc.Length ); - buf.set_len(len); - buf + + let mut idx: u32 = 0; + if self.tx_ring.producer_reserve(1, &mut idx) != 1 { + return Err(Fail::new(libc::EAGAIN, "tx ring is full")); + } + + let b: &mut MaybeUninit = self.tx_ring.get_element(idx); + b.write(buf_desc); + + self.tx_ring.producer_submit(1); + + // Notify socket. + if let Err(e) = self.poke(api) { + let cause = format!("failed to notify socket: {:?}", e); + warn!("{}", cause); + return Err(Fail::new(libc::EAGAIN, &cause)); + } + + // Check for error + self.check_error(api) + } + + pub fn return_buffers(&mut self) { + let mut idx: u32 = 0; + let available: u32 = self.tx_completion_ring.consumer_reserve(u32::MAX, &mut idx); + let mut returned: u32 = 0; + for i in 0..available { + let b: &MaybeUninit = self.tx_completion_ring.get_element(idx + i); + + // Safety: the integers in tx_completion_ring are initialized by the XDP runtime. + let buf_offset: u64 = unsafe { b.assume_init_read() }; + + // NB dropping the buffer returns it to the pool. + trace!("returning buffer at offset {}", buf_offset); + if let Err(e) = self.mem.borrow().rehydrate_buffer_offset(buf_offset) { + error!("failed to return buffer: {:?}", e); + } + + returned += 1; + } + + if returned > 0 { + trace!("returning {} buffers", returned); + self.tx_completion_ring.consumer_release(returned); + } } } diff --git a/src/rust/catpowder/win/ring/umemreg.rs b/src/rust/catpowder/win/ring/umemreg.rs index 7f796f77b..ad832022b 100644 --- a/src/rust/catpowder/win/ring/umemreg.rs +++ b/src/rust/catpowder/win/ring/umemreg.rs @@ -1,7 +1,23 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -use crate::runtime::libxdp; +use std::{ + mem::MaybeUninit, + num::{NonZeroU16, NonZeroU32, NonZeroUsize}, + ops::Range, + ptr::NonNull, +}; + +use windows::Win32::System::SystemInformation::{GetSystemInfo, SYSTEM_INFO}; + +use crate::{ + catpowder::win::{api::XdpApi, socket::XdpSocket}, + runtime::{ + fail::Fail, + libxdp, + memory::{BufferPool, DemiBuffer}, + }, +}; //====================================================================================================================== // Structures @@ -9,8 +25,10 @@ use crate::runtime::libxdp; /// A wrapper structure for a XDP user memory region. pub struct UmemReg { - _buffer: Vec, + _buffer: Vec>, + pool: BufferPool, umem: libxdp::XSK_UMEM_REG, + buf_offset_from_chunk: isize, } //====================================================================================================================== @@ -19,27 +37,179 @@ pub struct UmemReg { impl UmemReg { /// Creates a new XDP user memory region with `count` blocks of `chunk_size` bytes. - pub fn new(count: u32, chunk_size: u32) -> Self { - let total_size: u64 = count as u64 * chunk_size as u64; - let mut buffer: Vec = Vec::::with_capacity(total_size as usize); + pub fn new( + api: &mut XdpApi, + socket: &mut XdpSocket, + count: NonZeroU32, + chunk_size: NonZeroU16, + ) -> Result { + let pool: BufferPool = + BufferPool::new(chunk_size.get()).map_err(|_| Fail::new(libc::EINVAL, "bad buffer size"))?; + assert!(pool.pool().layout().size() >= chunk_size.get() as usize); + + let real_chunk_size: usize = pool.pool().layout().size(); + let headroom: usize = real_chunk_size - chunk_size.get() as usize; + let buf_offset_from_chunk: isize = + isize::try_from(headroom - BufferPool::overhead_bytes()).map_err(Fail::from)?; + + let page_size: NonZeroUsize = get_page_size(); + let align: usize = std::cmp::max(page_size.get(), pool.pool().layout().align()); + let total_size: u64 = count.get() as u64 * real_chunk_size as u64 + align as u64; + + trace!( + "creating umem region with {} blocks of {} bytes aligned to {} with headroom {} and DemiBuffer offset of {}", + count.get(), + real_chunk_size, + align, + headroom, + buf_offset_from_chunk + ); + let mut buffer: Vec> = Vec::new(); + buffer.resize(total_size as usize, MaybeUninit::uninit()); + let offset: usize = buffer.as_mut_ptr().align_offset(align); + let total_size: u64 = total_size - offset as u64; + + // Round down to the nearest multiple of the real chunk size. + let total_size: u64 = total_size - (total_size % real_chunk_size as u64); + + let buffer_ptr: NonNull<[MaybeUninit]> = NonNull::from(&mut buffer[offset..(offset + total_size as usize)]); + unsafe { pool.pool().populate(buffer_ptr, page_size)? }; + + if pool.pool().is_empty() { + return Err(Fail::new(libc::ENOMEM, "out of memory")); + } + + let headroom: u32 = u32::try_from(headroom).map_err(Fail::from)?; let umem: libxdp::XSK_UMEM_REG = libxdp::XSK_UMEM_REG { TotalSize: total_size, - ChunkSize: chunk_size, - Headroom: 0, - Address: buffer.as_mut_ptr() as *mut core::ffi::c_void, + ChunkSize: real_chunk_size as u32, + Headroom: headroom, + Address: buffer_ptr.as_ptr() as *mut core::ffi::c_void, }; - Self { _buffer: buffer, umem } + // Register the UMEM region. + trace!("registering umem region"); + socket.setsockopt( + api, + libxdp::XSK_SOCKOPT_UMEM_REG, + &umem as *const libxdp::XSK_UMEM_REG as *const core::ffi::c_void, + std::mem::size_of::() as u32, + )?; + + Ok(Self { + _buffer: buffer, + pool, + umem, + buf_offset_from_chunk, + }) } - /// Gets a reference to the underlying XDP user memory region. - pub fn as_ref(&self) -> &libxdp::XSK_UMEM_REG { - &self.umem + /// Get a buffer from the umem pool. + pub fn get_buffer(&self) -> Option { + DemiBuffer::new_in_pool(&self.pool) } /// Returns a raw pointer to the the start address of the user memory region. - pub fn address(&self) -> *mut core::ffi::c_void { - self.umem.Address + pub fn address(&self) -> NonNull { + // NB: non-nullness is validated by the constructor. + NonNull::new(self.umem.Address.cast::()).unwrap() + } + + /// Returns the region of memory that the umem region occupies. + pub fn region(&self) -> Range> { + let start: NonNull = self.address(); + let end: NonNull = unsafe { start.add(self.umem.TotalSize as usize) }; + + start..end + } + + /// Get the number of overhead bytes from a DemiBuffer returned by this instance. + pub fn overhead_bytes(&self) -> usize { + BufferPool::overhead_bytes() + } + + /// Determine if the data pointed to by a DemiBuffer is inside the umem region. + pub fn is_data_in_pool(&self, buf: &DemiBuffer) -> bool { + let data: NonNull = unsafe { NonNull::new_unchecked(buf.as_ptr() as *mut u8) }; + self.region().contains(&data) + } + + /// Same as `self.dehydrate_buffer(self.get_buffer()?)`, but returns only the base address of + /// the buffer. Useful for publishing to receive rings. + pub fn get_dehydrated_buffer(&self) -> Option { + let buf: DemiBuffer = self.get_buffer()?; + let desc: libxdp::XSK_BUFFER_DESCRIPTOR = self.dehydrate_buffer(buf); + assert!( + unsafe { desc.Address.__bindgen_anon_1.Offset() } + == (self.overhead_bytes() as u64 + self.buf_offset_from_chunk as u64) + ); + Some(unsafe { desc.Address.__bindgen_anon_1.BaseAddress() }) + } + + /// Dehydrates a DemiBuffer into a usize that can be rehydrated later. This operation consumes the DemiBuffer. + pub fn dehydrate_buffer(&self, buf: DemiBuffer) -> libxdp::XSK_BUFFER_DESCRIPTOR { + let data_len: usize = buf.len(); + let data: *const u8 = buf.as_ptr(); + let basis: NonNull = if buf.is_direct() { + buf.into_raw() + } else { + let direct: DemiBuffer = buf.into_direct(); + direct.into_raw() + }; + + let basis: NonNull = unsafe { basis.offset(-self.buf_offset_from_chunk) }; + + // Safety: MemoryPool guarantees that the DemiBuffer data is allocated from the allocated object pointed to + // by `self.address()`. + let base_address: usize = unsafe { basis.offset_from(self.address().cast::()) as usize }; + let offset: usize = unsafe { data.offset_from(basis.as_ptr()) as usize }; + + if (self.umem.TotalSize - self.umem.ChunkSize as u64) < base_address as u64 { + panic!("buffer {} not in region", base_address); + } + + let addr: libxdp::XSK_BUFFER_ADDRESS = unsafe { + let mut addr: libxdp::XSK_BUFFER_ADDRESS = std::mem::zeroed(); + addr.__bindgen_anon_1.set_BaseAddress(base_address as u64); + addr.__bindgen_anon_1.set_Offset(offset as u64); + addr + }; + + libxdp::XSK_BUFFER_DESCRIPTOR { + Address: addr, + Length: data_len as u32, + Reserved: 0, + } + } + + /// Rehydrates a buffer from an XSK_BUFFER_DESCRIPTOR that was previously dehydrated by `dehydrate_buffer`. + pub fn rehydrate_buffer_desc(&self, desc: &libxdp::XSK_BUFFER_DESCRIPTOR) -> Result { + self.rehydrate_buffer_offset(unsafe { desc.Address.__bindgen_anon_1.BaseAddress() }) + .and_then(|mut buf: DemiBuffer| -> Result { + buf.trim(buf.len().saturating_sub(desc.Length as usize))?; + Ok(buf) + }) } + + /// Rehydrates a buffer from a usize that was previously dehydrated by `dehydrate_buffer`. + pub fn rehydrate_buffer_offset(&self, offset: u64) -> Result { + let token: NonNull = unsafe { self.address().offset(isize::try_from(offset).map_err(Fail::from)?) }; + let demi_token: NonNull = unsafe { token.offset(self.buf_offset_from_chunk) }; + + Ok(unsafe { DemiBuffer::from_raw(demi_token) }) + } +} + +//====================================================================================================================== +// Functions +//====================================================================================================================== + +fn get_page_size() -> NonZeroUsize { + let mut si: SYSTEM_INFO = SYSTEM_INFO::default(); + + // Safety: `si` is allocated and aligned correctly for Windows API access. + unsafe { GetSystemInfo(&mut si as *mut SYSTEM_INFO) }; + + NonZeroUsize::new(si.dwPageSize as usize).expect("invariant violation from Windows API: zero page size") } diff --git a/src/rust/catpowder/win/runtime.rs b/src/rust/catpowder/win/runtime.rs index 5617c3793..622420453 100644 --- a/src/rust/catpowder/win/runtime.rs +++ b/src/rust/catpowder/win/runtime.rs @@ -8,7 +8,8 @@ use crate::{ catpowder::win::{ api::XdpApi, - ring::{RxRing, TxRing, XdpBuffer}, + ring::{RxRing, TxRing}, + socket::XdpSocket, }, demi_sgarray_t, demi_sgaseg_t, demikernel::config::Config, @@ -18,14 +19,20 @@ use crate::{ }, runtime::{ fail::Fail, - libxdp, + libxdp::{XSK_SOCKOPT_STATISTICS, XSK_STATISTICS}, memory::{DemiBuffer, MemoryRuntime}, Runtime, SharedObject, }, }; use arrayvec::ArrayVec; use libc::c_void; -use std::{borrow::BorrowMut, mem}; +use std::{ + borrow::BorrowMut, + mem, + sync::{Arc, Condvar, Mutex, MutexGuard}, + thread::JoinHandle, + time::Duration, +}; use windows::Win32::{ Foundation::ERROR_INSUFFICIENT_BUFFER, System::SystemInformation::{ @@ -47,19 +54,15 @@ struct CatpowderRuntimeInner { tx: TxRing, rx_rings: Vec, vf_rx_rings: Vec, + + exit_mtx: Arc>, + cnd_var: Arc, + thrd: Option>, } //====================================================================================================================== // Implementations //====================================================================================================================== impl SharedCatpowderRuntime { - /// - /// Number of buffers in the rings. - /// - /// **NOTE:** To introduce batch support (i.e., having more than one buffer in the ring), we need to revisit current - /// implementation. It is not all about just changing this constant. - /// - const RING_LENGTH: u32 = 1; - /// Instantiates a new XDP runtime. pub fn new(config: &Config) -> Result { let ifindex: u32 = config.local_interface_index()?; @@ -67,8 +70,34 @@ impl SharedCatpowderRuntime { trace!("Creating XDP runtime."); let mut api: XdpApi = XdpApi::new()?; + let (tx_buffer_count, tx_ring_size) = config.tx_buffer_config()?; + if !tx_ring_size.is_power_of_two() { + let cause: String = format!("rx_ring_size must be a power of two: {:?}", tx_ring_size); + return Err(Fail::new(libc::EINVAL, &cause)); + } + + if tx_buffer_count < tx_ring_size { + let cause: String = format!("tx_buffer_count must be greater than or equal to tx_ring_size"); + return Err(Fail::new(libc::EINVAL, &cause)); + } + + let (rx_buffer_count, rx_ring_size) = config.rx_buffer_config()?; + if !rx_ring_size.is_power_of_two() { + let cause: String = format!("rx_ring_size must be a power of two: {:?}", rx_ring_size); + return Err(Fail::new(libc::EINVAL, &cause)); + } + + if rx_buffer_count < rx_ring_size { + let cause: String = format!("rx_buffer_count must be greater than or equal to rx_ring_size"); + return Err(Fail::new(libc::EINVAL, &cause)); + } + + let mut sockets: Vec<(String, XdpSocket)> = Vec::new(); + // Open TX and RX rings - let tx: TxRing = TxRing::new(&mut api, Self::RING_LENGTH, ifindex, 0)?; + let always_poke: bool = config.xdp_always_poke_tx()?; + let tx: TxRing = TxRing::new(&mut api, tx_ring_size, tx_buffer_count, ifindex, 0, always_poke)?; + sockets.push((String::from("tx socket"), tx.socket().clone())); let cohost_mode = config.xdp_cohost_mode()?; let (tcp_ports, udp_ports) = if cohost_mode { @@ -79,25 +108,31 @@ impl SharedCatpowderRuntime { (vec![], vec![]) }; - let make_ring = |api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32| -> Result { - if cohost_mode { - RxRing::new_cohost( - api, - length, - ifindex, - queueid, - tcp_ports.as_slice(), - udp_ports.as_slice(), - ) - } else { - RxRing::new_redirect_all(api, length, ifindex, queueid) - } - }; + let make_ring = + |api: &mut XdpApi, length: u32, buf_count: u32, ifindex: u32, queueid: u32| -> Result { + if cohost_mode { + RxRing::new_cohost( + api, + length, + buf_count, + ifindex, + queueid, + tcp_ports.as_slice(), + udp_ports.as_slice(), + ) + } else { + RxRing::new_redirect_all(api, length, buf_count, ifindex, queueid) + } + }; let queue_count: u32 = deduce_rss_settings(&mut api, ifindex)?; let mut rx_rings: Vec = Vec::with_capacity(queue_count as usize); + for queueid in 0..queue_count { - rx_rings.push(make_ring(&mut api, Self::RING_LENGTH, ifindex, queueid as u32)?); + let mut ring: RxRing = make_ring(&mut api, rx_ring_size, rx_buffer_count, ifindex, queueid as u32)?; + ring.provide_buffers(); + sockets.push((format!("RX on if {} queue {}", ifindex, queueid), ring.socket().clone())); + rx_rings.push(ring); } trace!("Created {} RX rings on interface {}", rx_rings.len(), ifindex); @@ -106,7 +141,13 @@ impl SharedCatpowderRuntime { let vf_queue_count: u32 = deduce_rss_settings(&mut api, vf_if_index)?; let mut vf_rx_rings: Vec = Vec::with_capacity(vf_queue_count as usize); for queueid in 0..vf_queue_count { - vf_rx_rings.push(make_ring(&mut api, Self::RING_LENGTH, vf_if_index, queueid as u32)?); + let mut ring: RxRing = make_ring(&mut api, rx_ring_size, rx_buffer_count, vf_if_index, queueid as u32)?; + ring.provide_buffers(); + sockets.push(( + format!("RX on if {} queue {}", vf_if_index, queueid), + ring.socket().clone(), + )); + vf_rx_rings.push(ring); } trace!( "Created {} RX rings on VF interface {}.", @@ -119,11 +160,23 @@ impl SharedCatpowderRuntime { vec![] }; + let exit_mtx: Arc> = Arc::new(Mutex::new(false)); + let exit_mtx_clone: Arc> = exit_mtx.clone(); + let cnd_var: Arc = Arc::new(Condvar::new()); + let cnd_var_clone: Arc = cnd_var.clone(); + let api_2: XdpApi = XdpApi::new()?; + let thrd: JoinHandle<()> = std::thread::spawn(move || { + run_stats_thread(api_2, sockets, exit_mtx_clone, cnd_var_clone); + }); + Ok(Self(SharedObject::new(CatpowderRuntimeInner { api, tx, rx_rings, vf_rx_rings, + exit_mtx, + cnd_var, + thrd: Some(thrd), }))) } } @@ -139,104 +192,135 @@ impl PhysicalLayer for SharedCatpowderRuntime { return Err(Fail::new(libc::ENOTSUP, &cause)); } - let mut idx: u32 = 0; + let me: &mut CatpowderRuntimeInner = &mut self.0.borrow_mut(); + me.tx.return_buffers(); - if self.0.borrow_mut().tx.reserve_tx(Self::RING_LENGTH, &mut idx) != Self::RING_LENGTH { - let cause = format!("failed to reserve producer space for packet"); - warn!("{}", cause); - return Err(Fail::new(libc::EAGAIN, &cause)); - } + me.tx.transmit_buffer(&mut me.api, pkt)?; - let mut buf: XdpBuffer = self.0.borrow_mut().tx.get_buffer(idx, pkt_size); - buf.copy_from_slice(&pkt); - - self.0.borrow_mut().tx.submit_tx(Self::RING_LENGTH); - - // Notify socket. - let mut outflags: i32 = libxdp::XSK_NOTIFY_RESULT_FLAGS::default(); - let flags: i32 = - libxdp::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_POKE_TX | libxdp::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_WAIT_TX; - - if let Err(e) = self.0.borrow_mut().notify_socket(flags, u32::MAX, &mut outflags) { - let cause = format!("failed to notify socket: {:?}", e); - warn!("{}", cause); - return Err(Fail::new(libc::EAGAIN, &cause)); - } - - if self - .0 - .borrow_mut() - .tx - .reserve_tx_completion(Self::RING_LENGTH, &mut idx) - != Self::RING_LENGTH - { - let cause = format!("failed to send packet"); - warn!("{}", cause); - return Err(Fail::new(libc::EAGAIN, &cause)); - } - - self.0.borrow_mut().tx.release_tx_completion(Self::RING_LENGTH); Ok(()) } /// Polls for received packets. fn receive(&mut self) -> Result, Fail> { let mut ret: ArrayVec = ArrayVec::new(); - let mut idx: u32 = 0; - for rx in self.0.borrow_mut().rx_rings.iter_mut() { - if rx.reserve_rx(Self::RING_LENGTH, &mut idx) == Self::RING_LENGTH { - let xdp_buffer: XdpBuffer = rx.get_buffer(idx); - let dbuf: DemiBuffer = DemiBuffer::from_slice(&*xdp_buffer)?; - rx.release_rx(Self::RING_LENGTH); + let me: &mut CatpowderRuntimeInner = &mut self.0.borrow_mut(); + me.tx.return_buffers(); - ret.push(dbuf); + for rx in me.rx_rings.iter_mut() { + rx.provide_buffers(); + } - rx.reserve_rx_fill(Self::RING_LENGTH, &mut idx); - // NB for now there is only ever one element in the fill ring, so we don't have to - // change the ring contents. - rx.submit_rx_fill(Self::RING_LENGTH); + for rx in me.vf_rx_rings.iter_mut() { + rx.provide_buffers(); + } - if ret.is_full() { - break; - } + let mut queue: usize = 0; + for rx in me.rx_rings.iter_mut() { + let remaining: u32 = ret.remaining_capacity() as u32; + rx.process_rx(&mut me.api, remaining, |dbuf: DemiBuffer| { + trace!("receive(): non-VF, queue={}, pkt_size={:?}", queue, dbuf.len()); + ret.push(dbuf); + Ok(()) + })?; + + if ret.is_full() { + return Ok(ret); } + queue += 1; } - for rx in self.0.borrow_mut().vf_rx_rings.iter_mut() { - if rx.reserve_rx(Self::RING_LENGTH, &mut idx) == Self::RING_LENGTH { - let xdp_buffer: XdpBuffer = rx.get_buffer(idx); - let dbuf: DemiBuffer = DemiBuffer::from_slice(&*xdp_buffer)?; - rx.release_rx(Self::RING_LENGTH); - + queue = 0; + for rx in me.vf_rx_rings.iter_mut() { + let remaining: u32 = ret.remaining_capacity() as u32; + rx.process_rx(&mut me.api, remaining, |dbuf: DemiBuffer| { + trace!("receive(): VF, queue={}, pkt_size={:?}", queue, dbuf.len()); ret.push(dbuf); + Ok(()) + })?; - rx.reserve_rx_fill(Self::RING_LENGTH, &mut idx); - // NB for now there is only ever one element in the fill ring, so we don't have to - // change the ring contents. - rx.submit_rx_fill(Self::RING_LENGTH); - - if ret.is_full() { - break; - } + if ret.is_full() { + return Ok(ret); } + queue += 1; } Ok(ret) } } -impl CatpowderRuntimeInner { - /// Notifies the socket that there are packets to be transmitted. - fn notify_socket(&mut self, flags: i32, timeout: u32, outflags: &mut i32) -> Result<(), Fail> { - self.tx.notify_socket(&mut self.api, flags, timeout, outflags) - } -} - //====================================================================================================================== // Functions //====================================================================================================================== +fn run_stats_thread( + mut api: XdpApi, + mut sockets: Vec<(String, XdpSocket)>, + exit_mtx: Arc>, + cnd_var: Arc, +) { + const DEFAULT_STATS: XSK_STATISTICS = XSK_STATISTICS { + RxDropped: 0, + RxInvalidDescriptors: 0, + RxTruncated: 0, + TxInvalidDescriptors: 0, + }; + let mut stats: Vec = vec![DEFAULT_STATS; sockets.len()]; + + let mut exit_guard: MutexGuard<'_, bool> = exit_mtx.lock().unwrap(); + while !*exit_guard { + for (i, (name, socket)) in sockets.iter_mut().enumerate() { + if let Err(e) = update_stats(&mut api, name.as_str(), socket, &mut stats[i]) { + warn!("{}: Failed to update stats: {:?}", name, e); + } + } + + exit_guard = cnd_var.wait_timeout(exit_guard, Duration::from_secs(1)).unwrap().0; + } +} + +fn update_stats(api: &mut XdpApi, name: &str, socket: &mut XdpSocket, stats: &mut XSK_STATISTICS) -> Result<(), Fail> { + let mut new_stats: XSK_STATISTICS = unsafe { std::mem::zeroed() }; + let mut len: u32 = std::mem::size_of::() as u32; + socket.getsockopt( + api, + XSK_SOCKOPT_STATISTICS, + &mut new_stats as *mut _ as *mut c_void, + &mut len, + )?; + + if stats.RxDropped < new_stats.RxDropped { + warn!("{}: XDP RX dropped: {}", name, new_stats.RxDropped - stats.RxDropped); + } + + if stats.RxInvalidDescriptors < new_stats.RxInvalidDescriptors { + warn!( + "{}: XDP RX invalid descriptors: {}", + name, + new_stats.RxInvalidDescriptors - stats.RxInvalidDescriptors + ); + } + + if stats.RxTruncated < new_stats.RxTruncated { + warn!( + "{}: XDP RX truncated packets: {}", + name, + new_stats.RxTruncated - stats.RxTruncated + ); + } + + if stats.TxInvalidDescriptors < new_stats.TxInvalidDescriptors { + warn!( + "{}: XDP TX invalid descriptors: {}", + name, + new_stats.TxInvalidDescriptors - stats.TxInvalidDescriptors + ); + } + + *stats = new_stats; + Ok(()) +} + fn count_processor_cores() -> Result { let mut proc_info: SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX = SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX::default(); let mut buffer_len: u32 = 0; @@ -289,12 +373,13 @@ fn count_processor_cores() -> Result { /// Deduces the RSS settings for the given interface. Returns the number of valid RSS queues for the interface. fn deduce_rss_settings(api: &mut XdpApi, ifindex: u32) -> Result { const DUMMY_QUEUE_LENGTH: u32 = 1; + const DUMMY_BUFFER_COUNT: u32 = 1; let sys_proc_count: u32 = count_processor_cores()? as u32; // NB there will always be at least one queue available, hence starting the loop at 1. There should not be more // queues than the number of processors on the system. for queueid in 1..sys_proc_count { - match TxRing::new(api, DUMMY_QUEUE_LENGTH, ifindex, queueid) { + match TxRing::new(api, DUMMY_QUEUE_LENGTH, DUMMY_BUFFER_COUNT, ifindex, queueid, false) { Ok(_) => (), Err(e) => { warn!( @@ -332,9 +417,18 @@ impl MemoryRuntime for SharedCatpowderRuntime { return Err(Fail::new(libc::EINVAL, "size too large for a single demi_sgaseg_t")); } - // First allocate the underlying DemiBuffer. - // Always allocate with header space for now even if we do not need it. - let buf: DemiBuffer = DemiBuffer::new_with_headroom(size as u16, MAX_HEADER_SIZE as u16); + // Allocate buffer from sender pool. + let mut buf: DemiBuffer = match self.0.tx.get_buffer() { + None => return Err(Fail::new(libc::ENOBUFS, "out of buffers")), + Some(buf) => buf, + }; + + if size > buf.len() - MAX_HEADER_SIZE { + return Err(Fail::new(libc::EINVAL, "size too large for buffer")); + } + + // Reserve space for headers. + buf.adjust(MAX_HEADER_SIZE).expect("buffer size invariant violation"); // Create a scatter-gather segment to expose the DemiBuffer to the user. let data: *const u8 = buf.as_ptr(); @@ -355,3 +449,16 @@ impl MemoryRuntime for SharedCatpowderRuntime { /// Runtime trait implementation for XDP Runtime. impl Runtime for SharedCatpowderRuntime {} + +impl Drop for CatpowderRuntimeInner { + fn drop(&mut self) { + if let Some(thrd) = self.thrd.take() { + if let Ok(mut guard) = self.exit_mtx.lock() { + *guard = true; + std::mem::drop(guard); + self.cnd_var.notify_all(); + let _ = thrd.join(); + } + } + } +} diff --git a/src/rust/catpowder/win/socket.rs b/src/rust/catpowder/win/socket.rs index 108dde819..176302190 100644 --- a/src/rust/catpowder/win/socket.rs +++ b/src/rust/catpowder/win/socket.rs @@ -13,6 +13,10 @@ use ::windows::{ core::{Error, HRESULT}, Win32::{Foundation, Foundation::HANDLE}, }; +use windows::Win32::{ + Foundation::{DUPLICATE_SAME_ACCESS, FALSE}, + System::Threading::GetCurrentProcess, +}; //====================================================================================================================== // Structures @@ -174,3 +178,13 @@ impl Drop for XdpSocket { } } } + +impl Clone for XdpSocket { + fn clone(&self) -> Self { + let mut dup: HANDLE = self.0; + let proc: HANDLE = unsafe { GetCurrentProcess() }; + unsafe { Foundation::DuplicateHandle(proc, self.0, proc, &mut dup, 0, FALSE, DUPLICATE_SAME_ACCESS) } + .expect("clone(): failed to duplicate handle"); + Self(dup) + } +} diff --git a/src/rust/demikernel/config.rs b/src/rust/demikernel/config.rs index afe160ab8..21d79e7f3 100644 --- a/src/rust/demikernel/config.rs +++ b/src/rust/demikernel/config.rs @@ -5,6 +5,7 @@ // Imports //====================================================================================================================== +#[cfg(all(feature = "catpowder-libos", target_os = "windows"))] use crate::{pal::KeepAlive, runtime::fail::Fail, MacAddress}; #[cfg(any(feature = "catnip-libos"))] use ::std::ffi::CString; @@ -65,6 +66,11 @@ mod raw_socket_config { #[cfg(target_os = "windows")] pub const LOCAL_INTERFACE_INDEX: &str = "xdp_interface_index"; + // Whether XDP should always poke the TX ring, versus only poking when the ring flags indicate + // to do so. + #[cfg(target_os = "windows")] + pub const XDP_ALWAYS_POKE_TX: &str = "xdp_always_poke_tx"; + // N.B. hyper-V VMs can have both NetVSC and VF interfaces working in tandem, in which case // we need to listen to the corresponding VF interface as well. #[cfg(target_os = "windows")] @@ -82,6 +88,18 @@ mod raw_socket_config { // UDP ports for XDP to redirect when cohosting. #[cfg(target_os = "windows")] pub const XDP_UDP_PORTS: &str = "xdp_udp_ports"; + + // Buffer counts for XDP backend. + #[cfg(target_os = "windows")] + pub const RX_BUFFER_COUNT: &str = "rx_buffer_count"; + #[cfg(target_os = "windows")] + pub const TX_BUFFER_COUNT: &str = "tx_buffer_count"; + + // Ring sizes for XDP backend. + #[cfg(target_os = "windows")] + pub const RX_RING_SIZE: &str = "rx_ring_size"; + #[cfg(target_os = "windows")] + pub const TX_RING_SIZE: &str = "tx_ring_size"; } //====================================================================================================================== @@ -258,30 +276,17 @@ impl Config { } pub fn arp_cache_ttl(&self) -> Result { - let ttl: u64 = if let Some(ttl) = Self::get_typed_env_option(inetstack_config::ARP_CACHE_TTL)? { - ttl - } else { - Self::get_int_option(self.get_inetstack_config()?, inetstack_config::ARP_CACHE_TTL)? - }; - Ok(Duration::from_secs(ttl)) + self.get_int_env_or_option(inetstack_config::ARP_CACHE_TTL, Self::get_inetstack_config) + .map(|ttl: u64| Duration::from_secs(ttl)) } pub fn arp_request_timeout(&self) -> Result { - let timeout: u64 = if let Some(timeout) = Self::get_typed_env_option(inetstack_config::ARP_REQUEST_TIMEOUT)? { - timeout - } else { - Self::get_int_option(self.get_inetstack_config()?, inetstack_config::ARP_REQUEST_TIMEOUT)? - }; - Ok(Duration::from_secs(timeout)) + self.get_int_env_or_option(inetstack_config::ARP_REQUEST_TIMEOUT, Self::get_inetstack_config) + .map(|timeout: u64| Duration::from_secs(timeout)) } pub fn arp_request_retries(&self) -> Result { - let retries: usize = if let Some(retries) = Self::get_typed_env_option(inetstack_config::ARP_REQUEST_RETRIES)? { - retries - } else { - Self::get_int_option(self.get_inetstack_config()?, inetstack_config::ARP_REQUEST_RETRIES)? - }; - Ok(retries) + self.get_int_env_or_option(inetstack_config::ARP_REQUEST_RETRIES, Self::get_inetstack_config) } #[cfg(all(feature = "catpowder-libos", target_os = "linux"))] @@ -304,11 +309,37 @@ impl Config { /// Global config: Reads the "local interface index" parameter from the environment variable and then the underlying /// configuration file. pub fn local_interface_index(&self) -> Result { - // Parse local MAC address. - if let Some(addr) = Self::get_typed_env_option(raw_socket_config::LOCAL_INTERFACE_INDEX)? { - Ok(addr) + self.get_int_env_or_option(raw_socket_config::LOCAL_INTERFACE_INDEX, Self::get_raw_socket_config) + } + + #[cfg(all(feature = "catpowder-libos", target_os = "windows"))] + /// Global config: Reads the "rx_buffer_count" and "rx_ring_size" parameters from the environment variable and + /// then the underlying configuration file. Returns the tuple (buffer count, ring size). + pub fn rx_buffer_config(&self) -> Result<(u32, u32), Fail> { + let rx_buffer_count: u32 = + self.get_int_env_or_option(raw_socket_config::RX_BUFFER_COUNT, Self::get_raw_socket_config)?; + let rx_ring_size: u32 = + self.get_int_env_or_option(raw_socket_config::RX_RING_SIZE, Self::get_raw_socket_config)?; + Ok((rx_buffer_count, rx_ring_size)) + } + + #[cfg(all(feature = "catpowder-libos", target_os = "windows"))] + /// Global config: Reads the "rx_buffer_count" and "rx_ring_size" parameters from the environment variable and + /// then the underlying configuration file. Returns the tuple (buffer count, ring size). + pub fn tx_buffer_config(&self) -> Result<(u32, u32), Fail> { + let tx_buffer_count: u32 = + self.get_int_env_or_option(raw_socket_config::TX_BUFFER_COUNT, Self::get_raw_socket_config)?; + let tx_ring_size: u32 = + self.get_int_env_or_option(raw_socket_config::TX_RING_SIZE, Self::get_raw_socket_config)?; + Ok((tx_buffer_count, tx_ring_size)) + } + + #[cfg(all(feature = "catpowder-libos", target_os = "windows"))] + pub fn xdp_always_poke_tx(&self) -> Result { + if let Some(always_poke) = Self::get_typed_env_option(raw_socket_config::XDP_ALWAYS_POKE_TX)? { + Ok(always_poke) } else { - Self::get_int_option(self.get_raw_socket_config()?, raw_socket_config::LOCAL_INTERFACE_INDEX) + Self::get_bool_option(self.get_raw_socket_config()?, raw_socket_config::XDP_ALWAYS_POKE_TX) } } @@ -391,11 +422,7 @@ impl Config { } pub fn mtu(&self) -> Result { - if let Some(addr) = Self::get_typed_env_option(inetstack_config::MTU)? { - Ok(addr) - } else { - Self::get_int_option(self.get_inetstack_config()?, inetstack_config::MTU) - } + self.get_int_env_or_option(inetstack_config::MTU, Self::get_inetstack_config) } pub fn mss(&self) -> Result { @@ -501,6 +528,17 @@ impl Config { } } + fn get_int_env_or_option(&self, index: &str, resolve_yaml: Fn) -> Result + where + T: TryFrom + FromStr, + for<'a> Fn: FnOnce(&'a Self) -> Result<&'a Yaml, Fail>, + { + match Self::get_typed_env_option(index)? { + Some(val) => Ok(val), + None => Self::get_int_option(resolve_yaml(self)?, index), + } + } + /// Same as `Self::require_typed_option` using `Yaml::as_bool` as the receiver. fn get_bool_option(yaml: &Yaml, index: &str) -> Result { Self::get_typed_option(yaml, index, &Yaml::as_bool) diff --git a/src/rust/runtime/fail.rs b/src/rust/runtime/fail.rs index 5fcecc096..748b45c7c 100644 --- a/src/rust/runtime/fail.rs +++ b/src/rust/runtime/fail.rs @@ -66,3 +66,9 @@ impl From for Fail { } } } + +impl From for Fail { + fn from(_: std::num::TryFromIntError) -> Self { + Fail::new(libc::ERANGE, "integer conversion error") + } +} diff --git a/src/rust/runtime/memory/buffer_pool.rs b/src/rust/runtime/memory/buffer_pool.rs index 664daa0a7..ce57f1e57 100644 --- a/src/rust/runtime/memory/buffer_pool.rs +++ b/src/rust/runtime/memory/buffer_pool.rs @@ -9,7 +9,10 @@ use std::{alloc::LayoutError, num::NonZeroUsize, rc::Rc}; use crate::{ pal::CPU_DATA_CACHE_LINE_SIZE_IN_BYTES, - runtime::memory::{demibuffer::MetaData, memory_pool::MemoryPool}, + runtime::memory::{ + demibuffer::{DemiBuffer, MetaData, Tag}, + memory_pool::MemoryPool, + }, }; //====================================================================================================================== @@ -37,6 +40,12 @@ impl BufferPool { pub fn pool(&self) -> &Rc { &self.0 } + + /// Get the number of bytes of overhead between the user data and the pointer returned by DemiBuffer::into_raw. + /// Useful for passing buffer pointers to and from other APIs. + pub fn overhead_bytes() -> usize { + DemiBuffer::metadata_size(Tag::Heap) + } } // Unit tests for `BufferPool` type. diff --git a/src/rust/runtime/memory/demibuffer.rs b/src/rust/runtime/memory/demibuffer.rs index d2eac9619..3b89c5f5b 100644 --- a/src/rust/runtime/memory/demibuffer.rs +++ b/src/rust/runtime/memory/demibuffer.rs @@ -248,7 +248,7 @@ impl MetaData { // Since our MetaData structure is 64-byte aligned, the lower 6 bits of a pointer to it are guaranteed to be zero. // We currently only use the lower 2 of those bits to hold the type tag. #[derive(PartialEq)] -enum Tag { +pub(super) enum Tag { Heap = 1, #[cfg(feature = "libdpdk")] Dpdk = 2, @@ -491,6 +491,21 @@ impl DemiBuffer { // Public Functions // ---------------- + // Determine if a DemiBuffer was allocted from a specific BufferPool. + pub fn is_from_pool(&self, pool: &BufferPool) -> bool { + match self.get_tag() { + Tag::Heap => { + if let Some(check) = self.as_metadata().pool.as_ref() { + Rc::ptr_eq(&check, pool.pool()) + } else { + false + } + }, + #[cfg(feature = "libdpdk")] + Tag::Dpdk => false, + } + } + /// Returns `true` if this `DemiBuffer` was allocated off of the heap, and `false` otherwise. pub fn is_heap_allocated(&self) -> bool { self.get_tag() == Tag::Heap @@ -508,6 +523,69 @@ impl DemiBuffer { self.as_metadata().data_len as usize } + /// Returns the amount of headroom available in the packet. + pub fn headroom(&self) -> u16 { + self.as_metadata().data_off + } + + /// Returns a flag indicating whether this DemiBuffer is direct, i.e., that this instance is + /// not a clone of some other DemiBuffer. + pub fn is_direct(&self) -> bool { + self.as_metadata().ol_flags & METADATA_F_INDIRECT == 0 + } + + /// Get the direct DemiBuffer from an indirect DemiBuffer. + pub fn into_direct(self) -> DemiBuffer { + if self.is_direct() { + return self; + } + + match self.get_tag() { + Tag::Heap => { + let metadata: &mut MetaData = self.as_metadata(); + + // Step 1: get the direct buffer. + let offset: isize = -(size_of::() as isize); + let direct: &mut MetaData = unsafe { + // Safety: The offset call is safe as `offset` is known to be "in bounds" for buf_addr. + // Safety: The as_mut call is safe as the pointer is aligned, dereferenceable, and + // points to an initialized MetaData instance. + // The returned address is known to be non-Null, so the unwrap call will never panic. + metadata.buf_addr.offset(offset).cast::().as_mut().unwrap() + }; + + // Step 2: increment the reference count of the direct buffer. + direct.inc_refcnt(); + + // Step 3: detach the indirect buffer. + metadata.buf_addr = null_mut(); + metadata.buf_len = 0; + metadata.ol_flags = metadata.ol_flags & !METADATA_F_INDIRECT; + + // Step 4: reconstitute the direct DemiBuffer. + let direct: NonNull = NonNull::from(direct); + let tagged: NonNull = direct.with_addr(direct.addr() | Tag::Heap); + + unsafe { DemiBuffer::from_raw(tagged.cast()) } + }, + + #[cfg(feature = "libdpdk")] + Tag::Dpdk => { + // Step 1: get the direct buffer. + let direct: *mut rte_mbuf = rte_mbuf_from_indirect(self.as_mbuf()); + + // Step 2: incrememnt the reference count of the direct buffer. + rte_mbuf_refcnt_update(direct, 1); + + // Step 3: detach the indirect buffer. + rte_pktmbuf_detach(self.as_mbuf()); + + // Step 4: reconstitute the direct DemiBuffer. + return DemiBuffer::from_mbuf(direct); + }, + } + } + /// Removes `nbytes` bytes from the beginning of the `DemiBuffer` chain. // Note: If `nbytes` is greater than the length of the first segment in the chain, then this function will fail and // return an error, rather than remove the remaining bytes from subsequent segments in the chain. This is to match @@ -753,6 +831,12 @@ impl DemiBuffer { // Internal Functions // ------------------ + /// Returns the number of bytes subtracted from the beginning of the allocated buffer returned when calling + /// `into_raw`. + pub(super) fn metadata_size(tag: Tag) -> usize { + size_of::() - tag as usize + } + // Gets the tag containing the type of DemiBuffer. #[inline] fn get_tag(&self) -> Tag {