Skip to content

Commit

Permalink
Adding ReadCsvOptions options
Browse files Browse the repository at this point in the history
  • Loading branch information
Bidek56 committed Jan 5, 2024
1 parent fe70b46 commit f272d8f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 35 deletions.
24 changes: 23 additions & 1 deletion __tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,35 @@ describe("read:csv", () => {
csvBuffer.toString("utf-8").slice(0, 22),
);
});
it("can read csv with ragged lines", () => {
const csvBuffer = Buffer.from("A\nB\nC,ragged\n", "utf-8");
let df = pl.readCSV(csvBuffer);
const expected = `shape: (2, 1)
┌─────┐
│ A │
│ --- │
│ str │
╞═════╡
│ B │
│ C │
└─────┘`;
expect(df.toString()).toEqual(expected);
const f = () => {
df = pl.readCSV(csvBuffer, { truncateRaggedLines: false });
};
expect(f).toThrow();
});
it("can load empty csv", () => {
const df = pl.readCSV(emptycsvpath, { raiseIfEmpty: false });
expect(df.shape).toEqual({ height: 0, width: 0 });
});
it("can parse datetimes", () => {
const csv = `timestamp,open,high
2021-01-01 00:00:00,0.00305500,0.00306000
2021-01-01 00:15:00,0.00298800,0.00300400
2021-01-01 00:30:00,0.00298300,0.00300100
2021-01-01 00:45:00,0.00299400,0.00304000`;
const df = pl.readCSV(csv, { parseDates: true });
const df = pl.readCSV(csv, { tryParseDates: true });
expect(df.dtypes.map((dt) => dt.toJSON())).toEqual([
pl.Datetime("us").toJSON(),
pl.Float64.toJSON(),
Expand Down
14 changes: 12 additions & 2 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ export interface ReadCsvOptions {
encoding: "utf8" | "utf8-lossy";
numThreads: number;
dtypes: Record<string, DataType>;
sampleSize: number;
lowMemory: boolean;
commentChar: string;
quoteChar: string;
eolChar: string;
nullValues: string | Array<string> | Record<string, string>;
chunkSize: number;
skipRows: number;
parseDates: boolean;
tryParseDates: boolean;
skipRowsAfterHeader: number;
rowCount: any;
raiseIfEmpty: boolean;
truncateRaggedLines: boolean;
missingIsNull: boolean;
}

const readCsvDefaultOptions: Partial<ReadCsvOptions> = {
Expand All @@ -38,12 +43,17 @@ const readCsvDefaultOptions: Partial<ReadCsvOptions> = {
ignoreErrors: true,
chunkSize: 10000,
skipRows: 0,
sampleSize: 1024,
sep: ",",
rechunk: false,
encoding: "utf8",
lowMemory: false,
parseDates: false,
tryParseDates: false,
skipRowsAfterHeader: 0,
raiseIfEmpty: true,
truncateRaggedLines: true,
missingIsNull: true,
eolChar: "\n",
};

export interface ReadJsonOptions {
Expand Down
90 changes: 58 additions & 32 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,41 +50,51 @@ pub(crate) fn to_jsseries_collection(s: Vec<Series>) -> Vec<JsSeries> {

#[napi(object)]
pub struct ReadCsvOptions {
pub infer_schema_length: Option<u32>,
pub chunk_size: u32,
pub has_header: bool,
pub ignore_errors: bool,
/// Stop reading from the csv after this number of rows is reached
pub n_rows: Option<u32>,
// used by error ignore logic
pub max_records: Option<u32>,
pub skip_rows: u32,
pub sep: String,
pub rechunk: bool,
/// Optional indexes of the columns to project
pub projection: Option<Vec<u32>>,
/// Optional column names to project/ select.
pub columns: Option<Vec<String>>,
pub separator: Option<u8>,
pub schema: Option<Wrap<Schema>>,
pub encoding: String,
pub n_threads: Option<u32>,
pub path: Option<String>,
pub dtypes: Option<HashMap<String, Wrap<DataType>>>,
pub sample_size: u32,
pub chunk_size: u32,
pub comment_char: Option<u8>,
pub null_values: Option<Wrap<NullValues>>,
pub path: Option<String>,
pub low_memory: bool,
pub comment_char: Option<String>,
pub quote_char: Option<String>,
pub parse_dates: bool,
pub skip_rows_after_header: u32,
pub try_parse_dates: bool,
pub row_count: Option<JsRowCount>,

/// Aggregates chunk afterwards to a single chunk.
pub rechunk: bool,
pub raise_if_empty: bool,
pub truncate_ragged_lines: bool,
pub missing_is_null: bool,
pub low_memory: bool,
pub has_header: bool,
pub ignore_errors: bool,
pub eol_char: String,
}

#[napi(catch_unwind)]
pub fn read_csv(
path_or_buffer: Either<String, Buffer>,
options: ReadCsvOptions,
) -> napi::Result<JsDataFrame> {
let infer_schema_length = options.infer_schema_length.map(|i| i as usize);
let n_threads = options.n_threads.map(|i| i as usize);
let n_rows = options.n_rows.map(|i| i as usize);
let skip_rows = options.skip_rows as usize;
let chunk_size = options.chunk_size as usize;
let null_values = options.null_values.map(|w| w.0);
let comment_char = options.comment_char.map(|s| s.as_bytes()[0]);
let row_count = options.row_count.map(RowCount::from);
let projection = options
.projection
.map(|p: Vec<u32>| p.into_iter().map(|p| p as usize).collect());

let quote_char = if let Some(s) = options.quote_char {
if s.is_empty() {
Expand Down Expand Up @@ -115,47 +125,63 @@ pub fn read_csv(
let df = match path_or_buffer {
Either::A(path) => CsvReader::from_path(path)
.expect("unable to read file")
.infer_schema(infer_schema_length)
.infer_schema(Some(options.max_records.unwrap_or(100) as usize))
.with_projection(projection)
.has_header(options.has_header)
.with_n_rows(n_rows)
.with_separator(options.sep.as_bytes()[0])
.with_skip_rows(skip_rows)
.with_n_rows(options.n_rows.map(|i| i as usize))
.with_separator(options.separator.unwrap_or(b','))
.with_skip_rows(options.skip_rows as usize)
.with_ignore_errors(options.ignore_errors)
.with_rechunk(options.rechunk)
.with_chunk_size(chunk_size)
.with_chunk_size(options.chunk_size as usize)
.with_encoding(encoding)
.with_columns(options.columns)
.with_n_threads(n_threads)
.with_n_threads(options.n_threads.map(|i| i as usize))
.with_dtypes(overwrite_dtype.map(Arc::new))
.with_schema(options.schema.map(|schema| Arc::new(schema.0)))
.low_memory(options.low_memory)
.with_comment_char(comment_char)
.with_comment_char(options.comment_char)
.with_null_values(null_values)
.with_try_parse_dates(options.parse_dates)
.with_try_parse_dates(options.try_parse_dates)
.with_quote_char(quote_char)
.with_row_count(row_count)
.sample_size(options.sample_size as usize)
.with_skip_rows_after_header(options.skip_rows_after_header as usize)
.raise_if_empty(options.raise_if_empty)
.truncate_ragged_lines(options.truncate_ragged_lines)
.with_missing_is_null(options.missing_is_null)
.with_end_of_line_char(options.eol_char.as_bytes()[0])
.finish()
.map_err(JsPolarsErr::from)?,
Either::B(buffer) => {
let cursor = Cursor::new(buffer.as_ref());
CsvReader::new(cursor)
.infer_schema(infer_schema_length)
.infer_schema(Some(options.max_records.unwrap_or(100) as usize))
.with_projection(projection)
.has_header(options.has_header)
.with_n_rows(n_rows)
.with_separator(options.sep.as_bytes()[0])
.with_skip_rows(skip_rows)
.with_n_rows(options.n_rows.map(|i| i as usize))
.with_separator(options.separator.unwrap_or(b','))
.with_skip_rows(options.skip_rows as usize)
.with_ignore_errors(options.ignore_errors)
.with_rechunk(options.rechunk)
.with_chunk_size(chunk_size)
.with_chunk_size(options.chunk_size as usize)
.with_encoding(encoding)
.with_columns(options.columns)
.with_n_threads(n_threads)
.with_n_threads(options.n_threads.map(|i| i as usize))
.with_dtypes(overwrite_dtype.map(Arc::new))
.with_schema(options.schema.map(|schema| Arc::new(schema.0)))
.low_memory(options.low_memory)
.with_comment_char(comment_char)
.with_comment_char(options.comment_char)
.with_null_values(null_values)
.with_try_parse_dates(options.parse_dates)
.with_try_parse_dates(options.try_parse_dates)
.with_quote_char(quote_char)
.with_row_count(row_count)
.sample_size(options.sample_size as usize)
.with_skip_rows_after_header(options.skip_rows_after_header as usize)
.raise_if_empty(options.raise_if_empty)
.truncate_ragged_lines(options.truncate_ragged_lines)
.with_missing_is_null(options.missing_is_null)
.with_end_of_line_char(options.eol_char.as_bytes()[0])
.finish()
.map_err(JsPolarsErr::from)?
}
Expand Down

0 comments on commit f272d8f

Please sign in to comment.