Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into rs-0.42
Browse files Browse the repository at this point in the history
  • Loading branch information
Bidek56 committed Aug 16, 2024
2 parents 0b33e39 + b243288 commit 94836c5
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ features = [
"parquet",
"to_dummies",
"ipc",
"ipc_streaming",
"avro",
"list_eval",
"arg_where",
Expand Down
43 changes: 43 additions & 0 deletions __tests__/dataframe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,49 @@ describe("dataframe", () => {
"e|b": [null, "y"],
});
expect(actual).toFrameEqual(expected, true);

df = pl.DataFrame({
foo: ["A", "A", "B", "B", "C"],
N: [1, 2, 2, 4, 2],
bar: ["k", "l", "m", "n", "o"],
});
actual = df.pivot(["N"], {
index: "foo",
on: "bar",
aggregateFunc: "first",
});
expected = pl.DataFrame({
foo: ["A", "B", "C"],
k: [1, null, null],
l: [2, null, null],
m: [null, 2, null],
n: [null, 4, null],
o: [null, null, 2],
});
expect(actual).toFrameEqual(expected, true);

df = pl.DataFrame({
ix: [1, 1, 2, 2, 1, 2],
col: ["a", "a", "a", "a", "b", "b"],
foo: [0, 1, 2, 2, 7, 1],
bar: [0, 2, 0, 0, 9, 4],
});

actual = df.pivot(["foo", "bar"], {
index: "ix",
on: "col",
aggregateFunc: "sum",
separator: "/",
});

expected = pl.DataFrame({
ix: [1, 2],
"foo/a": [1, 4],
"foo/b": [7, 1],
"bar/a": [2, 0],
"bar/b": [9, 4],
});
expect(actual).toFrameEqual(expected, true);
});
});
describe("join", () => {
Expand Down
36 changes: 36 additions & 0 deletions __tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,42 @@ describe("ipc", () => {
expect(ipcDF).toFrameEqual(csvDF);
});
});
describe("ipc stream", () => {
beforeEach(() => {
pl.readCSV(csvpath).writeIPCStream(ipcpath);
});
afterEach(() => {
fs.rmSync(ipcpath);
});

test("read", () => {
const df = pl.readIPCStream(ipcpath);
expect(df.shape).toEqual({ height: 27, width: 4 });
});
test("read/write:buffer", () => {
const buff = pl.readCSV(csvpath).writeIPCStream();
const df = pl.readIPCStream(buff);
expect(df.shape).toEqual({ height: 27, width: 4 });
});
test("read:compressed", () => {
const csvDF = pl.readCSV(csvpath);
csvDF.writeIPCStream(ipcpath, { compression: "lz4" });
const ipcDF = pl.readIPCStream(ipcpath);
expect(ipcDF).toFrameEqual(csvDF);
});

test("read:options", () => {
const df = pl.readIPCStream(ipcpath, { nRows: 4 });
expect(df.shape).toEqual({ height: 4, width: 4 });
});

test("writeIPCStream", () => {
const csvDF = pl.readCSV(csvpath);
csvDF.writeIPCStream(ipcpath);
const ipcDF = pl.readIPCStream(ipcpath);
expect(ipcDF).toFrameEqual(csvDF);
});
});

