From ea4ddc599ad4b77cba85e72f149e5c9b909e94e1 Mon Sep 17 00:00:00 2001 From: Darek Date: Tue, 27 Feb 2024 10:51:43 -0500 Subject: [PATCH] Adding ldf sinkParquet (#171) --- __tests__/lazyframe.test.ts | 34 ++++++++++++++++--- polars/lazy/dataframe.ts | 66 ++++++++++++++++++++++++++++++++----- polars/types.ts | 20 ++++++++++- src/conversion.rs | 59 +++++++++++++++++++++++++++++++++ src/lazy/dataframe.rs | 23 +++++++++++++ 5 files changed, 189 insertions(+), 13 deletions(-) diff --git a/__tests__/lazyframe.test.ts b/__tests__/lazyframe.test.ts index 4516c3005..2f53b3b0e 100644 --- a/__tests__/lazyframe.test.ts +++ b/__tests__/lazyframe.test.ts @@ -1242,7 +1242,7 @@ describe("lazyframe", () => { ldf.sinkCSV("./test.csv"); const newDF: pl.DataFrame = pl.readCSV("./test.csv"); const actualDf: pl.DataFrame = await ldf.collect(); - expect(newDF.sort("foo").toString()).toEqual(actualDf.toString()); + expect(newDF.sort("foo")).toFrameEqual(actualDf); fs.rmSync("./test.csv"); }); test("sinkCSV:noHeader", async () => { @@ -1255,7 +1255,7 @@ describe("lazyframe", () => { ldf.sinkCSV("./test.csv", { includeHeader: false }); const newDF: pl.DataFrame = pl.readCSV("./test.csv", { hasHeader: false }); const actualDf: pl.DataFrame = await ldf.collect(); - expect(newDF.sort("column_1").toString()).toEqual(actualDf.toString()); + expect(newDF.sort("column_1")).toFrameEqual(actualDf); fs.rmSync("./test.csv"); }); test("sinkCSV:separator", async () => { @@ -1268,7 +1268,7 @@ describe("lazyframe", () => { ldf.sinkCSV("./test.csv", { separator: "|" }); const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "|" }); const actualDf: pl.DataFrame = await ldf.collect(); - expect(newDF.sort("foo").toString()).toEqual(actualDf.toString()); + expect(newDF.sort("foo")).toFrameEqual(actualDf); fs.rmSync("./test.csv"); }); test("sinkCSV:nullValue", async () => { @@ -1283,7 +1283,33 @@ describe("lazyframe", () => { const actualDf: pl.DataFrame = await (await ldf.collect()).withColumn( pl.col("bar").fillNull("BOOM"), ); - expect(newDF.sort("foo").toString()).toEqual(actualDf.toString()); + expect(newDF.sort("foo")).toFrameEqual(actualDf); fs.rmSync("./test.csv"); }); + test("sinkParquet:path", async () => { + const ldf = pl + .DataFrame([ + pl.Series("foo", [1, 2, 3], pl.Int64), + pl.Series("bar", ["a", "b", "c"]), + ]) + .lazy(); + ldf.sinkParquet("./test.parquet"); + const newDF: pl.DataFrame = pl.readParquet("./test.parquet"); + const actualDf: pl.DataFrame = await ldf.collect(); + expect(newDF.sort("foo")).toFrameEqual(actualDf); + fs.rmSync("./test.parquet"); + }); + test("sinkParquet:compression:gzip", async () => { + const ldf = pl + .DataFrame([ + pl.Series("foo", [1, 2, 3], pl.Int64), + pl.Series("bar", ["a", "b", "c"]), + ]) + .lazy(); + ldf.sinkParquet("./test.parquet", { compression: "gzip" }); + const newDF: pl.DataFrame = pl.readParquet("./test.parquet"); + const actualDf: pl.DataFrame = await ldf.collect(); + expect(newDF.sort("foo")).toFrameEqual(actualDf); + fs.rmSync("./test.parquet"); + }); }); diff --git a/polars/lazy/dataframe.ts b/polars/lazy/dataframe.ts index 39ee4e6c9..bf4b692a9 100644 --- a/polars/lazy/dataframe.ts +++ b/polars/lazy/dataframe.ts @@ -11,7 +11,12 @@ import { } from "../utils"; import { _LazyGroupBy, LazyGroupBy } from "./groupby"; import { Deserialize, GroupByOps, Serialize } from "../shared_traits"; -import { LazyOptions, LazyJoinOptions, SinkCsvOptions } from "../types"; +import { + LazyOptions, + LazyJoinOptions, + SinkCsvOptions, + SinkParquetOptions, +} from "../types"; import { Series } from "../series"; const inspect = Symbol.for("nodejs.util.inspect.custom"); @@ -513,7 +518,52 @@ export interface LazyDataFrame extends Serialize, GroupByOps { >>> lf.sinkCsv("out.csv") */ - sinkCSV(dest: string, options?: SinkCsvOptions): void; + sinkCSV(path: string, options?: SinkCsvOptions): void; + + /*** + * + * Evaluate the query in streaming mode and write to a Parquet file. + + .. warning:: + Streaming mode is considered **unstable**. It may be changed + at any point without it being considered a breaking change. + + This allows streaming results that are larger than RAM to be written to disk. + + Parameters + ---------- + @param path - File path to which the file should be written. + @param compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} + Choose "zstd" for good compression performance. (default) + Choose "lz4" for fast compression/decompression. + Choose "snappy" for more backwards compatibility guarantees + when you deal with older parquet readers. + @param compressionLevel - The level of compression to use. Higher compression means smaller files on disk. + - "gzip" : min-level: 0, max-level: 10. + - "brotli" : min-level: 0, max-level: 11. + - "zstd" : min-level: 1, max-level: 22. + @param statistics - Write statistics to the parquet headers. This requires extra compute. Default - false + @param rowGroupSize - Size of the row groups in number of rows. + If None (default), the chunks of the `DataFrame` are + used. Writing in smaller chunks may reduce memory pressure and improve + writing speeds. + @param dataPagesizeLimit - Size limit of individual data pages. + If not set defaults to 1024 * 1024 bytes + @param maintainOrder - Maintain the order in which data is processed. Default -> true + Setting this to `False` will be slightly faster. + @param typeCoercion - Do type coercion optimization. Default -> true + @param predicatePushdown - Do predicate pushdown optimization. Default -> true + @param projectionPushdown - Do projection pushdown optimization. Default -> true + @param simplifyExpression - Run simplify expressions optimization. Default -> true + @param slicePushdown - Slice pushdown optimization. Default -> true + @param noOptimization - Turn off (certain) optimizations. Default -> false + + Examples + -------- + >>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP + >>> lf.sinkParquet("out.parquet") # doctest: +SKIP + */ + sinkParquet(path: string, options?: SinkParquetOptions): void; } const prepareGroupbyInputs = (by) => { @@ -956,13 +1006,13 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => { withRowCount(name = "row_nr") { return _LazyDataFrame(_ldf.withRowCount(name)); }, - sinkCSV(dest?, options = {}) { + sinkCSV(path, options: SinkCsvOptions = {}) { options.maintainOrder = options.maintainOrder ?? false; - if (typeof dest === "string") { - _ldf.sinkCsv(dest, options); - } else { - throw new TypeError("Expected a string destination"); - } + _ldf.sinkCsv(path, options); + }, + sinkParquet(path: string, options: SinkParquetOptions = {}) { + options.compression = options.compression ?? "zstd"; + _ldf.sinkParquet(path, options); }, }; }; diff --git a/polars/types.ts b/polars/types.ts index 6fd460077..89a25ca18 100644 --- a/polars/types.ts +++ b/polars/types.ts @@ -47,7 +47,7 @@ export interface WriteCsvOptions { sep?: string; } /** - * Options for {@link LazyDataFrame.sinkCSV} + * Options for @see {@link LazyDataFrame.sinkCSV} * @category Options */ export interface SinkCsvOptions { @@ -65,6 +65,24 @@ export interface SinkCsvOptions { nullValue?: string; maintainOrder?: boolean; } +/** + * Options for @see {@link LazyDataFrame.sinkParquet} + * @category Options + */ +export interface SinkParquetOptions { + compression?: string; + compressionLevel?: number; + statistics?: boolean; + rowGroupSize?: number; + dataPagesizeLimit?: number; + maintainOrder?: boolean; + typeCoercion?: boolean; + predicatePushdown?: boolean; + projectionPushdown?: boolean; + simplifyExpression?: boolean; + slicePushdown?: boolean; + noOptimization?: boolean; +} /** * Options for {@link DataFrame.writeJSON} * @category Options diff --git a/src/conversion.rs b/src/conversion.rs index e620b8baf..398a5500b 100644 --- a/src/conversion.rs +++ b/src/conversion.rs @@ -586,6 +586,22 @@ pub struct SinkCsvOptions { pub maintain_order: bool, } +#[napi(object)] +pub struct SinkParquetOptions { + pub compression: Option, + pub compression_level: Option, + pub statistics: Option, + pub row_group_size: Option, + pub data_pagesize_limit: Option, + pub maintain_order: Option, + pub type_coercion: Option, + pub predicate_pushdown: Option, + pub projection_pushdown: Option, + pub simplify_expression: Option, + pub slice_pushdown: Option, + pub no_optimization: Option, +} + #[napi(object)] pub struct Shape { pub height: i64, @@ -1197,3 +1213,46 @@ where { container.into_iter().map(|s| s.as_ref().into()).collect() } + +pub(crate) fn parse_parquet_compression( + compression: String, + compression_level: Option, +) -> JsResult { + let parsed = match compression.as_ref() { + "uncompressed" => ParquetCompression::Uncompressed, + "snappy" => ParquetCompression::Snappy, + "gzip" => ParquetCompression::Gzip( + compression_level + .map(|lvl| { + GzipLevel::try_new(lvl as u8) + // .map_err(|e| JsValueErr::new_err(format!("{e:?}"))) + .map_err(|e| napi::Error::from_reason(format!("{:?}", e))) + }) + .transpose()?, + ), + "lzo" => ParquetCompression::Lzo, + "brotli" => ParquetCompression::Brotli( + compression_level + .map(|lvl| { + BrotliLevel::try_new(lvl as u32) + .map_err(|e| napi::Error::from_reason(format!("{e:?}"))) + }) + .transpose()?, + ), + "lz4" => ParquetCompression::Lz4Raw, + "zstd" => ParquetCompression::Zstd( + compression_level + .map(|lvl| { + ZstdLevel::try_new(lvl) + .map_err(|e| napi::Error::from_reason(format!("{e:?}"))) + }) + .transpose()?, + ), + e => { + return Err(napi::Error::from_reason(format!( + "parquet `compression` must be one of {{'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'lz4', 'zstd'}}, got {e}", + ))) + } + }; + Ok(parsed) +} diff --git a/src/lazy/dataframe.rs b/src/lazy/dataframe.rs index abf87c0b7..2eca2f79a 100644 --- a/src/lazy/dataframe.rs +++ b/src/lazy/dataframe.rs @@ -590,6 +590,29 @@ impl JsLazyFrame { let _ = ldf.sink_csv(path_buf, options).map_err(JsPolarsErr::from); Ok(()) } + + #[napi(catch_unwind)] + pub fn sink_parquet(&self, path: String, options: SinkParquetOptions) -> napi::Result<()> { + let compression_str = options.compression.unwrap_or("zstd".to_string()); + let compression = parse_parquet_compression(compression_str, options.compression_level)?; + let statistics = options.statistics.unwrap_or(false); + let row_group_size = options.row_group_size.map(|i| i as usize); + let data_pagesize_limit = options.data_pagesize_limit.map(|i| i as usize); + let maintain_order = options.maintain_order.unwrap_or(true); + + let options = ParquetWriteOptions { + compression, + statistics, + row_group_size, + data_pagesize_limit, + maintain_order, + }; + + let path_buf: PathBuf = PathBuf::from(path); + let ldf = self.ldf.clone().with_comm_subplan_elim(false); + let _ = ldf.sink_parquet(path_buf, options).map_err(JsPolarsErr::from); + Ok(()) + } } #[napi(object)]