Skip to content

Commit

Permalink
chore: use log store name to check
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 6, 2025
1 parent a613cce commit 7834241
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -547,7 +547,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -604,7 +604,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -669,7 +669,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -710,7 +710,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -742,7 +742,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -775,7 +775,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -807,7 +807,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -914,7 +914,7 @@ impl RawDeltaTable {
) -> PyResult<PyMergeBuilder> {
py.allow_threads(|| {
let handler: Option<Arc<dyn PreExecuteHandler>> =
if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
Some(Arc::new(LakeFSPreExecuteHandler {}))
} else {
None
Expand Down Expand Up @@ -982,7 +982,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -1370,7 +1370,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand All @@ -1397,7 +1397,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -1426,7 +1426,7 @@ impl RawDeltaTable {
cmd = cmd.with_commit_properties(commit_properties);
}

if self._config.root_url.starts_with("lakefs://") {
if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -2065,6 +2065,12 @@ fn create_deltalake(
let mode = mode.parse().map_err(PythonError::from)?;
let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let use_lakefs_handler = if table.log_store().name() == "LakeFSLogStore" {
true
} else {
false
};

let mut builder = DeltaOps(table)
.create()
.with_columns(schema.fields().cloned())
Expand All @@ -2090,7 +2096,7 @@ fn create_deltalake(
builder = builder.with_metadata(json_metadata);
};

if table_uri.starts_with("lakefs://") {
if use_lakefs_handler {
builder = builder.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down Expand Up @@ -2125,6 +2131,14 @@ fn write_new_deltalake(

let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;


let use_lakefs_handler = if table.log_store().name() == "LakeFSLogStore" {
true
} else {
false
};


let mut builder = DeltaOps(table)
.create()
.with_columns(schema.fields().cloned())
Expand All @@ -2149,7 +2163,7 @@ fn write_new_deltalake(
builder = builder.with_metadata(json_metadata);
};

if table_uri.starts_with("lakefs://") {
if use_lakefs_handler {
builder = builder.with_pre_execute_handler(Arc::new(LakeFSPreExecuteHandler {}))
}

Expand Down

0 comments on commit 7834241

Please sign in to comment.