From eae13e874fcd65bbb0cebe15e4d6813514b45b72 Mon Sep 17 00:00:00 2001 From: Brian Brown Date: Fri, 16 Aug 2024 00:02:06 -0400 Subject: [PATCH 1/2] fix: update pivot expr on to first param (#252) The pivot function now has `on` as the first param instead of `index`, as noted in the [python 1.0 upgrade documentation](https://docs.pola.rs/releases/upgrade/1/#update-resulting-column-names-in-pivot-when-pivoting-by-multiple-values): > columns has been renamed to on, and is now the first positional argument. image Added some test coverage from examples in docs to capture this. Existing test coverage was using the same column as `index` and `on`, resulting in this not being caught. --- __tests__/dataframe.test.ts | 43 +++++++++++++++++++++++++++++++++++++ polars/dataframe.ts | 2 +- src/dataframe.rs | 6 +++--- 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/__tests__/dataframe.test.ts b/__tests__/dataframe.test.ts index f5a41d6a7..5934c84a0 100644 --- a/__tests__/dataframe.test.ts +++ b/__tests__/dataframe.test.ts @@ -1354,6 +1354,49 @@ describe("dataframe", () => { "e|b": [null, "y"], }); expect(actual).toFrameEqual(expected, true); + + df = pl.DataFrame({ + foo: ["A", "A", "B", "B", "C"], + N: [1, 2, 2, 4, 2], + bar: ["k", "l", "m", "n", "o"], + }); + actual = df.pivot(["N"], { + index: "foo", + on: "bar", + aggregateFunc: "first", + }); + expected = pl.DataFrame({ + foo: ["A", "B", "C"], + k: [1, null, null], + l: [2, null, null], + m: [null, 2, null], + n: [null, 4, null], + o: [null, null, 2], + }); + expect(actual).toFrameEqual(expected, true); + + df = pl.DataFrame({ + ix: [1, 1, 2, 2, 1, 2], + col: ["a", "a", "a", "a", "b", "b"], + foo: [0, 1, 2, 2, 7, 1], + bar: [0, 2, 0, 0, 9, 4], + }); + + actual = df.pivot(["foo", "bar"], { + index: "ix", + on: "col", + aggregateFunc: "sum", + separator: "/", + }); + + expected = pl.DataFrame({ + ix: [1, 2], + "foo/a": [1, 4], + "foo/b": [7, 1], + "bar/a": [2, 0], + "bar/b": [9, 4], + }); + expect(actual).toFrameEqual(expected, true); }); }); describe("join", () => { diff --git a/polars/dataframe.ts b/polars/dataframe.ts index 615fec9d9..e2b85dbb6 100644 --- a/polars/dataframe.ts +++ b/polars/dataframe.ts @@ -2237,8 +2237,8 @@ export const _DataFrame = (_df: any): DataFrame => { return _DataFrame( _df.pivotExpr( values, - index, on, + index, fn, maintainOrder, sortColumns, diff --git a/src/dataframe.rs b/src/dataframe.rs index 6aecb6f6d..f68650468 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -895,8 +895,8 @@ impl JsDataFrame { pub fn pivot_expr( &self, values: Vec, + on: Vec, index: Vec, - columns: Vec, aggregate_expr: Option>, maintain_order: bool, sort_columns: bool, @@ -908,8 +908,8 @@ impl JsDataFrame { }; fun( &self.df, - index, - Some(columns), + on, + Some(index), Some(values), sort_columns, aggregate_expr.map(|e| e.0 as Expr), From b24328863f0743de851e722c92a2a29d386d2ddb Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Fri, 16 Aug 2024 14:02:23 +1000 Subject: [PATCH 2/2] Replicate py-polars API surface for streaming IPC formats (#249) TLDR: Solves https://github.com/pola-rs/nodejs-polars/issues/109 More or less the IPC Stream methods are straight copies of the IPC File (Feather) ones, swapping out the IpcReader, IpcWriter for their streaming equivalents; the API should be identical to py-polars (with the exception of file-like objects as input for read_ipc, read_ipc_stream - not much point adding that until streaming IO is exposed upstream). I've left the docstrings basically untouched, let me know if you want those tweaked (the `@param` s appear to have drifted over time). --- Cargo.toml | 1 + __tests__/io.test.ts | 36 +++++++++++++++++++++++ polars/dataframe.ts | 40 ++++++++++++++++++++++--- polars/index.ts | 2 ++ polars/io.ts | 34 ++++++++++++++++++++-- src/dataframe.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 176 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 750b1edc6..021e8087f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ features = [ "parquet", "to_dummies", "ipc", + "ipc_streaming", "avro", "list_eval", "arg_where", diff --git a/__tests__/io.test.ts b/__tests__/io.test.ts index 29fd8b4bf..7fdc9b0a1 100644 --- a/__tests__/io.test.ts +++ b/__tests__/io.test.ts @@ -334,6 +334,42 @@ describe("ipc", () => { expect(ipcDF).toFrameEqual(csvDF); }); }); +describe("ipc stream", () => { + beforeEach(() => { + pl.readCSV(csvpath).writeIPCStream(ipcpath); + }); + afterEach(() => { + fs.rmSync(ipcpath); + }); + + test("read", () => { + const df = pl.readIPCStream(ipcpath); + expect(df.shape).toEqual({ height: 27, width: 4 }); + }); + test("read/write:buffer", () => { + const buff = pl.readCSV(csvpath).writeIPCStream(); + const df = pl.readIPCStream(buff); + expect(df.shape).toEqual({ height: 27, width: 4 }); + }); + test("read:compressed", () => { + const csvDF = pl.readCSV(csvpath); + csvDF.writeIPCStream(ipcpath, { compression: "lz4" }); + const ipcDF = pl.readIPCStream(ipcpath); + expect(ipcDF).toFrameEqual(csvDF); + }); + + test("read:options", () => { + const df = pl.readIPCStream(ipcpath, { nRows: 4 }); + expect(df.shape).toEqual({ height: 4, width: 4 }); + }); + + test("writeIPCStream", () => { + const csvDF = pl.readCSV(csvpath); + csvDF.writeIPCStream(ipcpath); + const ipcDF = pl.readIPCStream(ipcpath); + expect(ipcDF).toFrameEqual(csvDF); + }); +}); describe("avro", () => { beforeEach(() => { diff --git a/polars/dataframe.ts b/polars/dataframe.ts index e2b85dbb6..1ba053bac 100644 --- a/polars/dataframe.ts +++ b/polars/dataframe.ts @@ -141,17 +141,29 @@ interface WriteMethods { options?: { format: "lines" | "json" }, ): void; /** - * Write to Arrow IPC binary stream, or a feather file. - * @param file File path to which the file should be written. + * Write to Arrow IPC feather file, either to a file path or to a write stream. + * @param destination File path to which the file should be written, or writable. * @param options.compression Compression method *defaults to "uncompressed"* * @category IO */ writeIPC(options?: WriteIPCOptions): Buffer; writeIPC(destination: string | Writable, options?: WriteIPCOptions): void; + /** + * Write to Arrow IPC stream file, either to a file path or to a write stream. + * @param destination File path to which the file should be written, or writable. + * @param options.compression Compression method *defaults to "uncompressed"* + * @category IO + */ + writeIPCStream(options?: WriteIPCOptions): Buffer; + writeIPCStream( + destination: string | Writable, + options?: WriteIPCOptions, + ): void; + /** * Write the DataFrame disk in parquet format. - * @param file File path to which the file should be written. + * @param destination File path to which the file should be written, or writable. * @param options.compression Compression method *defaults to "uncompressed"* * @category IO */ @@ -163,7 +175,7 @@ interface WriteMethods { /** * Write the DataFrame disk in avro format. - * @param file File path to which the file should be written. + * @param destination File path to which the file should be written, or writable. * @param options.compression Compression method *defaults to "uncompressed"* * @category IO */ @@ -2511,6 +2523,26 @@ export const _DataFrame = (_df: any): DataFrame => { return Buffer.concat(buffers); }, + writeIPCStream(dest?, options = { compression: "uncompressed" }) { + if (dest instanceof Writable || typeof dest === "string") { + return _df.writeIpcStream(dest, options.compression) as any; + } + const buffers: Buffer[] = []; + const writeStream = new Stream.Writable({ + write(chunk, _encoding, callback) { + buffers.push(chunk); + callback(null); + }, + }); + + _df.writeIpcStream( + writeStream, + dest?.compression ?? options?.compression, + ); + writeStream.end(""); + + return Buffer.concat(buffers); + }, toSeries: (index = 0) => _Series(_df.selectAtIdx(index) as any) as any, toStruct(name) { return _Series(_df.toStruct(name)); diff --git a/polars/index.ts b/polars/index.ts index 31090053b..a0a6af301 100644 --- a/polars/index.ts +++ b/polars/index.ts @@ -44,6 +44,7 @@ export namespace pl { export import readRecords = io.readRecords; export import readCSV = io.readCSV; export import readIPC = io.readIPC; + export import readIPCStream = io.readIPCStream; export import readJSON = io.readJSON; export import readParquet = io.readParquet; export import readAvro = io.readAvro; @@ -188,6 +189,7 @@ export import scanParquet = io.scanParquet; export import readRecords = io.readRecords; export import readCSV = io.readCSV; export import readIPC = io.readIPC; +export import readIPCStream = io.readIPCStream; export import readJSON = io.readJSON; export import readParquet = io.readParquet; export import readAvro = io.readAvro; diff --git a/polars/io.ts b/polars/io.ts index d90f79b42..f25af201c 100644 --- a/polars/io.ts +++ b/polars/io.ts @@ -531,7 +531,7 @@ export interface ReadIPCOptions { } /** - * __Read into a DataFrame from Arrow IPC (Feather v2) file.__ + * __Read into a DataFrame from Arrow IPC file (Feather v2).__ * ___ * @param pathOrBody - path or buffer or string * - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`. @@ -558,6 +558,36 @@ export function readIPC(pathOrBody, options = {}) { throw new Error("must supply either a path or body"); } +/** + * __Read into a DataFrame from Arrow IPC stream.__ + * ___ + * @param pathOrBody - path or buffer or string + * - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`. + * - body: String or buffer to be read as Arrow IPC + * @param options.columns Columns to select. Accepts a list of column names. + * @param options.nRows Stop reading from parquet file after reading ``nRows``. + */ +export function readIPCStream( + pathOrBody: string | Buffer, + options?: Partial, +): DataFrame; +export function readIPCStream(pathOrBody, options = {}) { + if (Buffer.isBuffer(pathOrBody)) { + return _DataFrame(pli.readIpcStream(pathOrBody, options)); + } + + if (typeof pathOrBody === "string") { + const inline = !isPath(pathOrBody, [".ipc"]); + if (inline) { + return _DataFrame( + pli.readIpcStream(Buffer.from(pathOrBody, "utf-8"), options), + ); + } + return _DataFrame(pli.readIpcStream(pathOrBody, options)); + } + throw new Error("must supply either a path or body"); +} + export interface ScanIPCOptions { nRows: number; cache: boolean; @@ -565,7 +595,7 @@ export interface ScanIPCOptions { } /** - * __Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns.__ + * __Lazily read from an Arrow IPC file (Feather v2) or multiple files via glob patterns.__ * ___ * @param path Path to a IPC file. * @param options.nRows Stop reading from IPC file after reading ``nRows`` diff --git a/src/dataframe.rs b/src/dataframe.rs index f68650468..1472c258b 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -349,6 +349,43 @@ pub fn read_ipc( Ok(JsDataFrame::new(df)) } +#[napi(catch_unwind)] +pub fn read_ipc_stream( + path_or_buffer: Either, + options: ReadIpcOptions, +) -> napi::Result { + let columns = options.columns; + let projection = options + .projection + .map(|projection| projection.into_iter().map(|p| p as usize).collect()); + let row_count = options.row_count.map(|rc| rc.into()); + let n_rows = options.n_rows.map(|nr| nr as usize); + + let result = match path_or_buffer { + Either::A(path) => { + let f = File::open(&path)?; + let reader = BufReader::new(f); + IpcStreamReader::new(reader) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .with_row_index(row_count) + .finish() + } + Either::B(buf) => { + let cursor = Cursor::new(buf.as_ref()); + IpcStreamReader::new(cursor) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .with_row_index(row_count) + .finish() + } + }; + let df = result.map_err(JsPolarsErr::from)?; + Ok(JsDataFrame::new(df)) +} + #[napi(object)] pub struct ReadAvroOptions { pub columns: Option>, @@ -1426,6 +1463,38 @@ impl JsDataFrame { Ok(()) } #[napi(catch_unwind)] + pub fn write_ipc_stream( + &mut self, + path_or_buffer: JsUnknown, + compression: Wrap>, + env: Env, + ) -> napi::Result<()> { + let compression = compression.0; + + match path_or_buffer.get_type()? { + ValueType::String => { + let path: napi::JsString = unsafe { path_or_buffer.cast() }; + let path = path.into_utf8()?.into_owned()?; + let f = std::fs::File::create(path).unwrap(); + let f = BufWriter::new(f); + IpcStreamWriter::new(f) + .with_compression(compression) + .finish(&mut self.df) + .map_err(JsPolarsErr::from)?; + } + ValueType::Object => { + let inner: napi::JsObject = unsafe { path_or_buffer.cast() }; + let writeable = JsWriteStream { inner, env: &env }; + IpcStreamWriter::new(writeable) + .with_compression(compression) + .finish(&mut self.df) + .map_err(JsPolarsErr::from)?; + } + _ => panic!(), + }; + Ok(()) + } + #[napi(catch_unwind)] pub fn write_json( &mut self, path_or_buffer: JsUnknown,