From 3335a03a2629698bb18947a999180b44a5abb778 Mon Sep 17 00:00:00 2001 From: Peter Ke Date: Sat, 2 Nov 2024 10:42:38 -0700 Subject: [PATCH 1/2] Implement query builder Signed-off-by: Peter Ke --- crates/core/src/delta_datafusion/mod.rs | 7 ++- python/deltalake/__init__.py | 1 + python/deltalake/_internal.pyi | 5 +++ python/deltalake/query.py | 31 +++++++++++++ python/deltalake/warnings.py | 2 + python/src/error.rs | 8 ++++ python/src/lib.rs | 13 +++++- python/src/query.rs | 58 +++++++++++++++++++++++++ python/tests/test_table_read.py | 54 +++++++++++++++++++++++ 9 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 python/deltalake/query.py create mode 100644 python/deltalake/warnings.py create mode 100644 python/src/query.rs diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 4425b0ff6f..745e9c9df1 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -826,9 +826,12 @@ impl TableProvider for DeltaTableProvider { fn supports_filters_pushdown( &self, - _filter: &[&Expr], + filter: &[&Expr], ) -> DataFusionResult> { - Ok(vec![TableProviderFilterPushDown::Inexact]) + Ok(filter + .iter() + .map(|_| TableProviderFilterPushDown::Inexact) + .collect()) } fn statistics(&self) -> Option { diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index 43997076b2..2e82d20a40 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -2,6 +2,7 @@ from ._internal import __version__ as __version__ from ._internal import rust_core_version as rust_core_version from .data_catalog import DataCatalog as DataCatalog +from .query import QueryBuilder as QueryBuilder from .schema import DataType as DataType from .schema import Field as Field from .schema import Schema as Schema diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 026d84d08d..052cf1ebb6 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -874,6 +874,11 @@ class DeltaFileSystemHandler: ) -> ObjectOutputStream: """Open an output stream for sequential writing.""" +class PyQueryBuilder: + def __init__(self) -> None: ... + def register(self, table_name: str, delta_table: RawDeltaTable) -> None: ... + def execute(self, sql: str) -> List[pyarrow.RecordBatch]: ... + class DeltaDataChecker: def __init__(self, invariants: List[Tuple[str, str]]) -> None: ... def check_batch(self, batch: pyarrow.RecordBatch) -> None: ... diff --git a/python/deltalake/query.py b/python/deltalake/query.py new file mode 100644 index 0000000000..06e5144d24 --- /dev/null +++ b/python/deltalake/query.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import warnings +from typing import List + +import pyarrow + +from deltalake._internal import PyQueryBuilder +from deltalake.table import DeltaTable +from deltalake.warnings import ExperimentalWarning + + +class QueryBuilder: + def __init__(self) -> None: + warnings.warn( + "QueryBuilder is experimental and subject to change", + category=ExperimentalWarning, + ) + self._query_builder = PyQueryBuilder() + + def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: + """Add a table to the query builder.""" + self._query_builder.register( + table_name=table_name, + delta_table=delta_table._table, + ) + return self + + def execute(self, sql: str) -> List[pyarrow.RecordBatch]: + """Execute the query and return a list of record batches.""" + return self._query_builder.execute(sql) diff --git a/python/deltalake/warnings.py b/python/deltalake/warnings.py new file mode 100644 index 0000000000..83c5d34bcd --- /dev/null +++ b/python/deltalake/warnings.py @@ -0,0 +1,2 @@ +class ExperimentalWarning(Warning): + pass diff --git a/python/src/error.rs b/python/src/error.rs index a54b1e60b4..b1d22fc7ca 100644 --- a/python/src/error.rs +++ b/python/src/error.rs @@ -1,4 +1,5 @@ use arrow_schema::ArrowError; +use deltalake::datafusion::error::DataFusionError; use deltalake::protocol::ProtocolError; use deltalake::{errors::DeltaTableError, ObjectStoreError}; use pyo3::exceptions::{ @@ -79,6 +80,10 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr { } } +fn datafusion_to_py(err: DataFusionError) -> PyErr { + DeltaError::new_err(err.to_string()) +} + #[derive(thiserror::Error, Debug)] pub enum PythonError { #[error("Error in delta table")] @@ -89,6 +94,8 @@ pub enum PythonError { Arrow(#[from] ArrowError), #[error("Error in checkpoint")] Protocol(#[from] ProtocolError), + #[error("Error in data fusion")] + DataFusion(#[from] DataFusionError), } impl From for pyo3::PyErr { @@ -98,6 +105,7 @@ impl From for pyo3::PyErr { PythonError::ObjectStore(err) => object_store_to_py(err), PythonError::Arrow(err) => arrow_to_py(err), PythonError::Protocol(err) => checkpoint_to_py(err), + PythonError::DataFusion(err) => datafusion_to_py(err), } } } diff --git a/python/src/lib.rs b/python/src/lib.rs index 8da2544b3f..c4a4d80b78 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -2,6 +2,7 @@ mod error; mod features; mod filesystem; mod merge; +mod query; mod schema; mod utils; @@ -20,12 +21,18 @@ use delta_kernel::expressions::Scalar; use delta_kernel::schema::StructField; use deltalake::arrow::compute::concat_batches; use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use deltalake::arrow::pyarrow::ToPyArrow; use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator}; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; use deltalake::checkpoints::{cleanup_metadata, create_checkpoint}; +use deltalake::datafusion::datasource::provider_as_source; +use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use deltalake::datafusion::physical_plan::ExecutionPlan; -use deltalake::datafusion::prelude::SessionContext; -use deltalake::delta_datafusion::DeltaDataChecker; +use deltalake::datafusion::prelude::{DataFrame, SessionContext}; +use deltalake::delta_datafusion::{ + DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig, + DeltaTableProvider, +}; use deltalake::errors::DeltaTableError; use deltalake::kernel::{ scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction, @@ -69,6 +76,7 @@ use crate::error::PythonError; use crate::features::TableFeatures; use crate::filesystem::FsConfig; use crate::merge::PyMergeBuilder; +use crate::query::PyQueryBuilder; use crate::schema::{schema_to_pyobject, Field}; use crate::utils::rt; @@ -2095,6 +2103,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { )?)?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/src/query.rs b/python/src/query.rs new file mode 100644 index 0000000000..af3da38eee --- /dev/null +++ b/python/src/query.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use deltalake::{ + arrow::pyarrow::ToPyArrow, + datafusion::prelude::SessionContext, + delta_datafusion::{DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider}, +}; +use pyo3::prelude::*; + +use crate::{error::PythonError, utils::rt, RawDeltaTable}; + +#[pyclass(module = "deltalake._internal")] +pub(crate) struct PyQueryBuilder { + _ctx: SessionContext, +} + +#[pymethods] +impl PyQueryBuilder { + #[new] + pub fn new() -> Self { + let config = DeltaSessionConfig::default().into(); + let _ctx = SessionContext::new_with_config(config); + + PyQueryBuilder { _ctx } + } + + pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> { + let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?; + let log_store = delta_table._table.log_store(); + + let scan_config = DeltaScanConfigBuilder::default() + .build(snapshot) + .map_err(PythonError::from)?; + + let provider = Arc::new( + DeltaTableProvider::try_new(snapshot.clone(), log_store, scan_config) + .map_err(PythonError::from)?, + ); + + self._ctx + .register_table(table_name, provider) + .map_err(PythonError::from)?; + + Ok(()) + } + + pub fn execute(&self, py: Python, sql: &str) -> PyResult { + let batches = py.allow_threads(|| { + rt().block_on(async { + let df = self._ctx.sql(sql).await?; + df.collect().await + }) + .map_err(PythonError::from) + })?; + + batches.to_pyarrow(py) + } +} diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 5ff07ed9e8..30d7f21d7f 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -9,6 +9,7 @@ from deltalake._util import encode_partition_value from deltalake.exceptions import DeltaProtocolError +from deltalake.query import QueryBuilder from deltalake.table import ProtocolVersions from deltalake.writer import write_deltalake @@ -946,3 +947,56 @@ def test_is_deltatable_with_storage_opts(): "DELTA_DYNAMO_TABLE_NAME": "custom_table_name", } assert DeltaTable.is_deltatable(table_path, storage_options=storage_options) + + +def test_read_query_builder(): + table_path = "../crates/test/tests/data/delta-0.8.0-partitioned" + dt = DeltaTable(table_path) + expected = { + "value": ["4", "5", "6", "7"], + "year": ["2021", "2021", "2021", "2021"], + "month": ["4", "12", "12", "12"], + "day": ["5", "4", "20", "20"], + } + actual = pa.Table.from_batches( + QueryBuilder() + .register("tbl", dt) + .execute("SELECT * FROM tbl WHERE year >= 2021 ORDER BY value") + ).to_pydict() + assert expected == actual + + +def test_read_query_builder_join_multiple_tables(tmp_path): + table_path = "../crates/test/tests/data/delta-0.8.0-date" + dt1 = DeltaTable(table_path) + + write_deltalake( + tmp_path, + pa.table( + { + "date": ["2021-01-01", "2021-01-02", "2021-01-03", "2021-12-31"], + "value": ["a", "b", "c", "d"], + } + ), + ) + dt2 = DeltaTable(tmp_path) + + expected = { + "date": ["2021-01-01", "2021-01-02", "2021-01-03"], + "dayOfYear": [1, 2, 3], + "value": ["a", "b", "c"], + } + actual = pa.Table.from_batches( + QueryBuilder() + .register("tbl1", dt1) + .register("tbl2", dt2) + .execute( + """ + SELECT tbl2.date, tbl1.dayOfYear, tbl2.value + FROM tbl1 + INNER JOIN tbl2 ON tbl1.date = tbl2.date + ORDER BY tbl1.date + """ + ) + ).to_pydict() + assert expected == actual From 4af90524d4c8389b5f8adb60a317b311b5bbd152 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 29 Nov 2024 17:49:25 +0000 Subject: [PATCH 2/2] chore: add some more documentation to the new QueryBuilder interface Signed-off-by: R. Tyler Croy --- python/deltalake/query.py | 37 +++++++++++++++++++++++++++++++++++-- python/src/filesystem.rs | 2 +- python/src/query.rs | 25 ++++++++++++++++++++----- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/python/deltalake/query.py b/python/deltalake/query.py index 06e5144d24..b3e4100d8c 100644 --- a/python/deltalake/query.py +++ b/python/deltalake/query.py @@ -11,6 +11,14 @@ class QueryBuilder: + """ + QueryBuilder is an experimental API which exposes Apache DataFusion SQL to Python users of the deltalake library. + + This API is subject to change. + + >>> qb = QueryBuilder() + """ + def __init__(self) -> None: warnings.warn( "QueryBuilder is experimental and subject to change", @@ -19,7 +27,20 @@ def __init__(self) -> None: self._query_builder = PyQueryBuilder() def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: - """Add a table to the query builder.""" + """ + Add a table to the query builder instance by name. The `table_name` + will be how the referenced `DeltaTable` can be referenced in SQL + queries. + + For example: + + >>> tmp = getfixture('tmp_path') + >>> import pyarrow as pa + >>> from deltalake import DeltaTable, QueryBuilder + >>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())])) + >>> qb = QueryBuilder().register('test', dt) + >>> assert qb is not None + """ self._query_builder.register( table_name=table_name, delta_table=delta_table._table, @@ -27,5 +48,17 @@ def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: return self def execute(self, sql: str) -> List[pyarrow.RecordBatch]: - """Execute the query and return a list of record batches.""" + """ + Execute the query and return a list of record batches + + For example: + + >>> tmp = getfixture('tmp_path') + >>> import pyarrow as pa + >>> from deltalake import DeltaTable, QueryBuilder + >>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())])) + >>> qb = QueryBuilder().register('test', dt) + >>> results = qb.execute('SELECT * FROM test') + >>> assert results is not None + """ return self._query_builder.execute(sql) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 116b1b0cf1..ee5261ab09 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -13,7 +13,7 @@ use std::sync::Arc; const DEFAULT_MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub(crate) struct FsConfig { pub(crate) root_url: String, pub(crate) options: HashMap, diff --git a/python/src/query.rs b/python/src/query.rs index af3da38eee..55889c567f 100644 --- a/python/src/query.rs +++ b/python/src/query.rs @@ -9,9 +9,14 @@ use pyo3::prelude::*; use crate::{error::PythonError, utils::rt, RawDeltaTable}; +/// PyQueryBuilder supports the _experimental_ `QueryBuilder` Pythoh interface which allows users +/// to take advantage of the [Apache DataFusion](https://datafusion.apache.org) engine already +/// present in the Python package. #[pyclass(module = "deltalake._internal")] +#[derive(Default)] pub(crate) struct PyQueryBuilder { - _ctx: SessionContext, + /// DataFusion [SessionContext] to hold mappings of registered tables + ctx: SessionContext, } #[pymethods] @@ -19,11 +24,16 @@ impl PyQueryBuilder { #[new] pub fn new() -> Self { let config = DeltaSessionConfig::default().into(); - let _ctx = SessionContext::new_with_config(config); + let ctx = SessionContext::new_with_config(config); - PyQueryBuilder { _ctx } + PyQueryBuilder { ctx } } + /// Register the given [RawDeltaTable] into the [SessionContext] using the provided + /// `table_name` + /// + /// Once called, the provided `delta_table` will be referencable in SQL queries so long as + /// another table of the same name is not registered over it. pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> { let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?; let log_store = delta_table._table.log_store(); @@ -37,17 +47,22 @@ impl PyQueryBuilder { .map_err(PythonError::from)?, ); - self._ctx + self.ctx .register_table(table_name, provider) .map_err(PythonError::from)?; Ok(()) } + /// Execute the given SQL command within the [SessionContext] of this instance + /// + /// **NOTE:** Since this function returns a materialized Python list of `RecordBatch` + /// instances, it may result unexpected memory consumption for queries which return large data + /// sets. pub fn execute(&self, py: Python, sql: &str) -> PyResult { let batches = py.allow_threads(|| { rt().block_on(async { - let df = self._ctx.sql(sql).await?; + let df = self.ctx.sql(sql).await?; df.collect().await }) .map_err(PythonError::from)