Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a buffer pool on the network receiver side #435

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ criterion_group!(
criterion_main!(replication_benches);

// const NUM_ENTITIES: &[usize] = &[0, 10, 100, 1000, 10000];
const NUM_ENTITIES: &[usize] = &[1000];
const NUM_ENTITIES: &[usize] = &[10000];

/// Replicating N entity spawn from server to channel, with a local io
fn send_float_insert_one_client(criterion: &mut Criterion) {
Expand Down
6 changes: 3 additions & 3 deletions lightyear/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::client::sync::SyncConfig;
use crate::inputs::native::input_buffer::InputBuffer;
use crate::packet::message_manager::MessageManager;
use crate::packet::packet::Packet;
use crate::packet::packet_builder::{Payload, PACKET_BUFFER_CAPACITY};
use crate::packet::packet_builder::{Payload, RecvPayload, PACKET_BUFFER_CAPACITY};
use crate::prelude::{Channel, ChannelKind, ClientId, Message, ReplicationGroup, TargetEntity};
use crate::protocol::channel::ChannelRegistry;
use crate::protocol::component::{ComponentNetId, ComponentRegistry};
Expand Down Expand Up @@ -460,12 +460,12 @@ impl ConnectionManager {

pub(crate) fn recv_packet(
&mut self,
packet: Payload,
packet: RecvPayload,
tick_manager: &TickManager,
component_registry: &ComponentRegistry,
) -> Result<(), ClientError> {
// receive the packets, buffer them, update any sender that were waiting for their sent messages to be acked
let tick = self.message_manager.recv_packet(packet)?;
let tick = self.message_manager.recv_packet(&packet)?;
debug!("Received server packet with tick: {:?}", tick);
if self
.sync_manager
Expand Down
6 changes: 3 additions & 3 deletions lightyear/src/connection/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::connection::netcode::ConnectToken;
#[cfg(all(feature = "steam", not(target_family = "wasm")))]
use crate::connection::steam::{client::SteamConfig, steamworks_client::SteamworksClient};
use crate::packet::packet::Packet;
use crate::packet::packet_builder::Payload;
use crate::packet::packet_builder::{Payload, RecvPayload};

use crate::prelude::client::ClientTransport;
use crate::prelude::{generate_key, Key, LinkConditionerConfig};
Expand Down Expand Up @@ -46,7 +46,7 @@ pub trait NetClient: Send + Sync {
fn try_update(&mut self, delta_ms: f64) -> Result<(), ConnectionError>;

/// Receive a packet from the server
fn recv(&mut self) -> Option<Payload>;
fn recv(&mut self) -> Option<RecvPayload>;

/// Send a packet to the server
fn send(&mut self, buf: &[u8]) -> Result<(), ConnectionError>;
Expand Down Expand Up @@ -197,7 +197,7 @@ impl NetClient for ClientConnection {
self.client.try_update(delta_ms)
}

fn recv(&mut self) -> Option<Payload> {
fn recv(&mut self) -> Option<RecvPayload> {
self.client.recv()
}

Expand Down
4 changes: 2 additions & 2 deletions lightyear/src/connection/local/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::client::io::Io;
use crate::client::networking::NetworkingState;
use crate::connection::client::{ConnectionError, ConnectionState, NetClient};
use crate::packet::packet_builder::Payload;
use crate::packet::packet_builder::{Payload, RecvPayload};
use crate::prelude::ClientId;
use crate::transport::LOCAL_SOCKET;
use std::net::SocketAddr;
Expand Down Expand Up @@ -44,7 +44,7 @@ impl NetClient for Client {
Ok(())
}

fn recv(&mut self) -> Option<Payload> {
fn recv(&mut self) -> Option<RecvPayload> {
None
}

Expand Down
36 changes: 11 additions & 25 deletions lightyear/src/connection/netcode/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bevy::utils::Duration;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::{collections::VecDeque, net::SocketAddr};

use bevy::prelude::Resource;
Expand All @@ -10,7 +11,7 @@ use crate::connection::client::{
ConnectionError, ConnectionState, DisconnectReason, IoConfig, NetClient,
};
use crate::connection::id;
use crate::packet::packet_builder::Payload;
use crate::packet::packet_builder::{Payload, RecvPayload};
use crate::prelude::client::NetworkingState;
use crate::serialize::bitcode::reader::BufferPool;
use crate::serialize::reader::ReadBuffer;
Expand Down Expand Up @@ -197,8 +198,8 @@ pub struct NetcodeClient<Ctx = ()> {
replay_protection: ReplayProtection,
should_disconnect: bool,
should_disconnect_state: ClientState,
packet_queue: VecDeque<Payload>,
buffer_pool: Pool<Vec<u8>>,
packet_queue: VecDeque<RecvPayload>,
buffer_pool: Arc<Pool<Vec<u8>>>,
cfg: ClientConfig<Ctx>,
}

Expand Down Expand Up @@ -233,7 +234,7 @@ impl<Ctx> NetcodeClient<Ctx> {
should_disconnect: false,
should_disconnect_state: ClientState::Disconnected,
packet_queue: VecDeque::new(),
buffer_pool: Pool::new(10, || vec![0u8; MAX_PKT_BUF_SIZE]),
buffer_pool: Arc::new(Pool::new(10, || vec![0u8; MAX_PKT_BUF_SIZE])),
cfg,
})
}
Expand Down Expand Up @@ -407,26 +408,11 @@ impl<Ctx> NetcodeClient<Ctx> {
}
(Packet::Payload(pkt), ClientState::Connected) => {
trace!("client received payload packet from server");
// // TODO: we decode the data immediately so we don't need to keep the buffer around!
// // we could just
// // instead of allocating a new buffer, fetch one from the pool
// trace!("read from netcode client pre");
// let mut reader = self.buffer_pool.start_read(pkt.buf);
// let packet = crate::packet::packet::Packet::decode(&mut reader)
// .map_err(|_| super::packet::Error::InvalidPayload)?;
// trace!(
// "read from netcode client post; pool len: {}",
// self.buffer_pool.0.len()
// );
// // return the buffer to the pool
// self.buffer_pool.attach(reader);

// let buf = self.buffer_pool.pull(|| vec![0u8; pkt.buf.len()]);
// TODO: COPY THE PAYLOAD INTO A BUFFER FROM THE POOL! and we allocate buffers to the pool
// outside of the hotpath? we could have a static pool of buffers?
let mut buf = vec![0u8; pkt.buf.len()];
// TODO: we decode the data immediately so we don't need to keep the buffer around!
let mut buf = self.buffer_pool.pull_owned(|| vec![0u8; pkt.buf.len()]);
buf.clear();
buf.resize(pkt.buf.len(), 0);
buf.copy_from_slice(pkt.buf);

// TODO: control the size/memory of the packet queue?
self.packet_queue.push_back(buf);
}
Expand Down Expand Up @@ -599,7 +585,7 @@ impl<Ctx> NetcodeClient<Ctx> {
/// thread::sleep(tick_rate);
/// }
/// ```
pub fn recv(&mut self) -> Option<Payload> {
pub fn recv(&mut self) -> Option<RecvPayload> {
self.packet_queue.pop_front()
}

Expand Down Expand Up @@ -711,7 +697,7 @@ pub(crate) mod connection {
Ok(())
}

fn recv(&mut self) -> Option<Payload> {
fn recv(&mut self) -> Option<RecvPayload> {
self.client.recv()
}

Expand Down
31 changes: 14 additions & 17 deletions lightyear/src/connection/netcode/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ use crate::connection::netcode::token::TOKEN_EXPIRE_SEC;
use crate::connection::server::{
ConnectionRequestHandler, DefaultConnectionRequestHandler, DeniedReason, IoConfig, NetServer,
};
use crate::packet::packet_builder::Payload;
use crate::packet::packet_builder::{Payload, RecvPayload};
use crate::serialize::bitcode::reader::BufferPool;
use crate::serialize::reader::ReadBuffer;
use crate::server::config::NetcodeConfig;
use crate::server::io::{Io, ServerIoEvent, ServerNetworkEventSender};
use crate::transport::io::BaseIo;
use crate::transport::{PacketReceiver, PacketSender, Transport};
use crate::utils::pool::Pool;

use super::{
bytes::Bytes,
Expand Down Expand Up @@ -129,10 +130,7 @@ struct ConnectionCache {
replay_protection: HashMap<ClientId, ReplayProtection>,

// packet queue for all clients
packet_queue: VecDeque<(Payload, ClientId)>,

// pool of buffers to re-use for decoding packets
buffer_pool: BufferPool,
packet_queue: VecDeque<(RecvPayload, ClientId)>,

// corresponds to the server time
time: f64,
Expand All @@ -145,7 +143,7 @@ impl ConnectionCache {
client_id_map: HashMap::with_capacity(MAX_CLIENTS),
replay_protection: HashMap::with_capacity(MAX_CLIENTS),
packet_queue: VecDeque::with_capacity(MAX_CLIENTS * 2),
buffer_pool: BufferPool::default(),

time: server_time,
}
}
Expand Down Expand Up @@ -391,6 +389,8 @@ pub struct NetcodeServer<Ctx = ()> {
protocol_id: u64,
conn_cache: ConnectionCache,
token_entries: TokenEntries,
// pool of buffers to re-use for decoding packets
buffer_pool: Arc<Pool<Vec<u8>>>,
cfg: ServerConfig<Ctx>,
}

Expand All @@ -409,6 +409,7 @@ impl NetcodeServer {
challenge_key: crypto::generate_key(),
conn_cache: ConnectionCache::new(0.0),
token_entries: TokenEntries::new(),
buffer_pool: Arc::new(Pool::new(10, || vec![0; MAX_PKT_BUF_SIZE])),
cfg: ServerConfig::default(),
};
// info!("server started on {}", server.io.local_addr());
Expand Down Expand Up @@ -443,6 +444,7 @@ impl<Ctx> NetcodeServer<Ctx> {
challenge_key: crypto::generate_key(),
conn_cache: ConnectionCache::new(0.0),
token_entries: TokenEntries::new(),
buffer_pool: Arc::new(Pool::new(10, || vec![0; MAX_PKT_BUF_SIZE])),
cfg,
};
// info!("server started on {}", server.addr());
Expand Down Expand Up @@ -501,15 +503,10 @@ impl<Ctx> NetcodeServer<Ctx> {
Packet::Payload(packet) => {
self.touch_client(client_id)?;
if let Some(idx) = client_id {
// // use a buffer from the pool to avoid re-allocating
// let mut reader = self.conn_cache.buffer_pool.start_read(packet.buf);
// let packet = crate::packet::packet::Packet::decode(&mut reader)
// .map_err(|_| super::packet::Error::InvalidPayload)?;
// return the buffer to the pool
// self.conn_cache.buffer_pool.attach(reader);

// TODO: use a pool of buffers to avoid re-allocation
let mut buf = vec![0u8; packet.buf.len()];
// fetch a buffer from the pool
let mut buf = self.buffer_pool.pull_owned(|| vec![0u8; packet.buf.len()]);
buf.clear();
buf.resize(packet.buf.len(), 0);
buf.copy_from_slice(packet.buf);
self.conn_cache.packet_queue.push_back((buf, idx));
}
Expand Down Expand Up @@ -873,7 +870,7 @@ impl<Ctx> NetcodeServer<Ctx> {
/// }
/// # break;
/// }
pub fn recv(&mut self) -> Option<(Payload, ClientId)> {
pub fn recv(&mut self) -> Option<(RecvPayload, ClientId)> {
self.conn_cache.packet_queue.pop_front()
}
/// Sends a packet to a client.
Expand Down Expand Up @@ -1110,7 +1107,7 @@ pub(crate) mod connection {
Ok(())
}

fn recv(&mut self) -> Option<(Payload, id::ClientId)> {
fn recv(&mut self) -> Option<(RecvPayload, id::ClientId)> {
self.server
.recv()
.map(|(packet, id)| (packet, id::ClientId::Netcode(id)))
Expand Down
4 changes: 2 additions & 2 deletions lightyear/src/connection/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::connection::id::ClientId;
#[cfg(all(feature = "steam", not(target_family = "wasm")))]
use crate::connection::steam::{server::SteamConfig, steamworks_client::SteamworksClient};
use crate::packet::packet::Packet;
use crate::packet::packet_builder::Payload;
use crate::packet::packet_builder::{Payload, RecvPayload};
use crate::prelude::client::ClientTransport;
use crate::prelude::server::ServerTransport;
use crate::prelude::LinkConditionerConfig;
Expand Down Expand Up @@ -72,7 +72,7 @@ pub trait NetServer: Send + Sync {
fn try_update(&mut self, delta_ms: f64) -> Result<(), ConnectionError>;

/// Receive a packet from one of the connected clients
fn recv(&mut self) -> Option<(Payload, ClientId)>;
fn recv(&mut self) -> Option<(RecvPayload, ClientId)>;

/// Send a packet to one of the connected clients
fn send(&mut self, buf: &[u8], client_id: ClientId) -> Result<(), ConnectionError>;
Expand Down
19 changes: 10 additions & 9 deletions lightyear/src/packet/message_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{HashMap, VecDeque};
use std::io::Cursor;
use std::ops::Deref;

use bevy::ptr::UnsafeCellDeref;
use bevy::reflect::Reflect;
Expand All @@ -23,7 +24,7 @@ use crate::packet::message::{
FragmentData, MessageAck, MessageId, ReceiveMessage, SendMessage, SingleData,
};
use crate::packet::packet::{Packet, PacketId, MTU_PAYLOAD_BYTES};
use crate::packet::packet_builder::{PacketBuilder, Payload, PACKET_BUFFER_CAPACITY};
use crate::packet::packet_builder::{PacketBuilder, Payload, RecvPayload, PACKET_BUFFER_CAPACITY};
use crate::packet::packet_type::PacketType;
use crate::packet::priority_manager::{PriorityConfig, PriorityManager};
use crate::prelude::Channel;
Expand Down Expand Up @@ -277,8 +278,8 @@ impl MessageManager {
/// Update the acks, and put the messages from the packets in internal buffers
/// Returns the tick of the packet
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub fn recv_packet(&mut self, packet: Payload) -> Result<Tick, PacketError> {
let mut cursor = Cursor::new(&packet);
pub fn recv_packet(&mut self, packet: &[u8]) -> Result<Tick, PacketError> {
let mut cursor = Cursor::new(packet);

// Step 1. Parse the packet
let header = PacketHeader::from_bytes(&mut cursor)?;
Expand Down Expand Up @@ -469,7 +470,7 @@ mod tests {

// server: receive bytes from the sent messages, then process them into messages
for payload in payloads {
server_message_manager.recv_packet(payload)?;
server_message_manager.recv_packet(&payload)?;
}
let mut data = server_message_manager.read_messages();
assert_eq!(
Expand Down Expand Up @@ -505,7 +506,7 @@ mod tests {

// On client side: keep looping to receive bytes on the network, then process them into messages
for payload in payloads {
client_message_manager.recv_packet(payload)?;
client_message_manager.recv_packet(&payload)?;
}

// Check that reliability works correctly
Expand Down Expand Up @@ -559,7 +560,7 @@ mod tests {

// server: receive bytes from the sent messages, then process them into messages
for payload in payloads {
server_message_manager.recv_packet(payload)?;
server_message_manager.recv_packet(&payload)?;
}
let mut data = server_message_manager.read_messages();
assert_eq!(
Expand Down Expand Up @@ -597,7 +598,7 @@ mod tests {

// On client side: keep looping to receive bytes on the network, then process them into messages
for payload in payloads {
client_message_manager.recv_packet(payload)?;
client_message_manager.recv_packet(&payload)?;
}

// Check that reliability works correctly
Expand Down Expand Up @@ -640,7 +641,7 @@ mod tests {

// server: receive bytes from the sent messages, then process them into messages
for payload in payloads {
server_message_manager.recv_packet(payload)?;
server_message_manager.recv_packet(&payload)?;
}

// Server sends back a message (to ack the message)
Expand All @@ -649,7 +650,7 @@ mod tests {

// On client side: keep looping to receive bytes on the network, then process them into messages
for payload in payloads {
client_message_manager.recv_packet(payload)?;
client_message_manager.recv_packet(&payload)?;
}

assert_eq!(update_acks_tracker.try_recv().unwrap(), message_id);
Expand Down
3 changes: 2 additions & 1 deletion lightyear/src/packet/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use crate::serialize::reader::ReadBuffer;
use crate::serialize::varint::varint_len;
use crate::serialize::writer::WriteBuffer;
use crate::serialize::{SerializationError, ToBytes};
use crate::utils::pool::Reusable;
use crate::utils::pool::{Reusable, ReusableOwned};

// enough to hold a biggest fragment + writing channel/message_id/etc.
// pub(crate) const PACKET_BUFFER_CAPACITY: usize = MTU_PAYLOAD_BYTES * (u8::BITS as usize) + 50;
pub(crate) const PACKET_BUFFER_CAPACITY: usize = MTU_PAYLOAD_BYTES * (u8::BITS as usize);

pub type Payload = Vec<u8>;
pub type RecvPayload = ReusableOwned<Vec<u8>>;

/// `PacketBuilder` handles the process of creating a packet (writing the header and packing the
/// messages into packets)
Expand Down
Loading
Loading