From 294adc94cf92d0882b724ed110835af2fb1c1913 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 14:51:07 +1000 Subject: [PATCH] c --- crates/polars-io/src/utils/other.rs | 5 +- crates/polars-lazy/src/scan/ndjson.rs | 10 ++ .../polars-plan/src/plans/conversion/scans.rs | 7 +- crates/polars-plan/src/plans/options.rs | 1 + py-polars/polars/io/_utils.py | 17 +-- py-polars/polars/io/csv/functions.py | 82 +++++++++--- py-polars/polars/io/ndjson.py | 126 +++++++++++++++--- py-polars/src/lazyframe/mod.rs | 4 +- py-polars/src/lib.rs | 4 - py-polars/src/path_utils.rs | 45 ------- py-polars/tests/unit/io/test_csv.py | 42 +++--- 11 files changed, 218 insertions(+), 125 deletions(-) delete mode 100644 py-polars/src/path_utils.rs diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index a573c3b89f9b3..648c678bf8149 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -168,10 +168,7 @@ pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], off } #[cfg(feature = "json")] -pub(crate) fn overwrite_schema( - schema: &mut Schema, - overwriting_schema: &Schema, -) -> PolarsResult<()> { +pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> { for (k, value) in overwriting_schema.iter() { *schema.try_get_mut(k)? = value.clone(); } diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index 029c5253cbb44..fcc110248607e 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -18,6 +18,7 @@ pub struct LazyJsonLineReader { pub(crate) low_memory: bool, pub(crate) rechunk: bool, pub(crate) schema: Option, + pub(crate) schema_overwrite: Option, pub(crate) row_index: Option, pub(crate) infer_schema_length: Option, pub(crate) n_rows: Option, @@ -38,6 +39,7 @@ impl LazyJsonLineReader { low_memory: false, rechunk: false, schema: None, + schema_overwrite: None, row_index: None, infer_schema_length: NonZeroUsize::new(100), ignore_errors: false, @@ -82,6 +84,13 @@ impl LazyJsonLineReader { self } + /// Set the JSON file's schema + #[must_use] + pub fn with_schema_overwrite(mut self, schema_overwrite: Option) -> Self { + self.schema_overwrite = schema_overwrite; + self + } + /// Reduce memory usage at the expense of performance #[must_use] pub fn low_memory(mut self, toggle: bool) -> Self { @@ -129,6 +138,7 @@ impl LazyFileListReader for LazyJsonLineReader { low_memory: self.low_memory, ignore_errors: self.ignore_errors, schema: self.schema, + schema_overwrite: self.schema_overwrite, }; let scan_type = FileScan::NDJson { diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 8a8404e296225..9ecfa85382a29 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -325,7 +325,7 @@ pub(super) fn ndjson_file_info( }; let mut reader = std::io::BufReader::new(f); - let (reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { + let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { if file_options.row_index.is_none() { (schema.clone(), schema.clone()) } else { @@ -340,6 +340,11 @@ pub(super) fn ndjson_file_info( prepare_schemas(schema, file_options.row_index.as_ref()) }; + if let Some(overwriting_schema) = &ndjson_options.schema_overwrite { + let schema = Arc::make_mut(&mut reader_schema); + overwrite_schema(schema, overwriting_schema)?; + } + Ok(FileInfo::new( schema, Some(Either::Right(reader_schema)), diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index 75b581d2e6b8f..dfae2c5917ec1 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -359,4 +359,5 @@ pub struct NDJsonReadOptions { pub low_memory: bool, pub ignore_errors: bool, pub schema: Option, + pub schema_overwrite: Option, } diff --git a/py-polars/polars/io/_utils.py b/py-polars/polars/io/_utils.py index 0f262a92ab799..e8971b08660da 100644 --- a/py-polars/polars/io/_utils.py +++ b/py-polars/polars/io/_utils.py @@ -200,9 +200,7 @@ def managed_file(file: Any) -> Iterator[Any]: # as fsspec needs requests to be installed # to read from http if looks_like_url(file): - return process_file_url( - file, encoding_str, bearer_token=storage_options.get("token") - ) + return process_file_url(file, encoding_str) if _FSSPEC_AVAILABLE: from fsspec.utils import infer_storage_options @@ -269,17 +267,10 @@ def looks_like_url(path: str) -> bool: return re.match("^(ht|f)tps?://", path, re.IGNORECASE) is not None -def process_file_url( - path: str, encoding: str | None = None, bearer_token: str | None = None -) -> BytesIO: - from urllib.request import Request, urlopen - - request = Request(path) - - if bearer_token is not None: - request.add_header("Authorization", f"Bearer {bearer_token}") +def process_file_url(path: str, encoding: str | None = None) -> BytesIO: + from urllib.request import urlopen - with urlopen(request) as f: + with urlopen(path) as f: if not encoding or encoding in {"utf8", "utf8-lossy"}: return BytesIO(f.read()) else: diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 1150bafd5fa64..a0340a16fd0e7 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import os from io import BytesIO, StringIO from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Callable, Mapping, Sequence @@ -25,7 +26,7 @@ from polars.io.csv.batched_reader import BatchedCsvReader with contextlib.suppress(ImportError): # Module not available when building docs - from polars.polars import PyDataFrame, PyLazyFrame, expand_paths + from polars.polars import PyDataFrame, PyLazyFrame if TYPE_CHECKING: from polars import DataFrame, LazyFrame @@ -420,23 +421,67 @@ def read_csv( if not infer_schema: infer_schema_length = 0 - sources = [source] - - if isinstance(source, (str, Path)) and str(source).startswith("hf://"): - sources = expand_paths([source], glob, [*storage_options.items()]) - elif ( - isinstance(source, Sequence) - and source - and isinstance(source[0], (str, Path)) - and str(source[0]).startswith("hf://") + if ( + # TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType]) + schema_overrides is None or isinstance(schema_overrides, dict) + ) and ( + ( + # Check that it is not a BytesIO object + isinstance(v := source, (str, Path)) + or isinstance(source, Sequence) + and source + and isinstance(v := source[0], (str, Path)) + ) + and ( + # HuggingFace only ⊂( ◜◒◝ )⊃ + str(v).startswith("hf://") + # Also dispatch on FORCE_ASYNC, so that this codepath gets run + # through by our test suite during CI. + or os.getenv("POLARS_FORCE_ASYNC") == "1" + # We can't dispatch this for all paths due to a few reasons: + # * `scan_csv` does not support compressed files + # * The `storage_options` configuration keys are different between + # fsspec and object_store + ) ): - sources = expand_paths(source, glob, [*storage_options.items()]) + lf = _scan_csv_impl( + source, + has_header=has_header, + separator=separator, + comment_prefix=comment_prefix, + quote_char=quote_char, + skip_rows=skip_rows, + schema_overrides=schema_overrides, + schema=schema, + null_values=null_values, + missing_utf8_is_empty_string=missing_utf8_is_empty_string, + ignore_errors=ignore_errors, + try_parse_dates=try_parse_dates, + infer_schema_length=infer_schema_length, + n_rows=n_rows, + encoding=encoding if encoding == "utf8-lossy" else "utf8", + low_memory=low_memory, + rechunk=rechunk, + skip_rows_after_header=skip_rows_after_header, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + eol_char=eol_char, + raise_if_empty=raise_if_empty, + truncate_ragged_lines=truncate_ragged_lines, + decimal_comma=decimal_comma, + glob=glob, + ) + + if columns: + lf = lf.select(columns) + elif projection: + lf = lf.select(F.nth(projection)) - results = [] + df = lf.collect() - for s in sources: + else: with prepare_file_arg( - s, + source, encoding=encoding, use_pyarrow=False, raise_if_empty=raise_if_empty, @@ -474,12 +519,9 @@ def read_csv( glob=glob, ) - if new_columns: - df = _update_columns(df, new_columns) - - results.append(df) - - return F.concat(results) + if new_columns: + return _update_columns(df, new_columns) + return df def _read_csv_impl( diff --git a/py-polars/polars/io/ndjson.py b/py-polars/polars/io/ndjson.py index 373d0de5248f4..ece2295524aa3 100644 --- a/py-polars/polars/io/ndjson.py +++ b/py-polars/polars/io/ndjson.py @@ -3,7 +3,7 @@ import contextlib from io import BytesIO, StringIO from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Sequence from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import normalize_filepath @@ -22,11 +22,22 @@ def read_ndjson( - source: str | Path | IOBase | bytes, + source: str | Path | list[str] | list[Path] | IOBase | bytes, *, schema: SchemaDefinition | None = None, schema_overrides: SchemaDefinition | None = None, + infer_schema_length: int | None = N_INFER_DEFAULT, + batch_size: int | None = 1024, + n_rows: int | None = None, + low_memory: bool = False, + rechunk: bool = False, + row_index_name: str | None = None, + row_index_offset: int = 0, ignore_errors: bool = False, + storage_options: dict[str, Any] | None = None, + retries: int = 2, + file_cache_ttl: int | None = None, + include_file_paths: str | None = None, ) -> DataFrame: r""" Read into a DataFrame from a newline delimited JSON file. @@ -52,8 +63,46 @@ def read_ndjson( schema_overrides : dict, default None Support type specification or override of one or more columns; note that any dtypes inferred from the schema param will be overridden. + infer_schema_length + The maximum number of rows to scan for schema inference. + If set to `None`, the full data may be scanned *(this is slow)*. + batch_size + Number of rows to read in each batch. + n_rows + Stop reading from JSON file after reading `n_rows`. + low_memory + Reduce memory pressure at the expense of performance. + rechunk + Reallocate to contiguous memory when all chunks/ files are parsed. + row_index_name + If not None, this will insert a row index column with give name into the + DataFrame + row_index_offset + Offset to start the row index column (only use if the name is set) ignore_errors Return `Null` if parsing fails because of schema mismatches. + storage_options + Options that indicate how to connect to a cloud provider. + + The cloud providers currently supported are AWS, GCP, and Azure. + See supported keys here: + + * `aws `_ + * `gcp `_ + * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. + + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. + retries + Number of retries if accessing a cloud instance fails. + file_cache_ttl + Amount of time to keep downloaded cloud files since their last access time, + in seconds. Uses the `POLARS_FILE_CACHE_TTL` environment variable + (which defaults to 1 hour) if not given. + include_file_paths + Include the path of the source file(s) as a column with this name. Examples -------- @@ -71,26 +120,55 @@ def read_ndjson( │ 3 ┆ 8 │ └─────┴─────┘ """ - if isinstance(source, StringIO): - source = BytesIO(source.getvalue().encode()) - elif isinstance(source, (str, Path)): - source = normalize_filepath(source) - pydf = PyDataFrame.read_ndjson( + if not ( + isinstance(source, (str, Path)) + or isinstance(source, Sequence) + and source + and isinstance(source[0], (str, Path)) + ): + # TODO: A lot of the parameters aren't applied for BytesIO + if isinstance(source, StringIO): + source = BytesIO(source.getvalue().encode()) + + pydf = PyDataFrame.read_ndjson( + source, + ignore_errors=ignore_errors, + schema=schema, + schema_overrides=schema_overrides, + ) + + df = wrap_df(pydf) + + if n_rows: + df = df.head(n_rows) + + return df + + return scan_ndjson( source, - ignore_errors=ignore_errors, schema=schema, schema_overrides=schema_overrides, - ) - return wrap_df(pydf) + infer_schema_length=infer_schema_length, + batch_size=batch_size, + n_rows=n_rows, + low_memory=low_memory, + rechunk=rechunk, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + ignore_errors=ignore_errors, + include_file_paths=include_file_paths, + retries=retries, + storage_options=storage_options, + file_cache_ttl=file_cache_ttl, + ).collect() -@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") -@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def scan_ndjson( source: str | Path | list[str] | list[Path], *, schema: SchemaDefinition | None = None, + schema_overrides: SchemaDefinition | None = None, infer_schema_length: int | None = N_INFER_DEFAULT, batch_size: int | None = 1024, n_rows: int | None = None, @@ -124,6 +202,9 @@ def scan_ndjson( If you supply a list of column names that does not match the names in the underlying data, the names given here will overwrite them. The number of names given in the schema should match the underlying data dimensions. + schema_overrides : dict, default None + Support type specification or override of one or more columns; note that + any dtypes inferred from the schema param will be overridden. infer_schema_length The maximum number of rows to scan for schema inference. If set to `None`, the full data may be scanned *(this is slow)*. @@ -184,16 +265,17 @@ def scan_ndjson( storage_options = None pylf = PyLazyFrame.new_from_ndjson( - source, - sources, - infer_schema_length, - schema, - batch_size, - n_rows, - low_memory, - rechunk, - parse_row_index_args(row_index_name, row_index_offset), - ignore_errors, + path=source, + paths=sources, + infer_schema_length=infer_schema_length, + schema=schema, + schema_overrides=schema_overrides, + batch_size=batch_size, + n_rows=n_rows, + low_memory=low_memory, + rechunk=rechunk, + row_index=parse_row_index_args(row_index_name, row_index_offset), + ignore_errors=ignore_errors, include_file_paths=include_file_paths, retries=retries, cloud_options=storage_options, diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 91947564d81e6..85b4fa8f37100 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -45,7 +45,7 @@ impl PyLazyFrame { #[cfg(feature = "json")] #[allow(clippy::too_many_arguments)] #[pyo3(signature = ( - path, paths, infer_schema_length, schema, batch_size, n_rows, low_memory, rechunk, + path, paths, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk, row_index, ignore_errors, include_file_paths, cloud_options, retries, file_cache_ttl ))] fn new_from_ndjson( @@ -53,6 +53,7 @@ impl PyLazyFrame { paths: Vec, infer_schema_length: Option, schema: Option>, + schema_overrides: Option>, batch_size: Option, n_rows: Option, low_memory: bool, @@ -109,6 +110,7 @@ impl PyLazyFrame { .low_memory(low_memory) .with_rechunk(rechunk) .with_schema(schema.map(|schema| Arc::new(schema.0))) + .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0))) .with_row_index(row_index) .with_ignore_errors(ignore_errors) .with_include_file_paths(include_file_paths.map(Arc::from)) diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index a79658d570390..eea76cbc7ad7e 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -34,7 +34,6 @@ mod memory; mod object; #[cfg(feature = "object")] mod on_startup; -mod path_utils; mod prelude; mod py_modules; mod series; @@ -129,9 +128,6 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { #[cfg(feature = "sql")] m.add_class::().unwrap(); - m.add_wrapped(wrap_pyfunction!(path_utils::expand_paths)) - .unwrap(); - // Submodules // LogicalPlan objects m.add_wrapped(wrap_pymodule!(_ir_nodes))?; diff --git a/py-polars/src/path_utils.rs b/py-polars/src/path_utils.rs deleted file mode 100644 index f74a4a0403295..0000000000000 --- a/py-polars/src/path_utils.rs +++ /dev/null @@ -1,45 +0,0 @@ -use std::path::PathBuf; - -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; -use pyo3::pybacked::PyBackedStr; - -use crate::conversion::parse_cloud_options; -use crate::error::PyPolarsErr; - -#[pyfunction] -pub fn expand_paths( - paths: Vec, - glob: bool, - cloud_options: Option>, -) -> PyResult> { - let paths = paths - .iter() - .map(|x| PathBuf::from(AsRef::::as_ref(x))) - .collect::>(); - - #[cfg(feature = "cloud")] - let cloud_options = { - let first_path = paths - .first() - .ok_or_else(|| PyValueError::new_err("expected a path argument"))?; - - let first_path_url = first_path.to_string_lossy(); - - let cloud_options = if let Some(opts) = cloud_options { - parse_cloud_options(&first_path_url, opts)? - } else { - parse_cloud_options(&first_path_url, vec![])? - }; - - Some(cloud_options) - }; - - Ok( - polars_io::expand_paths(&paths, glob, cloud_options.as_ref()) - .map_err(PyPolarsErr::from)? - .iter() - .map(|x| x.to_str().unwrap().to_string()) - .collect(), - ) -} diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 772a1ea74d7f7..74f7402cf2eca 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -527,7 +527,9 @@ def test_column_rename_and_dtype_overwrite() -> None: assert df.dtypes == [pl.String, pl.Int64, pl.Float32] -def test_compressed_csv(io_files_path: Path) -> None: +def test_compressed_csv(io_files_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") + # gzip compression csv = textwrap.dedent( """\ @@ -868,18 +870,24 @@ def test_csv_globbing(io_files_path: Path) -> None: df = pl.read_csv(path) assert df.shape == (135, 4) - with pytest.raises(ValueError): - _ = pl.read_csv(path, columns=[0, 1]) + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + + with pytest.raises(ValueError): + _ = pl.read_csv(path, columns=[0, 1]) df = pl.read_csv(path, columns=["category", "sugars_g"]) assert df.shape == (135, 2) assert df.row(-1) == ("seafood", 1) assert df.row(0) == ("vegetables", 2) - with pytest.raises(ValueError): - _ = pl.read_csv( - path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64] - ) + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + + with pytest.raises(ValueError): + _ = pl.read_csv( + path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64] + ) dtypes = { "category": pl.String, @@ -2170,14 +2178,18 @@ def test_csv_float_decimal() -> None: pl.read_csv(floats, decimal_comma=True) -def test_fsspec_not_available(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr("polars.io._utils._FSSPEC_AVAILABLE", False) - with pytest.raises( - ImportError, match=r"`fsspec` is required for `storage_options` argument" - ): - pl.read_csv( - "s3://foods/cabbage.csv", storage_options={"key": "key", "secret": "secret"} - ) +def test_fsspec_not_available() -> None: + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + mp.setattr("polars.io._utils._FSSPEC_AVAILABLE", False) + + with pytest.raises( + ImportError, match=r"`fsspec` is required for `storage_options` argument" + ): + pl.read_csv( + "s3://foods/cabbage.csv", + storage_options={"key": "key", "secret": "secret"}, + ) @pytest.mark.write_disk()