From 3ecc1365dca7bd6f069c42b868ddc32467815ad3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 6 Oct 2024 09:53:37 -0600 Subject: [PATCH] tpch now works --- README.md | 49 +++++++++++++++++++++++------------------------- tpch/tpcbench.py | 3 +-- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 33d8ab8..8f5f047 100644 --- a/README.md +++ b/README.md @@ -19,23 +19,33 @@ # DataFusion on Ray -> This was originally a research project donated from [ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing distributed SQL queries from Python, using -> [Ray](https://www.ray.io/) and [DataFusion](https://github.com/apache/arrow-datafusion). +> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from +> Python, using [Ray] and [Apache DataFusion] -DataFusion Ray is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow](https://arrow.apache.org/), [Apache DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/). +[ray-sql]: https://github.com/datafusion-contrib/ray-sql -## Goals +DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation +of [Apache Arrow], [Apache DataFusion], and [Ray]. -- Demonstrate how easily new systems can be built on top of DataFusion. See the [design documentation](./docs/README.md) - to understand how RaySQL works. -- Drive requirements for DataFusion's [Python bindings](https://github.com/apache/arrow-datafusion-python). -- Create content for an interesting blog post or conference talk. +[Ray]: https://www.ray.io/ +[Apache Arrow]: https://arrow.apache.org/ +[Apache DataFusion]: https://datafusion.apache.org/ -## Non Goals +## Comparison to other DataFusion projects -- Re-build the cluster scheduling systems like what [Ballista](https://datafusion.apache.org/ballista/) did. - - Ballista is extremely complex and utilizing Ray feels like it abstracts some of that complexity away. - - Datafusion Ray is delegating cluster management to Ray. +### Comparison to DataFusion Ballista + +- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on + Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project. +- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first + +[DataFusion Ballista]: https://github.com/apache/datafusion-ballista + +### Comparison to DataFusion Python + +- [DataFusion Python] provides + +[DataFusion Python]: https://github.com/apache/datafusion-python ## Example @@ -43,7 +53,6 @@ Run the following example live in your browser using a Google Colab [notebook](h ```python import os -import pandas as pd import ray from datafusion_ray import DatafusionRayContext @@ -54,7 +63,7 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) ray.init(resources={"worker": 1}) # Create a context and register a table -ctx = DatafusionRayContext(2, use_ray_shuffle=True) +ctx = DatafusionRayContext(2) # Register either a CSV or Parquet file # ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True) ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet") @@ -75,10 +84,6 @@ for record_batch in result_set: - Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion - Support for CSV and Parquet files -## Limitations - -- Requires a shared file system currently. Check details [here](./docs/README.md#distributed-shuffle). - ## Performance This chart shows the performance of DataFusion Ray compared to Apache Spark for @@ -97,14 +102,6 @@ Spark is much faster on some queries, likely due to broadcast exchanges, which D ![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png) -### Performance Plan - -Plans on experimenting with the following changes to improve performance: - -- Make better use of Ray futures to run more tasks in parallel -- Use Ray object store for shuffle data transfer to reduce disk I/O cost -- Keep upgrading to newer versions of DataFusion to pick up the latest optimizations - ## Building ```bash diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py index eb8576b..70adc17 100644 --- a/tpch/tpcbench.py +++ b/tpch/tpcbench.py @@ -70,8 +70,7 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int): sql = sql.strip() if len(sql) > 0: print(f"Executing: {sql}") - df = ctx.sql(sql) - rows = df.collect() + rows = ctx.sql(sql) print(f"Query {query} returned {len(rows)} rows") end_time = time.time()