Skip to content

Commit

Permalink
replicate: implement server side data model (#121)
Browse files Browse the repository at this point in the history
Handles serverbound messages and stores state in the server's data
model.
Does not implement the clientside sync task, or the serverside sync
task.
Does not attempt to use datagrams yet.
  • Loading branch information
TheButlah authored Jun 24, 2024
1 parent 49e55be commit 79d54b0
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/replicate/client/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@

use eyre::{bail, Result, WrapErr};
use futures::{SinkExt, StreamExt};
use replicate_common::data_model::{
entity::EntityId, DataModel, LocalChanges, RemoteChanges, State,
use replicate_common::{
data_model::{entity::EntityId, DataModel, LocalChanges, RemoteChanges, State},
ClientId,
};
use url::Url;

Expand Down Expand Up @@ -130,7 +131,7 @@ impl Instance {

// Do handshake before anything else
let local_namespace = {
rpc.send(Sb::HandshakeRequest)
rpc.send(Sb::HandshakeRequest(ClientId::random()))
.await
.wrap_err("failed to send handshake request")?;
let Some(msg) = rpc.next().await else {
Expand Down
2 changes: 1 addition & 1 deletion crates/replicate/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Shared code between replicate-client and replicate-server"
publish = false

[dependencies]
bytes.workspace = true
bytes = { workspace = true, features = ["serde"] }
futures.workspace = true
pin-project.workspace = true
rand.workspace = true
Expand Down
6 changes: 0 additions & 6 deletions crates/replicate/common/src/data_model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ use self::entity::{EntityId, Index, Namespace};

pub type EntityMap<T> = HashMap<EntityId, T>;

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum SpawnedBy {
Local,
Remote,
}

/// The state of an entity.
pub type State = bytes::Bytes;

Expand Down
38 changes: 34 additions & 4 deletions crates/replicate/common/src/messages/instance.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use crate::data_model::entity::{EntityId, Index, Namespace};
use crate::{
data_model::{
entity::{Index, Namespace},
State,
},
ClientId,
};

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Serverbound {
HandshakeRequest,
SpawnEntities { idxs: Vec<Index> },
HandshakeRequest(ClientId),
SpawnEntities {
// Namespace is implied, clients always spawn on their own namespace
states: HashMap<Index, State>,
},
UpdateEntities {
namespace: Namespace,
states: HashMap<Index, State>,
},
DespawnEntities {
namespace: Namespace,
entities: HashSet<Index>,
},
}

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Clientbound {
HandshakeResponse(Namespace),
SpawnEntities { ids: Vec<EntityId> },
SpawnEntities {
namespace: Namespace,
states: HashMap<Index, State>,
},
UpdateEntities {
namespace: Namespace,
states: HashMap<Index, State>,
},
DespawnEntities {
namespace: Namespace,
states: HashSet<Index>,
},
}
3 changes: 2 additions & 1 deletion crates/replicate/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish = false

[dependencies]
base64.workspace = true
bytes.workspace = true
bytes = { workspace = true, features = ["serde"] }
clap.workspace = true
color-eyre.workspace = true
dashmap = "5.5.3"
Expand All @@ -20,6 +20,7 @@ futures.workspace = true
rand.workspace = true
replicate-common.path = "../common"
serde.workspace = true
thiserror.workspace = true
tokio-serde = { workspace = true, features = ["json"] }
tokio-util = { workspace = true, features = ["codec"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/replicate/server/src/instance/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use dashmap::DashMap;
use replicate_common::InstanceId;
use tracing::debug;

use super::Instance;
use super::DataModel;

#[derive(Default, Debug)]
pub struct InstanceManager {
instances: DashMap<InstanceId, Instance>,
instances: DashMap<InstanceId, DataModel>,
}

impl InstanceManager {
pub fn instance_create(&self) -> InstanceId {
let instance = Instance::default();
let instance = DataModel::default();
// TODO: seed random numbers for determinism?
let id = InstanceId::random();
self.instances.insert(id, instance);
Expand Down
192 changes: 164 additions & 28 deletions crates/replicate/server/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,93 @@ pub mod manager;

use std::fmt::Debug;

use eyre::{bail, ensure, Result, WrapErr as _};
use dashmap::DashMap;
use eyre::{bail, Result, WrapErr as _};
use futures::{SinkExt as _, StreamExt as _};
use replicate_common::{
data_model::entity::Namespace,
data_model::{
entity::{EntityId, Namespace},
State,
},
messages::instance::{Clientbound as Cb, Serverbound as Sb},
ClientId,
};
use tracing::{info, instrument};
use tracing::{info, instrument, trace};
use wtransport::endpoint::SessionRequest;

pub const URL_PREFIX: &str = "instances";

type Framed = replicate_common::Framed<wtransport::stream::BiStream, Sb, Cb>;

#[derive(thiserror::Error, Debug, Eq, PartialEq)]
#[error("rejected client's attempt to claim authority")]
pub struct DeniedAuthorityErr;

/// Data stored for each entity
#[derive(Debug, Eq, PartialEq)]
struct EntityData {
current_state: State,
authority: ClientId,
}

#[derive(Debug, Default)]
pub struct Instance {}
pub struct DataModel {
data: DashMap<EntityId, EntityData>,
}

impl DataModel {
fn spawn(&self, entity: EntityId, state: State, authority: ClientId) {
let insert_result = self.data.insert(
entity,
EntityData {
current_state: state,
authority,
},
);
if let Some(already_exists) = insert_result {
if already_exists.authority != authority {
panic!("should have been impossible, we should only spawn on validated namespaces");
}
// The spawn event is out of date, so we ignore it
trace!("stale spawn message");
}
}

fn update(
&self,
entity: EntityId,
state: State,
authority: ClientId,
) -> Result<(), DeniedAuthorityErr> {
let insert_result = self.data.insert(
entity,
EntityData {
current_state: state,
authority,
},
);
if insert_result.is_none() {
trace!("missed a spawn message, spawning entity and updating state anyway.")
}

Ok(())
}

fn despawn(
&self,
entity: EntityId,
_authority: ClientId,
) -> Result<(), DeniedAuthorityErr> {
let remove_result = self.data.remove(&entity);
if remove_result.is_none() {
trace!("tried to despawn a nonexistent entity, ignoring");
}
Ok(())
}
}

#[derive(derive_more::Deref, derive_more::DerefMut)]
struct Rng(pub Box<dyn rand::RngCore + Send>);
struct Rng(pub Box<dyn rand::RngCore + Send + Sync>);

impl Debug for Rng {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -37,24 +106,27 @@ impl Debug for Rng {
#[derive(Debug)]
pub struct InstanceCtx {
rng: Rng,
dm: DataModel,
}

/// Information specific to a particular client, looked up via
pub struct ClientCtx {
id: ClientId,
namespace: Namespace,
}

impl InstanceCtx {
pub fn new(rng: impl rand::Rng + Sized + Send + 'static) -> Self {
pub fn new(rng: impl rand::Rng + Sized + Send + Sync + 'static) -> Self {
Self {
rng: Rng(Box::new(rng)),
dm: DataModel::default(),
}
}
}

fn _assert_send() {
fn helper(_: impl Send) {}
helper(InstanceCtx::new(rand::rngs::OsRng));
}

#[instrument(skip_all, name = "instance")]
pub async fn handle_connection(
mut ctx: InstanceCtx,
mut instance_ctx: InstanceCtx,
session_request: SessionRequest,
) -> Result<()> {
debug_assert!(session_request
Expand All @@ -72,31 +144,95 @@ pub async fn handle_connection(
let mut framed = Framed::new(bi);

// Do handshake before anything else
{
let client_ctx = {
let Some(msg) = framed.next().await else {
bail!("Client disconnected before completing handshake");
};
let msg = msg.wrap_err("error while receiving handshake message")?;
ensure!(
msg == Sb::HandshakeRequest,
"invalid message during handshake"
);
let Sb::HandshakeRequest(client_id) =
msg.wrap_err("error while receiving handshake message")?
else {
bail!("invalid message during handshake")
};
let namespace = Namespace(instance_ctx.rng.next_u64());
framed
.send(Cb::HandshakeResponse(Namespace(ctx.rng.next_u64())))
.send(Cb::HandshakeResponse(namespace))
.await
.wrap_err("failed to send handshake response")?;
}

while let Some(request) = framed.next().await {
let request: Sb = request.wrap_err("error while receiving message")?;
match request {
Sb::HandshakeRequest => {
bail!("already did handshake, another handshake is unexpected")
ClientCtx {
id: client_id,
namespace,
}
};

let reliable_fut = async {
while let Some(request) = framed.next().await {
let request: Sb = request.wrap_err("error while receiving message")?;
handle_state_update(&instance_ctx, &client_ctx, request)?;
}
info!("Client disconnected");
Ok::<(), eyre::Report>(())
};

let unreliable_fut = async { Ok::<(), eyre::Report>(()) };

let _: ((), ()) = tokio::try_join!(reliable_fut, unreliable_fut)?;

Ok(())
}

fn handle_state_update(
instance_ctx: &InstanceCtx,
client_ctx: &ClientCtx,
message: Sb,
) -> Result<()> {
match message {
Sb::HandshakeRequest(_) => {
bail!("already did handshake, another handshake is unexpected");
}
Sb::SpawnEntities { states } => {
for (idx, state) in states {
instance_ctx.dm.spawn(
EntityId {
namespace: client_ctx.namespace,
idx,
},
state,
client_ctx.id,
)
}
}
Sb::UpdateEntities { namespace, states } => {
for (idx, state) in states {
// TODO: Decide how to handle authority problems.
let _ = instance_ctx.dm.update(
EntityId { namespace, idx },
state,
client_ctx.id,
);
}
}
Sb::DespawnEntities {
namespace,
entities,
} => {
for idx in entities {
// TODO: Decide how to handle authority problems.
let _ = instance_ctx
.dm
.despawn(EntityId { namespace, idx }, client_ctx.id);
}
_ => todo!(),
}
}

info!("Client disconnected");
Ok(())
}

#[cfg(test)]
mod test {
use super::*;

fn _assert_instance_ctx_send() {
fn helper(_: impl Send) {}
helper(InstanceCtx::new(rand::rngs::OsRng));
}
}

0 comments on commit 79d54b0

Please sign in to comment.