Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into upgrade-biome
Browse files Browse the repository at this point in the history
  • Loading branch information
Bidek56 committed Jul 11, 2024
2 parents 45270cb + 3ef1a59 commit 49b686d
Show file tree
Hide file tree
Showing 11 changed files with 1,683 additions and 1,530 deletions.
874 changes: 0 additions & 874 deletions .yarn/releases/yarn-3.6.1.cjs

This file was deleted.

894 changes: 894 additions & 0 deletions .yarn/releases/yarn-4.3.1.cjs

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion .yarnrc.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
compressionLevel: mixed

enableGlobalCache: false

nodeLinker: node-modules
yarnPath: .yarn/releases/yarn-3.6.1.cjs

yarnPath: .yarn/releases/yarn-4.3.1.cjs
10 changes: 9 additions & 1 deletion __tests__/lazyframe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ describe("lazyframe", () => {
const actual = await expected.lazy().collect();
expect(actual).toFrameEqual(expected);
});
test("collect:streaming", async () => {
const expected = pl.DataFrame({
foo: [1, 2],
bar: ["a", "b"],
});
const actual = await expected.lazy().collect({ streaming: true });
expect(actual).toFrameEqual(expected);
});
test("describeOptimizedPlan", () => {
const df = pl
.DataFrame({
Expand Down Expand Up @@ -1268,7 +1276,7 @@ describe("lazyframe", () => {
.lazy();
ldf.sinkCSV("./test.csv");
const newDF: pl.DataFrame = pl.readCSV("./test.csv");
const actualDf: pl.DataFrame = await ldf.collect();
const actualDf: pl.DataFrame = await ldf.collect({ streaming: true });
expect(newDF.sort("foo")).toFrameEqual(actualDf);
fs.rmSync("./test.csv");
});
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"typedoc": "^0.26.3",
"typescript": "5.5.3"
},
"packageManager": "yarn@3.6.1",
"packageManager": "yarn@4.3.1",
"workspaces": [
"benches"
]
Expand Down
4 changes: 2 additions & 2 deletions polars/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2351,7 +2351,7 @@ export const _DataFrame = (_df: any): DataFrame => {
return _DataFrame(_df)
.lazy()
.sort(arg, descending, nulls_last, maintain_order)
.collectSync({ noOptimization: true, stringCache: false });
.collectSync({ noOptimization: true });
}
return wrap("sort", arg, descending, nulls_last, maintain_order);
},
Expand Down Expand Up @@ -2571,7 +2571,7 @@ export const _DataFrame = (_df: any): DataFrame => {
}
return this.lazy()
.withColumns(columns)
.collectSync({ noOptimization: true, stringCache: false });
.collectSync({ noOptimization: true });
},
withColumnRenamed(opt, replacement?) {
if (typeof opt === "string") {
Expand Down
66 changes: 57 additions & 9 deletions polars/lazy/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
* @param predicatePushdown - Do predicate pushdown optimization.
* @param projectionPushdown - Do projection pushdown optimization.
* @param simplifyExpression - Run simplify expressions optimization.
* @param stringCache - Use a global string cache in this query.
* This is needed if you want to join on categorical columns.
* Caution!
* * If you already have set a global string cache, set this to `false` as this will reset the
* * global cache when the query is finished.
* @param noOptimization - Turn off optimizations.
* @param commSubplanElim - Will try to cache branching subplans that occur on self-joins or unions.
* @param commSubexprElim - Common subexpressions will be cached and reused.
* @param streaming - Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
batch.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
* @return DataFrame
*
*/
Expand Down Expand Up @@ -113,7 +117,16 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
* @param opts.predicatePushdown - Do predicate pushdown optimization.
* @param opts.projectionPushdown - Do projection pushdown optimization.
* @param opts.simplifyExpression - Run simplify expressions optimization.
* @param opts.stringCache - Use a global string cache in this query.
* @param opts.commSubplanElim - Will try to cache branching subplans that occur on self-joins or unions.
* @param opts.commSubexprElim - Common subexpressions will be cached and reused.
* @param opts.streaming - Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
batch.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
*
*/
fetch(numRows?: number): Promise<DataFrame>;
fetch(numRows: number, opts: LazyOptions): Promise<DataFrame>;
Expand Down Expand Up @@ -629,7 +642,30 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
collectSync() {
return _DataFrame(_ldf.collectSync());
},
collect() {
collect(opts?) {
if (opts?.noOptimization) {
opts.predicatePushdown = false;
opts.projectionPushdown = false;
opts.slicePushdown = false;
opts.commSubplanElim = false;
opts.commSubexprElim = false;
}

if (opts?.streaming) opts.commSubplanElim = false;

if (opts) {
_ldf = _ldf.optimizationToggle(
opts.typeCoercion,
opts.predicatePushdown,
opts.projectionPushdown,
opts.simplifyExpression,
opts.slicePushdown,
opts.commSubplanElim,
opts.commSubexprElim,
opts.streaming,
);
}

return _ldf.collect().then(_DataFrame);
},
drop(...cols) {
Expand Down Expand Up @@ -679,15 +715,21 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
if (opts?.noOptimization) {
opts.predicatePushdown = false;
opts.projectionPushdown = false;
opts.slicePushdown = false;
opts.commSubplanElim = false;
opts.commSubexprElim = false;
}
if (opts?.streaming) opts.commSubplanElim = false;
if (opts) {
_ldf = _ldf.optimizationToggle(
opts.typeCoercion,
opts.predicatePushdown,
opts.projectionPushdown,
opts.simplifyExpr,
opts.stringCache,
opts.slicePushdown,
opts.commSubplanElim,
opts.commSubexprElim,
opts.streaming,
);
}

Expand All @@ -697,15 +739,21 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
if (opts?.noOptimization) {
opts.predicatePushdown = false;
opts.projectionPushdown = false;
opts.slicePushdown = false;
opts.commSubplanElim = false;
opts.commSubexprElim = false;
}
if (opts?.streaming) opts.commSubplanElim = false;
if (opts) {
_ldf = _ldf.optimizationToggle(
opts.typeCoercion,
opts.predicatePushdown,
opts.projectionPushdown,
opts.simplifyExpr,
opts.stringCache,
opts.slicePushdown,
opts.commSubplanElim,
opts.commSubexprElim,
opts.streaming,
);
}

Expand Down
5 changes: 4 additions & 1 deletion polars/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ export type LazyOptions = {
predicatePushdown?: boolean;
projectionPushdown?: boolean;
simplifyExpression?: boolean;
stringCache?: boolean;
slicePushdown?: boolean;
noOptimization?: boolean;
commSubplanElim?: boolean;
commSubexprElim?: boolean;
streaming?: boolean;
};

/**
Expand Down
5 changes: 4 additions & 1 deletion src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ToSeries for JsUnknown {

impl ToNapiValue for Wrap<&Series> {
unsafe fn to_napi_value(napi_env: sys::napi_env, val: Self) -> napi::Result<sys::napi_value> {
let s = val.0;
let s = val.0.rechunk();
let len = s.len();
let dtype = s.dtype();
let env = Env::from_raw(napi_env);
Expand All @@ -79,8 +79,10 @@ impl ToNapiValue for Wrap<&Series> {
DataType::Struct(_) => {
let ca = s.struct_().map_err(JsPolarsErr::from)?;
let df: DataFrame = ca.clone().into();

let (height, _) = df.shape();
let mut rows = env.create_array(height as u32)?;

for idx in 0..height {
let mut row = env.create_object()?;
for col in df.get_columns() {
Expand All @@ -94,6 +96,7 @@ impl ToNapiValue for Wrap<&Series> {
}
_ => {
let mut arr = env.create_array(len as u32)?;

for (idx, val) in s.iter().enumerate() {
arr.set(idx as u32, Wrap(val))?;
}
Expand Down
13 changes: 11 additions & 2 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,31 @@ impl JsLazyFrame {
predicate_pushdown: Option<bool>,
projection_pushdown: Option<bool>,
simplify_expr: Option<bool>,
_string_cache: Option<bool>,
slice_pushdown: Option<bool>,
comm_subplan_elim: Option<bool>,
comm_subexpr_elim: Option<bool>,
streaming: Option<bool>,
) -> JsLazyFrame {
let type_coercion = type_coercion.unwrap_or(true);
let predicate_pushdown = predicate_pushdown.unwrap_or(true);
let projection_pushdown = projection_pushdown.unwrap_or(true);
let simplify_expr = simplify_expr.unwrap_or(true);
let slice_pushdown = slice_pushdown.unwrap_or(true);
let comm_subplan_elim = comm_subplan_elim.unwrap_or(true);
let comm_subexpr_elim = comm_subexpr_elim.unwrap_or(true);
let streaming = streaming.unwrap_or(false);

let ldf = self.ldf.clone();
let ldf = ldf
.with_type_coercion(type_coercion)
.with_predicate_pushdown(predicate_pushdown)
.with_simplify_expr(simplify_expr)
.with_slice_pushdown(slice_pushdown)
.with_projection_pushdown(projection_pushdown);
.with_streaming(streaming)
.with_projection_pushdown(projection_pushdown)
.with_comm_subplan_elim(comm_subplan_elim)
.with_comm_subexpr_elim(comm_subexpr_elim);

ldf.into()
}
#[napi(catch_unwind)]
Expand Down
Loading

0 comments on commit 49b686d

Please sign in to comment.