Skip to content

Commit

Permalink
feat: retryable config enablement through storage_options
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 14, 2025
1 parent c56d6c0 commit d636e02
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 16 deletions.
9 changes: 7 additions & 2 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use deltalake_core::storage::object_store::{
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
};
use deltalake_core::storage::{
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse,
StorageOptions,
};
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
use futures::stream::BoxStream;
Expand All @@ -34,6 +35,8 @@ pub struct S3ObjectStoreFactory {}

impl S3StorageOptionsConversion for S3ObjectStoreFactory {}

impl RetryConfigParse for S3ObjectStoreFactory {}

impl ObjectStoreFactory for S3ObjectStoreFactory {
fn parse_url_opts(
&self,
Expand Down Expand Up @@ -65,7 +68,9 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
));
}

let inner = builder.build()?;
let inner = builder
.with_retry(self.parse_retry_config(&options)?)
.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?;
debug!("Initialized the object store: {store:?}");
Expand Down
30 changes: 25 additions & 5 deletions crates/azure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::Arc;
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef,
StorageOptions,
RetryConfigParse, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use object_store::azure::AzureConfigKey;
use object_store::parse_url_opts;
use deltalake_core::{DeltaResult, DeltaTableError, Path};
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
use object_store::ObjectStoreScheme;
use url::Url;

