Cross-platform distributed SQL query planner for Cloud Data powered by DuckDB
Multiple options have been considered for implementing the distributed query planner, including Apache Calcite and DataFusion (Cf. #7). Apache Calcite has a very rich API and an unparalleled track record, but requires a JVM, which would make integration very challenging. DataFusion has been used successfully for projects like dask-sql, but would require migrating from Node.js to Rust, as well as integrating a secondary query engine, which would dramatically increase the footprint of our Engine serverless function.
A much more attractive option is to use DuckDB itself, by exposing its relational algebra and some of its query planning routines through a public API. DuckDB's SQL parser | stringifier will soon be exposed through the SQL API (project funded by STOIC). If implemented properly, this API will provide an attractive cross-platform alternative to some components of Apache Calcite.
- Cross-platform (C, C++, Java, Node.js, Python, R, Rust, WASM)
- Straightforward integration with Substrait
- Aligned with constraints of serverless architecture
- Aligned with target SQL dialect and primary query engine
- Built-in query engine to lookup metadata related to remote tables
- Built-in query engine for dynamic cascaded replanning at the edges
- Lowest-possible latency through co-location of query handler, query planner, and query engine
- Zero additional dependencies
- In-browser deployment option
- Access to a rapidly-growing community of contributors
- Commercial support from DuckDBLabs through STOIC's sponsorship
The following techniques are being considered:
- Logical query optimization using DuckDB's optimizer
- Implementation of multi-relational algebra
- Domain Specific Language (DSL) for rule-based query optimization
- Rule scripting powered by TypeScript for dynamic rule injection and client-side + cloud-side execution
- Initial set of optimizer rules bootstrapped by porting Trino's rules from Java to DSL
- Automatic generation of optimizer rules using WeTune
- Dynamic injection of optimizer rules through standard SQL API
- Rule interpreter implemented in Rust
- Memoization for cost-based optimization
- Parallelization of query planning across multiple serverless functions
- Parallelization of metadata lookups through concurrent invocations of the query engine
- Dynamic cascaded replanning at the edges
Because PuffinDB's distributed query engine will be deployed across multiple tiers and might run across tens of thousands of serverless functions with reactive caching, its query planner will be quite sophisticated. There are three main ways to approach such a challenge:
- Manufally curating hundreds of query optimizer rules
- Using deep learning to automate the generation of such rules
- Using first-order logic (FOL) to automate the generation of such rules (à la WeTune)
We fundamentally believe that first-order logic is the best approach, for several reasons:
- It is cost-effective, fast, and sustainable (unlike manual curation)
- It is efficient (unlike deep learning)
- It leverages the fact that SQL is based on solid mathematical foundations (relational algebra)
Therefore, PuffinDB's distributed query planner will do as much as possible with FOL, then add manually-curated rules for specific cases.
PuffinDB's distributed query engine is mostly serverless (with the exception of the Monostore, for good reasons) and will be deployed across tens of thousands or even hundreds of thousands of serverless functions (e.g. AWS Lambda), with reactive caching. This demands a radical rethinking of distributed SQL query planning, for several reasons:
- The number of compute nodes is very large (most query planners are optimized for tens or hundreds of compute nodes at most)
- Compute nodes are heterogeneous (serverless functions, serverless containers, server-based containers, and web browsers)
- Serverless functions mandate a clear delineation between static partitioning (object store) and dynamic sharding (functions)
Therefore, the distributed query planner will need to answer three main questions:
- How is data partitioned on the Object Store and sharded across the reactive caching system?
- How should the query be distributed across computing and caching tiers?
- How should data be cached next to accelerate subsequent queries?
Large tables managed by the lakehouse (e.g. Apache Iceberg) are partitioned across multiple objects on the Object Store (e.g. Amazon S3). While serverless functions scan tables directly from the Object Store, the resulting data ends up being cached on the serverless functions, the Monostore, or clients. Therefore, it becomes critical to properly shard large tables across serverless functions, be they used in a stateful or stateless manner (with or without caching).
For performance reasons, three types of sharded tables must be supported:
- Distributed tables: one serverless function per partition or group of partitions
- Co-located tables: partitions of tables sharded across the same dimensions are co-located within the same serverless functions
- Replicated tables: small tables are replicated across all severless functions that might need them for joins
Note: implementing the last two sharding techniques can be avoided if multi-table datasets are statically denormalized in the lakehouse.
The distributed query planner will include a cost-based optimizer responsible for deciding which plan will deliver the best performance for a given query. Among other things, this cost-based optimizer will decide whether an existing sharding is sufficient for a given query, or whether tables involved in the query should be sharded another way. Mature cost-based optimizers are currently under review.
- Query translated from non-SQL dialect (e.g. Malloy, PRQL) to SQL
- Abstract syntax tree, relational tree, and logical query plan produced by DuckDB
- Non-distributed logical query plan optimized by DuckDB
- Non-distributed logical query plan further optimized by WeTune
- Set of Object Store partitions looked-up from lakehouse (using Iceberg Java API packaged as a serverless function)
- Set of cached partitions looked-up from Registry (powered by Redis)
- Distributed logical query plan generated by Query Planner
- Distributed physical query plan produced by assigning operations to serverless functions and containers
- Distributed physical query plan executed by distributed query engine
Note: #4 can be executed in parallel with #5 and #6. #4 and #7 might be executed in parallel across many serverless functions.
Many thanks to Jacques Nadeau and Andy Grove for their help in giving us a better understanding of Substrait's awesome goodness.