Skip to content

Commit

Permalink
Fix FFI errors, and add script for running TPC-H (#20)
Browse files Browse the repository at this point in the history
* Add script for running TPC-H

* use latest df-python

* use latest df-python

* fix

* tpch now works

* remove old performance data

* update result filename

* update expected plans

* formatting

* add note

* revert formatting change
  • Loading branch information
andygrove authored Oct 7, 2024
1 parent 880544a commit ead1e4e
Show file tree
Hide file tree
Showing 30 changed files with 2,301 additions and 1,411 deletions.
621 changes: 304 additions & 317 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@ rust-version = "1.62"
build = "build.rs"

[dependencies]
datafusion = { version = "41.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "41.0.0"
datafusion-python = "41.0.0"
datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "42.0.0"

# temporarily point to revision until version 42 is released
datafusion-python = { git = "https://github.com/apache/datafusion-python" }

futures = "0.3"
log = "0.4"
prost = "0.12"
pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38"] }
prost = "0.13"
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "sync"] }

[build-dependencies]
prost-types = "0.12"
prost-types = "0.13"
rustc_version = "0.4.0"
tonic-build = { version = "0.8", default-features = false, features = ["transport", "prost"] }

Expand Down
68 changes: 24 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,41 @@

# DataFusion on Ray

> This was originally a research project donated from [ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing distributed SQL queries from Python, using
> [Ray](https://www.ray.io/) and [DataFusion](https://github.com/apache/arrow-datafusion).
> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from
> Python, using [Ray] and [Apache DataFusion]
DataFusion Ray is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow](https://arrow.apache.org/), [Apache DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/).
[ray-sql]: https://github.com/datafusion-contrib/ray-sql

## Goals
DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation
of [Apache Arrow], [Apache DataFusion], and [Ray].

- Demonstrate how easily new systems can be built on top of DataFusion. See the [design documentation](./docs/README.md)
to understand how RaySQL works.
- Drive requirements for DataFusion's [Python bindings](https://github.com/apache/arrow-datafusion-python).
- Create content for an interesting blog post or conference talk.
[Ray]: https://www.ray.io/
[Apache Arrow]: https://arrow.apache.org/
[Apache DataFusion]: https://datafusion.apache.org/

## Non Goals
## Comparison to other DataFusion projects

- Re-build the cluster scheduling systems like what [Ballista](https://datafusion.apache.org/ballista/) did.
- Ballista is extremely complex and utilizing Ray feels like it abstracts some of that complexity away.
- Datafusion Ray is delegating cluster management to Ray.
### Comparison to DataFusion Ballista

- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on
Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project.
- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first

[DataFusion Ballista]: https://github.com/apache/datafusion-ballista

### Comparison to DataFusion Python

- [DataFusion Python] provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends
DataFusion Python to provide scalability across multiple nodes.

[DataFusion Python]: https://github.com/apache/datafusion-python

## Example

Run the following example live in your browser using a Google Colab [notebook](https://colab.research.google.com/drive/1tmSX0Lu6UFh58_-DBUVoyYx6BoXHOszP?usp=sharing).

```python
import os
import pandas as pd
import ray

from datafusion_ray import DatafusionRayContext
Expand All @@ -54,7 +64,7 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
ray.init(resources={"worker": 1})

# Create a context and register a table
ctx = DatafusionRayContext(2, use_ray_shuffle=True)
ctx = DatafusionRayContext(2)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")
Expand All @@ -75,36 +85,6 @@ for record_batch in result_set:
- Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
- Support for CSV and Parquet files

## Limitations

- Requires a shared file system currently. Check details [here](./docs/README.md#distributed-shuffle).

## Performance

This chart shows the performance of DataFusion Ray compared to Apache Spark for
[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set (10GB), running on a desktop (Threadripper
with 24 physical cores). Both DataFusion Ray and Spark are configured with 24 executors.

### Overall Time

DataFusion Ray is ~1.9x faster overall for this scale factor and environment with disk-based shuffle.

![SQLBench-H Total](./docs/sqlbench-h-total.png)

### Per Query Time

Spark is much faster on some queries, likely due to broadcast exchanges, which DataFusion Ray hasn't implemented yet.

![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png)

### Performance Plan

Plans on experimenting with the following changes to improve performance:

- Make better use of Ray futures to run more tasks in parallel
- Use Ray object store for shuffle data transfer to reduce disk I/O cost
- Keep upgrading to newer versions of DataFusion to pick up the latest optimizations

## Building

```bash
Expand Down
8 changes: 4 additions & 4 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub fn deserialize_execution_plan(bytes: Vec<u8>) -> PyResult<PyExecutionPlan> {
/// Iterate down an ExecutionPlan and set the input objects for RayShuffleReaderExec.
fn _set_inputs_for_ray_shuffle_reader(
plan: Arc<dyn ExecutionPlan>,
input_partitions: &PyList,
input_partitions: &Bound<'_, PyList>,
) -> Result<()> {
if let Some(reader_exec) = plan.as_any().downcast_ref::<RayShuffleReaderExec>() {
let exec_stage_id = reader_exec.stage_id;
Expand All @@ -200,8 +200,8 @@ fn _set_inputs_for_ray_shuffle_reader(
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?
.extract::<usize>()
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
let batch = RecordBatch::from_pyarrow(
pytuple
let batch = RecordBatch::from_pyarrow_bound(
&pytuple
.get_item(2)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?,
)
Expand Down Expand Up @@ -235,7 +235,7 @@ fn _execute_partition(
));
Python::with_gil(|py| {
let input_partitions = inputs
.as_ref(py)
.bind(py)
.downcast::<PyList>()
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
_set_inputs_for_ray_shuffle_reader(plan.plan.clone(), input_partitions)
Expand Down
Loading

0 comments on commit ead1e4e

Please sign in to comment.