Skip to content

Commit

Permalink
Add better network limits for editor.
Browse files Browse the repository at this point in the history
Add compression to editor network.
  • Loading branch information
Jupeyy committed Jan 15, 2025
1 parent 93cd17d commit a077667
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 23 deletions.
2 changes: 1 addition & 1 deletion game/editor/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl NetworkEventToGameEventGenerator for EditorEventGenerator {
) {
let msg = bincode::serde::decode_from_slice::<EditorEvent, _>(
bytes,
bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(),
bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(),
);
if let Ok((msg, _)) = msg {
self.events
Expand Down
32 changes: 25 additions & 7 deletions game/editor/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use base::system::System;
use network::network::{
connection::NetworkConnectionId,
event::NetworkEvent,
packet_compressor::{types::DecompressionByteLimit, DefaultNetworkPacketCompressor},
plugins::NetworkPlugins,
quinn_network::QuinnNetwork,
types::NetworkInOrderChannel,
types::{
NetworkClientCertCheckMode, NetworkClientCertMode, NetworkClientInitOptions,
NetworkServerCertAndKey, NetworkServerCertMode, NetworkServerCertModeResult,
NetworkServerInitOptions,
NetworkInOrderChannel, NetworkServerCertAndKey, NetworkServerCertMode,
NetworkServerCertModeResult, NetworkServerInitOptions,
},
utils::create_certifified_keys,
};
Expand Down Expand Up @@ -46,8 +47,16 @@ impl EditorNetwork {
NetworkServerInitOptions::new()
.with_max_thread_count(6)
.with_timeout(Duration::from_secs(120))
.with_stream_receive_window(1024 * 1024 * 1024),
Default::default(),
.with_stream_receive_window(1024 * 1024 * 1024)
.with_receive_window(1024 * 1024 * 1024)
.with_send_window(1024 * 1024 * 1024),
NetworkPlugins {
packet_plugins: Arc::new(vec![Arc::new(
DefaultNetworkPacketCompressor::new()
.with_limit(DecompressionByteLimit::OneGigaByte),
)]),
..Default::default()
},
)
.unwrap();
let port = addr.port();
Expand Down Expand Up @@ -80,8 +89,17 @@ impl EditorNetwork {
private_key: client_private_key,
},
)
.with_timeout(Duration::from_secs(120)),
Default::default(),
.with_timeout(Duration::from_secs(120))
.with_stream_receive_window(1024 * 1024 * 1024)
.with_receive_window(1024 * 1024 * 1024)
.with_send_window(1024 * 1024 * 1024),
NetworkPlugins {
packet_plugins: Arc::new(vec![Arc::new(
DefaultNetworkPacketCompressor::new()
.with_limit(DecompressionByteLimit::OneGigaByte),
)]),
..Default::default()
},
server_addr,
)
.unwrap()
Expand Down
36 changes: 29 additions & 7 deletions lib/network/src/network/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
mut recv_stream: Vec<u8>,
debug_printing: bool,
packet_plugins: &Arc<Vec<Arc<dyn NetworkPluginPacket>>>,
stream_receive_window: Option<u32>,
) {
let timestamp = sys.time_get();

Expand All @@ -161,10 +162,24 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
}
}

let res_packet = bincode::serde::decode_from_slice::<NetworkPacket, _>(
recv_stream.as_slice(),
bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(),
);
let stream_window = stream_receive_window.unwrap_or(1024 * 1024 * 4) as usize;
let res_packet = if stream_window > 1024 * 1024 * 128 {
bincode::serde::decode_from_slice::<NetworkPacket, _>(
recv_stream.as_slice(),
bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(),
)
} else if stream_window > 1024 * 1024 * 4 {
bincode::serde::decode_from_slice::<NetworkPacket, _>(
recv_stream.as_slice(),
bincode::config::standard().with_limit::<{ 1024 * 1024 * 128 }>(),
)
} else {
bincode::serde::decode_from_slice::<NetworkPacket, _>(
recv_stream.as_slice(),
bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(),
)
};

