Skip to content

Commit

Permalink
feat(python): Enable collection with gpu engine
Browse files Browse the repository at this point in the history
Introduce option to collect a query using the cudf_polars gpu engine.
By default this falls back transparently to the default cpu engine if
the query cannot be executed. Configuration of specifics of the GPU
engine can be controlled by passing a GPUEngine object to the new
`engine` argument to `collect`.

The import of cudf_polars is currently quite expensive (will improve
in the coming months), so since we lazy-load the gpu engine, the first
query executed on gpu will pay this (one-time) cost.
  • Loading branch information
wence- committed Jul 22, 2024
1 parent 2eb1ac2 commit 7d9ee13
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 5 deletions.
4 changes: 3 additions & 1 deletion py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@
scan_parquet,
scan_pyarrow_dataset,
)
from polars.lazyframe import LazyFrame
from polars.lazyframe import GPUEngine, LazyFrame
from polars.meta import (
build_info,
get_index_type,
Expand Down Expand Up @@ -206,6 +206,8 @@
"Expr",
"LazyFrame",
"Series",
# Engine configuration
"GPUEngine",
# schema
"Schema",
# datatypes
Expand Down
4 changes: 4 additions & 0 deletions py-polars/polars/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from polars.dependencies import numpy as np
from polars.dependencies import pandas as pd
from polars.dependencies import pyarrow as pa
from polars.lazyframe.engine_config import GPUEngine
from polars.selectors import _selector_proxy_

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -276,3 +277,6 @@ def fetchmany(self, *args: Any, **kwargs: Any) -> Any:
]
SingleColSelector: TypeAlias = Union[SingleIndexSelector, SingleNameSelector]
MultiColSelector: TypeAlias = Union[MultiIndexSelector, MultiNameSelector, BooleanMask]

# LazyFrame engine selection
EngineType: TypeAlias = Union[Literal["cpu", "gpu"], "GPUEngine"]
4 changes: 4 additions & 0 deletions py-polars/polars/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from types import ModuleType
from typing import TYPE_CHECKING, Any, ClassVar, Hashable, cast

_CUDF_POLARS_AVAILABLE = True
_DELTALAKE_AVAILABLE = True
_FSSPEC_AVAILABLE = True
_GEVENT_AVAILABLE = True
Expand Down Expand Up @@ -150,6 +151,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
import pickle
import subprocess

import cudf_polars
import deltalake
import fsspec
import gevent
Expand All @@ -175,6 +177,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
subprocess, _ = _lazy_import("subprocess")

