diff --git a/Cargo.lock b/Cargo.lock index 0f851dcd..95701a28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -855,6 +855,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "cuprate-p2p-bucket" +version = "0.1.0" +dependencies = [ + "arrayvec", + "rand", +] + [[package]] name = "cuprate-p2p-core" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d5aca71e..ff87836d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "net/wire", "p2p/p2p", "p2p/p2p-core", + "p2p/bucket", "p2p/dandelion-tower", "p2p/async-buffer", "p2p/address-book", @@ -64,6 +65,7 @@ cuprate-levin = { path = "net/levin" ,default-feature cuprate-wire = { path = "net/wire" ,default-features = false} cuprate-p2p = { path = "p2p/p2p" ,default-features = false} cuprate-p2p-core = { path = "p2p/p2p-core" ,default-features = false} +cuprate-p2p-bucket = { path = "p2p/p2p-bucket" ,default-features = false} cuprate-dandelion-tower = { path = "p2p/dandelion-tower" ,default-features = false} cuprate-async-buffer = { path = "p2p/async-buffer" ,default-features = false} cuprate-address-book = { path = "p2p/address-book" ,default-features = false} @@ -80,6 +82,7 @@ cuprate-rpc-interface = { path = "rpc/interface" ,default-feature # External dependencies anyhow = { version = "1.0.89", default-features = false } +arrayvec = { version = "0.7.6", default-features = false } async-trait = { version = "0.1.82", default-features = false } bitflags = { version = "2.6.0", default-features = false } blake3 = { version = "1", default-features = false } diff --git a/p2p/bucket/Cargo.toml b/p2p/bucket/Cargo.toml new file mode 100644 index 00000000..1a53e85a --- /dev/null +++ b/p2p/bucket/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "cuprate-p2p-bucket" +version = "0.1.0" +edition = "2021" +license = "MIT" +authors = ["SyntheticBird"] + +[dependencies] +arrayvec = { workspace = true } +rand = { workspace = true, features = ["std", "std_rng"]} + +[lints] +workspace = true diff --git a/p2p/bucket/src/lib.rs b/p2p/bucket/src/lib.rs new file mode 100644 index 00000000..0920f272 --- /dev/null +++ b/p2p/bucket/src/lib.rs @@ -0,0 +1,163 @@ +//! Bucket data structure +//! +//! A collection data structure that discriminates its unique items and place them into "buckets". +//! +//! The item must implement the [`Bucketable`] trait that defines how to create the discriminant +//! from the item type. The data structure will internally contain any item into "buckets" or vectors +//! of sized capacity `N` that regroup all the stored items with this specific discriminant. +//! +//! A practical example of this data structure is for storing `N` amount of IP discriminated by their subnets. +//! You can store in each "buckets" corresponding to a `/16` subnet up to `N` IPs of that subnet. +//! +//! # Example +//! +//! ``` +//! use cuprate_p2p_bucket::Bucket; +//! use std::net::Ipv4Addr; +//! +//! // Create a new bucket that can store at most 2 IPs in a particular `/16` subnet. +//! let mut bucket = Bucket::<2,Ipv4Addr>::new(); +//! +//! // Fulfill the `96.96.0.0/16` bucket. +//! bucket.push("96.96.0.1".parse().unwrap()); +//! bucket.push("96.96.0.2".parse().unwrap()); +//! assert_eq!(2, bucket.len()); +//! assert_eq!(2, bucket.len_bucket(&[96_u8,96_u8]).unwrap()); +//! +//! // Push a new IP from another subnet +//! bucket.push("127.0.0.1".parse().unwrap()); +//! assert_eq!(3, bucket.len()); +//! assert_eq!(2, bucket.len_bucket(&[96_u8,96_u8]).unwrap()); +//! assert_eq!(1, bucket.len_bucket(&[127_u8,0_u8]).unwrap()); +//! +//! // Attempting to push a new IP within `96.96.0.0/16` bucket will panic. +//! ``` + +use arrayvec::ArrayVec; +use rand::random; + +use std::{collections::BTreeMap, net::Ipv4Addr}; + +/// A discriminant that can be computed from the type. +pub trait Bucketable: Sized + Eq + Clone { + /// The type of the discriminant being used in the Binary tree. + type Discriminant: Ord + AsRef<[u8]>; + + /// Method that can compute the discriminant from the item. + fn discriminant(&self) -> Self::Discriminant; +} + +/// A collection data structure discriminating its unique items +/// with a specified method. Enable a fair distribution of the +/// items based on their discriminants when popped. +pub struct Bucket { + /// The storage of the bucket + storage: BTreeMap>, +} + +impl Bucket { + /// Create a new Bucket + pub const fn new() -> Self { + Self { + storage: BTreeMap::new(), + } + } + + /// Push a new element into the Bucket + /// + /// Will internally create a new vector for each new discriminant being + /// generated from an item. + /// + /// This function WILL NOT push the element if it already exists. + /// + /// # Example + /// + /// ``` + /// use cuprate_p2p_bucket::Bucket; + /// use std::net::Ipv4Addr; + /// + /// let mut bucket = Bucket::<8,Ipv4Addr>::new(); + /// + /// // Push a first IP address. + /// bucket.push("127.0.0.1".parse().unwrap()); + /// assert_eq!(1, bucket.len()); + /// + /// // Push the same IP address a second time. + /// bucket.push("127.0.0.1".parse().unwrap()); + /// assert_eq!(1, bucket.len()); + /// ``` + pub fn push(&mut self, item: I) { + let discriminant = item.discriminant(); + + if let Some(vec) = self.storage.get_mut(&discriminant) { + // Push the item if it doesn't exist. + if !vec.contains(&item) { + vec.push(item); + } + } else { + // Initialize the vector if not found. + let mut vec = ArrayVec::::new(); + vec.push(item); + self.storage.insert(discriminant, vec); + } + } + + /// Will attempt to remove an item from the bucket. + pub fn remove(&mut self, item: &I) -> Option { + self.storage.get_mut(&item.discriminant()).and_then(|vec| { + vec.iter() + .enumerate() + .filter_map(|(i, v)| (item == v).then_some(i)) + .last() + .map(|index| vec.swap_remove(index)) + }) + } + + /// Return the number of item stored within the storage + pub fn len(&self) -> usize { + self.storage.iter().fold(0, |len, bck| len + bck.1.len()) + } + + /// Return the number of item stored with a specific discriminant. + /// + /// This method returns None if the bucket with this discriminant + /// doesn't exist. + pub fn len_bucket(&self, discriminant: &I::Discriminant) -> Option { + self.storage.get(discriminant).map(ArrayVec::len) + } + + /// Return `true` if the storage contains no items + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return a reference to an item chosen at random. + /// + /// Repeated use of this function will provide a normal distribution of + /// items based on their discriminants. + pub fn get_random(&mut self) -> Option<&I> { + // Get the total amount of discriminants to explore. + let len = self.storage.len(); + + // Get a random bucket. + let (_, vec) = self.storage.iter().nth(random::() / len).unwrap(); + + // Return a reference chose at random. + vec.get(random::() / vec.len()) + } +} + +impl Default for Bucket { + fn default() -> Self { + Self::new() + } +} + +impl Bucketable for Ipv4Addr { + /// We are discriminating by `/16` subnets. + type Discriminant = [u8; 2]; + + fn discriminant(&self) -> Self::Discriminant { + [self.octets()[0], self.octets()[1]] + } +}