diff --git a/Cargo.lock b/Cargo.lock index 0f851dcd..4dab6801 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -855,6 +855,13 @@ dependencies = [ "tracing", ] +[[package]] +name = "cuprate-p2p-bucket" +version = "0.1.0" +dependencies = [ + "arrayvec", +] + [[package]] name = "cuprate-p2p-core" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d5aca71e..34d08c32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "net/wire", "p2p/p2p", "p2p/p2p-core", + "p2p/bucket", "p2p/dandelion-tower", "p2p/async-buffer", "p2p/address-book", @@ -51,35 +52,37 @@ opt-level = 3 [workspace.dependencies] # Cuprate members -cuprate-fast-sync = { path = "consensus/fast-sync" ,default-features = false} -cuprate-consensus-rules = { path = "consensus/rules" ,default-features = false} -cuprate-constants = { path = "constants" ,default-features = false} -cuprate-consensus = { path = "consensus" ,default-features = false} -cuprate-consensus-context = { path = "consensus/context" ,default-features = false} -cuprate-cryptonight = { path = "cryptonight" ,default-features = false} -cuprate-helper = { path = "helper" ,default-features = false} -cuprate-epee-encoding = { path = "net/epee-encoding" ,default-features = false} -cuprate-fixed-bytes = { path = "net/fixed-bytes" ,default-features = false} -cuprate-levin = { path = "net/levin" ,default-features = false} -cuprate-wire = { path = "net/wire" ,default-features = false} -cuprate-p2p = { path = "p2p/p2p" ,default-features = false} -cuprate-p2p-core = { path = "p2p/p2p-core" ,default-features = false} -cuprate-dandelion-tower = { path = "p2p/dandelion-tower" ,default-features = false} -cuprate-async-buffer = { path = "p2p/async-buffer" ,default-features = false} -cuprate-address-book = { path = "p2p/address-book" ,default-features = false} -cuprate-blockchain = { path = "storage/blockchain" ,default-features = false} -cuprate-database = { path = "storage/database" ,default-features = false} -cuprate-database-service = { path = "storage/service" ,default-features = false} -cuprate-txpool = { path = "storage/txpool" ,default-features = false} -cuprate-pruning = { path = "pruning" ,default-features = false} -cuprate-test-utils = { path = "test-utils" ,default-features = false} -cuprate-types = { path = "types" ,default-features = false} -cuprate-json-rpc = { path = "rpc/json-rpc" ,default-features = false} -cuprate-rpc-types = { path = "rpc/types" ,default-features = false} -cuprate-rpc-interface = { path = "rpc/interface" ,default-features = false} +cuprate-fast-sync = { path = "consensus/fast-sync" ,default-features = false} +cuprate-consensus-rules = { path = "consensus/rules" ,default-features = false} +cuprate-constants = { path = "constants" ,default-features = false} +cuprate-consensus = { path = "consensus" ,default-features = false} +cuprate-consensus-context = { path = "consensus/context" ,default-features = false} +cuprate-cryptonight = { path = "cryptonight" ,default-features = false} +cuprate-helper = { path = "helper" ,default-features = false} +cuprate-epee-encoding = { path = "net/epee-encoding" ,default-features = false} +cuprate-fixed-bytes = { path = "net/fixed-bytes" ,default-features = false} +cuprate-levin = { path = "net/levin" ,default-features = false} +cuprate-wire = { path = "net/wire" ,default-features = false} +cuprate-p2p = { path = "p2p/p2p" ,default-features = false} +cuprate-p2p-core = { path = "p2p/p2p-core" ,default-features = false} +cuprate-p2p-bucket = { path = "p2p/p2p-bucket" ,default-features = false} +cuprate-dandelion-tower = { path = "p2p/dandelion-tower" ,default-features = false} +cuprate-async-buffer = { path = "p2p/async-buffer" ,default-features = false} +cuprate-address-book = { path = "p2p/address-book" ,default-features = false} +cuprate-blockchain = { path = "storage/blockchain" ,default-features = false} +cuprate-database = { path = "storage/database" ,default-features = false} +cuprate-database-service = { path = "storage/service" ,default-features = false} +cuprate-txpool = { path = "storage/txpool" ,default-features = false} +cuprate-pruning = { path = "pruning" ,default-features = false} +cuprate-test-utils = { path = "test-utils" ,default-features = false} +cuprate-types = { path = "types" ,default-features = false} +cuprate-json-rpc = { path = "rpc/json-rpc" ,default-features = false} +cuprate-rpc-types = { path = "rpc/types" ,default-features = false} +cuprate-rpc-interface = { path = "rpc/interface" ,default-features = false} # External dependencies anyhow = { version = "1.0.89", default-features = false } +arrayvec = { version = "0.7.6", default-features = false} async-trait = { version = "0.1.82", default-features = false } bitflags = { version = "2.6.0", default-features = false } blake3 = { version = "1", default-features = false } diff --git a/p2p/bucket/Cargo.toml b/p2p/bucket/Cargo.toml new file mode 100644 index 00000000..0d81c90f --- /dev/null +++ b/p2p/bucket/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "cuprate-p2p-bucket" +version = "0.1.0" +edition = "2021" + +[dependencies] +arrayvec = { workspace = true } + +[lints] +workspace = true diff --git a/p2p/bucket/src/lib.rs b/p2p/bucket/src/lib.rs new file mode 100644 index 00000000..ecefdf00 --- /dev/null +++ b/p2p/bucket/src/lib.rs @@ -0,0 +1,117 @@ +//! Bucket data structure +//! +//! A collection data structure that discriminates its unique items and place them into "buckets" +//! +//! The item must implement the [`Bucketable`] trait that defines how to create the discriminant +//! from the item type. The data structure will internally contain any item into "buckets" or vectors +//! of sized capacity N that regroup all the stored items with this specific discriminant. +//! +//! A practical example of this data structure is for storing N amount of IP discriminated by their subnets. +//! You can store in each "buckets" corresponding to a /16 subnet up to N IPs of that subnet. +//! + +use arrayvec::ArrayVec; +use std::{collections::BTreeMap, net::Ipv4Addr}; + +/// A discriminant can be compute from the type +pub trait Bucketable: Sized + Eq + Clone { + /// The type of the discriminant being used in the Binary tree. + type Discriminant: Ord + AsRef<[u8]>; + + /// Method that can compute the discriminant from the item. + fn discriminant(&self) -> Self::Discriminant; +} + +/// A collection data structure discriminating its unique items +/// with a specified method. Enable a fair distribution of the +/// items based on their discriminants when popped. +pub struct Bucket { + /// The storage of the bucket + storage: BTreeMap>, + /// Internal round-robin state + round_robin: usize, +} + +impl Bucket { + /// Create a new Bucket + pub const fn new() -> Self { + Self { + storage: BTreeMap::new(), + round_robin: 0, + } + } + + /// Push a new element into the Bucket + /// + /// Will internally create a new vector for each new discriminant being + /// generated from an item. + /// + /// This function WILL NOT push the element if it already exists. + /// + pub fn push(&mut self, item: I) { + let discriminant = item.discriminant(); + + if let Some(vec) = self.storage.get_mut(&discriminant) { + // Check if the element already exists + let already_exist = vec.iter().any(|v| &item == v); + if !already_exist { + // Push the new element + vec.push(item); + } + } else { + // Initialize the vector if not found + let mut vec = ArrayVec::::new(); + vec.push(item); + self.storage.insert(discriminant, vec); + } + } + + /// Will attempt to remove an item from the bucket + pub fn remove(&mut self, item: &I) -> Option { + self.storage.get_mut(&item.discriminant()).and_then(|vec| { + let find = vec + .iter() + .enumerate() + .filter(|(_, v)| &item == v) + .map(|(i, _)| i) + .last(); + find.map(|index| vec.swap_remove(index)) + }) + } + + /// Will remove a new element from any bucket following a round-robin + /// pattern. + /// + /// Repeated use of this function will provide a fair distribution of + /// item based on their discriminants + pub fn pop(&mut self) -> Option { + // Get the total amount of discriminants to explore + let len = self.storage.len(); + + // Loop over every bucket for an element. + for _ in 0..len { + let (_, vec) = self.storage.iter_mut().nth(self.round_robin / len).unwrap(); + self.round_robin = self.round_robin.wrapping_add(1); + if let Some(item) = vec.pop() { + return Some(item); + } + } + + None + } +} + +impl Default for Bucket { + fn default() -> Self { + Self::new() + } +} + +impl Bucketable for Ipv4Addr { + // We are discriminating by /16 subnets + type Discriminant = [u8; 2]; + + fn discriminant(&self) -> Self::Discriminant { + [self.octets()[0], self.octets()[1]] + } +}