Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LZ4 compression usage to CasObject #17

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions cas_client/src/data_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ use std::time::Duration;

use crate::cas_connection_pool::CasConnectionConfig;
use anyhow::{anyhow, Result};
use cas::common::CompressionScheme;
use cas::compression::{
multiple_accepted_encoding_header_value, CAS_ACCEPT_ENCODING_HEADER,
CAS_CONTENT_ENCODING_HEADER, CAS_INFLATED_SIZE_HEADER,
};
use cas_object::CompressionScheme;

use error_printer::ErrorPrinter;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
Expand All @@ -33,16 +30,26 @@ use xet_error::Error;

use merklehash::MerkleHash;

const CAS_CONTENT_ENCODING_HEADER: &str = "xet-cas-content-encoding";
const CAS_ACCEPT_ENCODING_HEADER: &str = "xet-cas-content-encoding";
const CAS_INFLATED_SIZE_HEADER: &str = "xet-cas-inflated-size";

const HTTP2_POOL_IDLE_TIMEOUT_SECS: u64 = 30;
const HTTP2_KEEPALIVE_MILLIS: u64 = 500;
const HTTP2_WINDOW_SIZE: u32 = 2147418112;
const NUM_RETRIES: usize = 5;
const BASE_RETRY_DELAY_MS: u64 = 3000;

// in the header value, we will consider
fn multiple_accepted_encoding_header_value(list: Vec<CompressionScheme>) -> String {
let as_strs: Vec<&str> = list.iter().map(Into::into).collect();
as_strs.join(";").to_string()
}