describe("avro", () => {
beforeEach(() => {
Expand Down
42 changes: 37 additions & 5 deletions polars/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,29 @@ interface WriteMethods {
options?: { format: "lines" | "json" },
): void;
/**
* Write to Arrow IPC binary stream, or a feather file.
* @param file File path to which the file should be written.
* Write to Arrow IPC feather file, either to a file path or to a write stream.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
writeIPC(options?: WriteIPCOptions): Buffer;
writeIPC(destination: string | Writable, options?: WriteIPCOptions): void;

/**
* Write to Arrow IPC stream file, either to a file path or to a write stream.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
writeIPCStream(options?: WriteIPCOptions): Buffer;
writeIPCStream(
destination: string | Writable,
options?: WriteIPCOptions,
): void;

/**
* Write the DataFrame disk in parquet format.
* @param file File path to which the file should be written.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
Expand All @@ -163,7 +175,7 @@ interface WriteMethods {

/**
* Write the DataFrame disk in avro format.
* @param file File path to which the file should be written.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
Expand Down Expand Up @@ -2237,8 +2249,8 @@ export const _DataFrame = (_df: any): DataFrame => {
return _DataFrame(
_df.pivotExpr(
values,
index,
on,
index,
fn,
maintainOrder,
sortColumns,
Expand Down Expand Up @@ -2511,6 +2523,26 @@ export const _DataFrame = (_df: any): DataFrame => {

return Buffer.concat(buffers);
},
writeIPCStream(dest?, options = { compression: "uncompressed" }) {
if (dest instanceof Writable || typeof dest === "string") {
return _df.writeIpcStream(dest, options.compression) as any;
}
const buffers: Buffer[] = [];
const writeStream = new Stream.Writable({
write(chunk, _encoding, callback) {
buffers.push(chunk);
callback(null);
},
});

_df.writeIpcStream(
writeStream,
dest?.compression ?? options?.compression,
);
writeStream.end("");

return Buffer.concat(buffers);
},
toSeries: (index = 0) => _Series(_df.selectAtIdx(index) as any) as any,
toStruct(name) {
return _Series(_df.toStruct(name));
Expand Down
2 changes: 2 additions & 0 deletions polars/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export namespace pl {
export import readRecords = io.readRecords;
export import readCSV = io.readCSV;
export import readIPC = io.readIPC;
export import readIPCStream = io.readIPCStream;
export import readJSON = io.readJSON;
export import readParquet = io.readParquet;
export import readAvro = io.readAvro;
Expand Down Expand Up @@ -188,6 +189,7 @@ export import scanParquet = io.scanParquet;
export import readRecords = io.readRecords;
export import readCSV = io.readCSV;
export import readIPC = io.readIPC;
export import readIPCStream = io.readIPCStream;
export import readJSON = io.readJSON;
export import readParquet = io.readParquet;
export import readAvro = io.readAvro;
Expand Down
34 changes: 32 additions & 2 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ export interface ReadIPCOptions {
}

/**
* __Read into a DataFrame from Arrow IPC (Feather v2) file.__
* __Read into a DataFrame from Arrow IPC file (Feather v2).__
* ___
* @param pathOrBody - path or buffer or string
* - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`.
Expand All @@ -558,14 +558,44 @@ export function readIPC(pathOrBody, options = {}) {
throw new Error("must supply either a path or body");
}

/**
* __Read into a DataFrame from Arrow IPC stream.__
* ___
* @param pathOrBody - path or buffer or string
* - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`.
* - body: String or buffer to be read as Arrow IPC
* @param options.columns Columns to select. Accepts a list of column names.
* @param options.nRows Stop reading from parquet file after reading ``nRows``.
*/
export function readIPCStream(
pathOrBody: string | Buffer,
options?: Partial<ReadIPCOptions>,
): DataFrame;
export function readIPCStream(pathOrBody, options = {}) {
if (Buffer.isBuffer(pathOrBody)) {
return _DataFrame(pli.readIpcStream(pathOrBody, options));
}

if (typeof pathOrBody === "string") {
const inline = !isPath(pathOrBody, [".ipc"]);
if (inline) {
return _DataFrame(
pli.readIpcStream(Buffer.from(pathOrBody, "utf-8"), options),
);
}
return _DataFrame(pli.readIpcStream(pathOrBody, options));
}
throw new Error("must supply either a path or body");
}

export interface ScanIPCOptions {
nRows: number;
cache: boolean;
rechunk: boolean;
}

/**
* __Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns.__
* __Lazily read from an Arrow IPC file (Feather v2) or multiple files via glob patterns.__
* ___
* @param path Path to a IPC file.
* @param options.nRows Stop reading from IPC file after reading ``nRows``
Expand Down
75 changes: 72 additions & 3 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,43 @@ pub fn read_ipc(
Ok(JsDataFrame::new(df))
}

#[napi(catch_unwind)]
pub fn read_ipc_stream(
path_or_buffer: Either<String, Buffer>,
options: ReadIpcOptions,
) -> napi::Result<JsDataFrame> {
let columns = options.columns;
let projection = options
.projection
.map(|projection| projection.into_iter().map(|p| p as usize).collect());
let row_count = options.row_count.map(|rc| rc.into());
let n_rows = options.n_rows.map(|nr| nr as usize);

let result = match path_or_buffer {
Either::A(path) => {
let f = File::open(&path)?;
let reader = BufReader::new(f);
IpcStreamReader::new(reader)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_count)
.finish()
}
Either::B(buf) => {
let cursor = Cursor::new(buf.as_ref());
IpcStreamReader::new(cursor)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_count)
.finish()
}
};
let df = result.map_err(JsPolarsErr::from)?;
Ok(JsDataFrame::new(df))
}

#[napi(object)]
pub struct ReadAvroOptions {
pub columns: Option<Vec<String>>,
Expand Down Expand Up @@ -895,8 +932,8 @@ impl JsDataFrame {
pub fn pivot_expr(
&self,
values: Vec<String>,
on: Vec<String>,
index: Vec<String>,
columns: Vec<String>,
aggregate_expr: Option<Wrap<polars::prelude::Expr>>,
maintain_order: bool,
sort_columns: bool,
Expand All @@ -908,8 +945,8 @@ impl JsDataFrame {
};
fun(
&self.df,
index,
Some(columns),
on,
Some(index),
Some(values),
sort_columns,
aggregate_expr.map(|e| e.0 as Expr),
Expand Down Expand Up @@ -1424,6 +1461,38 @@ impl JsDataFrame {
Ok(())
}
#[napi(catch_unwind)]
pub fn write_ipc_stream(
&mut self,
path_or_buffer: JsUnknown,
compression: Wrap<Option<IpcCompression>>,
env: Env,
) -> napi::Result<()> {
let compression = compression.0;

match path_or_buffer.get_type()? {
ValueType::String => {
let path: napi::JsString = unsafe { path_or_buffer.cast() };
let path = path.into_utf8()?.into_owned()?;
let f = std::fs::File::create(path).unwrap();
let f = BufWriter::new(f);
IpcStreamWriter::new(f)
.with_compression(compression)
.finish(&mut self.df)
.map_err(JsPolarsErr::from)?;
}
ValueType::Object => {
let inner: napi::JsObject = unsafe { path_or_buffer.cast() };
let writeable = JsWriteStream { inner, env: &env };
IpcStreamWriter::new(writeable)
.with_compression(compression)
.finish(&mut self.df)
.map_err(JsPolarsErr::from)?;
}
_ => panic!(),
};
Ok(())
}
#[napi(catch_unwind)]
pub fn write_json(
&mut self,
path_or_buffer: JsUnknown,
Expand Down

0 comments on commit 94836c5

Please sign in to comment.