From d636e02e58803e2512dbee278eb85135c7ace687 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 14 Jan 2025 20:55:25 +0100 Subject: [PATCH] feat: retryable config enablement through storage_options --- crates/aws/src/storage.rs | 9 +++++-- crates/azure/src/lib.rs | 30 +++++++++++++++++++---- crates/core/Cargo.toml | 1 + crates/core/src/storage/mod.rs | 44 +++++++++++++++++++++++++++++++++- crates/gcp/src/lib.rs | 28 ++++++++++++++++++---- crates/lakefs/src/storage.rs | 25 ++++++++++++++++--- 6 files changed, 121 insertions(+), 16 deletions(-) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index f834378831..d16887a636 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -8,7 +8,8 @@ use deltalake_core::storage::object_store::{ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, }; use deltalake_core::storage::{ - limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, + limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse, + StorageOptions, }; use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path}; use futures::stream::BoxStream; @@ -34,6 +35,8 @@ pub struct S3ObjectStoreFactory {} impl S3StorageOptionsConversion for S3ObjectStoreFactory {} +impl RetryConfigParse for S3ObjectStoreFactory {} + impl ObjectStoreFactory for S3ObjectStoreFactory { fn parse_url_opts( &self, @@ -65,7 +68,9 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { )); } - let inner = builder.build()?; + let inner = builder + .with_retry(self.parse_retry_config(&options)?) + .build()?; let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?; debug!("Initialized the object store: {store:?}"); diff --git a/crates/azure/src/lib.rs b/crates/azure/src/lib.rs index 7782f69f43..e4d2d1e555 100644 --- a/crates/azure/src/lib.rs +++ b/crates/azure/src/lib.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::storage::{ factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, - StorageOptions, + RetryConfigParse, StorageOptions, }; -use deltalake_core::{DeltaResult, Path}; -use object_store::azure::AzureConfigKey; -use object_store::parse_url_opts; +use deltalake_core::{DeltaResult, DeltaTableError, Path}; +use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder}; +use object_store::ObjectStoreScheme; use url::Url; mod config; @@ -36,6 +36,8 @@ impl AzureOptions for StorageOptions { #[derive(Clone, Default, Debug)] pub struct AzureFactory {} +impl RetryConfigParse for AzureFactory {} + impl ObjectStoreFactory for AzureFactory { fn parse_url_opts( &self, @@ -43,7 +45,25 @@ impl ObjectStoreFactory for AzureFactory { options: &StorageOptions, ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?; - let (inner, prefix) = parse_url_opts(url, config)?; + + let (_scheme, path) = + ObjectStoreScheme::parse(&url).map_err(|e| DeltaTableError::GenericError { + source: Box::new(e), + })?; + let prefix = Path::parse(path)?; + + // let (inner, prefix) = parse_url_opts(url, config)?; + + let mut builder = MicrosoftAzureBuilder::new().with_url(url.to_string()); + + for (key, value) in config.iter() { + builder = builder.with_config(key.clone(), value.clone()); + } + + let inner = builder + .with_retry(self.parse_retry_config(&options)?) + .build()?; + let store = limit_store_handler(url_prefix_handler(inner, prefix.clone()), options); Ok((store, prefix)) } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3e159aaf9b..d987415513 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -98,6 +98,7 @@ rand = "0.8" z85 = "3.0.5" maplit = "1" sqlparser = { version = "0.53.0" } +humantime = { version = "2.1.0"} [dev-dependencies] criterion = "0.5" diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index b43c33b97c..ae31fa01ac 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -7,12 +7,13 @@ use dashmap::DashMap; use futures::future::BoxFuture; use futures::FutureExt; use futures::TryFutureExt; +use humantime::parse_duration; use lazy_static::lazy_static; use object_store::limit::LimitStore; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::prefix::PrefixStore; -use object_store::{GetOptions, PutOptions, PutPayload, PutResult}; +use object_store::{GetOptions, PutOptions, PutPayload, PutResult, RetryConfig}; use serde::{Deserialize, Serialize}; use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime}; use url::Url; @@ -345,6 +346,47 @@ pub trait ObjectStoreFactory: Send + Sync { ) -> DeltaResult<(ObjectStoreRef, Path)>; } +pub trait RetryConfigParse { + fn parse_retry_config(&self, options: &StorageOptions) -> DeltaResult { + let mut retry_config = RetryConfig::default(); + if let Some(max_retries) = options.0.get("max_retries") { + retry_config.max_retries = max_retries + .parse::() + .map_err(|e| DeltaTableError::generic(e.to_string()))?; + } + + if let Some(retry_timeout) = options.0.get("retry_timeout") { + retry_config.retry_timeout = parse_duration(&retry_timeout).map_err(|_| { + DeltaTableError::generic(format!("failed to parse \"{retry_timeout}\" as Duration")) + })?; + } + + if let Some(bc_init_backoff) = options.0.get("backoff_config.init_backoff") { + retry_config.backoff.init_backoff = parse_duration(&bc_init_backoff).map_err(|_| { + DeltaTableError::generic(format!( + "failed to parse \"{bc_init_backoff}\" as Duration" + )) + })?; + } + + if let Some(bc_max_backoff) = options.0.get("backoff_config.max_backoff") { + retry_config.backoff.max_backoff = parse_duration(&bc_max_backoff).map_err(|_| { + DeltaTableError::generic(format!( + "failed to parse \"{bc_max_backoff}\" as Duration" + )) + })?; + } + + if let Some(bc_base) = options.0.get("backoff_config.base") { + retry_config.backoff.base = bc_base + .parse::() + .map_err(|e| DeltaTableError::generic(e.to_string()))?; + } + + Ok(retry_config) + } +} + #[derive(Clone, Debug, Default)] pub(crate) struct DefaultObjectStoreFactory {} diff --git a/crates/gcp/src/lib.rs b/crates/gcp/src/lib.rs index e50681ed30..3ae3dac245 100644 --- a/crates/gcp/src/lib.rs +++ b/crates/gcp/src/lib.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::storage::{ factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, - StorageOptions, + RetryConfigParse, StorageOptions, }; -use deltalake_core::{DeltaResult, Path}; -use object_store::gcp::GoogleConfigKey; -use object_store::parse_url_opts; +use deltalake_core::{DeltaResult, DeltaTableError, Path}; +use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey}; +use object_store::ObjectStoreScheme; use url::Url; mod config; @@ -37,6 +37,8 @@ impl GcpOptions for StorageOptions { #[derive(Clone, Default, Debug)] pub struct GcpFactory {} +impl RetryConfigParse for GcpFactory {} + impl ObjectStoreFactory for GcpFactory { fn parse_url_opts( &self, @@ -44,7 +46,23 @@ impl ObjectStoreFactory for GcpFactory { options: &StorageOptions, ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?; - let (inner, prefix) = parse_url_opts(url, config)?; + + let (_scheme, path) = + ObjectStoreScheme::parse(&url).map_err(|e| DeltaTableError::GenericError { + source: Box::new(e), + })?; + let prefix = Path::parse(path)?; + + let mut builder = GoogleCloudStorageBuilder::new().with_url(url.to_string()); + + for (key, value) in config.iter() { + builder = builder.with_config(key.clone(), value.clone()); + } + + let inner = builder + .with_retry(self.parse_retry_config(&options)?) + .build()?; + let gcs_backend = crate::storage::GcsStorageBackend::try_new(Arc::new(inner))?; let store = limit_store_handler(url_prefix_handler(gcs_backend, prefix.clone()), options); Ok((store, prefix)) diff --git a/crates/lakefs/src/storage.rs b/crates/lakefs/src/storage.rs index 616a30a7ae..ea17be4eb3 100644 --- a/crates/lakefs/src/storage.rs +++ b/crates/lakefs/src/storage.rs @@ -2,10 +2,11 @@ use deltalake_core::storage::object_store::aws::AmazonS3ConfigKey; use deltalake_core::storage::{ - limit_store_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions, + limit_store_handler, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse, StorageOptions, }; use deltalake_core::{DeltaResult, DeltaTableError, Path}; -use object_store::parse_url_opts; +use object_store::aws::AmazonS3Builder; +use object_store::ObjectStoreScheme; use std::collections::HashMap; use std::fmt::Debug; use std::str::FromStr; @@ -61,6 +62,8 @@ pub(crate) trait S3StorageOptionsConversion { impl S3StorageOptionsConversion for LakeFSObjectStoreFactory {} +impl RetryConfigParse for LakeFSObjectStoreFactory {} + impl ObjectStoreFactory for LakeFSObjectStoreFactory { fn parse_url_opts( &self, @@ -88,7 +91,23 @@ impl ObjectStoreFactory for LakeFSObjectStoreFactory { } }) .collect::>(); - let (inner, prefix) = parse_url_opts(&s3_url, config)?; + + let (_scheme, path) = + ObjectStoreScheme::parse(&s3_url).map_err(|e| DeltaTableError::GenericError { + source: Box::new(e), + })?; + let prefix = Path::parse(path)?; + + let mut builder = AmazonS3Builder::new().with_url(s3_url.to_string()); + + for (key, value) in config.iter() { + builder = builder.with_config(key.clone(), value.clone()); + } + + let inner = builder + .with_retry(self.parse_retry_config(&options)?) + .build()?; + let store = limit_store_handler(inner, &options); debug!("Initialized the object store: {store:?}"); Ok((store, prefix))