Skip to content

Commit

Permalink
feat(python): Add capability to read Unity Catalog (uc://) uris
Browse files Browse the repository at this point in the history
This adds capability to read directly from uc:// uris using the
local catalog-unity crate. This also exposes the UC temporary
credentials in storage_options of the `DeltaTable` instance so
polars or similar readers can use it.

Signed-off-by: Omkar P <[email protected]>
  • Loading branch information
omkar-foss committed Jan 10, 2025
1 parent 0b90a11 commit 8fcd7f4
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 13 deletions.
67 changes: 65 additions & 2 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ pub enum UnityCatalogConfigKey {
/// - `azure_use_azure_cli`
/// - `use_azure_cli`
UseAzureCli,

/// Allow http url (e.g. http://localhost:8080/api/2.1/...)
/// Supported keys:
/// - `unity_allow_http_url`
AllowHttpUrl,
}

impl FromStr for UnityCatalogConfigKey {
Expand Down Expand Up @@ -224,6 +229,7 @@ impl FromStr for UnityCatalogConfigKey {
"workspace_url" | "unity_workspace_url" | "databricks_workspace_url" => {
Ok(UnityCatalogConfigKey::WorkspaceUrl)
}
"allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
_ => Err(DataCatalogError::UnknownConfigKey {
catalog: "unity",
key: s.to_string(),
Expand All @@ -237,6 +243,7 @@ impl AsRef<str> for UnityCatalogConfigKey {
fn as_ref(&self) -> &str {
match self {
UnityCatalogConfigKey::AccessToken => "unity_access_token",
UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
UnityCatalogConfigKey::ClientId => "unity_client_id",
Expand Down Expand Up @@ -289,6 +296,9 @@ pub struct UnityCatalogBuilder {
/// When set to true, azure cli has to be used for acquiring access token
use_azure_cli: bool,

/// When set to true, http will be allowed in the catalog url
allow_http_url: bool,

/// Retry config
retry_config: RetryConfig,

Expand All @@ -311,6 +321,9 @@ impl UnityCatalogBuilder {
) -> DataCatalogResult<Self> {
match UnityCatalogConfigKey::from_str(key.as_ref())? {
UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
UnityCatalogConfigKey::AllowHttpUrl => {
self.allow_http_url = str_is_truthy(&value.into())
}
UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
Expand Down Expand Up @@ -407,6 +420,44 @@ impl UnityCatalogBuilder {
self
}

/// Returns true if table uri is a valid Unity Catalog URI, false otherwise.
pub fn is_unity_catalog_uri(table_uri: &str) -> bool {
table_uri.starts_with("uc://")
}

/// Returns the storage location and temporary token to be used with the
/// Unity Catalog table.
pub async fn get_uc_location_and_token(table_uri: &str) -> Result<(String, String), UnityCatalogError> {
let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
if uri_parts.len() != 3 {
panic!("Invalid Unity Catalog URI: {}", table_uri);
}

let catalog_id = uri_parts[0];
let database_name = uri_parts[1];
let table_name = uri_parts[2];

let unity_catalog = match UnityCatalogBuilder::from_env().build() {
Ok(uc) => uc,
Err(_e) => panic!("Unable to build Unity Catalog."),
};
let storage_location =
match unity_catalog.get_table_storage_location(
Some(catalog_id.to_string()),
database_name,
table_name,
).await {
Ok(s) => s,
Err(_e) => panic!("Unable to find the table's storage location."),
};
let token = unity_catalog.get_credential().await?;
let credential = match token.to_str() {
Ok(header_str) => header_str.to_string(),
Err(_e) => panic!("Unable to get string value from Unity Catalog token."),
};
Ok((storage_location, credential))
}

fn get_credential_provider(&self) -> Option<CredentialProvider> {
if let Some(token) = self.bearer_token.as_ref() {
return Some(CredentialProvider::BearerToken(token.clone()));
Expand Down Expand Up @@ -451,7 +502,12 @@ impl UnityCatalogBuilder {
.trim_end_matches('/')
.to_string();

let client = self.client_options.client()?;
let client_options = if self.allow_http_url {
self.client_options.with_allow_http(true)
} else {
self.client_options
};
let client = client_options.client()?;

Ok(UnityCatalog {
client,
Expand Down Expand Up @@ -612,7 +668,7 @@ impl UnityCatalog {
self.catalog_url(),
catalog_id.as_ref(),
database_name.as_ref(),
table_name.as_ref()
table_name.as_ref(),
))
.header(AUTHORIZATION, token)
.send()
Expand Down Expand Up @@ -661,6 +717,7 @@ mod tests {
use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
use crate::models::*;
use crate::UnityCatalogBuilder;
use deltalake_core::DataCatalog;
use httpmock::prelude::*;

#[tokio::test]
Expand Down Expand Up @@ -716,5 +773,11 @@ mod tests {
.await
.unwrap();
assert!(matches!(get_table_response, GetTableResponse::Success(_)));

let storage_location = client.get_table_storage_location(
Some("catalog_name".to_string()), "schema_name", "table_name",
).await
.unwrap();
assert!(storage_location.eq_ignore_ascii_case("string"));
}
}
6 changes: 4 additions & 2 deletions crates/catalog-unity/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,14 @@ pub struct TableSummary {
pub struct Table {
/// Username of table creator.
#[serde(default)]
pub created_by: String,
pub created_by: Option<String>,

/// Name of table, relative to parent schema.
pub name: String,

/// Username of user who last modified the table.
#[serde(default)]
pub updated_by: String,
pub updated_by: Option<String>,

/// List of schemes whose objects can be referenced without qualification.
#[serde(default)]
Expand All @@ -283,6 +283,7 @@ pub struct Table {
pub data_source_format: DataSourceFormat,

/// Full name of table, in form of catalog_name.schema_name.table_name
#[serde(default)]
pub full_name: String,

/// Name of parent schema relative to its parent catalog.
Expand All @@ -292,6 +293,7 @@ pub struct Table {
pub storage_location: String,

/// Unique identifier of parent metastore.
#[serde(default)]
pub metastore_id: String,
}

Expand Down
3 changes: 3 additions & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ doc = false
[dependencies]
delta_kernel.workspace = true

# deltalake_catalog_unity - local crate
deltalake-catalog-unity = { path = "../crates/catalog-unity" }

# arrow
arrow-schema = { workspace = true, features = ["serde"] }

Expand Down
44 changes: 35 additions & 9 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ use crate::merge::PyMergeBuilder;
use crate::query::PyQueryBuilder;
use crate::schema::{schema_to_pyobject, Field};
use crate::utils::rt;
use deltalake_catalog_unity::UnityCatalogBuilder;

#[derive(FromPyObject)]
enum PartitionFilterValue {
Expand Down Expand Up @@ -169,12 +170,24 @@ impl RawDeltaTable {
log_buffer_size: Option<usize>,
) -> PyResult<Self> {
py.allow_threads(|| {
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri)
.with_io_runtime(IORuntime::default());
let options = storage_options.clone().unwrap_or_default();
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options)
let (table_path, uc_token) = if UnityCatalogBuilder::is_unity_catalog_uri(table_uri) {
match rt().block_on(UnityCatalogBuilder::get_uc_location_and_token(table_uri)) {
Ok(tup) => tup,
Err(err) => return Err(PyRuntimeError::new_err(err.to_string())),
}
} else {
(table_uri.to_string(), "".to_string())
};

let mut options = storage_options.clone().unwrap_or_default();
if !uc_token.is_empty() {
options.insert("UNITY_CATALOG_TEMPORARY_TOKEN".to_string(), uc_token);
}

let mut builder = deltalake::DeltaTableBuilder::from_uri(&table_path)
.with_io_runtime(IORuntime::default());
builder = builder.with_storage_options(options.clone());

if let Some(version) = version {
builder = builder.with_version(version)
}
Expand All @@ -191,7 +204,7 @@ impl RawDeltaTable {
Ok(RawDeltaTable {
_table: Arc::new(Mutex::new(table)),
_config: FsConfig {
root_url: table_uri.into(),
root_url: table_path,
options,
},
})
Expand All @@ -204,10 +217,23 @@ impl RawDeltaTable {
table_uri: &str,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<bool> {
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri);
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options)
let (table_path, uc_token) = if UnityCatalogBuilder::is_unity_catalog_uri(table_uri) {
match rt().block_on(UnityCatalogBuilder::get_uc_location_and_token(table_uri)) {
Ok(tup) => tup,
Err(err) => return Err(PyRuntimeError::new_err(err.to_string())),
}
} else {
(table_uri.to_string(), "".to_string())
};

let mut options = storage_options.clone().unwrap_or_default();
if !uc_token.is_empty() {
options.insert("UNITY_CATALOG_TEMPORARY_TOKEN".to_string(), uc_token);
}

let mut builder = deltalake::DeltaTableBuilder::from_uri(&table_path)
.with_io_runtime(IORuntime::default());
builder = builder.with_storage_options(options.clone());
Ok(rt()
.block_on(async {
match builder.build() {
Expand Down

0 comments on commit 8fcd7f4

Please sign in to comment.