diff --git a/game/editor/src/event.rs b/game/editor/src/event.rs index e72e2f7..a50fc44 100644 --- a/game/editor/src/event.rs +++ b/game/editor/src/event.rs @@ -81,7 +81,7 @@ impl NetworkEventToGameEventGenerator for EditorEventGenerator { ) { let msg = bincode::serde::decode_from_slice::<EditorEvent, _>( bytes, - bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(), + bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(), ); if let Ok((msg, _)) = msg { self.events diff --git a/game/editor/src/network.rs b/game/editor/src/network.rs index 3e57b97..03d3f06 100644 --- a/game/editor/src/network.rs +++ b/game/editor/src/network.rs @@ -4,12 +4,13 @@ use base::system::System; use network::network::{ connection::NetworkConnectionId, event::NetworkEvent, + packet_compressor::{types::DecompressionByteLimit, DefaultNetworkPacketCompressor}, + plugins::NetworkPlugins, quinn_network::QuinnNetwork, - types::NetworkInOrderChannel, types::{ NetworkClientCertCheckMode, NetworkClientCertMode, NetworkClientInitOptions, - NetworkServerCertAndKey, NetworkServerCertMode, NetworkServerCertModeResult, - NetworkServerInitOptions, + NetworkInOrderChannel, NetworkServerCertAndKey, NetworkServerCertMode, + NetworkServerCertModeResult, NetworkServerInitOptions, }, utils::create_certifified_keys, }; @@ -46,8 +47,16 @@ impl EditorNetwork { NetworkServerInitOptions::new() .with_max_thread_count(6) .with_timeout(Duration::from_secs(120)) - .with_stream_receive_window(1024 * 1024 * 1024), - Default::default(), + .with_stream_receive_window(1024 * 1024 * 1024) + .with_receive_window(1024 * 1024 * 1024) + .with_send_window(1024 * 1024 * 1024), + NetworkPlugins { + packet_plugins: Arc::new(vec![Arc::new( + DefaultNetworkPacketCompressor::new() + .with_limit(DecompressionByteLimit::OneGigaByte), + )]), + ..Default::default() + }, ) .unwrap(); let port = addr.port(); @@ -80,8 +89,17 @@ impl EditorNetwork { private_key: client_private_key, }, ) - .with_timeout(Duration::from_secs(120)), - Default::default(), + .with_timeout(Duration::from_secs(120)) + .with_stream_receive_window(1024 * 1024 * 1024) + .with_receive_window(1024 * 1024 * 1024) + .with_send_window(1024 * 1024 * 1024), + NetworkPlugins { + packet_plugins: Arc::new(vec![Arc::new( + DefaultNetworkPacketCompressor::new() + .with_limit(DecompressionByteLimit::OneGigaByte), + )]), + ..Default::default() + }, server_addr, ) .unwrap() diff --git a/lib/network/src/network/connections.rs b/lib/network/src/network/connections.rs index 2b95d7c..0e67a5d 100644 --- a/lib/network/src/network/connections.rs +++ b/lib/network/src/network/connections.rs @@ -149,6 +149,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 mut recv_stream: Vec<u8>, debug_printing: bool, packet_plugins: &Arc<Vec<Arc<dyn NetworkPluginPacket>>>, + stream_receive_window: Option<u32>, ) { let timestamp = sys.time_get(); @@ -161,10 +162,24 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 } } - let res_packet = bincode::serde::decode_from_slice::<NetworkPacket, _>( - recv_stream.as_slice(), - bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(), - ); + let stream_window = stream_receive_window.unwrap_or(1024 * 1024 * 4) as usize; + let res_packet = if stream_window > 1024 * 1024 * 128 { + bincode::serde::decode_from_slice::<NetworkPacket, _>( + recv_stream.as_slice(), + bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(), + ) + } else if stream_window > 1024 * 1024 * 4 { + bincode::serde::decode_from_slice::<NetworkPacket, _>( + recv_stream.as_slice(), + bincode::config::standard().with_limit::<{ 1024 * 1024 * 128 }>(), + ) + } else { + bincode::serde::decode_from_slice::<NetworkPacket, _>( + recv_stream.as_slice(), + bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(), + ) + }; + if let Ok((res_packet, handled_size)) = res_packet { let remaining_size = recv_stream.len() - handled_size; if remaining_size > 0 && debug_printing { @@ -187,6 +202,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 sys: Arc<SystemTime>, debug_printing: bool, packet_plugins: Arc<Vec<Arc<dyn NetworkPluginPacket>>>, + stream_receive_window: Option<u32>, ) -> anyhow::Result<()> { 'conn_loop: loop { let connection = &connection_async.conn; @@ -200,6 +216,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 recv_stream, debug_printing, &packet_plugins, + stream_receive_window, ) .await; } @@ -221,6 +238,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 sys: Arc<SystemTime>, debug_printing: bool, packet_plugins: Arc<Vec<Arc<dyn NetworkPluginPacket>>>, + stream_receive_window: Option<u32>, ) -> anyhow::Result<()> { 'conn_loop: loop { let game_ev_gen_clone = game_event_generator.clone(); @@ -239,6 +257,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 data, debug_printing, &packet_plugins, + stream_receive_window, ) .await; } @@ -273,6 +292,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 sys: Arc<SystemTime>, debug_printing: bool, packet_plugins: Arc<Vec<Arc<dyn NetworkPluginPacket>>>, + stream_receive_window: Option<u32>, ) -> anyhow::Result<()> { 'conn_loop: loop { let game_ev_gen_clone = game_event_generator.clone(); @@ -294,6 +314,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 data, debug_printing, &packet_plugins, + stream_receive_window, ) .await; } @@ -358,6 +379,7 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 debug_printing: bool, packet_plugins: &Arc<Vec<Arc<dyn NetworkPluginPacket>>>, connection_plugins: &Arc<Vec<Arc<dyn NetworkPluginConnection>>>, + stream_receive_window: Option<u32>, ) -> tokio::task::JoinHandle<()> { let remote_addr = conn.remote_addr(); log::debug!("handling connecting request for {:?}", remote_addr); @@ -513,12 +535,12 @@ impl<C: NetworkConnectionInterface + Send + Sync + Clone + 'static, const TY: u3 let res = tokio::select! { res = Self::handle_connection_recv_unordered_reliable( connection.clone(), game_event_generator_clone.clone(), connection_identifier, sys.clone(), - debug_printing, packet_plugins.clone()) => {res} + debug_printing, packet_plugins.clone(), stream_receive_window) => {res} res = Self::handle_connection_recv_ordered_reliable( connection.clone(), game_event_generator_clone.clone(), connection_identifier, sys.clone(), - debug_printing, packet_plugins.clone()) => {res} + debug_printing, packet_plugins.clone(), stream_receive_window) => {res} res = Self::handle_connection_recv_unordered_unreliable(connection.clone(), game_event_generator_clone.clone(), - connection_identifier, sys.clone(), debug_printing, packet_plugins.clone()) => {res} + connection_identifier, sys.clone(), debug_printing, packet_plugins.clone(), stream_receive_window) => {res} res = Self::ping(sys.clone(), game_event_generator_clone.clone(), connection.clone(), &connection_identifier, &mut ping_interval) => {res} }; diff --git a/lib/network/src/network/network.rs b/lib/network/src/network/network.rs index f9e68ed..f591ed0 100644 --- a/lib/network/src/network/network.rs +++ b/lib/network/src/network/network.rs @@ -250,6 +250,9 @@ where sys: sys.time.clone(), is_debug: debug_printing, packet_pool: pool.clone(), + + stream_receive_window: options.base.stream_receive_window, + plugins, _z: PhantomData, @@ -295,6 +298,9 @@ where let is_debug = network_thread.is_debug; let packet_plugins = network_thread.plugins.packet_plugins.clone(); let connection_plugins = network_thread.plugins.connection_plugins.clone(); + + let stream_receive_window = network_thread.stream_receive_window; + if is_server { tokio::spawn(async move { log::debug!("server: starting to accept connections"); @@ -321,6 +327,7 @@ where is_debug, &packet_plugins, &connection_plugins, + stream_receive_window, ) .await; } diff --git a/lib/network/src/network/network_async.rs b/lib/network/src/network/network_async.rs index 2ae4dee..9546f7d 100644 --- a/lib/network/src/network/network_async.rs +++ b/lib/network/src/network/network_async.rs @@ -37,6 +37,8 @@ pub struct NetworkAsync<E, C: Send + Sync, Z, I, const TY: u32> { pub(crate) is_debug: bool, pub(crate) packet_pool: Pool<Vec<u8>>, + pub(crate) stream_receive_window: Option<u32>, + // plugins pub(crate) plugins: NetworkPlugins, @@ -109,6 +111,9 @@ where sys: sys.time.clone(), is_debug: debug_printing, packet_pool: pool.clone(), + + stream_receive_window: options.base.stream_receive_window, + plugins, _z: PhantomData, @@ -134,6 +139,7 @@ where let is_debug = self.is_debug; let packet_plugins = self.plugins.packet_plugins.clone(); let connection_plugins = self.plugins.connection_plugins.clone(); + let stream_receive_window = self.stream_receive_window; // handle the connect sync (since it's client side only) if let Err(err) = tokio::runtime::Handle::current().block_on(tokio::spawn(async move { @@ -148,6 +154,7 @@ where is_debug, &packet_plugins, &connection_plugins, + stream_receive_window ) .await => Err(anyhow!("{:?}", res)), res = async move { diff --git a/lib/network/src/network/packet_compressor.rs b/lib/network/src/network/packet_compressor.rs index 9e5c5cb..69900a5 100644 --- a/lib/network/src/network/packet_compressor.rs +++ b/lib/network/src/network/packet_compressor.rs @@ -4,11 +4,13 @@ use anyhow::anyhow; use async_trait::async_trait; use header::CompressHeader; use pool::mt_pool::Pool; +use types::DecompressionByteLimit; #[cfg(feature = "brotli")] pub mod brotli; pub mod header; +pub mod types; use super::{connection::NetworkConnectionId, plugins::NetworkPluginPacket}; @@ -20,6 +22,8 @@ pub struct ZstdNetworkPacketCompressor { send_dict: Option<Vec<u8>>, recv_dict: Option<Vec<u8>>, + + limit: DecompressionByteLimit, } impl Default for ZstdNetworkPacketCompressor { @@ -37,6 +41,8 @@ impl ZstdNetworkPacketCompressor { .build(), send_dict: None, recv_dict: None, + + limit: Default::default(), } } @@ -48,8 +54,15 @@ impl ZstdNetworkPacketCompressor { .build(), send_dict: Some(send_dict), recv_dict: Some(recv_dict), + + limit: Default::default(), } } + + pub fn with_limit(mut self, limit: DecompressionByteLimit) -> Self { + self.limit = limit; + self + } } #[async_trait] @@ -93,10 +106,22 @@ impl NetworkPluginPacket for ZstdNetworkPacketCompressor { _id: &NetworkConnectionId, buffer: &mut Vec<u8>, ) -> anyhow::Result<()> { - let (header, read_size) = bincode::serde::decode_from_slice::<CompressHeader, _>( - buffer, - bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(), - )?; + let (header, read_size) = match self.limit { + DecompressionByteLimit::FourMegaBytes => { + bincode::serde::decode_from_slice::<CompressHeader, _>( + buffer, + // use a high limit, since the packet size is already limited by the stream window + bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(), + )? + } + DecompressionByteLimit::OneGigaByte => { + bincode::serde::decode_from_slice::<CompressHeader, _>( + buffer, + // use a high limit, since the packet size is already limited by the stream window + bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(), + )? + } + }; if header.is_compressed { let mut helper = self.helper_pool.new(); diff --git a/lib/network/src/network/packet_compressor/brotli.rs b/lib/network/src/network/packet_compressor/brotli.rs index 17d1466..13119e1 100644 --- a/lib/network/src/network/packet_compressor/brotli.rs +++ b/lib/network/src/network/packet_compressor/brotli.rs @@ -11,6 +11,8 @@ use super::header::CompressHeader; #[derive(Debug)] pub struct BrotliNetworkPacketCompressor { helper_pool: Pool<Vec<u8>>, + + limit: DecompressionByteLimit, } impl Default for BrotliNetworkPacketCompressor { @@ -28,6 +30,11 @@ impl BrotliNetworkPacketCompressor { .build(), } } + + pub fn with_limit(mut self, limit: DecompressionByteLimit) -> Self { + self.limit = limit; + self + } } #[async_trait] @@ -64,10 +71,22 @@ impl NetworkPluginPacket for BrotliNetworkPacketCompressor { _id: &NetworkConnectionId, buffer: &mut Vec<u8>, ) -> anyhow::Result<()> { - let (header, read_size) = bincode::serde::decode_from_slice::<CompressHeader, _>( - buffer, - bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(), - )?; + let (header, read_size) = match self.limit { + DecompressionByteLimit::FourMegaBytes => { + bincode::serde::decode_from_slice::<CompressHeader, _>( + buffer, + // use a high limit, since the packet size is already limited by the stream window + bincode::config::standard().with_limit::<{ 1024 * 1024 * 4 }>(), + )? + } + DecompressionByteLimit::OneGigaByte => { + bincode::serde::decode_from_slice::<CompressHeader, _>( + buffer, + // use a high limit, since the packet size is already limited by the stream window + bincode::config::standard().with_limit::<{ 1024 * 1024 * 1024 }>(), + )? + } + }; if header.is_compressed { let mut helper = self.helper_pool.new(); diff --git a/lib/network/src/network/packet_compressor/types.rs b/lib/network/src/network/packet_compressor/types.rs new file mode 100644 index 0000000..3229151 --- /dev/null +++ b/lib/network/src/network/packet_compressor/types.rs @@ -0,0 +1,6 @@ +#[derive(Debug, Default, Clone, Copy)] +pub enum DecompressionByteLimit { + #[default] + FourMegaBytes, + OneGigaByte, +} diff --git a/lib/network/src/network/quinnminimal.rs b/lib/network/src/network/quinnminimal.rs index ee0780e..668594b 100644 --- a/lib/network/src/network/quinnminimal.rs +++ b/lib/network/src/network/quinnminimal.rs @@ -198,6 +198,12 @@ fn configure_client(options: &NetworkClientInitOptions) -> anyhow::Result<Client if let Some(stream_receive_window) = options.base.stream_receive_window { transport_config.stream_receive_window(stream_receive_window.into()); } + if let Some(receive_window) = options.base.receive_window { + transport_config.receive_window(receive_window.into()); + } + if let Some(send_window) = options.base.send_window { + transport_config.send_window(send_window.into()); + } if options .base .timeout @@ -381,6 +387,12 @@ fn configure_server( } else { transport.stream_receive_window((1024u32 * 64u32).into()); } + if let Some(receive_window) = options.base.receive_window { + transport.receive_window(receive_window.into()); + } + if let Some(send_window) = options.base.send_window { + transport.send_window(send_window.into()); + } if let Some(max_reorder) = options.base.packet_reorder_threshold { transport.packet_threshold(max_reorder.clamp(3, u32::MAX)); } diff --git a/lib/network/src/network/types.rs b/lib/network/src/network/types.rs index afed2c0..04f902b 100644 --- a/lib/network/src/network/types.rs +++ b/lib/network/src/network/types.rs @@ -108,6 +108,11 @@ pub struct NetworkSharedInitOptions { /// The size in bytes of the receive window /// per stream. pub stream_receive_window: Option<u32>, + /// The size in bytes of the receive window + /// of all streams. + pub receive_window: Option<u32>, + /// Maximum number of bytes to transmit to a peer without acknowledgment. + pub send_window: Option<u32>, /// Max reordering of packets before it's considered lost. /// Should not be less than 3, per RFC5681. /// Note: ignored if not supported. @@ -197,6 +202,16 @@ impl NetworkServerInitOptions { self } + pub fn with_receive_window(mut self, receive_window: u32) -> Self { + self.base.receive_window = Some(receive_window); + self + } + + pub fn with_send_window(mut self, send_window: u32) -> Self { + self.base.send_window = Some(send_window); + self + } + pub fn with_debug_priting(mut self, debug_printing: bool) -> Self { self.base = self.base.with_debug_priting(debug_printing); self @@ -257,6 +272,21 @@ impl<'a> NetworkClientInitOptions<'a> { self } + pub fn with_stream_receive_window(mut self, stream_receive_window: u32) -> Self { + self.base.stream_receive_window = Some(stream_receive_window); + self + } + + pub fn with_receive_window(mut self, receive_window: u32) -> Self { + self.base.receive_window = Some(receive_window); + self + } + + pub fn with_send_window(mut self, send_window: u32) -> Self { + self.base.send_window = Some(send_window); + self + } + pub fn with_debug_priting(mut self, debug_printing: bool) -> Self { self.base = self.base.with_debug_priting(debug_printing); self