Skip to content

Commit

Permalink
s3 range propagation (#23)
Browse files Browse the repository at this point in the history
* fix

* fixes

* please our overload, the linter

* fix pr comment
  • Loading branch information
assafvayner authored Sep 24, 2024
1 parent b83054b commit 5df5ca2
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
45 changes: 32 additions & 13 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ use anyhow::anyhow;
use bytes::Buf;
use cas::key::Key;
use cas_types::{QueryChunkResponse, QueryReconstructionResponse, UploadXorbResponse};
use reqwest::{header::{HeaderMap, HeaderValue}, StatusCode, Url};
use reqwest::{
header::{HeaderMap, HeaderValue},
StatusCode, Url,
};
use serde::{de::DeserializeOwned, Serialize};

use bytes::Bytes;
use cas_object::CasObject;
use cas_types::CASReconstructionTerm;
use tracing::warn;
use tracing::{debug, warn};

use crate::{error::Result, CasClientError};

use merklehash::MerkleHash;
use tracing::debug;

use crate::Client;
pub const CAS_ENDPOINT: &str = "http://localhost:8080";
Expand Down Expand Up @@ -83,8 +85,8 @@ impl Client for RemoteClient {

impl RemoteClient {
pub async fn from_config(endpoint: String, token: Option<String>) -> Self {
Self {
client: CASAPIClient::new(&endpoint, token)
Self {
client: CASAPIClient::new(&endpoint, token),
}
}
}
Expand All @@ -108,7 +110,7 @@ impl CASAPIClient {
Self {
client,
endpoint: endpoint.to_string(),
token
token,
}
}

Expand Down Expand Up @@ -170,7 +172,7 @@ impl CASAPIClient {
&key.hash,
contents,
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
cas_object::CompressionScheme::LZ4
cas_object::CompressionScheme::LZ4,
)?;

debug!("Upload: POST to {url:?} for {key:?}");
Expand Down Expand Up @@ -224,14 +226,17 @@ impl CASAPIClient {
/// Reconstruct the file
async fn reconstruct_file(&self, file_id: &MerkleHash) -> Result<QueryReconstructionResponse> {
let url = Url::parse(&format!(
"{}/reconstruction/{}",
self.endpoint,
"{}/reconstruction/{}",
self.endpoint,
file_id.hex()
))?;

let mut headers = HeaderMap::new();
if let Some(tok) = &self.token {
headers.insert("Authorization", HeaderValue::from_str(&format!("Bearer {}", tok)).unwrap());
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {}", tok)).unwrap(),
);
}

let response = self.client.get(url).headers(headers).send().await?;
Expand Down Expand Up @@ -264,9 +269,21 @@ impl CASAPIClient {
}

async fn get_one(term: &CASReconstructionTerm) -> Result<Bytes> {
debug!("term: {term:?}");

if term.range.end < term.range.start || term.url_range.end < term.url_range.start {
return Err(CasClientError::InternalError(anyhow!(
"invalid range in reconstruction"
)));
}

let url = Url::parse(term.url.as_str())?;
let response = reqwest::Client::new()
.request(hyper::Method::GET, url)
.header(
reqwest::header::RANGE,
format!("bytes={}-{}", term.url_range.start, term.url_range.end),
)
.send()
.await?
.error_for_status()?;
Expand All @@ -275,11 +292,13 @@ async fn get_one(term: &CASReconstructionTerm) -> Result<Bytes> {
.await
.map_err(CasClientError::ReqwestError)?;
let mut readseek = Cursor::new(xorb_bytes.to_vec());
let data = cas_object::deserialize_chunks(&mut readseek)?;
let len = (term.range.end - term.range.start) as usize;
let offset = term.range_start_offset as usize;

let cas_object = CasObject::deserialize(&mut readseek)?;
let data = cas_object.get_range(&mut readseek, term.range.start, term.range.end)?;
let sliced = data[offset..offset + len].to_vec();

Ok(Bytes::from(data))
Ok(Bytes::from(sliced))
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions cas_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct CASReconstructionTerm {
pub range: Range,
pub range_start_offset: u32,
pub url: String,
pub url_range: Range,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
1 change: 1 addition & 0 deletions hf_xet/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5df5ca2

Please sign in to comment.