Skip to content

Commit

Permalink
feat: Add UC support for all cloud providers
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Carman <[email protected]>
  • Loading branch information
hntd187 committed Jan 17, 2025
1 parent e95b915 commit b90d552
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 14 deletions.
9 changes: 6 additions & 3 deletions crates/catalog-unity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
futures.workspace = true
chrono.workspace = true
tracing.workspace = true
deltalake-core = { version = "0.24.0", path = "../core", features = [
"datafusion",
] }
deltalake-aws = { version = "0.7.0", path = "../aws", optional = true }
deltalake-azure = { version = "0.7.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.8.0", path = "../gcp", optional = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] }
reqwest-retry = "0.7"
reqwest-middleware = { version = "0.4.0", features = ["json"] }
rand = "0.8"
futures = { workspace = true }
chrono = { workspace = true }
dashmap = "6"
tracing = { workspace = true }
datafusion = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
moka = { version = "0.12", optional = true, features = ["future"] }
Expand All @@ -44,6 +45,8 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
default = ["datafusion", "aws"]
aws = ["deltalake-aws"]
azure = ["deltalake-azure"]
gcp = ["deltalake-gcp"]
r2 = ["deltalake-aws"]
datafusion = ["dep:datafusion", "datafusion-common", "deltalake-core/datafusion", "moka"]

[[example]]
Expand Down
9 changes: 3 additions & 6 deletions crates/catalog-unity/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,9 @@ impl SchemaProvider for UnitySchemaProvider {
.await
.map_err(|err| DataFusionError::External(err.into()))?;

let new_storage_opts = temp_creds
.aws_temp_credentials
.ok_or_else(|| {
DataFusionError::External(UnityCatalogError::MissingCredential.into())
})?
.into();
let new_storage_opts = temp_creds.get_credentials().ok_or_else(|| {
DataFusionError::External(UnityCatalogError::MissingCredential.into())
})?;
let table = DeltaTableBuilder::from_uri(table.storage_location)
.with_storage_options(new_storage_opts)
.load()
Expand Down
91 changes: 91 additions & 0 deletions crates/catalog-unity/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::sync::Once;

/// Error response from unity API
#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -402,6 +403,66 @@ pub struct TemporaryTableCredentials {
pub url: String,
}

#[cfg(feature = "aws")]
static INIT_AWS: Once = Once::new();
#[cfg(feature = "azure")]
static INIT_AZURE: Once = Once::new();
#[cfg(feature = "gcp")]
static INIT_GCP: Once = Once::new();

impl TemporaryTableCredentials {
#[cfg(feature = "aws")]
pub fn get_aws_credentials(&self) -> Option<HashMap<String, String>> {
INIT_AWS.call_once(|| deltalake_aws::register_handlers(None));
self.aws_temp_credentials.clone().map(Into::into)
}

#[cfg(not(feature = "aws"))]
pub fn get_aws_credentials(&self) -> Option<HashMap<String, String>> {
None
}

#[cfg(feature = "azure")]
pub fn get_azure_credentials(&self) -> Option<HashMap<String, String>> {
INIT_AZURE.call_once(|| deltalake_azure::register_handlers(None));
self.azure_user_delegation_sas.clone().map(Into::into)
}

#[cfg(not(feature = "azure"))]
pub fn get_azure_credentials(&self) -> Option<HashMap<String, String>> {
None
}

#[cfg(feature = "gcp")]
pub fn get_gcp_credentials(&self) -> Option<HashMap<String, String>> {
INIT_GCP.call_once(|| deltalake_gcp::register_handlers(None));
self.gcp_oauth_token.clone().map(Into::into)
}

#[cfg(not(feature = "gcp"))]
pub fn get_gcp_credentials(&self) -> Option<HashMap<String, String>> {
None
}

#[cfg(feature = "r2")]
pub fn get_r2_credentials(&self) -> Option<HashMap<String, String>> {
INIT_AWS.call_once(|| deltalake_aws::register_handlers(None));
self.r2_temp_credentials.clone().map(Into::into)
}

#[cfg(not(feature = "r2"))]
pub fn get_r2_credentials(&self) -> Option<HashMap<String, String>> {
None
}

pub fn get_credentials(self) -> Option<HashMap<String, String>> {
self.get_aws_credentials()
.or(self.get_azure_credentials())
.or(self.get_gcp_credentials())
.or(self.get_r2_credentials())
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct AwsTempCredentials {
pub access_key_id: String,
Expand Down Expand Up @@ -440,6 +501,36 @@ impl From<AzureUserDelegationSas> for HashMap<String, String> {
}
}

#[cfg(feature = "gcp")]
impl From<GcpOauthToken> for HashMap<String, String> {
fn from(value: GcpOauthToken) -> Self {
HashMap::from_iter([(
"google_application_credentials".to_string(),
value.oauth_token,
)])
}
}

#[cfg(feature = "r2")]
impl From<R2TempCredentials> for HashMap<String, String> {
fn from(value: R2TempCredentials) -> Self {
HashMap::from_iter([
(
deltalake_aws::constants::AWS_ACCESS_KEY_ID.to_string(),
value.access_key_id,
),
(
deltalake_aws::constants::AWS_SECRET_ACCESS_KEY.to_string(),
value.secret_access_key,
),
(
deltalake_aws::constants::AWS_SESSION_TOKEN.to_string(),
value.session_token,
),
])
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct AzureUserDelegationSas {
pub sas_token: String,
Expand Down
10 changes: 5 additions & 5 deletions crates/gcp/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Auxiliary module for generating a valig Google cloud configuration.
//! Auxiliary module for generating a valid Google cloud configuration.
//!
//! Google offers few ways to authenticate against storage accounts and
//! provide credentials for a service principal. Some of this configutaion may
//! provide credentials for a service principal. Some of this configuration may
//! partially be specified in the environment. This module establishes a structured
//! way how we discover valid credentials and some heuristics on how they are prioritized.
use std::collections::{hash_map::Entry, HashMap};
Expand Down Expand Up @@ -42,7 +42,7 @@ impl GcpCredential {

/// Helper struct to create full configuration from passed options and environment
///
/// Main concern is to pick the desired credential for connecting to starage backend
/// Main concern is to pick the desired credential for connecting to storage backend
/// based on a provided configuration and configuration set in the environment.
pub(crate) struct GcpConfigHelper {
config: HashMap<GoogleConfigKey, String>,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl GcpConfigHelper {
.all(|key| self.config.contains_key(key) || self.env_config.contains_key(key))
}

/// Generate a cofiguration augmented with options from the environment
/// Generate a configuration augmented with options from the environment
pub fn build(mut self) -> Result<HashMap<GoogleConfigKey, String>> {
let mut has_credential = false;

Expand All @@ -110,7 +110,7 @@ impl GcpConfigHelper {
}
}

// try partially avaialbe credentials augmented by environment
// try partially available credentials augmented by environment
if !has_credential {
for cred in &self.priority {
if self.has_any_config(cred) && self.has_full_config_with_env(cred) {
Expand Down

0 comments on commit b90d552

Please sign in to comment.