Skip to content

Commit

Permalink
Add tests for table prefix and uri
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 27, 2023
1 parent 5fb7156 commit c28c067
Showing 1 changed file with 96 additions and 6 deletions.
102 changes: 96 additions & 6 deletions src/object_store/wrapped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,32 @@ pub struct InternalObjectStore {

impl InternalObjectStore {
pub fn new(inner: Arc<dyn ObjectStore>, config: schema::ObjectStore) -> Self {
let root_uri = match config.clone() {
let mut root_uri = match config.clone() {
schema::ObjectStore::Local(Local { data_dir }) => {
let canonical_path = StdPath::new(&data_dir).canonicalize().unwrap();
Url::from_directory_path(canonical_path).unwrap()
}
schema::ObjectStore::InMemory(_) => Url::from_str("memory://").unwrap(),
schema::ObjectStore::S3(S3 { bucket, .. }) => {
Url::from_str(&format!("s3://{bucket}")).unwrap()
schema::ObjectStore::S3(S3 {
bucket, endpoint, ..
}) => {
if let Some(endpoint) = endpoint {
// We're assuming here that the bucket isn't contained in the endpoint itself
Url::from_str(&format!("{endpoint}/{bucket}")).unwrap()
} else {
Url::from_str(&format!("s3://{bucket}")).unwrap()
}
}
schema::ObjectStore::GCS(GCS { bucket, .. }) => {
Url::from_str(&format!("gs://{bucket}")).unwrap()
}
};

// If a configured bucket contains a path without a trailing slash add one.
if !root_uri.path().ends_with('/') {
root_uri.set_path(&format!("{}/", root_uri.path()));
}

Self {
inner,
root_uri,
Expand All @@ -58,9 +70,27 @@ impl InternalObjectStore {
// This is either just a UUID, or potentially UUID prepended by some path.
pub fn table_prefix(&self, table_uuid: Uuid) -> Path {
match self.config.clone() {
schema::ObjectStore::S3(_) | schema::ObjectStore::GCS(_) => {
// In case the config bucket contains a path as well,
// take it and prepend it to the table UUID.
schema::ObjectStore::S3(S3 {
endpoint: Some(_), ..
}) => {
// In case when there's an endpoint specified, the bucket is not the host but the
// first element in the path, so we need to discard it when creating a prefix.
let mut segments =
self.root_uri.path_segments().unwrap().collect::<Vec<_>>();

// Add the last path segment which is the table UUID
let uuid_str = table_uuid.to_string();
segments.push(&uuid_str);

// Remove the first path segment which is actually the bucket
segments.remove(0);
Path::from(segments.join("/"))
}
schema::ObjectStore::S3(S3 { endpoint: None, .. })
| schema::ObjectStore::GCS(_) =>
// In case the config bucket contains a path as well,
// take it and prepend it to the table UUID.
{
Path::from(format!("{}/{table_uuid}", self.root_uri.path()))
}
_ => Path::from(table_uuid.to_string()),
Expand Down Expand Up @@ -257,3 +287,63 @@ impl ObjectStore for InternalObjectStore {
self.inner.rename_if_not_exists(from, to).await
}
}

#[cfg(test)]
mod tests {
use crate::config::context::build_object_store;
use crate::config::schema::{ObjectStore, S3};
use crate::frontend::http::tests::deterministic_uuid;
use crate::object_store::wrapped::InternalObjectStore;
use datafusion::common::Result;
use deltalake::logstore::LogStore;
use rstest::rstest;

#[rstest]
#[case::bucket_root("test-bucket", "01020304-0506-4708-890a-0b0c0d0e0f10")]
#[case::path_no_delimiter(
"test-bucket/some/path/no/delimiter",
"some/path/no/delimiter/01020304-0506-4708-890a-0b0c0d0e0f10"
)]
#[case::path_with_delimiter(
"test-bucket/some/path/with/delimiter/",
"some/path/with/delimiter/01020304-0506-4708-890a-0b0c0d0e0f10"
)]
#[test]
fn test_table_location_s3(
#[case] bucket: &str,
#[case] table_prefix: &str,
#[values(None, Some("http://127.0.0.1:9000".to_string()))] endpoint: Option<
String,
>,
) -> Result<()> {
let config = ObjectStore::S3(S3 {
region: None,
access_key_id: "access_key_id".to_string(),
secret_access_key: "secret_access_key".to_string(),
bucket: bucket.to_string(),
endpoint: endpoint.clone(),
cache_properties: None,
});
// In principle for this test we could use any object store since we only exercise the
// prefix/log store uri logic
let inner_store = build_object_store(&config)?;

let store = InternalObjectStore::new(inner_store, config);

let uuid = deterministic_uuid();
let prefix = store.table_prefix(uuid);
let uri = store.get_log_store(uuid).root_uri();

assert_eq!(prefix, table_prefix.into());

let expected_uri = if let Some(endpoint) = endpoint {
format!("{endpoint}/test-bucket/{table_prefix}")
} else {
format!("s3://test-bucket/{table_prefix}")
};

assert_eq!(uri, expected_uri);

Ok(())
}
}

0 comments on commit c28c067

Please sign in to comment.