From 83e54144b7f3b6d56e0b4635167dc8bf8af8614e Mon Sep 17 00:00:00 2001 From: Darek Date: Tue, 20 Feb 2024 09:11:15 -0500 Subject: [PATCH] Adding sinkCSV to ldf (#168) --- __tests__/lazyframe.test.ts | 55 ++++++++++++++++++++++++++++++ polars/lazy/dataframe.ts | 67 ++++++++++++++++++++++++++++++++++++- polars/types.ts | 20 ++++++++++- src/conversion.rs | 17 ++++++++++ src/lazy/dataframe.rs | 46 +++++++++++++++++++++++++ 5 files changed, 203 insertions(+), 2 deletions(-) diff --git a/__tests__/lazyframe.test.ts b/__tests__/lazyframe.test.ts index b00b6e513..4516c3005 100644 --- a/__tests__/lazyframe.test.ts +++ b/__tests__/lazyframe.test.ts @@ -1,4 +1,5 @@ import pl from "@polars"; +import fs from "fs"; describe("lazyframe", () => { test("columns", () => { @@ -1231,4 +1232,58 @@ describe("lazyframe", () => { .collectSync(); expect(actual).toFrameEqual(expected); }); + test("sinkCSV:path", async () => { + const ldf = pl + .DataFrame([ + pl.Series("foo", [1, 2, 3], pl.Int64), + pl.Series("bar", ["a", "b", "c"]), + ]) + .lazy(); + ldf.sinkCSV("./test.csv"); + const newDF: pl.DataFrame = pl.readCSV("./test.csv"); + const actualDf: pl.DataFrame = await ldf.collect(); + expect(newDF.sort("foo").toString()).toEqual(actualDf.toString()); + fs.rmSync("./test.csv"); + }); + test("sinkCSV:noHeader", async () => { + const ldf = pl + .DataFrame([ + pl.Series("column_1", [1, 2, 3], pl.Int64), + pl.Series("column_2", ["a", "b", "c"]), + ]) + .lazy(); + ldf.sinkCSV("./test.csv", { includeHeader: false }); + const newDF: pl.DataFrame = pl.readCSV("./test.csv", { hasHeader: false }); + const actualDf: pl.DataFrame = await ldf.collect(); + expect(newDF.sort("column_1").toString()).toEqual(actualDf.toString()); + fs.rmSync("./test.csv"); + }); + test("sinkCSV:separator", async () => { + const ldf = pl + .DataFrame([ + pl.Series("foo", [1, 2, 3], pl.Int64), + pl.Series("bar", ["a", "b", "c"]), + ]) + .lazy(); + ldf.sinkCSV("./test.csv", { separator: "|" }); + const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "|" }); + const actualDf: pl.DataFrame = await ldf.collect(); + expect(newDF.sort("foo").toString()).toEqual(actualDf.toString()); + fs.rmSync("./test.csv"); + }); + test("sinkCSV:nullValue", async () => { + const ldf = pl + .DataFrame([ + pl.Series("foo", [1, 2, 3], pl.Int64), + pl.Series("bar", ["a", "b", null]), + ]) + .lazy(); + ldf.sinkCSV("./test.csv", { nullValue: "BOOM" }); + const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "," }); + const actualDf: pl.DataFrame = await (await ldf.collect()).withColumn( + pl.col("bar").fillNull("BOOM"), + ); + expect(newDF.sort("foo").toString()).toEqual(actualDf.toString()); + fs.rmSync("./test.csv"); + }); }); diff --git a/polars/lazy/dataframe.ts b/polars/lazy/dataframe.ts index d6bfd8a0c..39ee4e6c9 100644 --- a/polars/lazy/dataframe.ts +++ b/polars/lazy/dataframe.ts @@ -11,7 +11,7 @@ import { } from "../utils"; import { _LazyGroupBy, LazyGroupBy } from "./groupby"; import { Deserialize, GroupByOps, Serialize } from "../shared_traits"; -import { LazyOptions, LazyJoinOptions } from "../types"; +import { LazyOptions, LazyJoinOptions, SinkCsvOptions } from "../types"; import { Series } from "../series"; const inspect = Symbol.for("nodejs.util.inspect.custom"); @@ -457,6 +457,63 @@ export interface LazyDataFrame extends Serialize, GroupByOps { * @see {@link DataFrame.withRowCount} */ withRowCount(); + /*** + * + * Evaluate the query in streaming mode and write to a CSV file. + + .. warning:: + Streaming mode is considered **unstable**. It may be changed + at any point without it being considered a breaking change. + + This allows streaming results that are larger than RAM to be written to disk. + + Parameters + ---------- + @param path - File path to which the file should be written. + @param includeBom - Whether to include UTF-8 BOM in the CSV output. + @param includeHeader - Whether to include header in the CSV output. + @param separator - Separate CSV fields with this symbol. + @param lineTerminator - String used to end each row. + @param quoteChar - Byte to use as quoting character. + @param batchSize - Number of rows that will be processed per thread. Default - 1024 + @param datetimeFormat - A format string, with the specifiers defined by the + `chrono `_ + Rust crate. If no format specified, the default fractional-second + precision is inferred from the maximum timeunit found in the frame's + Datetime cols (if any). + @param dateFormat - A format string, with the specifiers defined by the + `chrono `_ + Rust crate. + @param timeFormat A format string, with the specifiers defined by the + `chrono `_ + Rust crate. + @param floatPrecision - Number of decimal places to write, applied to both `Float32` and `Float64` datatypes. + @param nullValue - A string representing null values (defaulting to the empty string). + @param quoteStyle - Determines the quoting strategy used. : {'necessary', 'always', 'non_numeric', 'never'} + - necessary (default): This puts quotes around fields only when necessary. + They are necessary when fields contain a quote, + delimiter or record terminator. + Quotes are also necessary when writing an empty record + (which is indistinguishable from a record with one empty field). + This is the default. + - always: This puts quotes around every field. Always. + - never: This never puts quotes around fields, even if that results in + invalid CSV data (e.g.: by not quoting strings containing the + separator). + - non_numeric: This puts quotes around all fields that are non-numeric. + Namely, when writing a field that does not parse as a valid float + or integer, then quotes will be used even if they aren`t strictly + necessary. + @param maintainOrder - Maintain the order in which data is processed. + Setting this to `False` will be slightly faster. + + Examples + -------- + >>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv") + >>> lf.sinkCsv("out.csv") + */ + + sinkCSV(dest: string, options?: SinkCsvOptions): void; } const prepareGroupbyInputs = (by) => { @@ -899,6 +956,14 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => { withRowCount(name = "row_nr") { return _LazyDataFrame(_ldf.withRowCount(name)); }, + sinkCSV(dest?, options = {}) { + options.maintainOrder = options.maintainOrder ?? false; + if (typeof dest === "string") { + _ldf.sinkCsv(dest, options); + } else { + throw new TypeError("Expected a string destination"); + } + }, }; }; diff --git a/polars/types.ts b/polars/types.ts index a4711700c..6fd460077 100644 --- a/polars/types.ts +++ b/polars/types.ts @@ -46,7 +46,25 @@ export interface WriteCsvOptions { includeHeader?: boolean; sep?: string; } - +/** + * Options for {@link LazyDataFrame.sinkCSV} + * @category Options + */ +export interface SinkCsvOptions { + includeHeader?: boolean; + quote?: string; + includeBom?: boolean; + separator?: string; + lineTerminator?: string; + quoteChar?: string; + batchSize?: number; + datetimeFormat?: string; + dateFormat?: string; + timeFormat?: string; + floatPrecision?: number; + nullValue?: string; + maintainOrder?: boolean; +} /** * Options for {@link DataFrame.writeJSON} * @category Options diff --git a/src/conversion.rs b/src/conversion.rs index 51a36d8de..e620b8baf 100644 --- a/src/conversion.rs +++ b/src/conversion.rs @@ -569,6 +569,23 @@ pub struct WriteCsvOptions { pub quote: Option, } +#[napi(object)] +pub struct SinkCsvOptions { + pub include_header: Option, + pub quote: Option, + pub include_bom: Option, + pub separator: Option, + pub line_terminator: Option, + pub quote_char: Option, + pub batch_size: Option, + pub datetime_format: Option, + pub date_format: Option, + pub time_format: Option, + pub float_precision: Option, + pub null_value: Option, + pub maintain_order: bool, +} + #[napi(object)] pub struct Shape { pub height: i64, diff --git a/src/lazy/dataframe.rs b/src/lazy/dataframe.rs index af22cee17..abf87c0b7 100644 --- a/src/lazy/dataframe.rs +++ b/src/lazy/dataframe.rs @@ -8,6 +8,7 @@ use polars::prelude::{col, lit, ClosedWindow, CsvEncoding, DataFrame, Field, Joi use polars_io::cloud::CloudOptions; use polars_io::parquet::ParallelStrategy; use std::collections::HashMap; +use std::path::PathBuf; #[napi] #[repr(transparent)] @@ -544,6 +545,51 @@ impl JsLazyFrame { pub fn unnest(&self, colss: Vec) -> JsLazyFrame { self.ldf.clone().unnest(colss).into() } + + #[napi(catch_unwind)] + pub fn sink_csv(&self, path: String, options: SinkCsvOptions) -> napi::Result<()> { + let quote_style = QuoteStyle::default(); + let null_value = options + .null_value + .unwrap_or(SerializeOptions::default().null); + let float_precision: Option = options.float_precision.map(|fp| fp as usize); + let separator = options.separator.unwrap_or(",".to_owned()).as_bytes()[0]; + let line_terminator = options.line_terminator.unwrap_or("\n".to_string()); + let quote_char = options.quote_char.unwrap_or("\"".to_owned()).as_bytes()[0]; + let date_format = options.date_format; + let time_format = options.time_format; + let datetime_format = options.datetime_format; + + let serialize_options = SerializeOptions { + date_format, + time_format, + datetime_format, + float_precision, + separator, + quote_char, + null: null_value, + line_terminator, + quote_style, + }; + + let batch_size = options.batch_size.map(|bs| bs).unwrap_or(1024) as usize; + let include_bom = options.include_bom.unwrap_or(false); + let include_header = options.include_header.unwrap_or(true); + let maintain_order = options.maintain_order; + + let options = CsvWriterOptions { + include_bom, + include_header, + maintain_order, + batch_size, + serialize_options, + }; + + let path_buf: PathBuf = PathBuf::from(path); + let ldf = self.ldf.clone().with_comm_subplan_elim(false); + let _ = ldf.sink_csv(path_buf, options).map_err(JsPolarsErr::from); + Ok(()) + } } #[napi(object)]