Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 range propagation #23

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ use anyhow::anyhow;
use bytes::Buf;
use cas::key::Key;
use cas_types::{QueryChunkResponse, QueryReconstructionResponse, UploadXorbResponse};
use reqwest::{header::{HeaderMap, HeaderValue}, StatusCode, Url};
use reqwest::{
header::{HeaderMap, HeaderValue},
StatusCode, Url,
};
use serde::{de::DeserializeOwned, Serialize};

use bytes::Bytes;
use cas_object::CasObject;
use cas_types::CASReconstructionTerm;
use tracing::warn;
use tracing::{debug, warn};

use crate::{error::Result, CasClientError};

use merklehash::MerkleHash;
use tracing::debug;

use crate::Client;
pub const CAS_ENDPOINT: &str = "http://localhost:8080";
Expand Down Expand Up @@ -83,8 +85,8 @@ impl Client for RemoteClient {

impl RemoteClient {
pub async fn from_config(endpoint: String, token: Option<String>) -> Self {
Self {
client: CASAPIClient::new(&endpoint, token)
Self {
client: CASAPIClient::new(&endpoint, token),
}
}
}
Expand All @@ -108,7 +110,7 @@ impl CASAPIClient {
Self {
client,
endpoint: endpoint.to_string(),
token
token,
}
}

Expand Down Expand Up @@ -170,7 +172,7 @@ impl CASAPIClient {
&key.hash,
contents,
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
cas_object::CompressionScheme::LZ4
cas_object::CompressionScheme::LZ4,
)?;

debug!("Upload: POST to {url:?} for {key:?}");
Expand Down Expand Up @@ -224,14 +226,17 @@ impl CASAPIClient {
/// Reconstruct the file
async fn reconstruct_file(&self, file_id: &MerkleHash) -> Result<QueryReconstructionResponse> {
let url = Url::parse(&format!(
"{}/reconstruction/{}",
self.endpoint,
"{}/reconstruction/{}",
self.endpoint,
file_id.hex()
))?;

let mut headers = HeaderMap::new();
if let Some(tok) = &self.token {
headers.insert("Authorization", HeaderValue::from_str(&format!("Bearer {}", tok)).unwrap());
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {}", tok)).unwrap(),
);
}

let response = self.client.get(url).headers(headers).send().await?;
Expand Down Expand Up @@ -264,9 +269,21 @@ impl CASAPIClient {
}

async fn get_one(term: &CASReconstructionTerm) -> Result<Bytes> {
debug!("term: {term:?}");

if term.range.end < term.range.start || term.url_range.end < term.url_range.start {
return Err(CasClientError::InternalError(anyhow!(
"invalid range in reconstruction"
)));
}

let url = Url::parse(term.url.as_str())?;
let response = reqwest::Client::new()
.request(hyper::Method::GET, url)
.header(
reqwest::header::RANGE,
format!("bytes={}-{}", term.url_range.start, term.url_range.end),
)
.send()
.await?
.error_for_status()?;
Expand All @@ -275,11 +292,13 @@ async fn get_one(term: &CASReconstructionTerm) -> Result<Bytes> {
.await
.map_err(CasClientError::ReqwestError)?;
let mut readseek = Cursor::new(xorb_bytes.to_vec());
let data = cas_object::deserialize_chunks(&mut readseek)?;
let len = (term.range.end - term.range.start) as usize;
assafvayner marked this conversation as resolved.
Show resolved Hide resolved
let offset = term.range_start_offset as usize;

let cas_object = CasObject::deserialize(&mut readseek)?;
let data = cas_object.get_range(&mut readseek, term.range.start, term.range.end)?;
let sliced = data[offset..offset + len].to_vec();

Ok(Bytes::from(data))
Ok(Bytes::from(sliced))
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions cas_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct CASReconstructionTerm {
pub range: Range,
pub range_start_offset: u32,
pub url: String,
pub url_range: Range,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
1 change: 1 addition & 0 deletions hf_xet/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.