Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming parquet from lazy DF to S3 #748

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end

@impl true
def to_parquet(_df, %S3.Entry{}, _compression, _streaming = true) do
{:error, ArgumentError.exception("streaming is not supported for writes to AWS S3")}
def to_parquet(%DF{} = ldf, %S3.Entry{} = entry, {compression, level}, _streaming = true) do
case Native.lf_to_parquet_cloud(
ldf.data,
entry,
Shared.parquet_compression(compression, level)
) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_concat_rows(_dfs), do: err()
def lf_concat_columns(_df, _others), do: err()
def lf_to_parquet(_df, _filename, _compression, _streaming), do: err()
def lf_to_parquet_cloud(_df, _filename, _compression), do: err()
def lf_to_ipc(_df, _filename, _compression, _streaming), do: err()

# Series
Expand Down
2 changes: 1 addition & 1 deletion native/explorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ features = ["abs", "ewma", "cum_agg", "cov"]
[features]
default = ["ndjson", "cloud", "nif_version_2_15"]

cloud = ["object_store", "tokio", "tokio-util", "aws"]
cloud = ["object_store", "tokio", "tokio-util", "aws", "polars/cloud", "polars/cloud_write"]
ndjson = ["polars/json"]
aws = ["object_store/aws", "polars/async", "polars/aws"]

Expand Down
39 changes: 39 additions & 0 deletions native/explorer/src/lazyframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,45 @@ pub fn lf_to_parquet(
}
}

#[cfg(feature = "aws")]
#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_to_parquet_cloud(
data: ExLazyFrame,
ex_entry: ExS3Entry,
ex_compression: ExParquetCompression,
) -> Result<(), ExplorerError> {
let lf = data.clone_inner();
let cloud_options = Some(ex_entry.config.to_cloud_options());
let compression = ParquetCompression::try_from(ex_compression)?;

let options = ParquetWriteOptions {
compression,
statistics: false,
row_group_size: None,
data_pagesize_limit: None,
maintain_order: false,
};

lf.with_comm_subplan_elim(false).sink_parquet_cloud(
ex_entry.to_string(),
cloud_options,
options,
)?;
Ok(())
}

#[cfg(not(feature = "aws"))]
#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_to_parquet_cloud(
_data: ExLazyFrame,
_ex_entry: ExS3Entry,
_ex_compression: ExParquetCompression,
) -> Result<(), ExplorerError> {
Err(ExplorerError::Other("Explorer was compiled without the \"aws\" feature enabled. \
This is mostly due to this feature being incompatible with your computer's architecture. \
Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation".to_string()))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_from_ipc(filename: &str) -> Result<ExLazyFrame, ExplorerError> {
let lf = LazyFrame::scan_ipc(filename, Default::default())?;
Expand Down
1 change: 1 addition & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ rustler::init!(
lf_concat_rows,
lf_concat_columns,
lf_to_parquet,
lf_to_parquet_cloud,
lf_to_ipc,
// series
s_as_str,
Expand Down
8 changes: 6 additions & 2 deletions test/explorer/data_frame/lazy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ defmodule Explorer.DataFrame.LazyTest do
assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort()
end

@tag :cloud_integration
test "to_parquet/2 - cloud with streaming enabled", %{ldf: ldf} do
config = %FSS.S3.Config{
access_key_id: "test",
Expand All @@ -339,9 +340,12 @@ defmodule Explorer.DataFrame.LazyTest do
path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.parquet"

ldf = DF.head(ldf, 15)
assert {:error, error} = DF.to_parquet(ldf, path, streaming: true, config: config)
assert :ok = DF.to_parquet(ldf, path, streaming: true, config: config)

assert error == ArgumentError.exception("streaming is not supported for writes to AWS S3")
df = DF.collect(ldf)
df1 = DF.from_parquet!(path, config: config)

assert DF.to_rows(df) |> Enum.sort() == DF.to_rows(df1) |> Enum.sort()
end

@tag :cloud_integration
Expand Down
Loading