Skip to content

Commit

Permalink
Adding sinkCSV to ldf (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bidek56 authored Feb 20, 2024
1 parent 98b5987 commit 83e5414
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 2 deletions.
55 changes: 55 additions & 0 deletions __tests__/lazyframe.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pl from "@polars";
import fs from "fs";

describe("lazyframe", () => {
test("columns", () => {
Expand Down Expand Up @@ -1231,4 +1232,58 @@ describe("lazyframe", () => {
.collectSync();
expect(actual).toFrameEqual(expected);
});
test("sinkCSV:path", async () => {
const ldf = pl
.DataFrame([
pl.Series("foo", [1, 2, 3], pl.Int64),
pl.Series("bar", ["a", "b", "c"]),
])
.lazy();
ldf.sinkCSV("./test.csv");
const newDF: pl.DataFrame = pl.readCSV("./test.csv");
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
fs.rmSync("./test.csv");
});
test("sinkCSV:noHeader", async () => {
const ldf = pl
.DataFrame([
pl.Series("column_1", [1, 2, 3], pl.Int64),
pl.Series("column_2", ["a", "b", "c"]),
])
.lazy();
ldf.sinkCSV("./test.csv", { includeHeader: false });
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { hasHeader: false });
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("column_1").toString()).toEqual(actualDf.toString());
fs.rmSync("./test.csv");
});
test("sinkCSV:separator", async () => {
const ldf = pl
.DataFrame([
pl.Series("foo", [1, 2, 3], pl.Int64),
pl.Series("bar", ["a", "b", "c"]),
])
.lazy();
ldf.sinkCSV("./test.csv", { separator: "|" });
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "|" });
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
fs.rmSync("./test.csv");
});
test("sinkCSV:nullValue", async () => {
const ldf = pl
.DataFrame([
pl.Series("foo", [1, 2, 3], pl.Int64),
pl.Series("bar", ["a", "b", null]),
])
.lazy();
ldf.sinkCSV("./test.csv", { nullValue: "BOOM" });
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "," });
const actualDf: pl.DataFrame = await (await ldf.collect()).withColumn(
pl.col("bar").fillNull("BOOM"),
);
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
fs.rmSync("./test.csv");
});
});
67 changes: 66 additions & 1 deletion polars/lazy/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from "../utils";
import { _LazyGroupBy, LazyGroupBy } from "./groupby";
import { Deserialize, GroupByOps, Serialize } from "../shared_traits";
import { LazyOptions, LazyJoinOptions } from "../types";
import { LazyOptions, LazyJoinOptions, SinkCsvOptions } from "../types";
import { Series } from "../series";

const inspect = Symbol.for("nodejs.util.inspect.custom");
Expand Down Expand Up @@ -457,6 +457,63 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
* @see {@link DataFrame.withRowCount}
*/
withRowCount();
/***
*
* Evaluate the query in streaming mode and write to a CSV file.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
This allows streaming results that are larger than RAM to be written to disk.
Parameters
----------
@param path - File path to which the file should be written.
@param includeBom - Whether to include UTF-8 BOM in the CSV output.
@param includeHeader - Whether to include header in the CSV output.
@param separator - Separate CSV fields with this symbol.
@param lineTerminator - String used to end each row.
@param quoteChar - Byte to use as quoting character.
@param batchSize - Number of rows that will be processed per thread. Default - 1024
@param datetimeFormat - A format string, with the specifiers defined by the
`chrono <https://docs.rs/chrono/latest/chrono/format/strftime/index.html>`_
Rust crate. If no format specified, the default fractional-second
precision is inferred from the maximum timeunit found in the frame's
Datetime cols (if any).
@param dateFormat - A format string, with the specifiers defined by the
`chrono <https://docs.rs/chrono/latest/chrono/format/strftime/index.html>`_
Rust crate.
@param timeFormat A format string, with the specifiers defined by the
`chrono <https://docs.rs/chrono/latest/chrono/format/strftime/index.html>`_
Rust crate.
@param floatPrecision - Number of decimal places to write, applied to both `Float32` and `Float64` datatypes.
@param nullValue - A string representing null values (defaulting to the empty string).
@param quoteStyle - Determines the quoting strategy used. : {'necessary', 'always', 'non_numeric', 'never'}
- necessary (default): This puts quotes around fields only when necessary.
They are necessary when fields contain a quote,
delimiter or record terminator.
Quotes are also necessary when writing an empty record
(which is indistinguishable from a record with one empty field).
This is the default.
- always: This puts quotes around every field. Always.
- never: This never puts quotes around fields, even if that results in
invalid CSV data (e.g.: by not quoting strings containing the
separator).
- non_numeric: This puts quotes around all fields that are non-numeric.
Namely, when writing a field that does not parse as a valid float
or integer, then quotes will be used even if they aren`t strictly
necessary.
@param maintainOrder - Maintain the order in which data is processed.
Setting this to `False` will be slightly faster.
Examples
--------
>>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv")
>>> lf.sinkCsv("out.csv")
*/