if let Ok((res_packet, handled_size)) = res_packet {
let remaining_size = recv_stream.len() - handled_size;
if remaining_size > 0 && debug_printing {
Expand All @@ -187,6 +202,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
sys: Arc<SystemTime>,
debug_printing: bool,
packet_plugins: Arc<Vec<Arc<dyn NetworkPluginPacket>>>,
stream_receive_window: Option<u32>,
) -> anyhow::Result<()> {
'conn_loop: loop {
let connection = &connection_async.conn;
Expand All @@ -200,6 +216,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
recv_stream,
debug_printing,
&packet_plugins,
stream_receive_window,
)
.await;
}
Expand All @@ -221,6 +238,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
sys: Arc<SystemTime>,
debug_printing: bool,
packet_plugins: Arc<Vec<Arc<dyn NetworkPluginPacket>>>,
stream_receive_window: Option<u32>,
) -> anyhow::Result<()> {
'conn_loop: loop {
let game_ev_gen_clone = game_event_generator.clone();
Expand All @@ -239,6 +257,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
data,
debug_printing,
&packet_plugins,
stream_receive_window,
)
.await;
}
Expand Down Expand Up @@ -273,6 +292,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
sys: Arc<SystemTime>,
debug_printing: bool,
packet_plugins: Arc<Vec<Arc<dyn NetworkPluginPacket>>>,
stream_receive_window: Option<u32>,
) -> anyhow::Result<()> {
'conn_loop: loop {
let game_ev_gen_clone = game_event_generator.clone();
Expand All @@ -294,6 +314,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
data,
debug_printing,
&packet_plugins,
stream_receive_window,
)
.await;
}
Expand Down Expand Up @@ -358,6 +379,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
debug_printing: bool,
packet_plugins: &Arc<Vec<Arc<dyn NetworkPluginPacket>>>,
connection_plugins: &Arc<Vec<Arc<dyn NetworkPluginConnection>>>,
stream_receive_window: Option<u32>,
) -> tokio::task::JoinHandle<()> {
let remote_addr = conn.remote_addr();
log::debug!("handling connecting request for {:?}", remote_addr);
Expand Down Expand Up @@ -513,12 +535,12 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3
let res = tokio::select! {
res = Self::handle_connection_recv_unordered_reliable(
connection.clone(), game_event_generator_clone.clone(), connection_identifier, sys.clone(),
debug_printing, packet_plugins.clone()) => {res}
debug_printing, packet_plugins.clone(), stream_receive_window) => {res}
res = Self::handle_connection_recv_ordered_reliable(
connection.clone(), game_event_generator_clone.clone(), connection_identifier, sys.clone(),
debug_printing, packet_plugins.clone()) => {res}
debug_printing, packet_plugins.clone(), stream_receive_window) => {res}
res = Self::handle_connection_recv_unordered_unreliable(connection.clone(), game_event_generator_clone.clone(),
connection_identifier, sys.clone(), debug_printing, packet_plugins.clone()) => {res}
connection_identifier, sys.clone(), debug_printing, packet_plugins.clone(), stream_receive_window) => {res}
res = Self::ping(sys.clone(), game_event_generator_clone.clone(), connection.clone(), &connection_identifier, &mut ping_interval) => {res}
};

Expand Down
7 changes: 7 additions & 0 deletions lib/network/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ where
sys: sys.time.clone(),
is_debug: debug_printing,
packet_pool: pool.clone(),

stream_receive_window: options.base.stream_receive_window,

plugins,

_z: PhantomData,
Expand Down Expand Up @@ -295,6 +298,9 @@ where
let is_debug = network_thread.is_debug;
let packet_plugins = network_thread.plugins.packet_plugins.clone();
let connection_plugins = network_thread.plugins.connection_plugins.clone();

let stream_receive_window = network_thread.stream_receive_window;

