diff --git a/Cargo.lock b/Cargo.lock index e5f496ee7..e01be46b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -178,9 +178,9 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" dependencies = [ "windows-sys 0.52.0", ] @@ -1346,6 +1346,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets 0.52.5", ] @@ -1563,7 +1564,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ "strum 0.26.2", - "strum_macros 0.26.3", + "strum_macros 0.26.4", "unicode-width", ] @@ -2607,9 +2608,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb497fad022245b29c2a0351df572e2d67c1046bcef2260ebc022aec81efea82" +checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c" dependencies = [ "cc", "cxxbridge-flags", @@ -2619,9 +2620,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9327c7f9fbd6329a200a5d4aa6f674c60ab256525ff0084b52a889d4e4c60cee" +checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501" dependencies = [ "cc", "codespan-reporting", @@ -2634,15 +2635,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "688c799a4a846f1c0acb9f36bb9c6272d9b3d9457f3633c7753c6057270df13c" +checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1" [[package]] name = "cxxbridge-macro" -version = "1.0.122" +version = "1.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928bc249a7e3cd554fd2e8e08a426e9670c50bbfc9a621653cfa9accc9641783" +checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140" dependencies = [ "proc-macro2", "quote", @@ -2895,7 +2896,7 @@ dependencies = [ "regex", "syn 2.0.66", "termcolor", - "toml 0.8.13", + "toml 0.8.14", "walkdir", ] @@ -4268,9 +4269,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" dependencies = [ "bytes", "futures-channel", @@ -6355,9 +6356,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "300.3.0+3.3.0" +version = "300.3.1+3.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eba8804a1c5765b18c4b3f907e6897ebabeedebc9830e1a0046c4a4cf44663e1" +checksum = "7259953d42a81bf137fbbd73bd30a8e1914d6dce43c2b90ed575783a22608b91" dependencies = [ "cc", ] @@ -7949,12 +7950,23 @@ dependencies = [ name = "polka-index" version = "0.1.0" dependencies = [ + "async-channel 2.3.1", + "base64 0.22.1", + "chrono", "ciborium", "cid 0.11.1", + "clap", + "integer-encoding 4.0.0", + "jsonrpsee", "rocksdb", "serde", + "sha2 0.10.8", "tempfile", "thiserror", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", ] [[package]] @@ -12545,7 +12557,7 @@ dependencies = [ [[package]] name = "sp-crypto-ec-utils" version = "0.10.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "ark-bls12-377", "ark-bls12-377-ext", @@ -12607,7 +12619,7 @@ dependencies = [ [[package]] name = "sp-debug-derive" version = "14.0.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "proc-macro2", "quote", @@ -12627,7 +12639,7 @@ dependencies = [ [[package]] name = "sp-externalities" version = "0.25.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "environmental", "parity-scale-codec", @@ -12842,7 +12854,7 @@ dependencies = [ [[package]] name = "sp-runtime-interface" version = "24.0.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "bytes", "impl-trait-for-tuples", @@ -12874,7 +12886,7 @@ dependencies = [ [[package]] name = "sp-runtime-interface-proc-macro" version = "17.0.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "Inflector", "expander 2.1.0", @@ -12963,7 +12975,7 @@ source = "git+https://github.com/paritytech/polkadot-sdk?tag=polkadot-v1.11.0#0b [[package]] name = "sp-std" version = "14.0.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" [[package]] name = "sp-storage" @@ -12980,7 +12992,7 @@ dependencies = [ [[package]] name = "sp-storage" version = "19.0.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "impl-serde", "parity-scale-codec", @@ -13015,7 +13027,7 @@ dependencies = [ [[package]] name = "sp-tracing" version = "16.0.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "parity-scale-codec", "tracing", @@ -13112,7 +13124,7 @@ dependencies = [ [[package]] name = "sp-wasm-interface" version = "20.0.0" -source = "git+https://github.com/paritytech/polkadot-sdk#f66e693a6befef0956a3129254fbe568247c9c57" +source = "git+https://github.com/paritytech/polkadot-sdk#dd4e6fd0968b2ccc9de8c5290d1c580b23491db9" dependencies = [ "impl-trait-for-tuples", "log", @@ -13345,7 +13357,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" dependencies = [ - "strum_macros 0.26.3", + "strum_macros 0.26.4", ] [[package]] @@ -13363,9 +13375,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.26.3" +version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7993a8e3a9e88a00351486baae9522c91b123a088f76469e5bd5cc17198ea87" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -13453,7 +13465,7 @@ dependencies = [ "sp-maybe-compressed-blob", "strum 0.26.2", "tempfile", - "toml 0.8.13", + "toml 0.8.14", "walkdir", "wasm-opt", ] @@ -13847,14 +13859,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.13" +version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4e43f8cc456c9704c851ae29c67e17ef65d2c30017c17a9765b89c382dc8bba" +checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.13", + "toml_edit 0.22.14", ] [[package]] @@ -13890,15 +13902,15 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.13" +version = "0.22.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c127785850e8c20836d49732ae6abfa47616e60bf9d9f57c43c250361a9db96c" +checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" dependencies = [ "indexmap 2.2.6", "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.9", + "winnow 0.6.12", ] [[package]] @@ -14257,9 +14269,9 @@ dependencies = [ [[package]] name = "unicode-width" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "unicode-xid" @@ -14331,6 +14343,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +dependencies = [ + "getrandom 0.2.15", + "serde", +] + [[package]] name = "valuable" version = "0.1.0" @@ -14965,9 +14987,9 @@ dependencies = [ [[package]] name = "wide" -version = "0.7.21" +version = "0.7.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd8dc749a1b03f3c255a3064a4f5c0ee5ed09b7c6bc6d4525d31f779cd74d7fc" +checksum = "1134eff459f1063780b94cc78b704e2212cac12abd554e4268f5b8f9dfcc1883" dependencies = [ "bytemuck", "safe_arch", @@ -15269,9 +15291,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.9" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86c949fede1d13936a99f14fafd3e76fd642b556dd2ce96287fbe2e0151bfac6" +checksum = "41ff33f391015ecab21cd092389215eb265ef9496a9a07b6bee7d3529831deda" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 55ac97cda..26caa0a1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,10 +28,13 @@ panic = 'abort' # Use abort on panic to reduce binary size substrate-build-script-utils = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" } substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" } +async-channel = "2.3.1" async-stream = "0.3.5" +base64 = "0.22.1" bitflags = "2.5.0" byteorder = "1.5.0" bytes = "1.6.0" +chrono = "0.4.38" ciborium = "0.2.2" cid = { version = "0.11.1" } clap = { version = "4.5.3" } @@ -70,7 +73,9 @@ thiserror = { version = "1.0.48" } tokio = "1.37.0" tokio-stream = "0.1.15" tokio-util = "0.7.11" -tracing-subscriber = { version = "0.3.18" } +tracing = "0.1.40" +tracing-subscriber = "0.3.18" +uuid = "1.8.0" # Local polka-storage-runtime = { path = "runtime" } diff --git a/storage/polka-index/Cargo.toml b/storage/polka-index/Cargo.toml index 335e9e4b4..1f38f0202 100644 --- a/storage/polka-index/Cargo.toml +++ b/storage/polka-index/Cargo.toml @@ -8,13 +8,24 @@ repository.workspace = true version = "0.1.0" [dependencies] +async-channel = { workspace = true } +base64 = { workspace = true } +chrono = { workspace = true, features = ["serde"] } ciborium = { workspace = true } cid = { workspace = true, features = ["serde"] } +clap = { workspace = true, features = ["derive", "string"] } +integer-encoding = { workspace = true } +jsonrpsee = { workspace = true, features = ["server"] } rocksdb = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +uuid = { workspace = true, features = ["serde", "v4"] } [dev-dependencies] +sha2 = { workspace = true } tempfile = { workspace = true } [lints] diff --git a/storage/polka-index/README.md b/storage/polka-index/README.md new file mode 100644 index 000000000..2a1f34b2f --- /dev/null +++ b/storage/polka-index/README.md @@ -0,0 +1,2 @@ +# Polka Index + diff --git a/storage/polka-index/src/lib.rs b/storage/polka-index/src/lib.rs index 26d67933c..a3596e595 100644 --- a/storage/polka-index/src/lib.rs +++ b/storage/polka-index/src/lib.rs @@ -1 +1 @@ -pub mod piecestore; +pub mod local_index_directory; diff --git a/storage/polka-index/src/local_index_directory/mod.rs b/storage/polka-index/src/local_index_directory/mod.rs new file mode 100644 index 000000000..a77a08a43 --- /dev/null +++ b/storage/polka-index/src/local_index_directory/mod.rs @@ -0,0 +1,452 @@ +use std::{ops::Deref, string}; + +use base64::Engine; +use cid::{ + multihash::{self, Multihash}, + Cid, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +pub mod rdb; +pub mod rdb_ext; + +/// Convert a [`Multihash`] into a key (converts [`Multihash::digest`] to base-64). +/// +/// Go encodes []byte as base-64: +/// > Array and slice values encode as JSON arrays, +/// > except that []byte encodes as a base64-encoded string, +/// > and a nil slice encodes as the null JSON value. +/// > — https://pkg.go.dev/encoding/json#Marshal +pub(crate) fn multihash_base64(multihash: &Multihash) -> String { + base64::engine::general_purpose::STANDARD.encode(multihash.to_bytes()) +} + +/// Error that can occur when interacting with the [`Service`]. +#[derive(Debug, thiserror::Error)] +pub enum LidError { + #[error("Deal already exists: {0}")] + DuplicateDealError(Uuid), + + #[error("Piece {0} was not found")] + PieceNotFound(Cid), + + #[error("Multihash {:?} was not found", multihash_base64(.0))] + MultihashNotFound(Multihash<64>), + + #[error("A free cursor was not found")] + CursorNotFound, + + #[error("Invalid flagged piece key format")] + InvalidFlaggedPieceKeyError(String), + + #[error("Serialization error: {0}")] + Serialization(String), + + #[error("Deserialization error: {0}")] + Deserialization(String), + + #[error(transparent)] + RocksDBError(#[from] rocksdb::Error), + + #[error(transparent)] + MultihashError(#[from] cid::multihash::Error), + + #[error(transparent)] + FromUtf8Error(#[from] string::FromUtf8Error), + + #[error(transparent)] + CidError(#[from] cid::Error), + + #[error(transparent)] + IoError(#[from] std::io::Error), + + #[error(transparent)] + Base64DecodeError(#[from] base64::DecodeError), +} + +/// A [`FlaggedPiece`] is a piece that has been flagged for the user's attention +/// (e.g. the index is missing). +/// +/// Source: +#[derive(Debug, Serialize, Deserialize)] +pub struct FlaggedPiece { + pub piece_cid: Cid, + pub storage_provider_address: StorageProviderAddress, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub has_unsealed_copy: bool, +} + +impl FlaggedPiece { + /// Construct a new [`FlaggedPiece`]. + /// + /// * `created_at` and `updated_at` will be set to `now`. + /// * `has_unsealed_copy` will be set to `false`. + pub fn new(piece_cid: Cid, storage_provider_address: StorageProviderAddress) -> Self { + let now = chrono::Utc::now(); + Self { + piece_cid, + storage_provider_address, + created_at: now, + updated_at: now, + has_unsealed_copy: false, + } + } +} + +pub struct FlaggedPiecesListFilter { + pub storage_provider_address: StorageProviderAddress, + pub has_unsealed_copy: bool, +} + +// https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/model/model.go#L50-L62 + +// NOTE(@jmg-duarte,12/06/2024): `OffsetSize` is currently (de)serialized using CBOR +// however, we can save up on space using the same encoding that the original implementation uses +// which are just two varints, packed together +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct OffsetSize { + /// Offset is the offset into the CAR file of the section, where a section + /// is
+ #[serde(rename = "o")] + pub offset: u64, + + /// Size is the size of the block data (not the whole section) + #[serde(rename = "s")] + pub size: u64, +} + +// Record is the information stored in the index for each block in a piece +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct IndexRecord { + /// The [`Cid`] of the indexed block. + #[serde(rename = "s")] + pub cid: Cid, + + /// The [`OffsetSize`] for the data — i.e. offset and size. + pub offset_size: OffsetSize, +} + +/// Metadata over a [piece][1], pertaining to the storage of the piece in a given storage provider. +/// +/// A piece is the unit of negotiation for data storage. +/// Piece sizes are limited by the sector size, hence, +/// if a user wants to store data larger than the sector size, +/// the data will be split into multiple pieces. +/// +/// [1]: https://spec.filecoin.io/systems/filecoin_files/piece/ +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PieceInfo { + /// The piece metadata version, it *will* be used for data migrations. + /// + /// Source: + pub version: String, + + /// If present, when the piece was last indexed. + pub indexed_at: Option>, + + /// If the index has all information or is missing block size information. + /// + /// Source: + pub complete_index: bool, + + /// Deals that this piece is related to. + /// + /// Each deal can only pertain to a single piece, however, + /// a piece can contain multiple deals — e.g. for redundancy. + /// + /// See [`DealInfo`] for more information. + pub deals: Vec, + + /// Piece cursor for other information, such as offset, etc. + /// https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L40-L41 + pub cursor: u64, +} + +impl Default for PieceInfo { + fn default() -> Self { + Self { + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L45-L46 + version: "1".to_string(), + // In Go, time.Time's default is "0001-01-01 00:00:00 +0000 UTC" + // but in Go, structures cannot be `nil`, which is probably why they use that sentinel value + indexed_at: None, + complete_index: false, + deals: Vec::new(), + cursor: 0, + } + } +} + +/// Identifier for a retrieval deal (unique to a client) +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub struct DealId(u64); + +impl From for DealId { + fn from(value: u64) -> Self { + Self(value) + } +} + +// TODO(@jmg-duarte,14/06/2024): validate miner address + +/// The storage provider address. +/// +/// It is a special type from `filecoin-project/go-address` +/// however, it's simply a wrapper to `string`: +/// https://github.com/filecoin-project/go-address/blob/365a7c8d0e85c731c192e65ece5f5b764026e85d/address.go#L39-L40 +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub struct StorageProviderAddress(String); + +// The Deref implementation eases usages like checking whether the address is empty. +impl Deref for StorageProviderAddress { + type Target = String; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for StorageProviderAddress { + fn from(value: String) -> Self { + Self(value) + } +} + +/// Numeric identifier for a sector. It is usually relative to a storage provider. +/// +/// For more information on sectors, see: +/// +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub struct SectorNumber(u64); + +impl From for SectorNumber { + fn from(value: u64) -> Self { + Self(value) + } +} + +/// Information about a single *storage* deal for a given piece. +/// +/// Source: +/// +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DealInfo { + // By default, the Eq implementation will use all fields, + // likewise, it doesn't sound like the best idea since + // as soon as you change a single detail that isn't the deal UUID + // what should be a conflicting DealInfo, no longer is. + // However, in the original implementation they do it like that + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L119-L125 + // Note that in Go, there is no operator overloading and == is implicitly defined for all types + /// The deal [`Uuid`]. + #[serde(rename = "u")] + pub deal_uuid: Uuid, + + // NOTE(@jmg-duarte,17/06/2024): this will probably not be needed + /// Wether this deal was performed using `go-fil-markets`. + /// + /// See the following links for more information: + /// * + /// * + #[serde(rename = "y")] + pub is_legacy: bool, + + /// Identifier for a deal on the chain. + /// + /// See [`DealId`] for more information. + #[serde(rename = "i")] + pub chain_deal_id: DealId, + + /// The storage provider's address. + /// + /// See [`StorageProviderAddress`] for more information. + #[serde(rename = "m")] + pub storage_provider_address: StorageProviderAddress, + + /// The sector number where the piece is stored in. + /// + /// See [`SectorNumber`] for more information. + #[serde(rename = "s")] + pub sector_number: SectorNumber, + + /// The offset of this deal's piece in the [sector][`SectorNumber`]. + #[serde(rename = "o")] + pub piece_offset: u64, + + /// The length of this deal's piece. + /// + /// A full piece will contain a proving tree and a CAR file. + /// + /// See: + /// * + /// * + #[serde(rename = "l")] + pub piece_length: u64, + + /// The length of the piece's CAR file. + #[serde(rename = "c")] + pub car_length: u64, + + /// Wether this deal is a [direct deal][1]. + /// + /// A direct deal is usually made for data larger than 4Gb as it will contain a single piece, + /// a non-direct deal is an [aggregated deal][2], which is aggregated from small scale data (< 4Gb). + /// + /// [1]: https://docs.filecoin.io/smart-contracts/programmatic-storage/direct-deal-making + /// [2]: https://docs.filecoin.io/smart-contracts/programmatic-storage/aggregated-deal-making + #[serde(rename = "d")] + pub is_direct_deal: bool, +} + +pub trait Service { + /// Add [`DealInfo`] pertaining to the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist in the index, it will be created before adding the [`DealInfo`]. + /// * If the deal is already present in the piece, returns [`LidError::DuplicateDealError`]. + fn add_deal_for_piece(&self, piece_cid: Cid, deal_info: DealInfo) -> Result<(), LidError>; + + /// Remove a deal with the given [`Uuid`] for the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist, this operation is a no-op. + fn remove_deal_for_piece(&self, piece_cid: Cid, deal_uuid: Uuid) -> Result<(), LidError>; + + /// Check if the piece with the provided [`Cid`] is indexed. + /// + /// * If the piece does not exist, returns `false`. + fn is_indexed(&self, piece_cid: Cid) -> Result; + + /// Get when the piece with the provided [`Cid`] was indexed. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + fn indexed_at(&self, piece_cid: Cid) + -> Result>, LidError>; + + /// Check if the index of the piece with the provided [`Cid`] has block size information. + /// + /// See [`PieceInfo::complete_index`] for details. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + fn is_complete_index(&self, piece_cid: Cid) -> Result; + + /// Get the [`PieceInfo`] pertaining to the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + fn get_piece_metadata(&self, piece_cid: Cid) -> Result; + + /// Remove the [`PieceInfo`] pertaining to the piece with the provided [`Cid`]. + /// It will also remove the piece's indexes. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + /// * If the piece's indexes are out of sync and its [`Multihash`] entries are not found, + /// returns [`LidError::MultihashNotFound`]. + fn remove_piece_metadata(&self, piece_cid: Cid) -> Result<(), LidError>; + + /// Get the list of [`DealInfo`] pertaining to the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + fn get_piece_deals(&self, piece_cid: Cid) -> Result, LidError>; + + /// List the existing pieces. + /// + /// * If no pieces exist, an empty [`Vec`] is returned. + fn list_pieces(&self) -> Result, LidError>; + + /// Add index records to the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist, a new [`PieceInfo`] will be created. + /// + /// Differences to the original: + /// * The original implementation streams the operation progress. + /// * The original implementation does not support this operation through HTTP. + fn add_index( + &self, + piece_cid: Cid, + records: Vec, + is_complete_index: bool, + ) -> Result<(), LidError>; + + /// Get the index records for the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + /// + /// Differences to the original: + /// * The original implementation streams the [`OffsetSize`]. + /// * The original implementation does not support this operation through HTTP. + fn get_index(&self, piece_cid: Cid) -> Result, LidError>; + + /// Get the [`OffsetSize`] of the given [`Multihash`](multihash::Multihash) for the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + /// * If the index entry (i.e. multihash) does not exist, returns [`LidError::MultihashNotFound`]. + fn get_offset_size( + &self, + piece_cid: Cid, + multihash: multihash::Multihash<64>, + ) -> Result; + + /// Get all the pieces containing the given [`Multihash`](multihash::Multihash). + /// + /// * If no pieces are found, returns [`LidError::MultihashNotFound`]. + fn pieces_containing_multihash( + &self, + multihash: multihash::Multihash<64>, + ) -> Result, LidError>; + + /// Remove indexes for the piece with the provided [`Cid`]. + /// + /// * If the piece does not exist, returns [`LidError::PieceNotFound`]. + /// * If the piece contains index entries — i.e. [`Multihash`] — + /// that cannot be found, returns [`LidError::MultihashNotFound`]. + fn remove_indexes(&self, piece_cid: Cid) -> Result<(), LidError>; + + /// Flag the piece with the given [`Cid`]. + /// + /// * If the piece & storage provider address pair is not found, a new entry will be stored. + fn flag_piece( + &self, + piece_cid: Cid, + has_unsealed_copy: bool, + storage_provider_address: StorageProviderAddress, + ) -> Result<(), LidError>; + + /// Unflag the piece with the given [`Cid`]. + /// + /// * If the piece & storage provider address pair is not found, this is a no-op. + fn unflag_piece( + &self, + piece_cid: Cid, + storage_provider_address: StorageProviderAddress, + ) -> Result<(), LidError>; + + /// List the flagged pieces matching the filter. + /// + /// * If the filter is `None`, then all flagged pieces will be matched. + /// * If no pieces are found, returns an empty [`Vec`]. + /// * Pieces flagged before `cursor` will be filtered out. + /// * Pieces are sorted according to when they were first flagged — see [`FlaggedPiece::created_at`]. + /// * Offset and limit are applied _after_ sorting the pieces. + fn flagged_pieces_list( + &self, + filter: Option, + cursor: chrono::DateTime, // this name doesn't make much sense but it's the original one, + offset: usize, + limit: usize, + ) -> Result, LidError>; + + /// Count all pieces that match the given filter. + /// + /// * If the filter is `None`, then all flagged pieces will be counted. + /// * If no pieces are found, returns `0`. + fn flagged_pieces_count( + &self, + filter: Option, + ) -> Result; + + /// Returns the [`Cid`]s of the next pieces to be checked for a given storage provider. + fn next_pieces_to_check( + &mut self, + storage_provider_address: StorageProviderAddress, + ) -> Result, LidError>; +} diff --git a/storage/polka-index/src/local_index_directory/rdb.rs b/storage/polka-index/src/local_index_directory/rdb.rs new file mode 100644 index 000000000..f1fa9a411 --- /dev/null +++ b/storage/polka-index/src/local_index_directory/rdb.rs @@ -0,0 +1,1745 @@ +// The name of this file is `rdb.rs` to avoid clashing with the `rocksdb` import. +use std::{collections::HashMap, path::PathBuf, str::FromStr, time::Duration}; + +use base64::Engine; +use cid::{multihash::Multihash, Cid}; +use integer_encoding::{VarInt, VarIntReader}; +use rocksdb::{ + ColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, WriteBatchWithTransaction, + DB as RocksDB, +}; +use serde::{de::DeserializeOwned, Serialize}; +use uuid::Uuid; + +use super::{ + multihash_base64, rdb_ext::WriteBatchWithTransactionExt, DealInfo, FlaggedPiece, + FlaggedPiecesListFilter, IndexRecord, LidError, OffsetSize, PieceInfo, Service, + StorageProviderAddress, +}; + +const RAW_CODEC: u64 = 0x55; + +/// Key for the next free cursor. +/// +/// This is not a column family as in the original source code it is not a prefix. +/// +/// Sources: +/// * +/// * +const NEXT_CURSOR_KEY: &str = "next_cursor"; + +// # Notes on LevelDB vs RocksDB: +// ## Prefixes & Column Families +// In LevelDB there's no concept of logical partitioning, let alone column families, +// instead and partitioning is achieved by prefixing an identifier to create a namespace. +// However, RocksDB has support for logical partitioning and as such, we take advantage +// of it by mapping the LevelDB prefixes from the original code into proper column families. +// +// ## Transactions +// LevelDB does not support transactions, as such, when using `WriteBatchWithTransaction` +// the TRANSACTION const generic is set to `false`. We may wish to turn it on, but at the +// time of writing (7/6/24) the main focus is on porting, and as such keeping things as +// close as possible to the original implementation. +// Discussion on LevelDB transactions: https://groups.google.com/g/leveldb/c/O_iNRkAoObg + +/// Column family name to store the mapping between a [`Cid`] and its cursor. +/// +/// Sources: +/// * +/// * +const PIECE_CID_TO_CURSOR_CF: &str = "piece_cid_to_cursor"; + +/// Column family name to store the mapping between [`Multihash`]es and piece [`Cid`]s. +/// +/// Sources: +/// * +/// * +const MULTIHASH_TO_PIECE_CID_CF: &str = "multihash_to_piece_cids"; + +/// Column family name to store the flagged piece [`Cid`]s. +/// +/// Sources: +/// * +/// * +const PIECE_CID_TO_FLAGGED_CF: &str = "piece_cid_to_flagged"; + +/// The minimum time between piece checks. +/// +/// Source: +const MIN_PIECE_CHECK_PERIOD: Duration = Duration::from_secs(5 * 60); + +/// The number of pieces to be checked per batch. +/// +/// Source: +const PIECES_TRACKER_BATCH_SIZE: usize = 1024; + +/// Returns a prefix like `//`. +fn key_cursor_prefix(cursor: u64) -> String { + format!("/{}/", cursor) +} + +/// Returns a key for flagging a piece, like `//
`. +fn key_flag_piece(cid: &Cid, address: &StorageProviderAddress) -> String { + format!("/{}/{}", cid, address.0) +} + +pub struct RocksDBStateStoreConfig { + pub path: PathBuf, +} + +/// A [`super::Service`] implementation backed by RocksDB. +pub struct RocksDBLid { + database: RocksDB, + /// Used in [`Self::next_pieces_to_check`], keeps track of the checked pieces. + /// + /// Source: + /// * + /// * + offset: usize, + /// Tracks the last time we processed a given piece. + checked: HashMap>, +} + +impl RocksDBLid { + /// Construct a new [`Self`] from the provided [`RocksDBStateStoreConfig`]. + /// + /// * If the database does not exist in the path, it will be created. + /// * If the column families ([`PIECE_CID_TO_CURSOR_CF`], + /// [`MULTIHASH_TO_PIECE_CID_CF`], [`PIECE_CID_TO_FLAGGED_CF`], + /// [`CURSOR_TO_OFFSET_SIZE_CF`]) do not exist, they will be created. + pub fn new(config: RocksDBStateStoreConfig) -> Result + where + Self: Sized, + { + let column_families = [ + PIECE_CID_TO_FLAGGED_CF, + MULTIHASH_TO_PIECE_CID_CF, + PIECE_CID_TO_CURSOR_CF, + ] + .into_iter() + .map(|cf| ColumnFamilyDescriptor::new(cf, Options::default())); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + Ok(Self { + database: RocksDB::open_cf_descriptors(&opts, config.path, column_families)?, + offset: 0, + checked: HashMap::new(), + }) + } + + /// Get the column family handle for the given column family name. + /// + /// **Invariant**: The column family name MUST exist. *Otherwise this function will panic.* + #[track_caller] + fn cf_handle(&self, cf_name: &str) -> &ColumnFamily { + self.database + .cf_handle(cf_name) + .expect("column family should have been initialized") + } + + /// Remove the value at the specified key in the specified column family. + fn remove_value_at_key(&self, key: Key, cf_name: &str) -> Result<(), LidError> + where + Key: AsRef<[u8]>, + { + Ok(self.database.delete_cf(self.cf_handle(cf_name), key)?) + } + + /// Get and deserialize (using CBOR) the value at the specified key in the specified column family. + fn get_value_at_key( + &self, + key: Key, + cf_name: &str, + ) -> Result, LidError> + where + Key: AsRef<[u8]>, + Value: DeserializeOwned, + { + let Some(slice) = self.database.get_pinned_cf(self.cf_handle(cf_name), key)? else { + return Ok(None); + }; + + match ciborium::from_reader(slice.as_ref()) { + Ok(value) => Ok(Some(value)), + // ciborium error is bubbled up as a string because it is generic + // and we didn't want to add a generic type to the LidError + Err(err) => Err(LidError::Deserialization(err.to_string())), + } + } + + /// Serialize (using CBOR) and put the value at the specified key in the specified column family. + fn put_value_at_key( + &self, + key: Key, + value: &Value, + cf_name: &str, + ) -> Result<(), LidError> + where + Key: AsRef<[u8]>, + Value: Serialize, + { + let mut serialized = Vec::new(); + if let Err(err) = ciborium::into_writer(value, &mut serialized) { + // ciborium error is bubbled up as a string because it is generic + // and we didn't want to add a generic type to the LidError + return Err(LidError::Serialization(err.to_string())); + } + + Ok(self + .database + .put_cf(self.cf_handle(cf_name), key, serialized)?) + } + + /// Get the [`PieceInfo`] for the provided piece [`Cid`]. + /// + /// The information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// It is equivalent to boost's `DB.GetPieceCidToMetadata`. + /// + /// Source: + fn get_piece_cid_to_metadata(&self, piece_cid: Cid) -> Result, LidError> { + self.get_value_at_key(piece_cid.to_bytes(), PIECE_CID_TO_CURSOR_CF) + } + + /// Set the [`PieceInfo`] for the provided piece [`Cid`]. + /// + /// The information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// It is equivalent to boost's `DB.SetPieceCidToMetadata`. + /// + /// Source: + fn set_piece_cid_to_metadata( + &self, + piece_cid: Cid, + metadata: &PieceInfo, + ) -> Result<(), LidError> { + self.put_value_at_key(piece_cid.to_bytes(), metadata, PIECE_CID_TO_CURSOR_CF) + } + + /// Add mappings from several [`Multihash`]es to a single piece [`Cid`]. + /// + /// * [`Cid`]s are stored as a [`Vec`] — i.e. a single [`Multihash`] can map to multiple [`Cid`]s. + /// * If the [`Multihash`] already exists in the database, it will append the [`Cid`] to the existing list. + /// * The [`Cid`] order inside the mapping is *not stable*! + fn set_multihashes_to_piece_cid( + &self, + record_multihashes: &Vec>, + piece_cid: Cid, + ) -> Result<(), LidError> { + // https://github.com/ipfs/go-datastore/blob/1de47089f5c72b61d91b5cd9043e49fe95771ac0/datastore.go#L97-L106 + let mut batch = WriteBatchWithTransaction::::default(); + + for multihash in record_multihashes { + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L166-L167 + let mut cids = self + .get_multihash_to_piece_cids(multihash) + .unwrap_or_default(); + if cids.contains(&piece_cid) { + continue; + } + cids.push(piece_cid); + batch.put_cf_cbor( + self.cf_handle(MULTIHASH_TO_PIECE_CID_CF), + multihash_base64(multihash), + &cids, + )?; + } + // "commit" the batch, should be equivalent to + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L216-L218 + Ok(self.database.write(batch)?) + } + + /// Retrieve the list of [`Cid`]s corresponding to a single [`Multihash`]. + /// + /// * If the [`Multihash`] is not found in the [`MULTIHASH_TO_PIECE_CID_CF`] column family, + /// returns [`LidError::MultihashNotFound`]. + fn get_multihash_to_piece_cids(&self, multihash: &Multihash<64>) -> Result, LidError> { + let mh_key = multihash_base64(multihash); + let Some(multihash) = self.get_value_at_key(mh_key, MULTIHASH_TO_PIECE_CID_CF)? else { + return Err(LidError::MultihashNotFound(*multihash)); + }; + Ok(multihash) + } + + /// Get the next available cursor. + /// + /// Returns [`LidError::CursorNotFound`] if no cursor has been set. + /// Use [`Self::set_next_cursor`] to set the next cursor. + /// + /// The information is stored in the [`rocksdb::DEFAULT_COLUMN_FAMILY_NAME`] column family. + /// + /// Source: + /// * + fn get_next_cursor(&self) -> Result<(u64, String), LidError> { + let pinned_slice = self.database.get_pinned(NEXT_CURSOR_KEY)?; + let Some(pinned_slice) = pinned_slice else { + // In most places the original source code has some special handling for the missing key, + // however, that does not apply for a missing cursor + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L391-L396 + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L111-L114 + return Err(LidError::CursorNotFound); + }; + + // We use varint instead of cborium here to match the original implementation + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L116 + let cursor = pinned_slice.as_ref().read_varint::()?; + Ok((cursor, key_cursor_prefix(cursor))) + } + + /// Set the next available cursor. + /// + /// The information is stored in the [`rocksdb::DEFAULT_COLUMN_FAMILY_NAME`] column family. + /// + /// Source: + /// * + fn set_next_cursor(&self, cursor: u64) -> Result<(), LidError> { + let encoded_cursor = cursor.encode_var_vec(); + Ok(self.database.put(NEXT_CURSOR_KEY, encoded_cursor)?) + } + + /// Add a [`Record`] to the database under a given cursor prefix. + /// + /// Even though the interface is different, this function is the dual to [`Service::get_offset_size`]. + fn add_index_record(&self, cursor_prefix: &str, record: IndexRecord) -> Result<(), LidError> { + let key = format!("{}{}", cursor_prefix, multihash_base64(record.cid.hash())); + self.put_value_at_key( + key, + &record.offset_size, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME, + ) + } + + /// Remove the indexes for a given piece [`Cid`], under the given cursor. + fn remove_indexes_with_cursor(&self, piece_cid: Cid, cursor: u64) -> Result<(), LidError> { + // In the original code they don't add first "/" in the prefix, + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L635-L640 + // but they actually do, if we dig deeper, until the go-ds-leveldb Datastore implementation, + // the first thing ds.Query does is prepepend the "/" in case it is missing + // https://github.com/ipfs/go-ds-leveldb/blob/efa3b97d25995dfcd042c476f3e2afe0105d0784/datastore.go#L131-L138 + + let cursor_prefix = key_cursor_prefix(cursor); + let iterator = self.database.prefix_iterator(&cursor_prefix); + let mut batch = WriteBatchWithTransaction::::default(); + + // NOTE(@jmg-duarte,08/06/2024): the continues are wrong because the batch.Delete will always run + // as long as it doesnt fail + for it in iterator { + let (key, _) = it?; + let (_, mh_key) = key.split_at(cursor_prefix.len()); + + // Without the closure, the only alternative is to use goto's to skip from the `return Ok(())` to the deletion of the key + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L655-L702 + (|| { + let Some(mut cids) = + self.get_value_at_key::<_, Vec>(mh_key, MULTIHASH_TO_PIECE_CID_CF)? + else { + let mh = Multihash::from_bytes(mh_key)?; + return Err(LidError::MultihashNotFound(mh)); + }; + + let Some(idx) = cids.iter().position(|cid| cid == &piece_cid) else { + return Ok(()); + }; + + // If it is empty or it would become empty, delete the whole entry + if cids.len() <= 1 { + batch.delete_cf(self.cf_handle(MULTIHASH_TO_PIECE_CID_CF), mh_key); + return Ok(()); + } + + // Otherwise, just delete from the list and put it back in the DB + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L684-L690 + cids.swap_remove(idx); + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L692-L698 + batch.put_cf_cbor(self.cf_handle(MULTIHASH_TO_PIECE_CID_CF), mh_key, cids)?; + Ok(()) + })()?; + + // Cursors are stored in the "default" CF, thus we don't specify a CF + batch.delete(key); + } + + Ok(self.database.write(batch)?) + } +} + +impl Service for RocksDBLid { + /// For a detailed description, see [`Service::add_deal_for_piece`]. + /// + /// Sources: + /// * + /// * + fn add_deal_for_piece(&self, piece_cid: Cid, deal_info: DealInfo) -> Result<(), LidError> { + // Check if the piece exists + let mut piece_info = self + .get_piece_cid_to_metadata(piece_cid)? + .unwrap_or_else(|| PieceInfo::default()); + + // Check for the duplicate deal + if let Some(deal) = piece_info.deals.iter().find(|d| **d == deal_info) { + return Err(LidError::DuplicateDealError(deal.deal_uuid)); + } + + // Save the new deal + piece_info.deals.push(deal_info); + self.set_piece_cid_to_metadata(piece_cid, &piece_info) + } + + /// For a detailed description, see [`Service::remove_deal_for_piece`]. + /// + /// Source: + fn remove_deal_for_piece(&self, piece_cid: Cid, deal_uuid: Uuid) -> Result<(), LidError> { + let mut piece_info = self.get_piece_metadata(piece_cid)?; + + if let Some(idx) = piece_info + .deals + .iter() + .position(|deal| deal.deal_uuid == deal_uuid) + { + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L733-L739 + piece_info.deals.swap_remove(idx); + } + + // If the removed deal was the last one, remove the metadata as well + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L741-L748 + if piece_info.deals.is_empty() { + return match self.remove_piece_metadata(piece_cid) { + Ok(()) => Ok(()), + // First of all, it's kinda weird that the metadata might not be there + // but in any case, it was going to be deleted, so in this case, + // not finding it is not an error, just means we don't need to do anything + Err(LidError::PieceNotFound(_)) | Err(LidError::MultihashNotFound(_)) => Ok(()), + Err(err) => Err(err), + }; + } + + self.put_value_at_key(piece_cid.to_bytes(), &piece_info, PIECE_CID_TO_CURSOR_CF) + } + + /// For a detailed description, see [`Service::is_indexed`]. + /// + /// * If the piece does not exist, `false` will be returned instead of [`LidError::PieceNotFound`]. + /// This is the same behavior the original implementation exhibits[*][1]. + /// + /// Sources: + /// * + /// * + /// + /// [1]: https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L461-L468 + fn is_indexed(&self, piece_cid: Cid) -> Result { + Ok(self + .get_piece_cid_to_metadata(piece_cid)? + // If the piece does not exist, it's clearly not indexed + .map_or(false, |piece_info: PieceInfo| { + piece_info.indexed_at.is_some() + })) + } + + /// For a detailed description, see [`Service::indexed_at`]. + /// + /// The information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// Source: + fn indexed_at( + &self, + piece_cid: Cid, + ) -> Result>, LidError> { + // The Go implementation seems to return the Unix epoch but returning the error makes more sense + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L461-L468 + self.get_piece_cid_to_metadata(piece_cid)? + .map(|piece_info: PieceInfo| piece_info.indexed_at) + .ok_or(LidError::PieceNotFound(piece_cid)) + } + + /// For a detailed description, see [`Service::is_complete_index`]. + /// + /// The information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// Source: + fn is_complete_index(&self, piece_cid: Cid) -> Result { + self.get_piece_cid_to_metadata(piece_cid)? + .map(|piece_info: PieceInfo| piece_info.complete_index) + .ok_or(LidError::PieceNotFound(piece_cid)) + } + + /// For a detailed description, see [`Service::get_piece_metadata`]. + /// + /// The information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// Source: + fn get_piece_metadata(&self, piece_cid: Cid) -> Result { + self.get_piece_cid_to_metadata(piece_cid)? + .ok_or(LidError::PieceNotFound(piece_cid)) + } + + /// For a detailed description, see [`Service::remove_piece_metadata`]. + /// + /// The information is removed from the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// Sources: + /// * + /// * + fn remove_piece_metadata(&self, piece_cid: Cid) -> Result<(), LidError> { + let piece = self.get_piece_metadata(piece_cid)?; + // Remove all the multihashes before, as without metadata, they're useless. + // This operation is made first for consistency — i.e. if this fails + // For more details, see the original implementation: + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L610-L615 + self.remove_indexes_with_cursor(piece_cid, piece.cursor)?; + self.remove_value_at_key(piece_cid.to_bytes(), PIECE_CID_TO_CURSOR_CF) + } + + /// For a detailed description, see [`Service::get_piece_deals`]. + /// + /// The information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// Source: + fn get_piece_deals(&self, piece_cid: Cid) -> Result, LidError> { + self.get_piece_cid_to_metadata(piece_cid)? + .map(|piece_info: PieceInfo| piece_info.deals) + .ok_or(LidError::PieceNotFound(piece_cid)) + } + + /// For a detailed description, see [`Service::list_pieces`]. + /// + /// The information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// Sources: + /// * + /// * + fn list_pieces(&self) -> Result, LidError> { + let iterator = self + .database + .iterator_cf(self.cf_handle(PIECE_CID_TO_CURSOR_CF), IteratorMode::Start); + + iterator + .map(|line| { + let (key, _) = line?; + + let parsed_cid = Cid::try_from(key.as_ref()).map_err(|err| { + // We know that all stored CIDs are valid, so this + // should only happen if database is corrupted. + LidError::Deserialization(format!("failed to deserialize CID: {}", err)) + })?; + + Ok(parsed_cid) + }) + .collect() + } + + /// For a detailed description, see [`Service::add_index`]. + /// + /// The index information is stored in the [`rocksdb::DEFAULT_COLUMN_FAMILY_NAME`] and [`MULTIHASH_TO_PIECE_CID_CF`] column families. + /// + /// Note: + /// * In Boost, this operation is performed by running a goroutine that will feed the returned channel, + /// in Rust we there's a mix of things that make life especially difficult for us here, however, + /// since the whole [`Service`] relies on the sync API of RocksDB, you should use [`tokio::task::spawn_blocking`]. + /// + /// Sources: + /// * + /// * + /// * + fn add_index( + &self, + piece_cid: Cid, + records: Vec, + is_complete_index: bool, + ) -> Result<(), LidError> { + let record_cids = records.iter().map(|r| r.cid.hash().to_owned()).collect(); + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L369-L374 + self.set_multihashes_to_piece_cid(&record_cids, piece_cid)?; + + // This looks a bit strange at first but in Go mutability is much more of a thing than in Rust, hence, + // you get a bunch of parts where a variable is declared (and initialized) to be overwritten in a deeper scope + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L376-L410 + let (mut metadata, cursor_prefix) = + if let Some(metadata) = self.get_piece_cid_to_metadata(piece_cid)? { + let cursor_prefix = key_cursor_prefix(metadata.cursor); + (metadata, cursor_prefix) + } else { + let mut metadata = PieceInfo::default(); + let (next_cursor, next_cursor_prefix) = self.get_next_cursor()?; + self.set_next_cursor(next_cursor + 1)?; + + metadata.cursor = next_cursor; + metadata.complete_index = is_complete_index; + (metadata, next_cursor_prefix) + }; + + // NOTE(@jmg-duarte,11/06/2024): this could be batched + records + .into_iter() + .map(|record| self.add_index_record(&cursor_prefix, record)) + .collect::>()?; + + metadata.indexed_at = chrono::Utc::now().into(); + self.set_piece_cid_to_metadata(piece_cid, &metadata) + } + + /// For a detailed description, see [`Service::get_index`]. + /// + /// The information is stored in the [`rocksdb::DEFAULT_COLUMN_FAMILY_NAME`] column family. + /// + /// Sources: + /// * + /// * + fn get_index(&self, piece_cid: Cid) -> Result, LidError> { + let Some(metadata) = self.get_piece_cid_to_metadata(piece_cid)? else { + return Err(LidError::PieceNotFound(piece_cid)); + }; + + // This is equivalent to `db.AllRecords` + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L304-L349 + let cursor_prefix = key_cursor_prefix(metadata.cursor); + let iterator = self.database.prefix_iterator(&cursor_prefix); + + let mut records = vec![]; + for it in iterator { + let (key, value) = it?; + // With some trickery, we can probably get rid of this allocation + let key = key + .to_vec() + // The original implementation does `k := r.Key[len(q.Prefix)+1:]` + // but that is because the underlying query "secretly" prepends a `/`, + // hence the `+1` in the original implementation, and the lack of one here + .split_off(cursor_prefix.len()); + let mh_bytes = base64::engine::general_purpose::STANDARD.decode(&key)?; + // We lost the original Cid version and so on, so we just create a new one + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L334-L335 + let cid = Cid::new_v1(RAW_CODEC, Multihash::from_bytes(&mh_bytes)?); + let offset_size = ciborium::from_reader(&*value) + .map_err(|err| LidError::Deserialization(err.to_string()))?; + records.push(IndexRecord { cid, offset_size }); + } + + // The main difference here is that we don't need to return IndexRecord since we're not sending + // the records over a channel, we should be able to just error out as soon as we hit one + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L285-L289 + + Ok(records) + } + + /// For a detailed description, see [`Service::get_offset_size`]. + /// + /// This information is stored in the [`rocksdb::DEFAULT_COLUMN_FAMILY_NAME`] column family. + /// + /// Sources: + /// * + /// * + fn get_offset_size( + &self, + piece_cid: Cid, + multihash: Multihash<64>, + ) -> Result { + let cursor = self + .get_piece_cid_to_metadata(piece_cid)? + .map(|piece_info: PieceInfo| piece_info.cursor) + .ok_or(LidError::PieceNotFound(piece_cid))?; + + let key = format!( + "{}{}", + key_cursor_prefix(cursor), + multihash_base64(&multihash) + ); + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/service.go#L164-L165 + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L370-L371 + self.get_value_at_key( + key, + // In the original source, the key is prefixed by the cursor, which is used in other places as well + rocksdb::DEFAULT_COLUMN_FAMILY_NAME, + )? + .ok_or(LidError::MultihashNotFound(multihash)) + } + + /// For a detailed description, see [`Service::pieces_containing_multihash`]. + /// + /// This information is stored in the [`MULTIHASH_TO_PIECE_CID_CF`] column family. + /// + /// Sources: + /// * + /// * + fn pieces_containing_multihash(&self, multihash: Multihash<64>) -> Result, LidError> { + self.get_multihash_to_piece_cids(&multihash) + } + + /// For a detailed description, see [`Service::remove_indexes`]. + /// + /// This information is stored in the [`rocksdb::DEFAULT_COLUMN_FAMILY_NAME`] and [`MULTIHASH_TO_PIECE_CID_CF`] column families. + /// + /// Sources: + /// * + /// * + fn remove_indexes(&self, piece_cid: Cid) -> Result<(), LidError> { + let Some(metadata) = self.get_piece_cid_to_metadata(piece_cid)? else { + return Err(LidError::PieceNotFound(piece_cid)); + }; + self.remove_indexes_with_cursor(piece_cid, metadata.cursor) + } + + /// For a detailed description, see [`Service::flag_piece`]. + /// + /// This information is stored in the [`PIECE_CID_TO_FLAGGED_CF`] column family. + /// + /// Sources: + /// * + /// * + fn flag_piece( + &self, + piece_cid: Cid, + has_unsealed_copy: bool, + storage_provider_address: StorageProviderAddress, + ) -> Result<(), LidError> { + let key = key_flag_piece(&piece_cid, &storage_provider_address); + let mut metadata = self + .get_value_at_key(&key, PIECE_CID_TO_FLAGGED_CF)? + .unwrap_or_else(|| FlaggedPiece::new(piece_cid, storage_provider_address)); + + metadata.updated_at = chrono::Utc::now(); + metadata.has_unsealed_copy = has_unsealed_copy; + + self.put_value_at_key(key, &metadata, PIECE_CID_TO_FLAGGED_CF) + } + + /// For a detailed description, see [`Service::unflag_piece`]. + /// + /// This information is stored in the [`PIECE_CID_TO_FLAGGED_CF`] column family. + /// + /// Sources: + /// * + /// * + fn unflag_piece( + &self, + piece_cid: Cid, + storage_provider_address: StorageProviderAddress, + ) -> Result<(), LidError> { + let key = key_flag_piece(&piece_cid, &storage_provider_address); + self.remove_value_at_key(key, PIECE_CID_TO_FLAGGED_CF) + } + + /// For a detailed description, see [`Service::flagged_pieces_list`]. + /// + /// This information is stored in the [`PIECE_CID_TO_FLAGGED_CF`] column family. + /// + /// Sources: + /// * + /// * + fn flagged_pieces_list( + &self, + filter: Option, + cursor: chrono::DateTime, + offset: usize, + limit: usize, + ) -> Result, LidError> { + let iterator = self + .database + .iterator_cf(self.cf_handle(PIECE_CID_TO_FLAGGED_CF), IteratorMode::Start); + + let mut flagged_pieces = vec![]; + for line in iterator { + let (key, value) = line?; + + // This one should never happen but who knows? + let key = String::from_utf8(key.to_vec())?; + // The key starts with a "/", skip it + let mut split = key.split('/').skip(1); + + // Using let/else instead of .ok_or/.ok_or_else avoids using .clone + let Some(piece_cid) = split.next() else { + return Err(LidError::InvalidFlaggedPieceKeyError(key)); + }; + // They don't actually check that the full key is well formed, they just check if it isn't ill-formed + // by checking if the length after splitting is != 0 and that the CID is valid + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L740-L748 + + let piece_cid = Cid::from_str(piece_cid)?; + let flagged_metadata = match ciborium::from_reader::(value.as_ref()) { + Ok(value) => Ok(value), + Err(err) => Err(LidError::Deserialization(err.to_string())), + }?; + + if let Some(filter) = &filter { + // NOTE(@jmg-duarte,05/06/2024): The check order is not arbitrary, + // it's the same as the order in boostd-data, maybe it has a reason, + // maybe it doesn't, keeping it the same for now... + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L756-L766 + if filter.has_unsealed_copy != flagged_metadata.has_unsealed_copy { + continue; + } + + // NOTE(@jmg-duarte,05/06/2024): We could check the address against the key and + // possibly avoid deserializing, but the original code only checks after deserializing + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L750-L762 + if !filter.storage_provider_address.is_empty() + && filter.storage_provider_address != flagged_metadata.storage_provider_address + { + continue; + } + } + + if flagged_metadata.created_at < cursor { + continue; + } + + flagged_pieces.push(FlaggedPiece { + piece_cid, + storage_provider_address: flagged_metadata.storage_provider_address, + created_at: flagged_metadata.created_at, + updated_at: flagged_metadata.updated_at, + has_unsealed_copy: flagged_metadata.has_unsealed_copy, + }); + } + + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L776-L778 + flagged_pieces.sort_by(|l, r| l.created_at.cmp(&r.created_at)); + + if offset > 0 { + if offset >= flagged_pieces.len() { + return Ok(vec![]); + } else { + flagged_pieces = flagged_pieces.split_off(offset); + } + } + + if flagged_pieces.len() > limit { + flagged_pieces.truncate(limit); + } + + Ok(flagged_pieces) + } + + /// For a detailed description, see [`Service::flagged_pieces_count`]. + /// + /// This information is stored in the [`PIECE_CID_TO_FLAGGED_CF`] column family. + /// + /// Sources: + /// * + /// * + fn flagged_pieces_count( + &self, + filter: Option, + ) -> Result { + let iterator = self + .database + .iterator_cf(self.cf_handle(PIECE_CID_TO_FLAGGED_CF), IteratorMode::Start); + + if let Some(filter) = filter { + let mut count: u64 = 0; + for line in iterator { + let (_, value) = line?; + + let flagged_metadata = + match ciborium::from_reader::(value.as_ref()) { + Ok(value) => Ok(value), + Err(err) => Err(LidError::Deserialization(err.to_string())), + }?; + + // NOTE(@jmg-duarte,05/06/2024): The check order is not arbitrary, + // it's the same as the order in boostd-data, maybe it has a reason, + // maybe it doesn't, keeping it the same for now... + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L823-L829 + if filter.has_unsealed_copy != flagged_metadata.has_unsealed_copy { + continue; + } + + if !filter.storage_provider_address.is_empty() + && filter.storage_provider_address != flagged_metadata.storage_provider_address + { + continue; + } + + count += 1; + } + Ok(count) + } else { + Ok(iterator.count() as u64) + } + } + + /// For a detailed description, see [`Self::next_pieces_to_check`]. + /// + /// This information is stored in the [`PIECE_CID_TO_CURSOR_CF`] column family. + /// + /// Sources: + /// * + /// * + fn next_pieces_to_check( + &mut self, + storage_provider_address: StorageProviderAddress, + ) -> Result, LidError> { + let mut cids = vec![]; + + // Leveraging the `DBRawIteratorWithThreadMode` should bring more performance + // but requires deeper knowledge of RocksDB, this is good enough for now + let iter = self + .database + .iterator_cf(self.cf_handle(PIECE_CID_TO_CURSOR_CF), IteratorMode::Start) + // Looks silly but it's faithful to the original implementation + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L389-L390 + .skip(self.offset) + .take(PIECES_TRACKER_BATCH_SIZE); + + let mut seen_pieces = 0; + for it in iter { + let (key, value) = it?; + seen_pieces += 1; + + let key_str = Cid::read_bytes(key.as_ref())?; + // TODO(@jmg-duarte,14/06/2024): missing an encoding step here + // https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L421-L422 + let checked_key = { + let mut key = storage_provider_address.0.clone(); + key.push_str(&key_str.to_string()); + key + }; + + if let Some(last_checked) = self.checked.get(&checked_key) { + if *last_checked > (chrono::Utc::now() - MIN_PIECE_CHECK_PERIOD) { + continue; + } + } + + let cid = Cid::read_bytes(key.as_ref())?; + let metadata: PieceInfo = ciborium::from_reader(value.as_ref()) + .map_err(|err| LidError::Deserialization(err.to_string()))?; + for deal in metadata.deals { + if deal.storage_provider_address == storage_provider_address { + self.checked.insert(checked_key.clone(), chrono::Utc::now()); + cids.push(cid); + break; + } + } + } + self.offset += seen_pieces; + + if seen_pieces < PIECES_TRACKER_BATCH_SIZE { + self.offset = 0; + } + + Ok(cids) + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use cid::{multihash::Multihash, Cid}; + use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; + use sha2::{Digest, Sha256}; + use tempfile::tempdir; + + use super::{key_flag_piece, RocksDBLid, RocksDBStateStoreConfig}; + use crate::local_index_directory::{ + rdb::{ + key_cursor_prefix, MULTIHASH_TO_PIECE_CID_CF, PIECE_CID_TO_CURSOR_CF, + PIECE_CID_TO_FLAGGED_CF, RAW_CODEC, + }, + DealId, DealInfo, FlaggedPiece, FlaggedPiecesListFilter, IndexRecord, LidError, OffsetSize, + PieceInfo, Service, StorageProviderAddress, + }; + + fn init_database() -> RocksDBLid { + let tmp_dir = tempdir().unwrap(); + let config = RocksDBStateStoreConfig { + path: tmp_dir.path().join("rocksdb"), + }; + RocksDBLid::new(config).unwrap() + } + + fn cids_vec() -> Vec { + vec![ + Cid::from_str("QmawceGscqN4o8Y8Fv26UUmB454kn2bnkXV5tEQYc4jBd6").unwrap(), + Cid::from_str("QmbvrHYWXAU1BuxMPNRtfeF4DS2oPmo5hat7ocqAkNPr74").unwrap(), + Cid::from_str("QmfRL5b6fPZ851F6E2ZUWX1kC4opXzq9QDhamvU4tJGuyR").unwrap(), + ] + } + + fn dummy_deal_info() -> DealInfo { + DealInfo { + deal_uuid: uuid::Uuid::new_v4(), + is_legacy: false, + chain_deal_id: 1337.into(), + storage_provider_address: "address".to_string().into(), + sector_number: 42.into(), + piece_offset: 10, + piece_length: 10, + car_length: 97, + is_direct_deal: false, + } + } + + /// Ensure that the expected column families are initialized. + #[test] + fn column_families() { + let db = init_database(); + assert!(matches!( + db.get_value_at_key::<_, Vec>("non_existing_key", DEFAULT_COLUMN_FAMILY_NAME), + Ok(None) + )); + assert!(matches!( + db.get_value_at_key::<_, Vec>("non_existing_key", PIECE_CID_TO_FLAGGED_CF), + Ok(None) + )); + assert!(matches!( + db.get_value_at_key::<_, Vec>("non_existing_key", PIECE_CID_TO_CURSOR_CF), + Ok(None) + )); + } + + /// Ensure there's nothing funky going on in the simpler wrappers. + #[test] + fn value_at_key() { + let db = init_database(); + let key = "cids"; + let cids = cids_vec(); + + assert!(matches!( + db.get_value_at_key::<_, Vec>(key, DEFAULT_COLUMN_FAMILY_NAME), + Ok(None) + )); + + assert!(db + .put_value_at_key(key, &cids, DEFAULT_COLUMN_FAMILY_NAME) + .is_ok()); + + assert!(matches!( + db.get_value_at_key::<_, Vec>(key, DEFAULT_COLUMN_FAMILY_NAME), + Ok(Some(_)) + )); + + assert!(db + .remove_value_at_key(key, DEFAULT_COLUMN_FAMILY_NAME) + .is_ok()); + + assert!(matches!( + db.get_value_at_key::<_, Vec>(key, DEFAULT_COLUMN_FAMILY_NAME), + Ok(None) + )); + } + + #[test] + fn piece_cid_to_metadata() { + let db = init_database(); + let cid = Cid::from_str("QmawceGscqN4o8Y8Fv26UUmB454kn2bnkXV5tEQYc4jBd6").unwrap(); + let piece_info = PieceInfo::default(); + + assert!(matches!(db.get_piece_cid_to_metadata(cid), Ok(None))); + assert!(db.set_piece_cid_to_metadata(cid, &piece_info).is_ok()); + let received = db.get_piece_cid_to_metadata(cid); + assert!(matches!(received, Ok(Some(_)))); + assert_eq!(piece_info, received.unwrap().unwrap()); + + assert!(db + .remove_value_at_key(cid.to_bytes(), PIECE_CID_TO_CURSOR_CF) + .is_ok()); + assert!(matches!(db.get_piece_cid_to_metadata(cid), Ok(None))); + } + + // Ensure the cursor logic works. + #[test] + fn cursor() { + let db = init_database(); + assert!(db.get_next_cursor().is_err()); + assert!(db.set_next_cursor(1010).is_ok()); + let cursor = db.get_next_cursor(); + assert_eq!(cursor.unwrap(), (1010, key_cursor_prefix(1010))); + } + + /// Ensure `add_deal_for_piece` creates a new [`PieceInfo`] and adds the respective deal + /// as well as append a second [`DealInfo`]. + #[test] + fn add_deal_for_piece() { + let db = init_database(); + let cid = cids_vec()[0]; + let deal_info = dummy_deal_info(); + let deal_info_2 = DealInfo { + deal_uuid: uuid::Uuid::new_v4(), + ..deal_info.clone() + }; + + assert!(matches!(db.get_piece_cid_to_metadata(cid), Ok(None))); + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_ok()); + assert!(db.add_deal_for_piece(cid, deal_info_2.clone()).is_ok()); // add a second one + + let piece_info = db.get_piece_cid_to_metadata(cid); + assert!(matches!(piece_info, Ok(Some(_)))); + assert_eq!(piece_info.unwrap().unwrap().deals[0], deal_info.clone()); + + let piece_info = db.get_piece_cid_to_metadata(cid); + assert!(matches!(piece_info, Ok(Some(_)))); + assert_eq!(piece_info.unwrap().unwrap().deals[1], deal_info_2.clone()); + } + + /// Ensure `add_deal_for_piece` detects duplicates. + #[test] + fn duplicate_add_deal_for_piece() { + let db = init_database(); + let cid = cids_vec()[0]; + // Not real values + let deal_info = dummy_deal_info(); + + assert!(matches!(db.get_piece_cid_to_metadata(cid), Ok(None))); + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_ok()); + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_err()); + } + + #[test] + fn remove_deal_for_piece() { + let db = init_database(); + let cid = cids_vec()[0]; + let deal_info = dummy_deal_info(); + let deal_info_2 = DealInfo { + deal_uuid: uuid::Uuid::new_v4(), + ..deal_info.clone() + }; + + assert!(matches!(db.get_piece_cid_to_metadata(cid), Ok(None))); + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_ok()); + assert!(db.add_deal_for_piece(cid, deal_info_2.clone()).is_ok()); // add a second one + + let piece_info = db.get_piece_cid_to_metadata(cid); + assert!(matches!(piece_info, Ok(Some(_)))); + assert_eq!(piece_info.unwrap().unwrap().deals[0], deal_info.clone()); + + let piece_info = db.get_piece_cid_to_metadata(cid); + assert!(matches!(piece_info, Ok(Some(_)))); + assert_eq!(piece_info.unwrap().unwrap().deals[1], deal_info_2.clone()); + + assert!(db.remove_deal_for_piece(cid, deal_info_2.deal_uuid).is_ok()); + assert_eq!(db.get_piece_deals(cid).unwrap(), vec![deal_info.clone()]); + + assert!(db.remove_deal_for_piece(cid, deal_info.deal_uuid).is_ok()); + assert!(matches!( + db.get_piece_deals(cid), + Err(LidError::PieceNotFound(_)) + )); + } + + #[test] + fn is_indexed() { + let db = init_database(); + let cid = cids_vec()[0]; + let mut piece_info = PieceInfo::default(); + + // PieceInfo hasn't been inserted + assert_eq!(db.is_indexed(cid).unwrap(), false); + // Inserted but false + assert!(db.set_piece_cid_to_metadata(cid, &piece_info).is_ok()); + assert_eq!(db.is_indexed(cid).unwrap(), false); + // Modify and insert + piece_info.indexed_at = chrono::Utc::now().into(); + assert!(db.set_piece_cid_to_metadata(cid, &piece_info).is_ok()); + assert!(db.is_indexed(cid).unwrap()); + } + + #[test] + fn indexed_at() { + let db = init_database(); + let cid = cids_vec()[0]; + let mut piece_info = PieceInfo::default(); + piece_info.indexed_at = chrono::Utc::now().into(); + + // Inserted but false + db.set_piece_cid_to_metadata(cid, &piece_info).unwrap(); + assert!(db.is_indexed(cid).unwrap()); + assert_eq!(db.indexed_at(cid).unwrap(), piece_info.indexed_at); + } + + #[test] + fn is_complete_index() { + let db = init_database(); + let cid = cids_vec()[0]; + let mut piece_info = PieceInfo::default(); + + // PieceInfo hasn't been inserted + assert!(matches!( + db.is_complete_index(cid), + Err(LidError::PieceNotFound(_)) + )); + // Inserted but false + db.set_piece_cid_to_metadata(cid, &piece_info).unwrap(); + assert_eq!(db.is_complete_index(cid).unwrap(), false); + // Modify and insert + piece_info.complete_index = true; + db.set_piece_cid_to_metadata(cid, &piece_info).unwrap(); + assert!(db.is_complete_index(cid).unwrap()); + } + + #[test] + fn list_pieces() { + let db = init_database(); + let cids = cids_vec(); + + assert_eq!(db.list_pieces().unwrap(), vec![]); + // empty payload since `list_pieces` reads the Cid off of the key + cids.iter().for_each(|cid| { + db.put_value_at_key::<_, Vec>(cid.to_bytes(), &vec![], PIECE_CID_TO_CURSOR_CF) + .unwrap() + }); + assert_eq!(db.list_pieces().unwrap(), cids); + } + + #[test] + fn get_piece_metadata() { + let db = init_database(); + let cid = Cid::from_str("QmawceGscqN4o8Y8Fv26UUmB454kn2bnkXV5tEQYc4jBd6").unwrap(); + let piece_info = PieceInfo::default(); + + assert!(matches!( + db.get_piece_metadata(cid), + Err(LidError::PieceNotFound(_)) + )); + assert!(db.set_piece_cid_to_metadata(cid, &piece_info).is_ok()); + let received = db.get_piece_metadata(cid); + assert!(matches!(received, Ok(_))); + assert_eq!(piece_info, received.unwrap()); + } + + #[test] + fn remove_piece_metadata() { + let db = init_database(); + let cids = cids_vec(); + let piece_info = PieceInfo::default(); + let records = vec![ + IndexRecord { + cid: cids[1], + offset_size: OffsetSize { offset: 0, size: 0 }, + }, + IndexRecord { + cid: cids[2], + offset_size: OffsetSize { + offset: 0, + size: 100, + }, + }, + ]; + + assert!(matches!( + db.get_piece_metadata(cids[0]), + Err(LidError::PieceNotFound(_)) + )); + assert!(db.set_piece_cid_to_metadata(cids[0], &piece_info).is_ok()); + let received = db.get_piece_metadata(cids[0]); + assert!(matches!(received, Ok(_))); + assert_eq!(piece_info, received.unwrap()); + assert!(db.add_index(cids[0], records.clone(), false).is_ok()); + + assert!(db.remove_piece_metadata(cids[0]).is_ok()); + assert!(matches!( + db.get_piece_metadata(cids[0]), + Err(LidError::PieceNotFound(_)) + )); + + // Ensure mh -> offset also gets removed when indexes are removed + assert!(db + .database + .prefix_iterator("/0/") + .collect::>() + .is_empty()); + } + + #[test] + fn get_piece_deals() { + let db = init_database(); + let cid = cids_vec()[0]; + let deal_info = dummy_deal_info(); + let deal_info_2 = DealInfo { + deal_uuid: uuid::Uuid::new_v4(), + ..deal_info.clone() + }; + + // Ensure there are no tricks up our sleeves + assert!(matches!( + db.get_piece_metadata(cid), + Err(LidError::PieceNotFound(_)) + )); + assert!(matches!( + db.get_piece_deals(cid), + Err(LidError::PieceNotFound(_)) + )); + + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_ok()); + assert!(db.add_deal_for_piece(cid, deal_info_2.clone()).is_ok()); // add a second one + + assert!(matches!(db.get_piece_deals(cid), Ok(_))); + assert_eq!( + db.get_piece_deals(cid).unwrap(), + vec![deal_info, deal_info_2] + ); + } + + /// Tests the insertion and retrieval of pieces indexes. + #[test] + fn get_index() { + let db = init_database(); + let cids = cids_vec(); + let cid = cids[0]; + let deal_info = dummy_deal_info(); + let records = vec![ + IndexRecord { + cid: cids[1], + offset_size: OffsetSize { offset: 0, size: 0 }, + }, + IndexRecord { + cid: cids[2], + offset_size: OffsetSize { + offset: 0, + size: 100, + }, + }, + ]; + + // Insert the deal + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_ok()); + assert_eq!(db.get_index(cid).unwrap(), vec![]); + // Add the index records + assert!(db.add_index(cid, records.clone(), false).is_ok()); + + // Get the index back + let mut received = db.get_index(cid).unwrap(); + // This sort is just to ensure the order matches our records vec, it's not representative of a pattern + received.sort_by(|l, h| l.offset_size.size.cmp(&h.offset_size.size)); + + for (new, old) in received.iter().zip(records.iter()) { + // We need to consider that the CIDs get converted to v1 on the way back + assert_eq!(new.cid.hash(), old.cid.hash()); + assert_eq!(new.cid.version(), cid::Version::V1); + // honestly, I don't know if the records are always sent in as RAW, + // but it forcefully makes them RAW on their way out, so, + // we need to check that the CIDs we have, that have the DAG-PB codec + // were converted to the RAW codec + assert_eq!(new.cid.codec(), RAW_CODEC); + assert_eq!(new.offset_size, old.offset_size); + } + + // Check the multihash -> cid mapping — i.e. check the add_index side effect + for record in &records { + let value = db.get_multihash_to_piece_cids(record.cid.hash()); + assert_eq!(value.unwrap(), vec![cid]); + } + + // Ensure the multihash -> offset entries were also added + assert_eq!( + db.database.prefix_iterator("/0/").collect::>().len(), + 2 + ); + } + + #[test] + fn remove_indexes() { + let db = init_database(); + let cids = cids_vec(); + let cid = cids[0]; + let deal_info = dummy_deal_info(); + let records = vec![ + IndexRecord { + cid: cids[1], + offset_size: OffsetSize { offset: 0, size: 0 }, + }, + IndexRecord { + cid: cids[2], + offset_size: OffsetSize { + offset: 0, + size: 100, + }, + }, + ]; + assert!(matches!( + db.remove_indexes(cid), + Err(LidError::PieceNotFound(_)) + )); + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_ok()); + assert!(db.add_index(cid, records.clone(), false).is_ok()); + // Ensure it's not empty + let indexes: Vec<_> = db + .database + .iterator_cf( + db.cf_handle(MULTIHASH_TO_PIECE_CID_CF), + rocksdb::IteratorMode::Start, + ) + .flat_map(std::convert::identity) + .collect(); + assert_eq!(indexes.len(), 2); + + assert_eq!( + db.database.prefix_iterator("/0/").collect::>().len(), + 2 + ); + // Ensure it's empty after removal + assert!(db.remove_indexes(cid).is_ok()); + let indexes: Vec<_> = db + .database + .iterator_cf( + db.cf_handle(MULTIHASH_TO_PIECE_CID_CF), + rocksdb::IteratorMode::Start, + ) + .flat_map(std::convert::identity) + .collect(); + assert!(indexes.is_empty()); + + // Ensure mh -> offset also gets removed when indexes are removed + assert!(db + .database + .prefix_iterator("/0/") + .collect::>() + .is_empty()); + } + + #[test] + fn get_offset_size() { + let db = init_database(); + let cids = cids_vec(); + let cid = cids[0]; + let deal_info = dummy_deal_info(); + let records = vec![ + IndexRecord { + cid: cids[1], + offset_size: OffsetSize { offset: 0, size: 0 }, + }, + IndexRecord { + cid: cids[2], + offset_size: OffsetSize { + offset: 0, + size: 100, + }, + }, + ]; + + // No piece + assert!(matches!( + db.get_offset_size(cid, *cids[1].hash()), + Err(LidError::PieceNotFound(_)) + )); + + // Insert the deal + assert!(db.add_deal_for_piece(cid, deal_info.clone()).is_ok()); + assert_eq!(db.get_index(cid).unwrap(), vec![]); + // Piece exists but no index + assert!(matches!( + db.get_offset_size(cid, *cids[1].hash()), + Err(LidError::MultihashNotFound(_)) + )); + + // Add the index records + assert!(db.add_index(cid, records.clone(), false).is_ok()); + + let offset_size = db.get_offset_size(cid, *cids[1].hash()).unwrap(); + assert_eq!(records[0].offset_size, offset_size); + + let offset_size = db.get_offset_size(cid, *cids[2].hash()).unwrap(); + assert_eq!(records[1].offset_size, offset_size); + } + + #[test] + fn pieces_containing_multihash() { + let db = init_database(); + let cids = cids_vec(); + let deal_info = dummy_deal_info(); + let records = vec![IndexRecord { + cid: cids[2], + offset_size: OffsetSize { offset: 0, size: 0 }, + }]; + + let pieces = db.pieces_containing_multihash(cids[2].hash().to_owned()); + assert!(matches!(pieces, Err(LidError::MultihashNotFound(_)))); + + assert!(db.add_deal_for_piece(cids[0], deal_info.clone()).is_ok()); + assert!(db.add_deal_for_piece(cids[1], deal_info.clone()).is_ok()); + assert!(db.add_index(cids[0], records.clone(), false).is_ok()); + assert!(db.add_index(cids[1], records, false).is_ok()); + + let pieces = db + .pieces_containing_multihash(cids[2].hash().to_owned()) + .unwrap(); + assert_eq!(pieces, vec![cids[0], cids[1]]); + } + + #[test] + fn flag_piece() { + let db = init_database(); + let cid = cids_vec()[0]; + // The address of the top storage provider at the time of writing (12/6/24) + let storage_provider_address = + StorageProviderAddress("f24yeyklfsjvav6onmm4k2lbkfi6chnke5ivt5wbq".to_string()); + let key = key_flag_piece(&cid, &storage_provider_address); + + assert!(db + .get_value_at_key::<_, Option>(&key, PIECE_CID_TO_FLAGGED_CF) + .unwrap() + .is_none()); + + assert!(db + .flag_piece(cid, true, storage_provider_address.clone()) + .is_ok()); + + let flagged_piece: FlaggedPiece = db + .get_value_at_key(key, PIECE_CID_TO_FLAGGED_CF) + .unwrap() + .unwrap(); + + assert_eq!(flagged_piece.piece_cid, cid); + assert_eq!( + flagged_piece.storage_provider_address, + storage_provider_address + ); + assert!(flagged_piece.has_unsealed_copy); + } + + #[test] + fn unflag_piece() { + let db = init_database(); + let cid = cids_vec()[0]; + // The address of the top storage provider at the time of writing (12/6/24) + let storage_provider_address = + StorageProviderAddress("f24yeyklfsjvav6onmm4k2lbkfi6chnke5ivt5wbq".to_string()); + let key = key_flag_piece(&cid, &storage_provider_address); + + assert!(matches!( + db.unflag_piece(cid, storage_provider_address.clone()), + Ok(()) + )); + + assert!(db + .get_value_at_key::<_, Option>(&key, PIECE_CID_TO_FLAGGED_CF) + .unwrap() + .is_none()); + + assert!(db + .flag_piece(cid, true, storage_provider_address.clone()) + .is_ok()); + + let flagged_piece: FlaggedPiece = db + .get_value_at_key(&key, PIECE_CID_TO_FLAGGED_CF) + .unwrap() + .unwrap(); + + assert_eq!(flagged_piece.piece_cid, cid); + assert_eq!( + flagged_piece.storage_provider_address, + storage_provider_address + ); + assert!(flagged_piece.has_unsealed_copy); + + assert!(db + .unflag_piece(cid, storage_provider_address.clone()) + .is_ok()); + + assert!(db + .get_value_at_key::<_, Option>(&key, PIECE_CID_TO_FLAGGED_CF) + .unwrap() + .is_none()); + } + + #[test] + fn flagged_pieces_count() { + let db = init_database(); + let cid = cids_vec()[0]; + // The address of the top storage provider at the time of writing (12/6/24) + let storage_provider_address = + StorageProviderAddress("f24yeyklfsjvav6onmm4k2lbkfi6chnke5ivt5wbq".to_string()); + + assert!(db + .flag_piece(cid, true, storage_provider_address.clone()) + .is_ok()); + + // All pieces + assert_eq!(db.flagged_pieces_count(None).unwrap(), 1); + // Should ignore empty address + assert_eq!( + db.flagged_pieces_count(Some(FlaggedPiecesListFilter { + storage_provider_address: StorageProviderAddress("".to_string()), + has_unsealed_copy: true + })) + .unwrap(), + 1 + ); + assert_eq!( + db.flagged_pieces_count(Some(FlaggedPiecesListFilter { + storage_provider_address: StorageProviderAddress("a".to_string()), + has_unsealed_copy: true + })) + .unwrap(), + 0 + ); + // Right address but the flagged piece has `has_unsealed_copy: true` + assert_eq!( + db.flagged_pieces_count(Some(FlaggedPiecesListFilter { + storage_provider_address: storage_provider_address.clone(), + has_unsealed_copy: false + })) + .unwrap(), + 0 + ); + // All filters match + assert_eq!( + db.flagged_pieces_count(Some(FlaggedPiecesListFilter { + storage_provider_address: storage_provider_address, + has_unsealed_copy: true + })) + .unwrap(), + 1 + ) + } + + #[test] + fn flagged_pieces_list() { + let db = init_database(); + let cids = cids_vec(); + // The address of the top storage provider at the time of writing (12/6/24) + let storage_provider_address = + StorageProviderAddress("f24yeyklfsjvav6onmm4k2lbkfi6chnke5ivt5wbq".to_string()); + + assert!(db + .flag_piece(cids[0], true, storage_provider_address.clone()) + .is_ok()); + + // To test the cursor functionality + let after_first = chrono::Utc::now(); + + assert!(db + .flag_piece(cids[1], false, storage_provider_address.clone()) + .is_ok()); + assert!(db + .flag_piece(cids[2], true, storage_provider_address.clone()) + .is_ok()); + + assert_eq!( + db.flagged_pieces_list(None, chrono::DateTime::UNIX_EPOCH, 0, 1000) + .unwrap() + .into_iter() + .map(|fp| fp.piece_cid) + .collect::>(), + cids + ); + + assert_eq!( + db.flagged_pieces_list(None, chrono::DateTime::UNIX_EPOCH, 0, 1) + .unwrap() + .into_iter() + .map(|fp| fp.piece_cid) + .collect::>(), + cids[..1] + ); + + assert_eq!( + db.flagged_pieces_list(None, chrono::DateTime::UNIX_EPOCH, 1, 1) + .unwrap() + .into_iter() + .map(|fp| fp.piece_cid) + .collect::>(), + cids[1..2] + ); + + assert_eq!( + db.flagged_pieces_list(None, after_first, 0, 1000) + .unwrap() + .into_iter() + .map(|fp| fp.piece_cid) + .collect::>(), + cids[1..] + ); + + assert_eq!( + db.flagged_pieces_list( + Some(FlaggedPiecesListFilter { + storage_provider_address: StorageProviderAddress("".to_string()), + has_unsealed_copy: false + }), + chrono::DateTime::UNIX_EPOCH, + 1, + 1000 + ) + .unwrap() + .into_iter() + .map(|fp| fp.piece_cid) + .collect::>(), + vec![] + ); + assert_eq!( + db.flagged_pieces_list( + Some(FlaggedPiecesListFilter { + storage_provider_address: StorageProviderAddress("a".to_string()), + has_unsealed_copy: false + }), + chrono::DateTime::UNIX_EPOCH, + 0, + 1000 + ) + .unwrap() + .into_iter() + .map(|fp| fp.piece_cid) + .collect::>(), + vec![] + ); + + assert_eq!( + db.flagged_pieces_list( + Some(FlaggedPiecesListFilter { + storage_provider_address, + has_unsealed_copy: false + }), + chrono::DateTime::UNIX_EPOCH, + 0, + 1000 + ) + .unwrap() + .into_iter() + .map(|fp| fp.piece_cid) + .collect::>(), + vec![cids[1]] + ); + } + + #[test] + fn next_pieces_to_check() { + let mut db = init_database(); + let mut cids = vec![]; + let storage_provider_address = + StorageProviderAddress("f24yeyklfsjvav6onmm4k2lbkfi6chnke5ivt5wbq".to_string()); + + // 1024 + 512 (a batch and a half) + for i in 0..1536u64 { + let digest = Sha256::digest(i.to_le_bytes()); + let mh = Multihash::wrap(0x12, digest.as_ref()).unwrap(); + let cid = Cid::new_v1(RAW_CODEC, mh); + cids.push(cid); + let mut piece_info = PieceInfo::default(); + piece_info.deals.push(DealInfo { + deal_uuid: uuid::Uuid::new_v4(), + is_legacy: false, + chain_deal_id: DealId(i), + storage_provider_address: storage_provider_address.clone(), + sector_number: 0.into(), + piece_offset: 0, + piece_length: 0, + car_length: 0, + is_direct_deal: false, + }); + db.set_piece_cid_to_metadata(cid, &piece_info).unwrap(); + } + // The DB does not ensure order, so we "create" one. + cids.sort(); + + let first_batch = { + let mut v = db + .next_pieces_to_check(storage_provider_address.clone()) + .unwrap(); + v.sort(); + v + }; + assert_eq!(first_batch, cids[0..1024]); + + let second_batch = { + let mut v = db + .next_pieces_to_check(storage_provider_address.clone()) + .unwrap(); + v.sort(); + v + }; + assert_eq!(second_batch, cids[1024..]); + } +} diff --git a/storage/polka-index/src/local_index_directory/rdb_ext.rs b/storage/polka-index/src/local_index_directory/rdb_ext.rs new file mode 100644 index 000000000..eef43d0cf --- /dev/null +++ b/storage/polka-index/src/local_index_directory/rdb_ext.rs @@ -0,0 +1,38 @@ +use rocksdb::{AsColumnFamilyRef, WriteBatchWithTransaction}; +use serde::Serialize; + +use super::LidError; + +pub(crate) trait WriteBatchWithTransactionExt { + /// Insert a CBOR serialized value with the provided key. + fn put_cf_cbor( + &mut self, + cf: &impl AsColumnFamilyRef, + key: K, + value: V, + ) -> Result<(), LidError> + where + K: AsRef<[u8]>, + V: Serialize; +} + +impl WriteBatchWithTransactionExt + for WriteBatchWithTransaction +{ + fn put_cf_cbor( + &mut self, + cf: &impl AsColumnFamilyRef, + key: K, + value: V, + ) -> Result<(), LidError> + where + K: AsRef<[u8]>, + V: Serialize, + { + let mut serialized = vec![]; + if let Err(err) = ciborium::into_writer(&value, &mut serialized) { + return Err(LidError::Serialization(err.to_string())); + } + Ok(self.put_cf(cf, key, serialized)) + } +} diff --git a/storage/polka-index/src/main.rs b/storage/polka-index/src/main.rs deleted file mode 100644 index e7a11a969..000000000 --- a/storage/polka-index/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/storage/polka-index/src/piecestore/README.md b/storage/polka-index/src/piecestore/README.md deleted file mode 100644 index 64a7cb55d..000000000 --- a/storage/polka-index/src/piecestore/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# piecestore - -The piecestore module is a simple encapsulation of two data stores, one for `PieceInfo` and -another for `CidInfo`. diff --git a/storage/polka-index/src/piecestore/mod.rs b/storage/polka-index/src/piecestore/mod.rs deleted file mode 100644 index b62c66292..000000000 --- a/storage/polka-index/src/piecestore/mod.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::collections::HashMap; - -use cid::Cid; -use rocksdb::RocksDBError; -use thiserror::Error; - -use self::types::{BlockLocation, CidInfo, DealInfo, PieceInfo}; - -pub mod rocksdb; -pub mod types; - -pub trait PieceStore { - /// Implementation-specific configuration. - type Config; - - /// Initialize a new store. - fn new(config: Self::Config) -> Result - where - Self: Sized; - - /// Store [`DealInfo`] in the PieceStore with key piece [`Cid`]. - fn add_deal_for_piece( - &self, - piece_cid: &Cid, - deal_info: DealInfo, - ) -> Result<(), PieceStoreError>; - - /// Store the map of [`BlockLocation`] in the [`PieceStore`]'s [`CidInfo`] store, with - /// key piece [`Cid`]. - /// - /// Note: If a piece block location is already present in the [`CidInfo`], it - /// will be ignored. - fn add_piece_block_locations( - &self, - piece_cid: &Cid, - block_locations: &HashMap, - ) -> Result<(), PieceStoreError>; - - /// List all piece [`Cid`]s stored in the [`PieceStore`]. - fn list_piece_info_keys(&self) -> Result, PieceStoreError>; - - /// List all [`CidInfo`]s keys stored in the [`PieceStore`]. - fn list_cid_info_keys(&self) -> Result, PieceStoreError>; - - /// Retrieve the [`PieceInfo`] for a given piece [`Cid`]. - fn get_piece_info(&self, cid: &Cid) -> Result, PieceStoreError>; - - /// Retrieve the [`CidInfo`] associated with piece [`Cid`]. - fn get_cid_info(&self, cid: &Cid) -> Result, PieceStoreError>; -} - -/// Error that can occur when interacting with the [`PieceStore`]. -#[derive(Debug, Error)] -pub enum PieceStoreError { - #[error("Initialization error: {0}")] - Initialization(String), - - #[error("Deal already exists")] - DealExists, - - #[error("Serialization error: {0}")] - Serialization(String), - - #[error("Deserialization error: {0}")] - Deserialization(String), - - #[error(transparent)] - StoreError(#[from] RocksDBError), -} diff --git a/storage/polka-index/src/piecestore/rocksdb.rs b/storage/polka-index/src/piecestore/rocksdb.rs deleted file mode 100644 index 3371663e3..000000000 --- a/storage/polka-index/src/piecestore/rocksdb.rs +++ /dev/null @@ -1,410 +0,0 @@ -use std::{collections::HashMap, path::PathBuf}; - -use cid::Cid; -use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, DB as RocksDB}; -use serde::{de::DeserializeOwned, Serialize}; - -use super::{ - types::{BlockLocation, CidInfo, DealInfo}, - PieceStore, PieceStoreError, -}; -use crate::piecestore::types::{PieceBlockLocation, PieceInfo}; - -/// Error type for RocksDB operations. -pub(crate) type RocksDBError = rocksdb::Error; - -/// Column family name used to store piece. -const PIECES_CF: &str = "pieces"; - -/// Column family name used to store CID infos. -const CID_INFOS_CF: &str = "cid_infos"; - -pub struct RocksDBStateStoreConfig { - pub path: PathBuf, -} - -/// A [`super::PieceStore`] implementation backed by RocksDB. -pub struct RocksDBPieceStore { - database: RocksDB, -} - -impl RocksDBPieceStore { - /// Get the column family handle for the given column family name. Panics if - /// the column family is not present. The column families needed and used - /// are created at initialization. They will always be present. - #[track_caller] - fn cf_handle(&self, cf_name: &str) -> &ColumnFamily { - self.database - .cf_handle(cf_name) - .expect("column family should be present") - } - - fn list_cids_in_cf(&self, cf_name: &str) -> Result, PieceStoreError> { - let iterator = self - .database - .iterator_cf(self.cf_handle(cf_name), IteratorMode::Start); - - iterator - .map(|line| { - let (key, _) = line?; - - let parsed_cid = Cid::try_from(key.as_ref()).map_err(|err| { - // We know that all stored CIDs are valid, so this - // should only happen if database is corrupted. - PieceStoreError::Deserialization(format!("invalid CID: {}", err)) - })?; - - Ok(parsed_cid) - }) - .collect() - } - - /// Get value at the specified key in the specified column family. - fn get_value_at_key( - &self, - key: Key, - cf_name: &str, - ) -> Result, PieceStoreError> - where - Key: AsRef<[u8]>, - Value: DeserializeOwned, - { - let Some(slice) = self.database.get_pinned_cf(self.cf_handle(cf_name), key)? else { - return Ok(None); - }; - - match ciborium::from_reader(slice.as_ref()) { - Ok(value) => Ok(Some(value)), - // ciborium error is bubbled up as a string because it is generic - // and we didn't want to add a generic type to the PieceStoreError - Err(err) => Err(PieceStoreError::Deserialization(err.to_string())), - } - } - - /// Put value at the specified key in the specified column family. - fn put_value_at_key( - &self, - key: Key, - value: &Value, - cf_name: &str, - ) -> Result<(), PieceStoreError> - where - Key: AsRef<[u8]>, - Value: Serialize, - { - let mut serialized = Vec::new(); - if let Err(err) = ciborium::into_writer(value, &mut serialized) { - // ciborium error is bubbled up as a string because it is generic - // and we didn't want to add a generic type to the PieceStoreError - return Err(PieceStoreError::Serialization(err.to_string())); - } - - self.database - .put_cf(self.cf_handle(cf_name), key, serialized)?; - - Ok(()) - } -} - -impl PieceStore for RocksDBPieceStore { - type Config = RocksDBStateStoreConfig; - - fn new(config: Self::Config) -> Result - where - Self: Sized, - { - let pieces_column = ColumnFamilyDescriptor::new(PIECES_CF, Options::default()); - let cid_infos_column = ColumnFamilyDescriptor::new(CID_INFOS_CF, Options::default()); - - let mut opts = Options::default(); - // Creates a new database if it doesn't exist - opts.create_if_missing(true); - // Create missing column families - opts.create_missing_column_families(true); - - let database = RocksDB::open_cf_descriptors( - &opts, - config.path, - vec![pieces_column, cid_infos_column], - )?; - - Ok(Self { database }) - } - - fn add_deal_for_piece( - &self, - piece_cid: &Cid, - deal_info: DealInfo, - ) -> Result<(), PieceStoreError> { - // Check if the piece exists - let mut piece_info = self - .get_value_at_key(piece_cid.to_bytes(), PIECES_CF)? - .unwrap_or_else(|| PieceInfo { - piece_cid: *piece_cid, - deals: Vec::new(), - }); - - // Check if deal already added for this piece - if piece_info.deals.iter().any(|d| *d == deal_info) { - return Err(PieceStoreError::DealExists); - } - - // Save the new deal - piece_info.deals.push(deal_info); - self.put_value_at_key(piece_cid.to_bytes(), &piece_info, PIECES_CF) - } - - fn add_piece_block_locations( - &self, - piece_cid: &Cid, - block_locations: &HashMap, - ) -> Result<(), PieceStoreError> { - for (cid, block_location) in block_locations { - let mut info = self - .get_value_at_key(cid.to_bytes(), CID_INFOS_CF)? - .unwrap_or_else(|| CidInfo { - cid: *cid, - piece_block_location: Vec::new(), - }); - - if info - .piece_block_location - .iter() - .any(|pbl| pbl.piece_cid == *piece_cid && pbl.location == *block_location) - { - continue; - } - - // Append the new block location - info.piece_block_location.push(PieceBlockLocation { - piece_cid: *piece_cid, - location: *block_location, - }); - - // Save the updated CidInfo - self.put_value_at_key(cid.to_bytes(), &info, CID_INFOS_CF)?; - } - - Ok(()) - } - - fn list_piece_info_keys(&self) -> Result, PieceStoreError> { - self.list_cids_in_cf(PIECES_CF) - } - - fn list_cid_info_keys(&self) -> Result, PieceStoreError> { - self.list_cids_in_cf(CID_INFOS_CF) - } - - fn get_piece_info(&self, cid: &Cid) -> Result, PieceStoreError> { - self.get_value_at_key(cid.to_bytes(), PIECES_CF) - } - - fn get_cid_info(&self, cid: &Cid) -> Result, PieceStoreError> { - self.get_value_at_key(cid.to_bytes(), CID_INFOS_CF) - } -} - -#[cfg(test)] -mod test { - use std::{collections::HashMap, str::FromStr}; - - use cid::Cid; - use tempfile::tempdir; - - use super::{RocksDBPieceStore, RocksDBStateStoreConfig}; - use crate::piecestore::{ - types::{BlockLocation, DealInfo, PieceBlockLocation}, - PieceStore, PieceStoreError, - }; - - fn init_database() -> RocksDBPieceStore { - let tmp_dir = tempdir().unwrap(); - let config = RocksDBStateStoreConfig { - path: tmp_dir.path().join("rocksdb"), - }; - - RocksDBPieceStore::new(config).unwrap() - } - - fn cids() -> (Cid, Cid, Cid) { - ( - Cid::from_str("QmawceGscqN4o8Y8Fv26UUmB454kn2bnkXV5tEQYc4jBd6").unwrap(), - Cid::from_str("QmbvrHYWXAU1BuxMPNRtfeF4DS2oPmo5hat7ocqAkNPr74").unwrap(), - Cid::from_str("QmfRL5b6fPZ851F6E2ZUWX1kC4opXzq9QDhamvU4tJGuyR").unwrap(), - ) - } - - fn rand_deal() -> DealInfo { - DealInfo { - deal_id: 1, - sector_id: 1, - offset: 0, - length: 100, - } - } - - fn block_location() -> BlockLocation { - BlockLocation { - rel_offset: 0, - block_size: 100, - } - } - - #[test] - fn test_piece_info_can_add_deals() { - let store = init_database(); - let (piece_cid, piece_cid2, _) = cids(); - let deal_info = rand_deal(); - - // add deal for piece - store.add_deal_for_piece(&piece_cid, deal_info).unwrap(); - - // get piece info - let info = store.get_piece_info(&piece_cid).unwrap().unwrap(); - assert_eq!(info.deals, vec![deal_info]); - - // verify that getting a piece with a non-existent CID return None - let info = store.get_piece_info(&piece_cid2).unwrap(); - assert!(info.is_none(), "expected None, got {:?}", info); - } - - #[test] - fn test_piece_adding_same_deal_twice_returns_error() { - let store = init_database(); - let (piece_cid, _, _) = cids(); - let deal_info = rand_deal(); - - // add deal for piece - store.add_deal_for_piece(&piece_cid, deal_info).unwrap(); - - // add deal for piece - let result = store.add_deal_for_piece(&piece_cid, deal_info); - assert!( - matches!(result, Err(PieceStoreError::DealExists)), - "expected error, got {:?}", - result - ); - } - - #[test] - fn test_cid_info_can_add_piece_block_locations() { - let store = init_database(); - let (piece_cid, _, _) = cids(); - let block_locations = [block_location(); 4]; - let test_cids = [ - Cid::from_str("QmW9pMY7fvbxVA2CaihgxJRzmSv15Re2TABte4HoZdfypo").unwrap(), - Cid::from_str("QmZbaU7GGuu9F7saPgVmPSK55of8QkzEwjPrj7xxWogxiY").unwrap(), - Cid::from_str("QmQSQYNn2K6xTDLhfNcoTjBExz5Q5gpHHBTqZZKdxsPRB9").unwrap(), - ]; - - let block_locations = test_cids - .iter() - .zip(block_locations.iter()) - .map(|(cid, block_location)| (*cid, *block_location)) - .collect::>(); - - // add piece block locations - store - .add_piece_block_locations(&piece_cid, &block_locations) - .unwrap(); - - // get cid info - let info = store.get_cid_info(&test_cids[0]).unwrap().unwrap(); - assert!( - info.piece_block_location.contains(&PieceBlockLocation { - piece_cid, - location: block_locations[&test_cids[0]] - }), - "block location not found in cid info" - ); - - let info = store.get_cid_info(&test_cids[1]).unwrap().unwrap(); - assert!( - info.piece_block_location.contains(&PieceBlockLocation { - piece_cid, - location: block_locations[&test_cids[1]] - }), - "block location not found in cid info" - ); - - let info = store.get_cid_info(&test_cids[2]).unwrap().unwrap(); - assert!( - info.piece_block_location.contains(&PieceBlockLocation { - piece_cid, - location: block_locations[&test_cids[2]] - }), - "block location not found in cid info" - ); - - // verify that getting a piece with a non-existent CID return None - let info = store - .get_cid_info(&Cid::from_str("QmW9pMY7fvbxVA2CaihgxJRzmSv15Re2TABte4HoZdfypa").unwrap()) - .unwrap(); - assert!(info.is_none(), "expected None, got {:?}", info); - } - - #[test] - fn test_cid_info_overlapping_adds() { - let store = init_database(); - let (piece_cid, _, _) = cids(); - let block_locations = [block_location(); 4]; - let test_cids = [ - Cid::from_str("QmW9pMY7fvbxVA2CaihgxJRzmSv15Re2TABte4HoZdfypo").unwrap(), - Cid::from_str("QmZbaU7GGuu9F7saPgVmPSK55of8QkzEwjPrj7xxWogxiY").unwrap(), - Cid::from_str("QmQSQYNn2K6xTDLhfNcoTjBExz5Q5gpHHBTqZZKdxsPRB9").unwrap(), - ]; - - // add piece block locations - let locations = [ - (test_cids[0], block_locations[0]), - (test_cids[1], block_locations[2]), - ] - .into_iter() - .collect::>(); - - store - .add_piece_block_locations(&piece_cid, &locations) - .unwrap(); - - // add piece block locations - let locations = [ - (test_cids[1], block_locations[1]), - (test_cids[2], block_locations[2]), - ] - .into_iter() - .collect::>(); - - store - .add_piece_block_locations(&piece_cid, &locations) - .unwrap(); - - // get cid info - let info = store.get_cid_info(&test_cids[0]).unwrap().unwrap(); - assert_eq!( - info.piece_block_location, - vec![PieceBlockLocation { - piece_cid, - location: block_locations[0] - }] - ); - - let info = store.get_cid_info(&test_cids[1]).unwrap().unwrap(); - assert_eq!( - info.piece_block_location, - vec![PieceBlockLocation { - piece_cid, - location: block_locations[1] - }] - ); - - let info = store.get_cid_info(&test_cids[2]).unwrap().unwrap(); - assert_eq!( - info.piece_block_location, - vec![PieceBlockLocation { - piece_cid, - location: block_locations[2] - }] - ); - } -} diff --git a/storage/polka-index/src/piecestore/types.rs b/storage/polka-index/src/piecestore/types.rs deleted file mode 100644 index b89d7c307..000000000 --- a/storage/polka-index/src/piecestore/types.rs +++ /dev/null @@ -1,48 +0,0 @@ -use cid::Cid; -use serde::{Deserialize, Serialize}; - -/// Metadata about a piece that provider may be storing based on its [`Cid`]. So -/// that, given a [`Cid`] during retrieval, the miner can determine how to -/// unseal it if needed -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PieceInfo { - pub piece_cid: Cid, - pub deals: Vec, -} - -/// Identifier for a retrieval deal (unique to a client) -type DealId = u64; - -/// Numeric identifier for a sector. It is usually relative to a miner. -type SectorNumber = u64; - -/// Information about a single deal for a given piece -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct DealInfo { - pub deal_id: DealId, - pub sector_id: SectorNumber, - pub offset: u64, - pub length: u64, -} - -/// Information about where a given block is relative to the overall piece -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct BlockLocation { - pub rel_offset: u64, - pub block_size: u64, -} - -/// Contains block information along with the [`Cid`] of the piece the block is -/// inside of -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct PieceBlockLocation { - pub piece_cid: Cid, - pub location: BlockLocation, -} - -/// Information about where a given [`Cid`] will live inside a piece -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CidInfo { - pub cid: Cid, - pub piece_block_location: Vec, -}