Skip to content

Commit

Permalink
tpch now works
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Oct 6, 2024
1 parent d8dd541 commit 3ecc136
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 28 deletions.
49 changes: 23 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,40 @@

# DataFusion on Ray

> This was originally a research project donated from [ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing distributed SQL queries from Python, using
> [Ray](https://www.ray.io/) and [DataFusion](https://github.com/apache/arrow-datafusion).
> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from
> Python, using [Ray] and [Apache DataFusion]
DataFusion Ray is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow](https://arrow.apache.org/), [Apache DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/).
[ray-sql]: https://github.com/datafusion-contrib/ray-sql

## Goals
DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation
of [Apache Arrow], [Apache DataFusion], and [Ray].

- Demonstrate how easily new systems can be built on top of DataFusion. See the [design documentation](./docs/README.md)
to understand how RaySQL works.
- Drive requirements for DataFusion's [Python bindings](https://github.com/apache/arrow-datafusion-python).
- Create content for an interesting blog post or conference talk.
[Ray]: https://www.ray.io/
[Apache Arrow]: https://arrow.apache.org/
[Apache DataFusion]: https://datafusion.apache.org/

## Non Goals
## Comparison to other DataFusion projects

- Re-build the cluster scheduling systems like what [Ballista](https://datafusion.apache.org/ballista/) did.
- Ballista is extremely complex and utilizing Ray feels like it abstracts some of that complexity away.
- Datafusion Ray is delegating cluster management to Ray.
### Comparison to DataFusion Ballista

- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on
Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project.
- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first

[DataFusion Ballista]: https://github.com/apache/datafusion-ballista

### Comparison to DataFusion Python

- [DataFusion Python] provides

[DataFusion Python]: https://github.com/apache/datafusion-python

## Example

Run the following example live in your browser using a Google Colab [notebook](https://colab.research.google.com/drive/1tmSX0Lu6UFh58_-DBUVoyYx6BoXHOszP?usp=sharing).

```python
import os
import pandas as pd
import ray

from datafusion_ray import DatafusionRayContext
Expand All @@ -54,7 +63,7 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
ray.init(resources={"worker": 1})

# Create a context and register a table
ctx = DatafusionRayContext(2, use_ray_shuffle=True)
ctx = DatafusionRayContext(2)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")
Expand All @@ -75,10 +84,6 @@ for record_batch in result_set:
- Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
- Support for CSV and Parquet files

## Limitations

- Requires a shared file system currently. Check details [here](./docs/README.md#distributed-shuffle).

## Performance

This chart shows the performance of DataFusion Ray compared to Apache Spark for
Expand All @@ -97,14 +102,6 @@ Spark is much faster on some queries, likely due to broadcast exchanges, which D

![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png)

### Performance Plan

Plans on experimenting with the following changes to improve performance:

- Make better use of Ray futures to run more tasks in parallel
- Use Ray object store for shuffle data transfer to reduce disk I/O cost
- Keep upgrading to newer versions of DataFusion to pick up the latest optimizations

## Building

```bash
Expand Down
3 changes: 1 addition & 2 deletions tpch/tpcbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int):
sql = sql.strip()
if len(sql) > 0:
print(f"Executing: {sql}")
df = ctx.sql(sql)
rows = df.collect()
rows = ctx.sql(sql)

print(f"Query {query} returned {len(rows)} rows")
end_time = time.time()
Expand Down

0 comments on commit 3ecc136

Please sign in to comment.