if is_server {
tokio::spawn(async move {
log::debug!("server: starting to accept connections");
Expand All @@ -321,6 +327,7 @@ where
is_debug,
&packet_plugins,
&connection_plugins,
stream_receive_window,
)
.await;
}
Expand Down
7 changes: 7 additions & 0 deletions lib/network/src/network/network_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub struct NetworkAsync<E, C: Send + Sync, Z, I, const TY: u32> {
pub(crate) is_debug: bool,
pub(crate) packet_pool: Pool<Vec<u8>>,

pub(crate) stream_receive_window: Option<u32>,

// plugins
pub(crate) plugins: NetworkPlugins,

Expand Down Expand Up @@ -109,6 +111,9 @@ where
sys: sys.time.clone(),
is_debug: debug_printing,
packet_pool: pool.clone(),

stream_receive_window: options.base.stream_receive_window,

plugins,

_z: PhantomData,
Expand All @@ -134,6 +139,7 @@ where
let is_debug = self.is_debug;
let packet_plugins = self.plugins.packet_plugins.clone();
let connection_plugins = self.plugins.connection_plugins.clone();
let stream_receive_window = self.stream_receive_window;
// handle the connect sync (since it's client side only)
if let Err(err) =
tokio::runtime::Handle::current().block_on(tokio::spawn(async move {
Expand All @@ -148,6 +154,7 @@ where
is_debug,
&packet_plugins,
&connection_plugins,
stream_receive_window
)
.await => Err(anyhow!("{:?}", res)),
res = async move {
Expand Down
33 changes: 29 additions & 4 deletions lib/network/src/network/packet_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use anyhow::anyhow;
use async_trait::async_trait;
use header::CompressHeader;
use pool::mt_pool::Pool;
use types::DecompressionByteLimit;

#[cfg(feature = "brotli")]
pub mod brotli;

pub mod header;
pub mod types;

use super::{connection::NetworkConnectionId, plugins::NetworkPluginPacket};

Expand All @@ -20,6 +22,8 @@ pub struct ZstdNetworkPacketCompressor {

send_dict: Option<Vec<u8>>,
recv_dict: Option<Vec<u8>>,

limit: DecompressionByteLimit,
}

impl Default for ZstdNetworkPacketCompressor {
Expand All @@ -37,6 +41,8 @@ impl ZstdNetworkPacketCompressor {
.build(),
send_dict: None,
recv_dict: None,

limit: Default::default(),
}
}

Expand All @@ -48,8 +54,15 @@ impl ZstdNetworkPacketCompressor {
.build(),
send_dict: Some(send_dict),
recv_dict: Some(recv_dict),

limit: Default::default(),
}
}

pub fn with_limit(mut self, limit: DecompressionByteLimit) -> Self {
self.limit = limit;
self
}
}

#[async_trait]
Expand Down Expand Up @@ -93,10 +106,22 @@ impl NetworkPluginPacket for ZstdNetworkPacketCompressor {
_id: &NetworkConnectionId,
buffer: &mut Vec<u8>,
) -> anyhow::Result<()> {
let (header, read_size) = bincode::serde::decode_from_slice::<CompressHeader, _>(
buffer,
bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(),
)?;
let (header, read_size) = match self.limit {
DecompressionByteLimit::FourMegaBytes => {
bincode::serde::decode_from_slice::<CompressHeader, _>(
buffer,
// use a high limit, since the packet size is already limited by the stream window
bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(),
)?
}
DecompressionByteLimit::OneGigaByte => {
bincode::serde::decode_from_slice::<CompressHeader, _>(
buffer,
// use a high limit, since the packet size is already limited by the stream window
bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(),
)?
}
};

if header.is_compressed {
let mut helper = self.helper_pool.new();
Expand Down
27 changes: 23 additions & 4 deletions lib/network/src/network/packet_compressor/brotli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use super::header::CompressHeader;
#[derive(Debug)]
pub struct BrotliNetworkPacketCompressor {
helper_pool: Pool<Vec<u8>>,

limit: DecompressionByteLimit,
}

impl Default for BrotliNetworkPacketCompressor {
Expand All @@ -28,6 +30,11 @@ impl BrotliNetworkPacketCompressor {
.build(),
}
}

pub fn with_limit(mut self, limit: DecompressionByteLimit) -> Self {
self.limit = limit;
self
}
}

#[async_trait]
Expand Down Expand Up @@ -64,10 +71,22 @@ impl NetworkPluginPacket for BrotliNetworkPacketCompressor {
_id: &NetworkConnectionId,
buffer: &mut Vec<u8>,
) -> anyhow::Result<()> {
let (header, read_size) = bincode::serde::decode_from_slice::<CompressHeader, _>(
buffer,
bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(),
)?;
let (header, read_size) = match self.limit {
DecompressionByteLimit::FourMegaBytes => {
bincode::serde::decode_from_slice::<CompressHeader, _>(
buffer,
// use a high limit, since the packet size is already limited by the stream window
bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(),
)?
}
DecompressionByteLimit::OneGigaByte => {
bincode::serde::decode_from_slice::<CompressHeader, _>(
buffer,
// use a high limit, since the packet size is already limited by the stream window
bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(),
)?
}
};

if header.is_compressed {
let mut helper = self.helper_pool.new();
Expand Down
6 changes: 6 additions & 0 deletions lib/network/src/network/packet_compressor/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[derive(Debug, Default, Clone, Copy)]
pub enum DecompressionByteLimit {
#[default]
FourMegaBytes,
OneGigaByte,
}
12 changes: 12 additions & 0 deletions lib/network/src/network/quinnminimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ fn configure_client(options: &NetworkClientInitOptions) -> anyhow::Result<Client
if let Some(stream_receive_window) = options.base.stream_receive_window {
transport_config.stream_receive_window(stream_receive_window.into());
}
if let Some(receive_window) = options.base.receive_window {
transport_config.receive_window(receive_window.into());
}
if let Some(send_window) = options.base.send_window {
transport_config.send_window(send_window.into());
}
if options
.base
.timeout
Expand Down Expand Up @@ -381,6 +387,12 @@ fn configure_server(
} else {
transport.stream_receive_window((1024u32 * 64u32).into());
}
if let Some(receive_window) = options.base.receive_window {
transport.receive_window(receive_window.into());
}
if let Some(send_window) = options.base.send_window {
transport.send_window(send_window.into());
}
if let Some(max_reorder) = options.base.packet_reorder_threshold {
transport.packet_threshold(max_reorder.clamp(3, u32::MAX));
}
Expand Down
Loading

0 comments on commit a077667

Please sign in to comment.