diff --git a/lib/explorer/polars_backend/lazy_frame.ex b/lib/explorer/polars_backend/lazy_frame.ex index a812a033c..9365ddc5b 100644 --- a/lib/explorer/polars_backend/lazy_frame.ex +++ b/lib/explorer/polars_backend/lazy_frame.ex @@ -301,8 +301,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do end @impl true - def to_parquet(_df, %S3.Entry{}, _compression, _streaming = true) do - {:error, ArgumentError.exception("streaming is not supported for writes to AWS S3")} + def to_parquet(%DF{} = ldf, %S3.Entry{} = entry, {compression, level}, _streaming = true) do + case Native.lf_to_parquet_cloud( + ldf.data, + entry, + Shared.parquet_compression(compression, level) + ) do + {:ok, _} -> :ok + {:error, error} -> {:error, RuntimeError.exception(error)} + end end @impl true diff --git a/lib/explorer/polars_backend/native.ex b/lib/explorer/polars_backend/native.ex index e51b6143d..5c4ed9340 100644 --- a/lib/explorer/polars_backend/native.ex +++ b/lib/explorer/polars_backend/native.ex @@ -241,6 +241,7 @@ defmodule Explorer.PolarsBackend.Native do def lf_concat_rows(_dfs), do: err() def lf_concat_columns(_df, _others), do: err() def lf_to_parquet(_df, _filename, _compression, _streaming), do: err() + def lf_to_parquet_cloud(_df, _filename, _compression), do: err() def lf_to_ipc(_df, _filename, _compression, _streaming), do: err() # Series diff --git a/native/explorer/Cargo.toml b/native/explorer/Cargo.toml index 429fd9dae..426beb45c 100644 --- a/native/explorer/Cargo.toml +++ b/native/explorer/Cargo.toml @@ -90,7 +90,7 @@ features = ["abs", "ewma", "cum_agg", "cov"] [features] default = ["ndjson", "cloud", "nif_version_2_15"] -cloud = ["object_store", "tokio", "tokio-util", "aws"] +cloud = ["object_store", "tokio", "tokio-util", "aws", "polars/cloud", "polars/cloud_write"] ndjson = ["polars/json"] aws = ["object_store/aws", "polars/async", "polars/aws"] diff --git a/native/explorer/src/lazyframe/io.rs b/native/explorer/src/lazyframe/io.rs index 223adc1c1..8a58b71ef 100644 --- a/native/explorer/src/lazyframe/io.rs +++ b/native/explorer/src/lazyframe/io.rs @@ -104,6 +104,45 @@ pub fn lf_to_parquet( } } +#[cfg(feature = "aws")] +#[rustler::nif(schedule = "DirtyIo")] +pub fn lf_to_parquet_cloud( + data: ExLazyFrame, + ex_entry: ExS3Entry, + ex_compression: ExParquetCompression, +) -> Result<(), ExplorerError> { + let lf = data.clone_inner(); + let cloud_options = Some(ex_entry.config.to_cloud_options()); + let compression = ParquetCompression::try_from(ex_compression)?; + + let options = ParquetWriteOptions { + compression, + statistics: false, + row_group_size: None, + data_pagesize_limit: None, + maintain_order: false, + }; + + lf.with_comm_subplan_elim(false).sink_parquet_cloud( + ex_entry.to_string(), + cloud_options, + options, + )?; + Ok(()) +} + +#[cfg(not(feature = "aws"))] +#[rustler::nif(schedule = "DirtyIo")] +pub fn lf_to_parquet_cloud( + _data: ExLazyFrame, + _ex_entry: ExS3Entry, + _ex_compression: ExParquetCompression, +) -> Result<(), ExplorerError> { + Err(ExplorerError::Other("Explorer was compiled without the \"aws\" feature enabled. \ + This is mostly due to this feature being incompatible with your computer's architecture. \ + Please read the section about precompilation in our README.md: https://github.com/elixir-explorer/explorer#precompilation".to_string())) +} + #[rustler::nif(schedule = "DirtyIo")] pub fn lf_from_ipc(filename: &str) -> Result { let lf = LazyFrame::scan_ipc(filename, Default::default())?; diff --git a/native/explorer/src/lib.rs b/native/explorer/src/lib.rs index 9d788f25f..dba61d9ef 100644 --- a/native/explorer/src/lib.rs +++ b/native/explorer/src/lib.rs @@ -299,6 +299,7 @@ rustler::init!( lf_concat_rows, lf_concat_columns, lf_to_parquet, + lf_to_parquet_cloud, lf_to_ipc, // series s_as_str, diff --git a/test/explorer/data_frame/lazy_test.exs b/test/explorer/data_frame/lazy_test.exs index 939283a25..7efbd86f6 100644 --- a/test/explorer/data_frame/lazy_test.exs +++ b/test/explorer/data_frame/lazy_test.exs @@ -328,6 +328,7 @@ defmodule Explorer.DataFrame.LazyTest do assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort() end + @tag :cloud_integration test "to_parquet/2 - cloud with streaming enabled", %{ldf: ldf} do config = %FSS.S3.Config{ access_key_id: "test", @@ -339,9 +340,12 @@ defmodule Explorer.DataFrame.LazyTest do path = "s3://test-bucket/test-lazy-writes/wine-#{System.monotonic_time()}.parquet" ldf = DF.head(ldf, 15) - assert {:error, error} = DF.to_parquet(ldf, path, streaming: true, config: config) + assert :ok = DF.to_parquet(ldf, path, streaming: true, config: config) - assert error == ArgumentError.exception("streaming is not supported for writes to AWS S3") + df = DF.collect(ldf) + df1 = DF.from_parquet!(path, config: config) + + assert DF.to_rows(df) |> Enum.sort() == DF.to_rows(df1) |> Enum.sort() end @tag :cloud_integration