Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Datafusion Ray architecture docs #27

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
under the License.
-->

# RaySQL Design Documentation
# DataFusion Ray Design Documentation

RaySQL is a distributed SQL query engine that is powered by DataFusion.
DataFusion Ray is a distributed SQL query engine that is powered by DataFusion and Ray.

DataFusion provides a high-performance query engine that is already partition-aware, with partitions being executed
in parallel in separate threads. RaySQL provides a distributed query planner that translates a DataFusion physical
in parallel in separate threads. DataFusion Ray provides a distributed query planner that translates a DataFusion physical
plan into a distributed plan.

Let's walk through an example to see how that works. We'll use [SQLBench-H](https://github.com/sql-benchmarks/sqlbench-h)
Expand Down Expand Up @@ -83,9 +83,6 @@ DataFusion's physical plan lists all the files to be queried, and they are organ
parallel execution within a single process. In this example, the level of concurrency was configured to be four, so
we see `partitions={4 groups: [[ ... ]]` in the leaf `ParquetExec` nodes, with the filenames listed in four groups.

_DataFusion will soon support parallel execution for single Parquet files but for now the parallelism is based on
splitting the available files into separate groups, so RaySQL will not yet scale well for single-file inputs._

Here is the full physical plan for query 3.

```text
Expand Down Expand Up @@ -123,7 +120,7 @@ GlobalLimitExec: skip=0, fetch=10
## Partitioning & Distribution

The partitioning scheme changes throughout the plan and this is the most important concept to
understand in order to understand RaySQL's design. Changes in partitioning are implemented by the `RepartitionExec`
understand in order to understand DataFusion Ray's design. Changes in partitioning are implemented by the `RepartitionExec`
operator in DataFusion and are happen in the following scenarios.

### Joins
Expand Down Expand Up @@ -155,7 +152,7 @@ Sort also has multiple approaches.
- The input partitions can be collapsed down to a single partition and then sorted
- Partitions can be sorted in parallel and then merged using a sort-preserving merge

DataFusion and RaySQL currently the first approach, but there is a DataFusion PR open for implementing the second.
DataFusion and DataFusion Ray currently choose the first approach, but there is a DataFusion PR open for implementing the second.

### Limit

Expand Down Expand Up @@ -260,13 +257,12 @@ child plans, building up a DAG of futures.

## Distributed Shuffle

The output of each query stage needs to be persisted somewhere so that the next query stage can read it. Currently,
RaySQL is just writing the output to disk in Arrow IPC format, and this means that RaySQL is not truly distributed
yet because it requires a shared file system. It would be better to use the Ray object store instead, as
proposed [here](https://github.com/datafusion-contrib/ray-sql/issues/22).
The output of each query stage needs to be persisted somewhere so that the next query stage can read it.

DataFusion Ray uses the Ray object store as a shared file system, which was proposed [here](https://github.com/datafusion-contrib/ray-sql/issues/22) and implemented [here](https://github.com/datafusion-contrib/ray-sql/pull/33).

DataFusion's `RepartitionExec` uses threads and channels within a single process and is not suitable for a
distributed query engine, so RaySQL rewrites the physical plan and replaces the `RepartionExec` with a pair of
distributed query engine, so DataFusion Ray rewrites the physical plan and replaces the `RepartionExec` with a pair of
operators to perform a "shuffle". These are the `ShuffleWriterExec` and `ShuffleReaderExec`.

### Shuffle Writes
Expand Down
Loading