# heavy/optional third party libs
cudf_polars, _CUDF_POLARS_AVAILABLE = _lazy_import("cudf_polars")
deltalake, _DELTALAKE_AVAILABLE = _lazy_import("deltalake")
fsspec, _FSSPEC_AVAILABLE = _lazy_import("fsspec")
great_tables, _GREAT_TABLES_AVAILABLE = _lazy_import("great_tables")
Expand Down Expand Up @@ -301,6 +304,7 @@ def import_optional(
"pickle",
"subprocess",
# lazy-load third party libs
"cudf_polars",
"deltalake",
"fsspec",
"gevent",
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/lazyframe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from polars.lazyframe.engine_config import GPUEngine
from polars.lazyframe.frame import LazyFrame

__all__ = [
"GPUEngine",
"LazyFrame",
]
41 changes: 41 additions & 0 deletions py-polars/polars/lazyframe/engine_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Mapping

from rmm.mr import DeviceMemoryResource


class GPUEngine:
"""
Configuration options for the GPU execution engine.
Use this if you want control over details of the execution.
Supported options
- `device`: Select the device to run the query on.
- `memory_resource`: Set an RMM memory resource for
device-side allocations.
"""

device: int | None
"""Device on which to run query."""
memory_resource: DeviceMemoryResource | None
"""Memory resource to use for device allocations."""
config: Mapping[str, Any]
"""Additional configuration options for the engine."""

def __init__(
self,
*,
device: int | None = None,
memory_resource: Any | None = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.device = device
self.memory_resource = memory_resource
self.config = kwargs
55 changes: 51 additions & 4 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import contextlib
import os
from datetime import date, datetime, time, timedelta
from functools import lru_cache, reduce
from functools import lru_cache, partial, reduce
from io import BytesIO, StringIO
from operator import and_
from pathlib import Path
Expand Down Expand Up @@ -78,6 +78,7 @@
from polars.datatypes.group import DataTypeGroup
from polars.dependencies import import_optional, subprocess
from polars.exceptions import PerformanceWarning
from polars.lazyframe.engine_config import GPUEngine
from polars.lazyframe.group_by import LazyGroupBy
from polars.lazyframe.in_process import InProcessQuery
from polars.schema import Schema
Expand All @@ -99,6 +100,7 @@
ClosedInterval,
ColumnNameOrSelector,
CsvQuoteStyle,
EngineType,
ExplainFormat,
FillNullStrategy,
FrameInitTypes,
Expand Down Expand Up @@ -1771,6 +1773,7 @@ def collect(
cluster_with_columns: bool = True,
no_optimization: bool = False,
streaming: bool = False,
engine: EngineType = "cpu",
background: Literal[True],
_eager: bool = False,
) -> InProcessQuery: ...
Expand All @@ -1789,6 +1792,7 @@ def collect(
cluster_with_columns: bool = True,
no_optimization: bool = False,
streaming: bool = False,
engine: EngineType = "cpu",
background: Literal[False] = False,
_eager: bool = False,
) -> DataFrame: ...
Expand All @@ -1806,6 +1810,7 @@ def collect(
cluster_with_columns: bool = True,
no_optimization: bool = False,
streaming: bool = False,
engine: EngineType = "cpu",
background: bool = False,
_eager: bool = False,
**_kwargs: Any,
Expand Down Expand Up @@ -1848,6 +1853,24 @@ def collect(
.. note::
Use :func:`explain` to see if Polars can process the query in streaming
mode.
engine
Select the engine used to process the query, optional.
If set to `"cpu"` (default), the query is run using the
polars CPU engine. If set to `"gpu"`, the GPU engine is
used. Fine-grained control over the GPU engine, for
example which device to use on a system with multiple
devices, is possible by providing a `GPUEngine` object
with configuration options.
.. note::
GPU mode is considered **unstable**. Not all queries will run
successfully on the GPU, however, they should fall back transparently
to the default engine if execution is not supported.
.. note::
The GPU engine does not support streaming, or running in the
background. If either are enabled, then gpu execution is switched off.
background
Run the query in the background and get a handle to the query.
This handle can be used to fetch the result or cancel the query.
Expand Down Expand Up @@ -1918,6 +1941,15 @@ def collect(
if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")

is_gpu = (is_config_obj := isinstance(engine, GPUEngine)) or engine == "gpu"
if (streaming or background or new_streaming) and is_gpu:
issue_warning(
"GPU engine does not support streaming or background collection, "
"disabling GPU engine.",
category=UserWarning,
)
engine = "cpu"

ldf = self._ldf.optimization_toggle(
type_coercion,
predicate_pushdown,
Expand All @@ -1936,9 +1968,24 @@ def collect(
issue_unstable_warning("Background mode is considered unstable.")
return InProcessQuery(ldf.collect_concurrently())

# Only for testing purposes atm.
callback = _kwargs.get("post_opt_callback")

if _eager:
# Don't run on GPU in _eager mode (but don't warn)
engine = "cpu"
callback = None
if is_gpu:
cudf_polars = import_optional(
"cudf_polars",
err_prefix="GPU engine requested, but required package",
install_message=(
"Please install using the command `pip install cudf-polars-cu12` "
"(or `pip install cudf-polars-cu11` if your system has a CUDA-11 driver."
),
)
if not is_config_obj:
engine = GPUEngine()
callback = partial(cudf_polars.execute_with_cudf, config=engine)
# Only for testing purposes
callback = _kwargs.get("post_opt_callback", callback)
return wrap_df(ldf.collect(callback))

@overload
Expand Down

0 comments on commit 7d9ee13

Please sign in to comment.