From ca3a5fe3693f45898f05e96d0b8bf828e42b39c5 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 19 Jul 2024 14:51:44 +1000 Subject: [PATCH] c --- crates/polars-lazy/src/scan/ndjson.rs | 56 ++++++----------- .../src/executors/scan/ndjson.rs | 29 ++++++++- crates/polars-mem-engine/src/planner/lp.rs | 2 +- .../src/plans/conversion/dsl_to_ir.rs | 38 +++++++----- .../polars-plan/src/plans/conversion/scans.rs | 54 ++++++++++++++--- crates/polars-plan/src/plans/file_scan.rs | 24 +++++++- .../polars-plan/src/plans/functions/count.rs | 60 +++++++++++++++++-- py-polars/polars/io/ndjson.py | 34 ++++++++++- py-polars/src/lazyframe/mod.rs | 36 ++++++++++- py-polars/src/lazyframe/visitor/nodes.rs | 3 +- .../tests/unit/io/test_lazy_count_star.py | 13 ++++ py-polars/tests/unit/io/test_scan.py | 5 +- 12 files changed, 276 insertions(+), 78 deletions(-) diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index 69dbf0e6d678c..029c5253cbb44 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; use polars_core::prelude::*; +use polars_io::cloud::CloudOptions; use polars_io::RowIndex; use polars_plan::plans::{DslPlan, FileScan}; use polars_plan::prelude::{FileScanOptions, NDJsonReadOptions}; @@ -22,6 +23,7 @@ pub struct LazyJsonLineReader { pub(crate) n_rows: Option, pub(crate) ignore_errors: bool, pub(crate) include_file_paths: Option>, + pub(crate) cloud_options: Option, } impl LazyJsonLineReader { @@ -41,6 +43,7 @@ impl LazyJsonLineReader { ignore_errors: false, n_rows: None, include_file_paths: None, + cloud_options: None, } } /// Add a row index column. @@ -92,6 +95,11 @@ impl LazyJsonLineReader { self } + pub fn with_cloud_options(mut self, cloud_options: Option) -> Self { + self.cloud_options = cloud_options; + self + } + pub fn with_include_file_paths(mut self, include_file_paths: Option>) -> Self { self.include_file_paths = include_file_paths; self @@ -100,10 +108,6 @@ impl LazyJsonLineReader { impl LazyFileListReader for LazyJsonLineReader { fn finish(self) -> PolarsResult { - if !self.glob() { - return self.finish_no_glob(); - } - let paths = Arc::new(Mutex::new((self.paths, false))); let file_options = FileScanOptions { @@ -127,7 +131,10 @@ impl LazyFileListReader for LazyJsonLineReader { schema: self.schema, }; - let scan_type = FileScan::NDJson { options }; + let scan_type = FileScan::NDJson { + options, + cloud_options: self.cloud_options, + }; Ok(LazyFrame::from(DslPlan::Scan { paths, @@ -140,39 +147,7 @@ impl LazyFileListReader for LazyJsonLineReader { } fn finish_no_glob(self) -> PolarsResult { - let paths = Arc::new(Mutex::new((self.paths, false))); - - let file_options = FileScanOptions { - n_rows: self.n_rows, - with_columns: None, - cache: false, - row_index: self.row_index.clone(), - rechunk: self.rechunk, - file_counter: 0, - hive_options: Default::default(), - glob: false, - include_file_paths: None, - }; - - let options = NDJsonReadOptions { - n_threads: None, - infer_schema_length: self.infer_schema_length, - chunk_size: NonZeroUsize::new(1 << 18).unwrap(), - low_memory: self.low_memory, - ignore_errors: self.ignore_errors, - schema: self.schema, - }; - - let scan_type = FileScan::NDJson { options }; - - Ok(LazyFrame::from(DslPlan::Scan { - paths, - file_info: Arc::new(RwLock::new(None)), - hive_parts: None, - predicate: None, - file_options, - scan_type, - })) + unreachable!(); } fn paths(&self) -> &[PathBuf] { @@ -215,4 +190,9 @@ impl LazyFileListReader for LazyJsonLineReader { fn row_index(&self) -> Option<&RowIndex> { self.row_index.as_ref() } + + /// [CloudOptions] used to list files. + fn cloud_options(&self) -> Option<&CloudOptions> { + self.cloud_options.as_ref() + } } diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 38e5c3965fd95..8a0dc40bb3734 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use polars_core::config; use polars_core::utils::accumulate_dataframes_vertical; use super::*; @@ -38,6 +39,14 @@ impl JsonExec { .as_ref() .unwrap_right(); + let verbose = config::verbose(); + let force_async = config::force_async(); + let run_async = force_async || is_cloud_url(self.paths.first().unwrap()); + + if force_async && verbose { + eprintln!("ASYNC READING FORCED"); + } + let mut n_rows = self.file_scan_options.n_rows; // Avoid panicking @@ -60,9 +69,23 @@ impl JsonExec { return None; } - let reader = match JsonLineReader::from_path(p) { - Ok(r) => r, - Err(e) => return Some(Err(e)), + let reader = if run_async { + JsonLineReader::new( + match polars_io::file_cache::FILE_CACHE + .get_entry(p.to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_assume_latest() + { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }, + ) + } else { + match JsonLineReader::from_path(p) { + Ok(r) => r, + Err(e) => return Some(Err(e)), + } }; let row_index = self.file_scan_options.row_index.as_mut(); diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index b88ecaa1d7fe7..693c6e92ffc07 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -292,7 +292,7 @@ fn create_physical_plan_impl( metadata, ))), #[cfg(feature = "json")] - FileScan::NDJson { options } => Ok(Box::new(executors::JsonExec::new( + FileScan::NDJson { options, .. } => Ok(Box::new(executors::JsonExec::new( paths, options, file_options, diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index eed5ecb1fdc4e..4ed934d1423b7 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -151,10 +151,16 @@ pub fn to_alp_impl( .map_err(|e| e.context(failed_here!(csv scan)))? }, #[cfg(feature = "json")] - FileScan::NDJson { options } => { - scans::ndjson_file_info(&paths, &file_options, options) - .map_err(|e| e.context(failed_here!(ndjson scan)))? - }, + FileScan::NDJson { + options, + cloud_options, + } => scans::ndjson_file_info( + &paths, + &file_options, + options, + cloud_options.as_ref(), + ) + .map_err(|e| e.context(failed_here!(ndjson scan)))?, // FileInfo should be set. FileScan::Anonymous { .. } => unreachable!(), } @@ -749,21 +755,23 @@ fn expand_scan_paths( } { - let paths_expanded = match scan_type { + let paths_expanded = match &scan_type { #[cfg(feature = "parquet")] - FileScan::Parquet { - ref cloud_options, .. - } => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?, + FileScan::Parquet { cloud_options, .. } => { + expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)? + }, #[cfg(feature = "ipc")] - FileScan::Ipc { - ref cloud_options, .. - } => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?, + FileScan::Ipc { cloud_options, .. } => { + expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)? + }, #[cfg(feature = "csv")] - FileScan::Csv { - ref cloud_options, .. - } => expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())?, + FileScan::Csv { cloud_options, .. } => { + expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())? + }, #[cfg(feature = "json")] - FileScan::NDJson { .. } => expand_paths(&lock.0, file_options.glob, None)?, + FileScan::NDJson { cloud_options, .. } => { + expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())? + }, FileScan::Anonymous { .. } => unreachable!(), // Invariant: Anonymous scans are already expanded. }; diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 6c158aaf7b4f4..78c47bb32f9ec 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -9,7 +9,7 @@ use polars_io::RowIndex; use super::*; -fn get_path(paths: &[PathBuf]) -> PolarsResult<&PathBuf> { +fn get_first_path(paths: &[PathBuf]) -> PolarsResult<&PathBuf> { // Use first path to get schema. paths .first() @@ -42,7 +42,7 @@ pub(super) fn parquet_file_info( file_options: &FileScanOptions, cloud_options: Option<&polars_io::cloud::CloudOptions>, ) -> PolarsResult<(FileInfo, Option)> { - let path = get_path(paths)?; + let path = get_first_path(paths)?; let (schema, reader_schema, num_rows, metadata) = if is_cloud_url(path) { #[cfg(not(feature = "cloud"))] @@ -92,7 +92,7 @@ pub(super) fn ipc_file_info( file_options: &FileScanOptions, cloud_options: Option<&polars_io::cloud::CloudOptions>, ) -> PolarsResult<(FileInfo, arrow::io::ipc::read::FileMetadata)> { - let path = get_path(paths)?; + let path = get_first_path(paths)?; let metadata = if is_cloud_url(path) { #[cfg(not(feature = "cloud"))] @@ -145,7 +145,7 @@ pub(super) fn csv_file_info( // * See if we can do this without downloading the entire file // prints the error message if paths is empty. - let first_path = get_path(paths)?; + let first_path = get_first_path(paths)?; let run_async = is_cloud_url(first_path) || config::force_async(); let cache_entries = { @@ -177,7 +177,7 @@ pub(super) fn csv_file_info( #[cfg(feature = "cloud")] { let entry: &Arc = - cache_entries.as_ref().unwrap().get(i).unwrap(); + &cache_entries.as_ref().unwrap()[0]; entry.try_open_check_latest()? } #[cfg(not(feature = "cloud"))] @@ -279,10 +279,50 @@ pub(super) fn ndjson_file_info( paths: &[PathBuf], file_options: &FileScanOptions, ndjson_options: &mut NDJsonReadOptions, + cloud_options: Option<&polars_io::cloud::CloudOptions>, ) -> PolarsResult { - let path = get_path(paths)?; + use polars_core::config; + + let run_async = !paths.is_empty() && is_cloud_url(&paths[0]) || config::force_async(); - let f = polars_utils::open_file(path)?; + let cache_entries = { + #[cfg(feature = "cloud")] + { + if run_async { + Some(polars_io::file_cache::init_entries_from_uri_list( + paths + .iter() + .map(|path| Arc::from(path.to_str().unwrap())) + .collect::>() + .as_slice(), + cloud_options, + )?) + } else { + None + } + } + #[cfg(not(feature = "cloud"))] + { + if run_async { + panic!("required feature `cloud` is not enabled") + } + } + }; + + let first_path = get_first_path(paths)?; + + let f = if run_async { + #[cfg(feature = "cloud")] + { + cache_entries.unwrap()[0].try_open_check_latest()? + } + #[cfg(not(feature = "cloud"))] + { + panic!("required feature `cloud` is not enabled") + } + } else { + polars_utils::open_file(first_path)? + }; let mut reader = std::io::BufReader::new(f); let (reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { diff --git a/crates/polars-plan/src/plans/file_scan.rs b/crates/polars-plan/src/plans/file_scan.rs index e0794b8a14694..73ae85d936469 100644 --- a/crates/polars-plan/src/plans/file_scan.rs +++ b/crates/polars-plan/src/plans/file_scan.rs @@ -34,7 +34,10 @@ pub enum FileScan { metadata: Option, }, #[cfg(feature = "json")] - NDJson { options: NDJsonReadOptions }, + NDJson { + options: NDJsonReadOptions, + cloud_options: Option, + }, #[cfg_attr(feature = "serde", serde(skip))] Anonymous { options: Arc, @@ -83,7 +86,16 @@ impl PartialEq for FileScan { }, ) => l == r && c_l == c_r, #[cfg(feature = "json")] - (FileScan::NDJson { options: l }, FileScan::NDJson { options: r }) => l == r, + ( + FileScan::NDJson { + options: l, + cloud_options: c_l, + }, + FileScan::NDJson { + options: r, + cloud_options: c_r, + }, + ) => l == r && c_l == c_r, _ => false, } } @@ -122,7 +134,13 @@ impl Hash for FileScan { cloud_options.hash(state); }, #[cfg(feature = "json")] - FileScan::NDJson { options } => options.hash(state), + FileScan::NDJson { + options, + cloud_options, + } => { + options.hash(state); + cloud_options.hash(state) + }, FileScan::Anonymous { options, .. } => options.hash(state), } } diff --git a/crates/polars-plan/src/plans/functions/count.rs b/crates/polars-plan/src/plans/functions/count.rs index b0378eb4e00ef..04fa24a631acb 100644 --- a/crates/polars-plan/src/plans/functions/count.rs +++ b/crates/polars-plan/src/plans/functions/count.rs @@ -72,7 +72,10 @@ pub fn count_rows(paths: &Arc<[PathBuf]>, scan_type: &FileScan) -> PolarsResult< metadata.as_ref(), ), #[cfg(feature = "json")] - FileScan::NDJson { options } => count_rows_ndjson(paths), + FileScan::NDJson { + options, + cloud_options, + } => count_rows_ndjson(paths, cloud_options.as_ref()), FileScan::Anonymous { .. } => { unreachable!() }, @@ -181,11 +184,56 @@ async fn count_rows_cloud_ipc( } #[cfg(feature = "json")] -pub(super) fn count_rows_ndjson(paths: &Arc<[PathBuf]>) -> PolarsResult { - paths - .iter() - .map(|path| { - let reader = polars_io::ndjson::core::JsonLineReader::from_path(path)?; +pub(super) fn count_rows_ndjson( + paths: &Arc<[PathBuf]>, + cloud_options: Option<&CloudOptions>, +) -> PolarsResult { + use polars_core::config; + + let run_async = !paths.is_empty() && is_cloud_url(&paths[0]) || config::force_async(); + + let cache_entries = { + #[cfg(feature = "cloud")] + { + if run_async { + Some(polars_io::file_cache::init_entries_from_uri_list( + paths + .iter() + .map(|path| Arc::from(path.to_str().unwrap())) + .collect::>() + .as_slice(), + cloud_options, + )?) + } else { + None + } + } + #[cfg(not(feature = "cloud"))] + { + if run_async { + panic!("required feature `cloud` is not enabled") + } + } + }; + + (0..paths.len()) + .map(|i| { + let f = if run_async { + #[cfg(feature = "cloud")] + { + let entry: &Arc = + &cache_entries.as_ref().unwrap()[0]; + entry.try_open_check_latest()? + } + #[cfg(not(feature = "cloud"))] + { + panic!("required feature `cloud` is not enabled") + } + } else { + polars_utils::open_file(&paths[i])? + }; + + let reader = polars_io::ndjson::core::JsonLineReader::new(f); reader.count() }) .sum() diff --git a/py-polars/polars/io/ndjson.py b/py-polars/polars/io/ndjson.py index 3b12b263a19df..373d0de5248f4 100644 --- a/py-polars/polars/io/ndjson.py +++ b/py-polars/polars/io/ndjson.py @@ -3,7 +3,7 @@ import contextlib from io import BytesIO, StringIO from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import normalize_filepath @@ -99,6 +99,9 @@ def scan_ndjson( row_index_name: str | None = None, row_index_offset: int = 0, ignore_errors: bool = False, + storage_options: dict[str, Any] | None = None, + retries: int = 2, + file_cache_ttl: int | None = None, include_file_paths: str | None = None, ) -> LazyFrame: """ @@ -139,6 +142,26 @@ def scan_ndjson( Offset to start the row index column (only use if the name is set) ignore_errors Return `Null` if parsing fails because of schema mismatches. + storage_options + Options that indicate how to connect to a cloud provider. + + The cloud providers currently supported are AWS, GCP, and Azure. + See supported keys here: + + * `aws `_ + * `gcp `_ + * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. + + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. + retries + Number of retries if accessing a cloud instance fails. + file_cache_ttl + Amount of time to keep downloaded cloud files since their last access time, + in seconds. Uses the `POLARS_FILE_CACHE_TTL` environment variable + (which defaults to 1 hour) if not given. include_file_paths Include the path of the source file(s) as a column with this name. """ @@ -154,6 +177,12 @@ def scan_ndjson( msg = "'infer_schema_length' should be positive" raise ValueError(msg) + if storage_options: + storage_options = list(storage_options.items()) # type: ignore[assignment] + else: + # Handle empty dict input + storage_options = None + pylf = PyLazyFrame.new_from_ndjson( source, sources, @@ -166,5 +195,8 @@ def scan_ndjson( parse_row_index_args(row_index_name, row_index_offset), ignore_errors, include_file_paths=include_file_paths, + retries=retries, + cloud_options=storage_options, + file_cache_ttl=file_cache_ttl, ) return wrap_ldf(pylf) diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 1dc86db3905a9..47a82e8813879 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -44,7 +44,10 @@ impl PyLazyFrame { #[staticmethod] #[cfg(feature = "json")] #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, paths, infer_schema_length, schema, batch_size, n_rows, low_memory, rechunk, row_index, ignore_errors, include_file_paths))] + #[pyo3(signature = ( + path, paths, infer_schema_length, schema, batch_size, n_rows, low_memory, rechunk, + row_index, ignore_errors, include_file_paths, cloud_options, retries, file_cache_ttl + ))] fn new_from_ndjson( path: Option, paths: Vec, @@ -57,12 +60,42 @@ impl PyLazyFrame { row_index: Option<(String, IdxSize)>, ignore_errors: bool, include_file_paths: Option, + cloud_options: Option>, + retries: usize, + file_cache_ttl: Option, ) -> PyResult { let row_index = row_index.map(|(name, offset)| RowIndex { name: Arc::from(name.as_str()), offset, }); + #[cfg(feature = "cloud")] + let cloud_options = { + let first_path = if let Some(path) = &path { + path + } else { + paths + .first() + .ok_or_else(|| PyValueError::new_err("expected a path argument"))? + }; + + let first_path_url = first_path.to_string_lossy(); + + let mut cloud_options = if let Some(opts) = cloud_options { + parse_cloud_options(&first_path_url, opts)? + } else { + parse_cloud_options(&first_path_url, vec![])? + }; + + cloud_options = cloud_options.with_max_retries(retries); + + if let Some(file_cache_ttl) = file_cache_ttl { + cloud_options.file_cache_ttl = file_cache_ttl; + } + + Some(cloud_options) + }; + let r = if let Some(path) = &path { LazyJsonLineReader::new(path) } else { @@ -79,6 +112,7 @@ impl PyLazyFrame { .with_row_index(row_index) .with_ignore_errors(ignore_errors) .with_include_file_paths(include_file_paths.map(Arc::from)) + .with_cloud_options(cloud_options) .finish() .map_err(PyPolarsErr::from)?; diff --git a/py-polars/src/lazyframe/visitor/nodes.rs b/py-polars/src/lazyframe/visitor/nodes.rs index 5841a0bd280c3..ca9fbd067a92d 100644 --- a/py-polars/src/lazyframe/visitor/nodes.rs +++ b/py-polars/src/lazyframe/visitor/nodes.rs @@ -344,7 +344,8 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult { #[cfg(feature = "ipc")] FileScan::Ipc { .. } => return Err(PyNotImplementedError::new_err("ipc scan")), #[cfg(feature = "json")] - FileScan::NDJson { options } => { + FileScan::NDJson { options, .. } => { + // TODO: Also pass cloud_options let options = serde_json::to_string(options) .map_err(|err| PyValueError::new_err(format!("{err:?}")))?; ("ndjson", options).into_py(py) diff --git a/py-polars/tests/unit/io/test_lazy_count_star.py b/py-polars/tests/unit/io/test_lazy_count_star.py index 7ab69ad73aab4..fa06b4695caeb 100644 --- a/py-polars/tests/unit/io/test_lazy_count_star.py +++ b/py-polars/tests/unit/io/test_lazy_count_star.py @@ -69,3 +69,16 @@ def test_count_ipc(io_files_path: Path, path: str, n_rows: int) -> None: # Check if we are using our fast count star assert "FAST COUNT(*)" in lf.explain() assert_frame_equal(lf.collect(), expected) + + +@pytest.mark.parametrize( + ("path", "n_rows"), [("foods1.ndjson", 27), ("foods*.ndjson", 27 * 2)] +) +def test_count_ndjson(io_files_path: Path, path: str, n_rows: int) -> None: + lf = pl.scan_ndjson(io_files_path / path).select(pl.len()) + + expected = pl.DataFrame(pl.Series("len", [n_rows], dtype=pl.UInt32)) + + # Check if we are using our fast count star + assert "FAST COUNT(*)" in lf.explain() + assert_frame_equal(lf.collect(), expected) diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 637cefc66b2a3..b5d389a850d8d 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -481,6 +481,7 @@ def test_scan_limit_0_does_not_panic( (pl.scan_csv, pl.DataFrame.write_csv), (pl.scan_parquet, pl.DataFrame.write_parquet), (pl.scan_ipc, pl.DataFrame.write_ipc), + (pl.scan_ndjson, pl.DataFrame.write_ndjson), ], ) @pytest.mark.parametrize( @@ -515,7 +516,7 @@ def test_scan_directory( scan = scan_func - if scan_func is pl.scan_csv: + if scan_func in [pl.scan_csv, pl.scan_ndjson]: scan = partial(scan, schema=df.schema) if scan_func is pl.scan_parquet: @@ -660,7 +661,7 @@ def test_scan_include_file_name( scan_func(tmp_path, include_file_paths="x").collect(streaming=streaming) # type: ignore[call-arg] f = scan_func - if scan_func is pl.scan_csv: + if scan_func in [pl.scan_csv, pl.scan_ndjson]: f = partial(f, schema=df.drop("path").schema) lf: pl.LazyFrame = f(tmp_path, include_file_paths="path") # type: ignore[call-arg]