lazy_static! {
static ref ACCEPTED_ENCODINGS_HEADER_VALUE: HeaderValue = HeaderValue::from_str(
multiple_accepted_encoding_header_value(vec![
CompressionScheme::Lz4,
CompressionScheme::LZ4,
CompressionScheme::None
])
.as_str()
Expand Down Expand Up @@ -284,7 +291,7 @@ impl DataTransport {
let bytes = maybe_decode(bytes.as_slice(), encoding, uncompressed_size)?;
debug!(
"GET; encoding: ({}), uncompressed size: ({}), payload ({}) prefix: ({}), hash: ({})",
encoding.as_str_name(),
encoding,
uncompressed_size.unwrap_or_default(),
payload_size,
prefix,
Expand Down Expand Up @@ -344,7 +351,7 @@ impl DataTransport {
.to_vec();
let payload_size = bytes.len();
let bytes = maybe_decode(bytes.as_slice(), encoding, uncompressed_size)?;
debug!("GET RANGE; encoding: ({}), uncompressed size: ({}), payload ({}) prefix: ({}), hash: ({})", encoding.as_str_name(), uncompressed_size.unwrap_or_default(), payload_size, prefix, hash);
debug!("GET RANGE; encoding: ({}), uncompressed size: ({}), payload ({}) prefix: ({}), hash: ({})", encoding, uncompressed_size.unwrap_or_default(), payload_size, prefix, hash);
Ok(bytes.to_vec())
},
is_status_retriable_and_print,
Expand All @@ -367,7 +374,7 @@ impl DataTransport {
let data = maybe_encode(data, encoding)?;
debug!(
"PUT; encoding: ({}), uncompressed size: ({}), payload: ({}), prefix: ({}), hash: ({})",
encoding.as_str_name(),
encoding,
full_size,
data.len(),
prefix,
Expand Down Expand Up @@ -423,7 +430,7 @@ fn maybe_decode<'a, T: Into<&'a [u8]>>(
encoding: CompressionScheme,
uncompressed_size: Option<i32>,
) -> Result<Vec<u8>> {
if let CompressionScheme::Lz4 = encoding {
if let CompressionScheme::LZ4 = encoding {
if uncompressed_size.is_none() {
return Err(anyhow!(
"Missing uncompressed size when attempting to decompress LZ4"
Expand All @@ -447,7 +454,7 @@ fn get_encoding_info<T>(response: &Response<T>) -> Option<(CompressionScheme, Op
}

fn maybe_encode<'a, T: Into<&'a [u8]>>(data: T, encoding: CompressionScheme) -> Result<Vec<u8>> {
if let CompressionScheme::Lz4 = encoding {
if let CompressionScheme::LZ4 = encoding {
lz4::block::compress(data.into(), Some(CompressionMode::DEFAULT), false)
.log_error("LZ4 compression error")
.map_err(|e| anyhow!(e))
Expand Down Expand Up @@ -580,4 +587,20 @@ mod tests {
assert_eq!(get_header_value(GIT_XET_VERSION_HEADER), git_xet_version);
assert_eq!(get_header_value(USER_ID_HEADER), user_id);
}

#[test]
fn test_multiple_accepted_encoding_header_value() {
let multi = vec![CompressionScheme::LZ4, CompressionScheme::None];
assert_eq!(
multiple_accepted_encoding_header_value(multi),
"lz4;none".to_string()
);

let singular = vec![CompressionScheme::LZ4];
assert_eq!(
multiple_accepted_encoding_header_value(singular),
"lz4".to_string()
);
}

}
1 change: 1 addition & 0 deletions cas_client/src/local_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl Client for LocalClient {
hash,
&data,
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
cas_object::CompressionScheme::None
)?;
// flush before persisting
writer.flush()?;
Expand Down
1 change: 1 addition & 0 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl CASAPIClient {
&key.hash,
contents,
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
cas_object::CompressionScheme::LZ4
)?;

debug!("Upload: POST to {url:?} for {key:?}");
Expand Down
4 changes: 2 additions & 2 deletions cas_object/src/cas_chunk_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use crate::error::CasObjectError;
use anyhow::anyhow;
use cas_types::compression_scheme::CompressionScheme;
use crate::CompressionScheme;
use lz4_flex::frame::{FrameDecoder, FrameEncoder};

pub const CAS_CHUNK_HEADER_LENGTH: u8 = 8;
Expand Down Expand Up @@ -191,7 +191,7 @@ mod tests {
use std::io::Cursor;

use super::*;
use cas_types::compression_scheme::CompressionScheme;
use CompressionScheme;
use rand::Rng;

const COMP_LEN: u32 = 0x010203;
Expand Down
145 changes: 131 additions & 14 deletions cas_object/src/cas_object_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use std::{
};

use crate::{
cas_chunk_format::{deserialize_chunk, serialize_chunk},
error::CasObjectError,
cas_chunk_format::{deserialize_chunk, serialize_chunk}, error::CasObjectError, CompressionScheme
};
use anyhow::anyhow;

Expand Down Expand Up @@ -453,6 +452,7 @@ impl CasObject {
hash: &MerkleHash,
data: &[u8],
chunk_boundaries: &Vec<u32>,
compression_scheme: CompressionScheme,
) -> Result<(Self, usize), CasObjectError> {
let mut cas = CasObject::default();
cas.info.cashash.copy_from_slice(hash.as_slice());
Expand All @@ -474,11 +474,8 @@ impl CasObject {

// now serialize chunk directly to writer (since chunks come first!)
// TODO: add compression scheme to this call
let chunk_written_bytes = serialize_chunk(
&chunk_raw_bytes,
writer,
cas_types::compression_scheme::CompressionScheme::None,
)?;
let chunk_written_bytes =
serialize_chunk(&chunk_raw_bytes, writer, compression_scheme)?;
total_written_bytes += chunk_written_bytes;

let chunk_meta = CasChunkInfo {
Expand Down Expand Up @@ -554,7 +551,7 @@ mod tests {
#[test]
fn test_chunk_boundaries_chunk_size_info() {
// Arrange
let (c, _cas_data, _raw_data) = build_cas_object(3, 100, false);
let (c, _cas_data, _raw_data) = build_cas_object(3, 100, false, false);
// Act & Assert
assert_eq!(c.get_chunk_boundaries().len(), 3);
assert_eq!(c.get_chunk_boundaries(), [100, 200, 300]);
Expand All @@ -579,6 +576,7 @@ mod tests {
num_chunks: u32,
uncompressed_chunk_size: u32,
use_random_chunk_size: bool,
use_lz4_compression: bool
) -> (CasObject, Vec<u8>, Vec<u8>) {
let mut c = CasObject::default();

Expand All @@ -594,7 +592,7 @@ mod tests {
for _idx in 0..num_chunks {
let chunk_size: u32 = if use_random_chunk_size {
let mut rng = rand::thread_rng();
rng.gen_range(1024..=uncompressed_chunk_size)
rng.gen_range(512..=uncompressed_chunk_size)
} else {
uncompressed_chunk_size
};
Expand All @@ -606,10 +604,15 @@ mod tests {

// build chunk, create ChunkInfo and keep going

let compression_scheme = match use_lz4_compression {
true => CompressionScheme::LZ4,
false => CompressionScheme::None
};

let bytes_written = serialize_chunk(
&bytes,
&mut writer,
cas_types::compression_scheme::CompressionScheme::None,
compression_scheme,
)
.unwrap();

Expand Down Expand Up @@ -646,14 +649,15 @@ mod tests {
#[test]
fn test_basic_serialization_mem() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(3, 100, false);
let (c, _cas_data, raw_data) = build_cas_object(3, 100, false, false);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::None
)
.is_ok());

Expand All @@ -670,14 +674,15 @@ mod tests {
#[test]
fn test_serialization_deserialization_mem_medium() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(32, 16384, false);
let (c, _cas_data, raw_data) = build_cas_object(32, 16384, false, false);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::None
)
.is_ok());

Expand All @@ -697,14 +702,15 @@ mod tests {
#[test]
fn test_serialization_deserialization_mem_large_random() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(32, 65536, true);
let (c, _cas_data, raw_data) = build_cas_object(32, 65536, true, false);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::None
)
.is_ok());

Expand All @@ -723,14 +729,125 @@ mod tests {
#[test]
fn test_serialization_deserialization_file_large_random() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(256, 65536, true);
let (c, _cas_data, raw_data) = build_cas_object(256, 65536, true, false);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::None
)
.is_ok());

let mut reader = writer.clone();
reader.set_position(0);
let res = CasObject::deserialize(&mut reader);
assert!(res.is_ok());

let c2 = res.unwrap();
assert_eq!(c, c2);

assert_eq!(c.info.num_chunks, c2.info.num_chunks);
assert_eq!(raw_data, c2.get_all_bytes(&mut reader).unwrap());
}

#[test]
fn test_basic_mem_lz4() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(1, 8, false, true);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::LZ4
)
.is_ok());

let mut reader = writer.clone();
reader.set_position(0);
let res = CasObject::deserialize(&mut reader);
assert!(res.is_ok());

let c2 = res.unwrap();
assert_eq!(c, c2);

let bytes_read = c2.get_all_bytes(&mut reader).unwrap();
assert_eq!(c.info.num_chunks, c2.info.num_chunks);
assert_eq!(raw_data, bytes_read);
}

#[test]
fn test_serialization_deserialization_mem_medium_lz4() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(32, 16384, false, true);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::LZ4
)
.is_ok());

let mut reader = writer.clone();
reader.set_position(0);
let res = CasObject::deserialize(&mut reader);
assert!(res.is_ok());

let c2 = res.unwrap();
assert_eq!(c, c2);

let bytes_read = c2.get_all_bytes(&mut reader).unwrap();
assert_eq!(c.info.num_chunks, c2.info.num_chunks);
assert_eq!(raw_data, bytes_read);
}

#[test]
fn test_serialization_deserialization_mem_large_random_lz4() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(32, 65536, true, true);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::LZ4
)
.is_ok());

let mut reader = writer.clone();
reader.set_position(0);
let res = CasObject::deserialize(&mut reader);
assert!(res.is_ok());

let c2 = res.unwrap();
assert_eq!(c, c2);

assert_eq!(c.info.num_chunks, c2.info.num_chunks);
assert_eq!(raw_data, c2.get_all_bytes(&mut reader).unwrap());
}

#[test]
fn test_serialization_deserialization_file_large_random_lz4() {
// Arrange
let (c, _cas_data, raw_data) = build_cas_object(256, 65536, true, true);
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
// Act & Assert
assert!(CasObject::serialize(
&mut writer,
&c.info.cashash,
&raw_data,
&c.get_chunk_boundaries(),
CompressionScheme::LZ4
)
.is_ok());

Expand Down
Loading