Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jitter #1112

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/test/tests/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use quilkin::{
xdp::{self, packet::net_types as nt},
},
},
time::UtcTimestamp,
};
use std::{
collections::BTreeSet,
Expand Down Expand Up @@ -66,6 +67,7 @@ async fn simple_forwarding() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY.ip(),
local_ipv6: Ipv6Addr::from_bits(0),
last_receive: UtcTimestamp::now(),
};

let data = [0xf0u8; 11];
Expand Down Expand Up @@ -150,6 +152,7 @@ async fn changes_ip_version() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY4.ip(),
local_ipv6: *PROXY6.ip(),
last_receive: UtcTimestamp::now(),
};

let data = [0xf1u8; 11];
Expand Down Expand Up @@ -283,6 +286,7 @@ async fn packet_manipulation() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY.ip(),
local_ipv6: Ipv6Addr::from_bits(0),
last_receive: UtcTimestamp::now(),
};

let data = [0xf1u8; 11];
Expand Down Expand Up @@ -346,6 +350,7 @@ async fn packet_manipulation() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY.ip(),
local_ipv6: Ipv6Addr::from_bits(0),
last_receive: UtcTimestamp::now(),
};

let data = [0xf1u8; 11];
Expand Down Expand Up @@ -417,6 +422,7 @@ async fn packet_manipulation() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY.ip(),
local_ipv6: Ipv6Addr::from_bits(0),
last_receive: UtcTimestamp::now(),
};

let mut client_packet = unsafe { umem.alloc().unwrap() };
Expand Down Expand Up @@ -511,6 +517,7 @@ async fn multiple_servers() {
sessions: Arc::new(Default::default()),
local_ipv4: Ipv4Addr::from_bits(0),
local_ipv6: *PROXY.ip(),
last_receive: UtcTimestamp::now(),
};

let mut umem = xdp::Umem::map(
Expand Down Expand Up @@ -588,6 +595,7 @@ async fn many_sessions() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY.ip(),
local_ipv6: Ipv6Addr::from_bits(0),
last_receive: UtcTimestamp::now(),
};

let data = [0xf0u8; 11];
Expand Down Expand Up @@ -717,6 +725,7 @@ async fn frees_dropped_packets() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY4.ip(),
local_ipv6: *PROXY6.ip(),
last_receive: UtcTimestamp::now(),
};

let data = [0xf0u8; 11];
Expand Down Expand Up @@ -812,6 +821,7 @@ async fn qcmp() {
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY.ip(),
local_ipv6: Ipv6Addr::from_bits(0),
last_receive: UtcTimestamp::now(),
};

let mut umem = xdp::Umem::map(
Expand Down
3 changes: 3 additions & 0 deletions src/net/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ pub fn spawn(workers: XdpWorkers, config: Arc<crate::Config>) -> Result<XdpLoop,
const BATCH_SIZE: usize = 64;
use xdp::packet::net_types::NetworkU16;

use crate::time::UtcTimestamp;

/// The core I/O loop
///
/// All of the ring operations are done in this loop so that the actual
Expand Down Expand Up @@ -393,6 +395,7 @@ fn io_loop(
sessions,
local_ipv4,
local_ipv6,
last_receive: UtcTimestamp::now(),
};

let mut rx_slab = xdp::HeapSlab::with_capacity(BATCH_SIZE);
Expand Down
17 changes: 16 additions & 1 deletion src/net/xdp/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
maxmind_db::{self, IpNetEntry},
EndpointAddress,
},
time::UtcTimestamp,
};
pub use quilkin_xdp::xdp;
use quilkin_xdp::xdp::{
Expand Down Expand Up @@ -114,6 +115,7 @@ pub struct State {
pub sessions: Arc<SessionState>,
pub local_ipv4: std::net::Ipv4Addr,
pub local_ipv6: std::net::Ipv6Addr,
pub last_receive: UtcTimestamp,
}

impl State {
Expand Down Expand Up @@ -440,6 +442,11 @@ pub fn process_packets(
let filters = state.config.filters.load();
let cm = state.config.clusters.clone_value();

let now = UtcTimestamp::now();
let jitter = (now - state.last_receive).nanos();
state.last_receive = now;
let mut had_read = false;

while let Some(inner) = rx_slab.pop_front() {
let Ok(Some(udp)) = UdpPacket::parse_packet(&inner) else {
unreachable!("we somehow got a non-UDP packet, this should be impossible with the eBPF program we use to route packets");
Expand All @@ -452,6 +459,7 @@ pub fn process_packets(

let is_client = udp.dst_port == state.external_port;
let direction = if is_client {
had_read = true;
metrics::READ
} else {
metrics::WRITE
Expand All @@ -465,7 +473,7 @@ pub fn process_packets(
if is_client {
process_client_packet(packet, umem, &filters, &cm, state, tx_slab)
} else {
process_server_packet(packet, umem, &filters, state, tx_slab)
process_server_packet(packet, umem, &filters, state, tx_slab, jitter)
}
};

Expand All @@ -483,6 +491,10 @@ pub fn process_packets(
}
}
}

if had_read {
metrics::packet_jitter(metrics::READ, &metrics::EMPTY).set(jitter);
}
}

#[inline]
Expand Down Expand Up @@ -630,6 +642,7 @@ fn process_server_packet(
filters: &crate::filters::FilterChain,
state: &mut State,
tx_slab: &mut HeapSlab,
jitter: i64,
) -> Result<Option<Packet>, (PipelineError, Packet)> {
let server_addr = SocketAddr::new(packet.udp.ips.source(), packet.udp.src_port.host());

Expand All @@ -638,6 +651,8 @@ fn process_server_packet(
return Ok(Some(packet.inner));
};

metrics::packet_jitter(metrics::Direction::Write, &asn).set(jitter);

let mut ctx = filters::WriteContext::new(server_addr.into(), client_addr.into(), packet);

let mut packet = match filters.write(&mut ctx) {
Expand Down
Loading