Skip to content

Commit

Permalink
chore: restructure crate, make it feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 8, 2025
1 parent ceed6f7 commit a65f72c
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 98 deletions.
3 changes: 3 additions & 0 deletions crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ deltalake-aws = { version = "0.6.0", path = "../aws", default-features = false,
deltalake-azure = { version = "0.6.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.7.0", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.7.0", path = "../hdfs", optional = true }
deltalake-lakefs = { version = "0.6.0", path = "../lakefs", optional = true }
deltalake-catalog-glue = { version = "0.7.0", path = "../catalog-glue", optional = true }
deltalake-catalog-unity = { version = "0.7.0", path = "../catalog-unity", optional = true }


[features]
# All of these features are just reflected into the core crate until that
# functionality is broken apart
Expand All @@ -39,6 +41,7 @@ python = ["deltalake-core/python"]
s3-native-tls = ["deltalake-aws/native-tls"]
s3 = ["deltalake-aws/rustls"]
unity-experimental = ["deltalake-catalog-unity"]
lakefs = ["deltalake-lakefs"]

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ pub use deltalake_azure as azure;
pub use deltalake_gcp as gcp;
#[cfg(feature = "hdfs")]
pub use deltalake_hdfs as hdfs;
#[cfg(feature = "lakefs")]
pub use deltalake_lakefs as lakefs;
4 changes: 0 additions & 4 deletions crates/lakefs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.23.0", path = "../core" }
lazy_static = "1"

# workspace dependencies
async-trait = { workspace = true }
bytes = { workspace = true }
Expand All @@ -27,8 +25,6 @@ tokio = { workspace = true }
regex = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }
backon = { version = "1",default-features = false, features = [ "tokio-sleep" ] }
hyper-tls = { version = "0.5", optional = true }
dashmap = "6"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
Expand Down
87 changes: 87 additions & 0 deletions crates/lakefs/src/execute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use async_trait::async_trait;
use deltalake_core::{
logstore::LogStoreRef, operations::CustomExecuteHandler, DeltaResult, DeltaTableError,
};
use tracing::debug;
use uuid::Uuid;

use crate::logstore::LakeFSLogStore;

pub struct LakeFSCustomExecuteHandler {}

#[async_trait]
impl CustomExecuteHandler for LakeFSCustomExecuteHandler {
// LakeFS Log store pre execution of delta operation (create branch, logs object store and transaction)
async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()> {
debug!("Running LakeFS pre execution inside delta operation");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>() {
lakefs_store.pre_execute(operation_id).await
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}
}
// Not required for LakeFS
async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()> {
debug!("Running LakeFS post execution inside delta operation");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>() {
let (repo, _, _) = lakefs_store
.client
.decompose_url(lakefs_store.config.location.to_string());
let result = lakefs_store
.client
.delete_branch(repo, lakefs_store.client.get_transaction(operation_id)?)
.await
.map_err(|e| DeltaTableError::Transaction { source: e });
lakefs_store.client.clear_transaction(operation_id);
result
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}
}

// Execute arbitrary code at the start of the post commit hook
async fn before_post_commit_hook(
&self,
log_store: &LogStoreRef,
file_operations: bool,
operation_id: Uuid,
) -> DeltaResult<()> {
if file_operations {
debug!("Running LakeFS pre execution inside post_commit_hook");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>()
{
lakefs_store.pre_execute(operation_id).await
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}?;
}
Ok(())
}

// Execute arbitrary code at the end of the post commit hook
async fn after_post_commit_hook(
&self,
log_store: &LogStoreRef,
file_operations: bool,
operation_id: Uuid,
) -> DeltaResult<()> {
if file_operations {
debug!("Running LakeFS post execution inside post_commit_hook");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>()
{
lakefs_store.commit_merge(operation_id).await
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}?;
}
Ok(())
}
}
2 changes: 2 additions & 0 deletions crates/lakefs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
pub mod client;
pub mod errors;
pub mod execute;
pub mod logstore;
pub mod storage;
use deltalake_core::logstore::{logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, Path};
pub use execute::LakeFSCustomExecuteHandler;
use logstore::lakefs_logstore;
use std::sync::Arc;
use storage::LakeFSObjectStoreFactory;
Expand Down
88 changes: 3 additions & 85 deletions crates/lakefs/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use crate::client::LakeFSConfig;
use crate::errors::LakeFSConfigError;

use super::client::LakeFSClient;
use async_trait::async_trait;
use bytes::Bytes;
use deltalake_core::operations::CustomExecuteHandler;
use deltalake_core::storage::{
commit_uri_from_version, DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
Expand All @@ -20,7 +18,6 @@ use deltalake_core::{
DeltaResult,
};
use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet};
use tracing::debug;
use url::Url;
use uuid::Uuid;

Expand Down Expand Up @@ -59,10 +56,10 @@ pub fn lakefs_logstore(

/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct LakeFSLogStore {
pub(crate) struct LakeFSLogStore {
pub(crate) storage: DefaultObjectStoreRegistry,
config: LogStoreConfig,
client: LakeFSClient,
pub(crate) config: LogStoreConfig,
pub(crate) client: LakeFSClient,
}

impl LakeFSLogStore {
Expand Down Expand Up @@ -328,82 +325,3 @@ fn put_options() -> &'static PutOptions {
attributes: Attributes::default(),
})
}

pub struct LakeFSCustomExecuteHandler {}

#[async_trait]
impl CustomExecuteHandler for LakeFSCustomExecuteHandler {
// LakeFS Log store pre execution of delta operation (create branch, logs object store and transaction)
async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()> {
debug!("Running LakeFS pre execution inside delta operation");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>() {
lakefs_store.pre_execute(operation_id).await
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}
}
// Not required for LakeFS
async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()> {
debug!("Running LakeFS post execution inside delta operation");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>() {
let (repo, _, _) = lakefs_store
.client
.decompose_url(lakefs_store.config.location.to_string());
let result = lakefs_store
.client
.delete_branch(repo, lakefs_store.client.get_transaction(operation_id)?)
.await
.map_err(|e| DeltaTableError::Transaction { source: e });
lakefs_store.client.clear_transaction(operation_id);
result
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}
}

// Execute arbitrary code at the start of the post commit hook
async fn before_post_commit_hook(
&self,
log_store: &LogStoreRef,
file_operations: bool,
operation_id: Uuid,
) -> DeltaResult<()> {
if file_operations {
debug!("Running LakeFS pre execution inside post_commit_hook");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>()
{
lakefs_store.pre_execute(operation_id).await
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}?;
}
Ok(())
}

// Execute arbitrary code at the end of the post commit hook
async fn after_post_commit_hook(
&self,
log_store: &LogStoreRef,
file_operations: bool,
operation_id: Uuid,
) -> DeltaResult<()> {
if file_operations {
debug!("Running LakeFS post execution inside post_commit_hook");
if let Some(lakefs_store) = log_store.clone().as_any().downcast_ref::<LakeFSLogStore>()
{
lakefs_store.commit_merge(operation_id).await
} else {
Err(DeltaTableError::generic(
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found",
))
}?;
}
Ok(())
}
}
6 changes: 1 addition & 5 deletions crates/lakefs/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! LakFS storage backend (internally S3).
//! LakeFS storage backend (internally S3).
use deltalake_core::storage::object_store::aws::AmazonS3ConfigKey;
use deltalake_core::storage::{
Expand Down Expand Up @@ -94,7 +94,3 @@ impl ObjectStoreFactory for LakeFSObjectStoreFactory {
Ok((store, prefix))
}
}

// TODO: ADD LakeFSObjectStore which can also work with unsafe_put for backwards compatible, this will be
// Just a bare put where we don't check if hte object exists. In theory this will be safe, since deltalake
// writer will never use the same transaction branch.
45 changes: 45 additions & 0 deletions crates/lakefs/tests/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// #![cfg(feature = "integration_test")]
use deltalake_lakefs::register_handlers;
use deltalake_test::utils::*;
use std::{
collections::HashSet,
process::{Command, ExitStatus},
};

use which::which;

pub struct LakeFSIntegration {}

impl Default for LakeFSIntegration {
fn default() -> Self {
register_handlers(None);
Self {}
}
}

impl StorageIntegration for LakeFSIntegration {
fn prepare_env(&self) {
println!("Preparing env");

set_env_if_not_set("endpoint", "http://127.0.0.1:8000");
set_env_if_not_set("access_key_id", "LAKEFSID");
set_env_if_not_set("secret_access_key", "LAKEFSKEY");
set_env_if_not_set("allow_http", "true");
}

fn create_bucket(&self) -> std::io::Result<ExitStatus> {
Ok(())
}

fn bucket_name(&self) -> String {
"bronze"
}

fn root_uri(&self) -> String {
format!("lakefs://{}/main", self.bucket_name())
}

fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result<ExitStatus> {

}
}
3 changes: 1 addition & 2 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
reqwest = { version = "*", features = ["native-tls-vendored"] }

deltalake-mount = { path = "../crates/mount" }
deltalake-lakefs = { path = "../crates/lakefs" }

[dependencies.pyo3]
version = "0.22.6"
Expand All @@ -55,7 +54,7 @@ features = ["extension-module", "abi3", "abi3-py39", "gil-refs"]
[dependencies.deltalake]
path = "../crates/deltalake"
version = "0"
features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs"]
features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs", "lakefs"]

[features]
default = ["rustls"]
Expand Down
4 changes: 2 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use deltalake::errors::DeltaTableError;
use deltalake::kernel::{
scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,
};
use deltalake::lakefs::LakeFSCustomExecuteHandler;
use deltalake::logstore::LogStoreRef;
use deltalake::operations::add_column::AddColumnBuilder;
use deltalake::operations::add_feature::AddTableFeatureBuilder;
Expand Down Expand Up @@ -58,7 +59,6 @@ use deltalake::storage::{IORuntime, ObjectStoreRef};
use deltalake::table::state::DeltaTableState;
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaOps, DeltaResult};
use deltalake_lakefs::logstore::LakeFSCustomExecuteHandler;
use error::DeltaError;
use futures::future::join_all;
use tracing::log::*;
Expand Down Expand Up @@ -2342,7 +2342,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
deltalake::gcp::register_handlers(None);
deltalake::hdfs::register_handlers(None);
deltalake_mount::register_handlers(None);
deltalake_lakefs::register_handlers(None);
deltalake::lakefs::register_handlers(None);

let py = m.py();
m.add("DeltaError", py.get_type_bound::<DeltaError>())?;
Expand Down

0 comments on commit a65f72c

Please sign in to comment.