Skip to content

Commit

Permalink
Support streaming parquet from lazy DF to S3
Browse files Browse the repository at this point in the history
Main issue: #705
  • Loading branch information
philss committed Nov 29, 2023
1 parent 01b78a5 commit 95bc202
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 5 deletions.
11 changes: 9 additions & 2 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end

@impl true
def to_parquet(_df, %S3.Entry{}, _compression, _streaming = true) do
{:error, ArgumentError.exception("streaming is not supported for writes to AWS S3")}
def to_parquet(%DF{} = ldf, %S3.Entry{} = entry, {compression, level}, _streaming = true) do
case Native.lf_to_parquet_cloud(
ldf.data,
entry,
Shared.parquet_compression(compression, level)
) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_concat_rows(_dfs), do: err()
def lf_concat_columns(_df, _others), do: err()
def lf_to_parquet(_df, _filename, _compression, _streaming), do: err()
def lf_to_parquet_cloud(_df, _filename, _compression), do: err()
def lf_to_ipc(_df, _filename, _compression, _streaming), do: err()

# Series
Expand Down
2 changes: 1 addition & 1 deletion native/explorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ features = ["abs", "ewma", "cum_agg", "cov"]
[features]
default = ["ndjson", "cloud", "nif_version_2_15"]

cloud = ["object_store", "tokio", "tokio-util", "aws"]
cloud = ["object_store", "tokio", "tokio-util", "aws", "polars/cloud", "polars/cloud_write"]
ndjson = ["polars/json"]
aws = ["object_store/aws", "polars/async", "polars/aws"]

Expand Down
39 changes: 39 additions & 0 deletions native/explorer/src/lazyframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,45 @@ pub fn lf_to_parquet(
}
}

#[cfg(feature = "aws")]
#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_to_parquet_cloud(
data: ExLazyFrame,
ex_entry: ExS3Entry,
ex_compression: ExParquetCompression,
) -> Result<(), ExplorerError> {
let lf = data.clone_inner();
let cloud_options = Some(ex_entry.config.to_cloud_options());
let compression = ParquetCompression::try_from(ex_compression)?;

let options = ParquetWriteOptions {
compression,
statistics: false,
row_group_size: None,
data_pagesize_limit: None,
maintain_order: false,
};

lf.with_comm_subplan_elim(false).sink_parquet_cloud(
ex_entry.to_string(),
cloud_options,
options,
)?;
Ok(())
}

#[cfg(not(feature = "aws"))]
#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_to_parquet_cloud(
_data: ExLazyFrame,
_ex_entry: ExS3Entry,
_ex_compression: ExParquetCompression,
) -> Result<(), ExplorerError> {
Err(ExplorerError::Other("Explorer was compiled without the \"aws\" feature enabled. \
This is mostly due to this feature being incompatible with your computer's architecture. \
Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation".to_string()))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_from_ipc(filename: &str) -> Result<ExLazyFrame, ExplorerError> {
let lf = LazyFrame::scan_ipc(filename, Default::default())?;
Expand Down
1 change: 1 addition & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ rustler::init!(
lf_concat_rows,
lf_concat_columns,
lf_to_parquet,
lf_to_parquet_cloud,
lf_to_ipc,
// series
s_as_str,
Expand Down
8 changes: 6 additions & 2 deletions test/explorer/data_frame/lazy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ defmodule Explorer.DataFrame.LazyTest do
assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort()
end

@tag :cloud_integration
test "to_parquet/2 - cloud with streaming enabled", %{ldf: ldf} do
config = %FSS.S3.Config{
access_key_id: "test",
Expand All @@ -339,9 +340,12 @@ defmodule Explorer.DataFrame.LazyTest do
path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.parquet"

ldf = DF.head(ldf, 15)
assert {:error, error} = DF.to_parquet(ldf, path, streaming: true, config: config)
assert :ok = DF.to_parquet(ldf, path, streaming: true, config: config)

assert error == ArgumentError.exception("streaming is not supported for writes to AWS S3")
df = DF.collect(ldf)
df1 = DF.from_parquet!(path, config: config)

assert DF.to_rows(df) |> Enum.sort() == DF.to_rows(df1) |> Enum.sort()
end

@tag :cloud_integration
Expand Down

0 comments on commit 95bc202

Please sign in to comment.