sinkCSV(dest: string, options?: SinkCsvOptions): void;
}

const prepareGroupbyInputs = (by) => {
Expand Down Expand Up @@ -899,6 +956,14 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
withRowCount(name = "row_nr") {
return _LazyDataFrame(_ldf.withRowCount(name));
},
sinkCSV(dest?, options = {}) {
options.maintainOrder = options.maintainOrder ?? false;
if (typeof dest === "string") {
_ldf.sinkCsv(dest, options);
} else {
throw new TypeError("Expected a string destination");
}
},
};
};

Expand Down
20 changes: 19 additions & 1 deletion polars/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,25 @@ export interface WriteCsvOptions {
includeHeader?: boolean;
sep?: string;
}

/**
* Options for {@link LazyDataFrame.sinkCSV}
* @category Options
*/
export interface SinkCsvOptions {
includeHeader?: boolean;
quote?: string;
includeBom?: boolean;
separator?: string;
lineTerminator?: string;
quoteChar?: string;
batchSize?: number;
datetimeFormat?: string;
dateFormat?: string;
timeFormat?: string;
floatPrecision?: number;
nullValue?: string;
maintainOrder?: boolean;
}
/**
* Options for {@link DataFrame.writeJSON}
* @category Options
Expand Down
17 changes: 17 additions & 0 deletions src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,23 @@ pub struct WriteCsvOptions {
pub quote: Option<String>,
}

#[napi(object)]
pub struct SinkCsvOptions {
pub include_header: Option<bool>,
pub quote: Option<String>,
pub include_bom: Option<bool>,
pub separator: Option<String>,
pub line_terminator: Option<String>,
pub quote_char: Option<String>,
pub batch_size: Option<i64>,
pub datetime_format: Option<String>,
pub date_format: Option<String>,
pub time_format: Option<String>,
pub float_precision: Option<i64>,
pub null_value: Option<String>,
pub maintain_order: bool,
}

#[napi(object)]
pub struct Shape {
pub height: i64,
Expand Down
46 changes: 46 additions & 0 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use polars::prelude::{col, lit, ClosedWindow, CsvEncoding, DataFrame, Field, Joi
use polars_io::cloud::CloudOptions;
use polars_io::parquet::ParallelStrategy;
use std::collections::HashMap;
use std::path::PathBuf;

#[napi]
#[repr(transparent)]
Expand Down Expand Up @@ -544,6 +545,51 @@ impl JsLazyFrame {
pub fn unnest(&self, colss: Vec<String>) -> JsLazyFrame {
self.ldf.clone().unnest(colss).into()
}

#[napi(catch_unwind)]
pub fn sink_csv(&self, path: String, options: SinkCsvOptions) -> napi::Result<()> {
let quote_style = QuoteStyle::default();
let null_value = options
.null_value
.unwrap_or(SerializeOptions::default().null);
let float_precision: Option<usize> = options.float_precision.map(|fp| fp as usize);
let separator = options.separator.unwrap_or(",".to_owned()).as_bytes()[0];
let line_terminator = options.line_terminator.unwrap_or("\n".to_string());
let quote_char = options.quote_char.unwrap_or("\"".to_owned()).as_bytes()[0];
let date_format = options.date_format;
let time_format = options.time_format;
let datetime_format = options.datetime_format;

let serialize_options = SerializeOptions {
date_format,
time_format,
datetime_format,
float_precision,
separator,
quote_char,
null: null_value,
line_terminator,
quote_style,
};

let batch_size = options.batch_size.map(|bs| bs).unwrap_or(1024) as usize;
let include_bom = options.include_bom.unwrap_or(false);
let include_header = options.include_header.unwrap_or(true);
let maintain_order = options.maintain_order;

let options = CsvWriterOptions {
include_bom,
include_header,
maintain_order,
batch_size,
serialize_options,
};

let path_buf: PathBuf = PathBuf::from(path);
let ldf = self.ldf.clone().with_comm_subplan_elim(false);
let _ = ldf.sink_csv(path_buf, options).map_err(JsPolarsErr::from);
Ok(())
}
}

#[napi(object)]
Expand Down

0 comments on commit 83e5414

Please sign in to comment.