mod config;
Expand All @@ -36,14 +36,34 @@ impl AzureOptions for StorageOptions {
#[derive(Clone, Default, Debug)]
pub struct AzureFactory {}

impl RetryConfigParse for AzureFactory {}

impl ObjectStoreFactory for AzureFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
let (inner, prefix) = parse_url_opts(url, config)?;

let (_scheme, path) =
ObjectStoreScheme::parse(&url).map_err(|e| DeltaTableError::GenericError {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;

// let (inner, prefix) = parse_url_opts(url, config)?;

let mut builder = MicrosoftAzureBuilder::new().with_url(url.to_string());

for (key, value) in config.iter() {
builder = builder.with_config(key.clone(), value.clone());
}

let inner = builder
.with_retry(self.parse_retry_config(&options)?)
.build()?;

let store = limit_store_handler(url_prefix_handler(inner, prefix.clone()), options);
Ok((store, prefix))
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ rand = "0.8"
z85 = "3.0.5"
maplit = "1"
sqlparser = { version = "0.53.0" }
humantime = { version = "2.1.0"}

[dev-dependencies]
criterion = "0.5"
Expand Down
44 changes: 43 additions & 1 deletion crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use dashmap::DashMap;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::TryFutureExt;
use humantime::parse_duration;
use lazy_static::lazy_static;
use object_store::limit::LimitStore;
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::prefix::PrefixStore;
use object_store::{GetOptions, PutOptions, PutPayload, PutResult};
use object_store::{GetOptions, PutOptions, PutPayload, PutResult, RetryConfig};
use serde::{Deserialize, Serialize};
use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime};
use url::Url;
Expand Down Expand Up @@ -345,6 +346,47 @@ pub trait ObjectStoreFactory: Send + Sync {
) -> DeltaResult<(ObjectStoreRef, Path)>;
}

pub trait RetryConfigParse {
fn parse_retry_config(&self, options: &StorageOptions) -> DeltaResult<RetryConfig> {
let mut retry_config = RetryConfig::default();
if let Some(max_retries) = options.0.get("max_retries") {
retry_config.max_retries = max_retries
.parse::<usize>()
.map_err(|e| DeltaTableError::generic(e.to_string()))?;
}

if let Some(retry_timeout) = options.0.get("retry_timeout") {
retry_config.retry_timeout = parse_duration(&retry_timeout).map_err(|_| {
DeltaTableError::generic(format!("failed to parse \"{retry_timeout}\" as Duration"))
})?;
}

if let Some(bc_init_backoff) = options.0.get("backoff_config.init_backoff") {
retry_config.backoff.init_backoff = parse_duration(&bc_init_backoff).map_err(|_| {
DeltaTableError::generic(format!(
"failed to parse \"{bc_init_backoff}\" as Duration"
))
})?;
}

if let Some(bc_max_backoff) = options.0.get("backoff_config.max_backoff") {
retry_config.backoff.max_backoff = parse_duration(&bc_max_backoff).map_err(|_| {
DeltaTableError::generic(format!(
"failed to parse \"{bc_max_backoff}\" as Duration"
))
})?;
}

if let Some(bc_base) = options.0.get("backoff_config.base") {
retry_config.backoff.base = bc_base
.parse::<f64>()
.map_err(|e| DeltaTableError::generic(e.to_string()))?;
}

Ok(retry_config)
}
}

#[derive(Clone, Debug, Default)]
pub(crate) struct DefaultObjectStoreFactory {}

Expand Down
28 changes: 23 additions & 5 deletions crates/gcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::Arc;
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef,
StorageOptions,
RetryConfigParse, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use object_store::gcp::GoogleConfigKey;
use object_store::parse_url_opts;
use deltalake_core::{DeltaResult, DeltaTableError, Path};
use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
use object_store::ObjectStoreScheme;
use url::Url;

mod config;
Expand Down Expand Up @@ -37,14 +37,32 @@ impl GcpOptions for StorageOptions {
#[derive(Clone, Default, Debug)]
pub struct GcpFactory {}

impl RetryConfigParse for GcpFactory {}

impl ObjectStoreFactory for GcpFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?;
let (inner, prefix) = parse_url_opts(url, config)?;

let (_scheme, path) =
ObjectStoreScheme::parse(&url).map_err(|e| DeltaTableError::GenericError {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;

let mut builder = GoogleCloudStorageBuilder::new().with_url(url.to_string());

for (key, value) in config.iter() {
builder = builder.with_config(key.clone(), value.clone());
}

let inner = builder
.with_retry(self.parse_retry_config(&options)?)
.build()?;

let gcs_backend = crate::storage::GcsStorageBackend::try_new(Arc::new(inner))?;
let store = limit_store_handler(url_prefix_handler(gcs_backend, prefix.clone()), options);
Ok((store, prefix))
Expand Down
25 changes: 22 additions & 3 deletions crates/lakefs/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
use deltalake_core::storage::object_store::aws::AmazonS3ConfigKey;
use deltalake_core::storage::{
limit_store_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
limit_store_handler, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse, StorageOptions,
};
use deltalake_core::{DeltaResult, DeltaTableError, Path};
use object_store::parse_url_opts;
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStoreScheme;
use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
Expand Down Expand Up @@ -61,6 +62,8 @@ pub(crate) trait S3StorageOptionsConversion {

impl S3StorageOptionsConversion for LakeFSObjectStoreFactory {}

impl RetryConfigParse for LakeFSObjectStoreFactory {}

impl ObjectStoreFactory for LakeFSObjectStoreFactory {
fn parse_url_opts(
&self,
Expand Down Expand Up @@ -88,7 +91,23 @@ impl ObjectStoreFactory for LakeFSObjectStoreFactory {
}
})
.collect::<HashMap<AmazonS3ConfigKey, String>>();
let (inner, prefix) = parse_url_opts(&s3_url, config)?;

let (_scheme, path) =
ObjectStoreScheme::parse(&s3_url).map_err(|e| DeltaTableError::GenericError {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;

let mut builder = AmazonS3Builder::new().with_url(s3_url.to_string());

for (key, value) in config.iter() {
builder = builder.with_config(key.clone(), value.clone());
}

let inner = builder
.with_retry(self.parse_retry_config(&options)?)
.build()?;

let store = limit_store_handler(inner, &options);
debug!("Initialized the object store: {store:?}");
Ok((store, prefix))
Expand Down

0 comments on commit d636e02

Please sign in to comment.