Skip to content

Commit

Permalink
fix: pass correct options to is_aws, reduce times sdk_config construc…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Jan 8, 2025
1 parent 053601b commit f33c9af
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 60 deletions.
9 changes: 6 additions & 3 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use storage::S3StorageOptionsConversion;
use storage::{S3ObjectStoreFactory, S3StorageOptions};
use tracing::debug;
use tracing::warn;
Expand All @@ -46,6 +47,8 @@ use url::Url;
#[derive(Clone, Debug, Default)]
pub struct S3LogStoreFactory {}

impl S3StorageOptionsConversion for S3LogStoreFactory {}

impl LogStoreFactory for S3LogStoreFactory {
fn with_options(
&self,
Expand All @@ -54,7 +57,7 @@ impl LogStoreFactory for S3LogStoreFactory {
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?);

let options = self.with_env_s3(options);
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
Expand All @@ -65,7 +68,7 @@ impl LogStoreFactory for S3LogStoreFactory {
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put.");
return Ok(logstore::default_s3_logstore(store, location, options));
return Ok(logstore::default_s3_logstore(store, location, &options));
}

let s3_options = S3StorageOptions::from_map(&options.0)?;
Expand All @@ -78,7 +81,7 @@ impl LogStoreFactory for S3LogStoreFactory {
store,
)?));
}
Ok(default_logstore(store, location, options))
Ok(default_logstore(store, location, &options))
}
}

Expand Down
114 changes: 57 additions & 57 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,52 +32,7 @@ const STORE_NAME: &str = "DeltaS3ObjectStore";
#[derive(Clone, Default, Debug)]
pub struct S3ObjectStoreFactory {}

impl S3ObjectStoreFactory {
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions {
let mut options = StorageOptions(
options
.0
.clone()
.into_iter()
.map(|(k, v)| {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
(config_key.as_ref().to_string(), v)
} else {
(k, v)
}
})
.collect(),
);

for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
if !options.0.contains_key(config_key.as_ref()) {
options
.0
.insert(config_key.as_ref().to_string(), value.to_string());
}
}
}
}

// All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
// set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
// that PutIfAbsent is supported.
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if !options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
options.0.insert("conditional_put".into(), "etag".into());
}
options
}
}
impl S3StorageOptionsConversion for S3ObjectStoreFactory {}

impl ObjectStoreFactory for S3ObjectStoreFactory {
fn parse_url_opts(
Expand All @@ -102,19 +57,17 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
})?;
let prefix = Path::parse(path)?;

if is_aws(storage_options) {
debug!("Detected AWS S3, resolving credentials");
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;
let s3_options: S3StorageOptions = S3StorageOptions::from_map(&options.0)?;

if let Some(ref sdk_config) = s3_options.sdk_config {
builder = builder.with_credentials(Arc::new(
crate::credentials::AWSForObjectStore::new(sdk_config),
crate::credentials::AWSForObjectStore::new(sdk_config.clone()),
));
}

let inner = builder.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;
let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?;
debug!("Initialized the object store: {store:?}");

Ok((store, prefix))
Expand All @@ -123,9 +76,8 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {

fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
s3_options: &S3StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
let s3_options = S3StorageOptions::from_map(&options.0)?;
// Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store
// unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend.
if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename
Expand Down Expand Up @@ -154,7 +106,8 @@ fn is_aws(options: &StorageOptions) -> bool {
}
// Options at this stage should only contain 'aws_endpoint' in lowercase
// due to with_env_s3
!(options.0.contains_key("aws_endpoint") || options.0.contains_key(constants::AWS_ENDPOINT_URL))
!(options.0.contains_key("aws_endpoint")
|| !options.0.contains_key(constants::AWS_ENDPOINT_URL))
}

/// Options used to configure the [S3StorageBackend].
Expand Down Expand Up @@ -240,7 +193,7 @@ impl S3StorageOptions {
let sdk_config = match is_aws(&storage_options) {
false => None,
true => {
debug!("Detected AWS S3, resolving credentials");
debug!("Detected AWS S3 Storage options, resolving AWS credentials");
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
Expand Down Expand Up @@ -477,6 +430,53 @@ pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<Str
std::env::var(key).ok()
}

pub(crate) trait S3StorageOptionsConversion {
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions {
let mut options = StorageOptions(
options
.0
.clone()
.into_iter()
.map(|(k, v)| {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
(config_key.as_ref().to_string(), v)
} else {
(k, v)
}
})
.collect(),
);

for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
if !options.0.contains_key(config_key.as_ref()) {
options
.0
.insert(config_key.as_ref().to_string(), value.to_string());
}
}
}
}

// All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
// set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
// that PutIfAbsent is supported.
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if !options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
options.0.insert("conditional_put".into(), "etag".into());
}
options
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit f33c9af

Please sign in to comment.