Skip to content

Commit

Permalink
Support Snowflake Azure
Browse files Browse the repository at this point in the history
clean up a bit based on Andre's variant

first cut at encryption

some cleanup

more cleanup

minor

retried; hashes now show up in SF

remove 256 GCM option for now

TEMP logs about url config

TEMP Switch back to trust-dns

hijack hickory with a version that has a larger buffer

actually switch back to hickory

Re-activate retries

Use eDNS

clean up log msgs

make get_master_key a free function

simplify

actually delete old get_master_key

more cleanup

Decrypt `AES_CBC_256` files with AES 128

cleanup

Support test endpoint in Azure

proper azurite support

Bump package version

fix version number

Update Cargo.lock
  • Loading branch information
alexrenz authored and andrebsguedes committed Nov 22, 2024
1 parent 810b5ff commit 76c0088
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 57 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "object_store_ffi"
version = "0.10.1"
version = "0.11.0"
edition = "2021"

[[bench]]
Expand Down Expand Up @@ -44,6 +44,7 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"
# object_store = { version = "0.10.1", features = ["azure", "aws"] }
# Pinned to a specific commit while waiting for upstream
object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", tag = "v0.10.2-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] }
hickory-resolver = "0.24"
thiserror = "1"
anyhow = { version = "1", features = ["backtrace"] }
once_cell = "1.18"
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ impl Client {
let (store, crypto_material_provider, stage_prefix, extension) = build_store_for_snowflake_stage(map, config.retry_config.clone()).await?;

let prefix = match (stage_prefix, config.prefix) {
(s, Some(u)) if s.ends_with("/") => Some(format!("{s}{u}")),
(s, Some(u)) => Some(format!("{s}/{u}")),
(s, None) => Some(s)
(Some(s), Some(u)) if s.ends_with("/") => Some(format!("{s}{u}")),
(Some(s), Some(u)) => Some(format!("{s}/{u}")),
(s, u) => s.or(u)
};

config.prefix = prefix;
Expand Down
60 changes: 54 additions & 6 deletions src/snowflake/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use zeroize::Zeroize;
use moka::future::Cache;
use crate::{duration_on_drop, error::{Error, RetryState}, metrics};
use crate::util::{deserialize_str, deserialize_slice};
// use anyhow::anyhow;
use super::resolver::HickoryResolverWithEdns;


#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -74,12 +74,42 @@ pub(crate) struct SnowflakeQueryData {

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub(crate) struct SnowflakeStageCreds {
pub(crate) struct SnowflakeStageAwsCreds {
pub aws_key_id: String,
pub aws_secret_key: String,
pub aws_token: String,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub(crate) struct SnowflakeStageAzureCreds {
pub azure_sas_token: String,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub(crate) enum SnowflakeStageCreds {
Aws(SnowflakeStageAwsCreds),
Azure(SnowflakeStageAzureCreds),
}

impl SnowflakeStageCreds {
pub(crate) fn as_aws(&self) -> crate::Result<&SnowflakeStageAwsCreds> {
match self {
SnowflakeStageCreds::Aws(creds) => Ok(creds),
SnowflakeStageCreds::Azure(_) => Err(Error::invalid_response("Expected AWS credentials, but got Azure ones")),
}
}

pub(crate) fn as_azure(&self) -> crate::Result<&SnowflakeStageAzureCreds> {
match self {
SnowflakeStageCreds::Azure(creds) => Ok(creds),
SnowflakeStageCreds::Aws(_) => Err(Error::invalid_response("Expected Azure credentials, but got AWS ones")),
}
}
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct SnowflakeStageInfo {
Expand Down Expand Up @@ -118,6 +148,7 @@ pub(crate) enum NormalizedStageInfo {
storage_account: String,
container: String,
prefix: String,
azure_sas_token: String,
#[serde(skip_serializing_if = "Option::is_none")]
end_point: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -132,18 +163,34 @@ impl TryFrom<&SnowflakeStageInfo> for NormalizedStageInfo {
if value.location_type == "S3" {
let (bucket, prefix) = value.location.split_once('/')
.ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the bucket name"))?;
let creds = value.creds.as_aws()?;
return Ok(NormalizedStageInfo::S3 {
bucket: bucket.to_string(),
prefix: prefix.to_string(),
region: value.region.clone(),
aws_key_id: value.creds.aws_key_id.clone(),
aws_secret_key: value.creds.aws_secret_key.clone(),
aws_token: value.creds.aws_token.clone(),
aws_key_id: creds.aws_key_id.clone(),
aws_secret_key: creds.aws_secret_key.clone(),
aws_token: creds.aws_token.clone(),
end_point: value.end_point.clone(),
test_endpoint: value.test_endpoint.clone()
})
} else if value.location_type == "AZURE" {
let (container, prefix) = value.location.split_once('/')
.ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the container name"))?;
let creds = value.creds.as_azure()?;
let storage_account = value.storage_account
.clone()
.ok_or_else(|| Error::invalid_response("Stage information from snowflake is missing the storage account name"))?;
return Ok(NormalizedStageInfo::BlobStorage {
storage_account: storage_account,
container: container.to_string(),
prefix: prefix.to_string(),
azure_sas_token: creds.azure_sas_token.clone(),
end_point: value.end_point.clone(),
test_endpoint: value.test_endpoint.clone()
})
} else {
return Err(Error::not_implemented("Azure BlobStorage is not implemented"));
return Err(Error::not_implemented(format!("Location type {} is not implemented", value.location_type)));
}
}
}
Expand Down Expand Up @@ -297,6 +344,7 @@ impl SnowflakeClient {
let client = SnowflakeClient {
config,
client: reqwest::Client::builder()
.dns_resolver(Arc::new(HickoryResolverWithEdns::default()))
.timeout(Duration::from_secs(180))
.build().unwrap(),
token: Arc::new(Mutex::new(None)),
Expand Down
Loading

0 comments on commit 76c0088

Please sign in to comment.