Skip to content

Commit

Permalink
CasObject v2 (#16)
Browse files Browse the repository at this point in the history
CasObject v2 includes:
* Major changes to metadata / info kept with each Xorb
* Header -> Footer (now called CasObjectInfo)
* Chunks include efficient 8-byte header, can be compressed
* More unit-tests
* Lots of documentation
---------

Co-authored-by: Hoyt Koepke <[email protected]>
Co-authored-by: Assaf Vayner <[email protected]>
Co-authored-by: Assaf Vayner <[email protected]>
  • Loading branch information
4 people authored Sep 20, 2024
1 parent 2020051 commit 037d267
Show file tree
Hide file tree
Showing 9 changed files with 904 additions and 291 deletions.
26 changes: 24 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 67 additions & 52 deletions cas_client/src/local_client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::fs::{metadata, File};
use std::path::{Path, PathBuf};
use std::io::{BufReader, BufWriter, Write};
use crate::error::{CasClientError, Result};
use crate::interface::Client;
use cas::key::Key;
use cas_object::cas_object_format::CasObject;
use cas_object::CasObject;
use merkledb::prelude::*;
use merkledb::{Chunk, MerkleMemDB};
use tempfile::TempDir;
use merklehash::MerkleHash;
use crate::interface::Client;
use crate::error::{CasClientError, Result};
use std::fs::{metadata, File};
use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use tempfile::TempDir;

use anyhow::anyhow;
use tracing::{debug, error, info};
use async_trait::async_trait;
use tracing::{debug, error, info};

#[derive(Debug)]
pub struct LocalClient {
Expand Down Expand Up @@ -130,7 +130,9 @@ impl LocalClient {

fn validate_root_hash(data: &[u8], chunk_boundaries: &[u64], hash: &MerkleHash) -> bool {
// at least 1 chunk, and last entry in chunk boundary must match the length
if chunk_boundaries.is_empty() || chunk_boundaries[chunk_boundaries.len() - 1] as usize != data.len() {
if chunk_boundaries.is_empty()
|| chunk_boundaries[chunk_boundaries.len() - 1] as usize != data.len()
{
return false;
}

Expand All @@ -150,7 +152,6 @@ impl LocalClient {
let ret = db.finalize(staging);
*ret.hash() == *hash
}

}

/// LocalClient is responsible for writing/reading Xorbs on local disk.
Expand All @@ -167,7 +168,7 @@ impl Client for LocalClient {
if chunk_boundaries.is_empty() || data.is_empty() {
return Err(CasClientError::InvalidArguments);
}

// last boundary must be end of data
if !chunk_boundaries.is_empty()
&& chunk_boundaries[chunk_boundaries.len() - 1] as usize != data.len()
Expand All @@ -189,7 +190,7 @@ impl Client for LocalClient {

let file_path = self.get_path_for_entry(prefix, hash);
info!("Writing XORB {prefix}/{hash:?} to local path {file_path:?}");

if let Ok(metadata) = metadata(&file_path) {
return if metadata.is_file() {
info!("{file_path:?} already exists; returning.");
Expand Down Expand Up @@ -217,31 +218,31 @@ impl Client for LocalClient {
))
})?;

let total_bytes_written;
{
let mut writer = BufWriter::new(&tempfile);
let (_, bytes_written) = CasObject::serialize(
&mut writer,
hash,
&data,
&chunk_boundaries.into_iter().map(|x| x as u32).collect()
)?;
// flush before persisting
writer.flush()?;
total_bytes_written = bytes_written;
}
tempfile.persist(&file_path)?;
// attempt to set to readonly
// its ok to fail.
if let Ok(metadata) = std::fs::metadata(&file_path) {
let mut permissions = metadata.permissions();
permissions.set_readonly(true);
let _ = std::fs::set_permissions(&file_path, permissions);
}
info!("{file_path:?} successfully written with {total_bytes_written:?} bytes.");
let total_bytes_written;
{
let mut writer = BufWriter::new(&tempfile);
let (_, bytes_written) = CasObject::serialize(
&mut writer,
hash,
&data,
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
)?;
// flush before persisting
writer.flush()?;
total_bytes_written = bytes_written;
}

tempfile.persist(&file_path)?;

// attempt to set to readonly
// its ok to fail.
if let Ok(metadata) = std::fs::metadata(&file_path) {
let mut permissions = metadata.permissions();
permissions.set_readonly(true);
let _ = std::fs::set_permissions(&file_path, permissions);
}

info!("{file_path:?} successfully written with {total_bytes_written:?} bytes.");

Ok(())
}
Expand Down Expand Up @@ -271,7 +272,6 @@ impl Client for LocalClient {
hash: &MerkleHash,
ranges: Vec<(u64, u64)>,
) -> Result<Vec<Vec<u8>>> {

// Handle the case where we aren't asked for any real data.
if ranges.len() == 1 && ranges[0].0 == ranges[0].1 {
return Ok(vec![Vec::<u8>::new()]);
Expand Down Expand Up @@ -302,7 +302,8 @@ impl Client for LocalClient {
Ok(file) => {
let mut reader = BufReader::new(file);
let cas = CasObject::deserialize(&mut reader)?;
Ok(cas.header.total_uncompressed_length as u64)
let length = cas.get_contents_length()?;
Ok(length as u64)
}
Err(_) => Err(CasClientError::XORBNotFound(*hash)),
}
Expand All @@ -323,11 +324,14 @@ mod tests {
let data = gen_random_bytes(2048);
let hash = compute_data_hash(&data[..]);
let chunk_boundaries = vec![data.len() as u64];

let data_again = data.clone();

// Act & Assert
assert!(client.put("key", &hash, data, chunk_boundaries).await.is_ok());
assert!(client
.put("key", &hash, data, chunk_boundaries)
.await
.is_ok());

let returned_data = client.get("key", &hash).await.unwrap();
assert_eq!(data_again, returned_data);
Expand Down Expand Up @@ -357,12 +361,15 @@ mod tests {
// Act & Assert
assert!(client.put("", &hash, data, chunk_boundaries).await.is_ok());

let ranges: Vec<(u64, u64)> = vec![(0, 100),(100, 1500)];
let ranges: Vec<(u64, u64)> = vec![(0, 100), (100, 1500)];
let ranges_again = ranges.clone();
let returned_ranges = client.get_object_range("", &hash, ranges).await.unwrap();

for idx in 0..returned_ranges.len() {
assert_eq!(data_again[ranges_again[idx].0 as usize .. ranges_again[idx].1 as usize], returned_ranges[idx]);
assert_eq!(
data_again[ranges_again[idx].0 as usize..ranges_again[idx].1 as usize],
returned_ranges[idx]
);
}
}

Expand All @@ -376,7 +383,7 @@ mod tests {
// Act
client.put("", &hash, data, chunk_boundaries).await.unwrap();
let len = client.get_length("", &hash).await.unwrap();

// Assert
assert_eq!(len as usize, gen_length);
}
Expand All @@ -399,7 +406,10 @@ mod tests {

let hello_hash = merklehash::compute_data_hash(&hello[..]);
// write "hello world"
client.put("key", &hello_hash, hello.clone(), vec![hello.len() as u64]).await.unwrap();
client
.put("key", &hello_hash, hello.clone(), vec![hello.len() as u64])
.await
.unwrap();

// put the same value a second time. This should be ok.
client
Expand Down Expand Up @@ -505,7 +515,7 @@ mod tests {
client.get("key", &hello_hash).await.unwrap_err()
);
}

#[tokio::test]
async fn test_hashing() {
let client = LocalClient::default();
Expand All @@ -532,12 +542,15 @@ mod tests {
.unwrap();
}

fn gen_dummy_xorb(num_chunks: u32, uncompressed_chunk_size: u32, randomize_chunk_sizes: bool) -> (DataHash, Vec<u8>, Vec<u64>) {
fn gen_dummy_xorb(
num_chunks: u32,
uncompressed_chunk_size: u32,
randomize_chunk_sizes: bool,
) -> (DataHash, Vec<u8>, Vec<u64>) {
let mut contents = Vec::new();
let mut chunks: Vec<Chunk> = Vec::new();
let mut chunk_boundaries = Vec::with_capacity(num_chunks as usize);
for _idx in 0..num_chunks {

let chunk_size: u32 = if randomize_chunk_sizes {
let mut rng = rand::thread_rng();
rng.gen_range(1024..=uncompressed_chunk_size)
Expand All @@ -547,7 +560,10 @@ mod tests {

let bytes = gen_random_bytes(chunk_size);

chunks.push(Chunk { hash: merklehash::compute_data_hash(&bytes), length: bytes.len() });
chunks.push(Chunk {
hash: merklehash::compute_data_hash(&bytes),
length: bytes.len(),
});

contents.extend(bytes);
chunk_boundaries.push(contents.len() as u64);
Expand All @@ -566,7 +582,6 @@ mod tests {
let mut rng = rand::thread_rng();
let mut data = vec![0u8; uncompressed_chunk_size as usize];
rng.fill(&mut data[..]);
data
data
}

}
}
2 changes: 1 addition & 1 deletion cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use reqwest::{StatusCode, Url};
use serde::{de::DeserializeOwned, Serialize};

use bytes::Bytes;
use cas_object::cas_object_format::CasObject;
use cas_object::CasObject;
use cas_types::CASReconstructionTerm;
use tracing::warn;

Expand Down
3 changes: 3 additions & 0 deletions cas_object/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ merklehash = { path = "../merklehash" }
tempfile = "3.12.0"
tracing = "0.1.40"
xet_error = { path = "../xet_error" }
cas_types = { path = "../cas_types" }
lz4_flex = "0.11.3"
bytes = "1.7.2"

[dev-dependencies]
rand = "0.8.5"
Expand Down
Loading

0 comments on commit 037d267

Please sign in to comment.