Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jul 23, 2024
1 parent 80a806c commit 294adc9
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 125 deletions.
5 changes: 1 addition & 4 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,7 @@ pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], off
}

#[cfg(feature = "json")]
pub(crate) fn overwrite_schema(
schema: &mut Schema,
overwriting_schema: &Schema,
) -> PolarsResult<()> {
pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> {
for (k, value) in overwriting_schema.iter() {
*schema.try_get_mut(k)? = value.clone();
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct LazyJsonLineReader {
pub(crate) low_memory: bool,
pub(crate) rechunk: bool,
pub(crate) schema: Option<SchemaRef>,
pub(crate) schema_overwrite: Option<SchemaRef>,
pub(crate) row_index: Option<RowIndex>,
pub(crate) infer_schema_length: Option<NonZeroUsize>,
pub(crate) n_rows: Option<usize>,
Expand All @@ -38,6 +39,7 @@ impl LazyJsonLineReader {
low_memory: false,
rechunk: false,
schema: None,
schema_overwrite: None,
row_index: None,
infer_schema_length: NonZeroUsize::new(100),
ignore_errors: false,
Expand Down Expand Up @@ -82,6 +84,13 @@ impl LazyJsonLineReader {
self
}

/// Set the JSON file's schema
#[must_use]
pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
self.schema_overwrite = schema_overwrite;
self
}

/// Reduce memory usage at the expense of performance
#[must_use]
pub fn low_memory(mut self, toggle: bool) -> Self {
Expand Down Expand Up @@ -129,6 +138,7 @@ impl LazyFileListReader for LazyJsonLineReader {
low_memory: self.low_memory,
ignore_errors: self.ignore_errors,
schema: self.schema,
schema_overwrite: self.schema_overwrite,
};

let scan_type = FileScan::NDJson {
Expand Down
7 changes: 6 additions & 1 deletion crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ pub(super) fn ndjson_file_info(
};
let mut reader = std::io::BufReader::new(f);

let (reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
if file_options.row_index.is_none() {
(schema.clone(), schema.clone())
} else {
Expand All @@ -340,6 +340,11 @@ pub(super) fn ndjson_file_info(
prepare_schemas(schema, file_options.row_index.as_ref())
};

if let Some(overwriting_schema) = &ndjson_options.schema_overwrite {
let schema = Arc::make_mut(&mut reader_schema);
overwrite_schema(schema, overwriting_schema)?;
}

Ok(FileInfo::new(
schema,
Some(Either::Right(reader_schema)),
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,5 @@ pub struct NDJsonReadOptions {
pub low_memory: bool,
pub ignore_errors: bool,
pub schema: Option<SchemaRef>,
pub schema_overwrite: Option<SchemaRef>,
}
17 changes: 4 additions & 13 deletions py-polars/polars/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ def managed_file(file: Any) -> Iterator[Any]:
# as fsspec needs requests to be installed
# to read from http
if looks_like_url(file):
return process_file_url(
file, encoding_str, bearer_token=storage_options.get("token")
)
return process_file_url(file, encoding_str)
if _FSSPEC_AVAILABLE:
from fsspec.utils import infer_storage_options

Expand Down Expand Up @@ -269,17 +267,10 @@ def looks_like_url(path: str) -> bool:
return re.match("^(ht|f)tps?://", path, re.IGNORECASE) is not None


def process_file_url(
path: str, encoding: str | None = None, bearer_token: str | None = None
) -> BytesIO:
from urllib.request import Request, urlopen

request = Request(path)

if bearer_token is not None:
request.add_header("Authorization", f"Bearer {bearer_token}")
def process_file_url(path: str, encoding: str | None = None) -> BytesIO:
from urllib.request import urlopen

with urlopen(request) as f:
with urlopen(path) as f:
if not encoding or encoding in {"utf8", "utf8-lossy"}:
return BytesIO(f.read())
else:
Expand Down
82 changes: 62 additions & 20 deletions py-polars/polars/io/csv/functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import os
from io import BytesIO, StringIO
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, Callable, Mapping, Sequence
Expand All @@ -25,7 +26,7 @@
from polars.io.csv.batched_reader import BatchedCsvReader

with contextlib.suppress(ImportError): # Module not available when building docs
from polars.polars import PyDataFrame, PyLazyFrame, expand_paths
from polars.polars import PyDataFrame, PyLazyFrame

if TYPE_CHECKING:
from polars import DataFrame, LazyFrame
Expand Down Expand Up @@ -420,23 +421,67 @@ def read_csv(
if not infer_schema:
infer_schema_length = 0

sources = [source]

if isinstance(source, (str, Path)) and str(source).startswith("hf://"):
sources = expand_paths([source], glob, [*storage_options.items()])
elif (
isinstance(source, Sequence)
and source
and isinstance(source[0], (str, Path))
and str(source[0]).startswith("hf://")
if (
# TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType])
schema_overrides is None or isinstance(schema_overrides, dict)
) and (
(
# Check that it is not a BytesIO object
isinstance(v := source, (str, Path))
or isinstance(source, Sequence)
and source
and isinstance(v := source[0], (str, Path))
)
and (
# HuggingFace only ⊂( ◜◒◝ )⊃
str(v).startswith("hf://")
# Also dispatch on FORCE_ASYNC, so that this codepath gets run
# through by our test suite during CI.
or os.getenv("POLARS_FORCE_ASYNC") == "1"
# We can't dispatch this for all paths due to a few reasons:
# * `scan_csv` does not support compressed files
# * The `storage_options` configuration keys are different between
# fsspec and object_store
)
):
sources = expand_paths(source, glob, [*storage_options.items()])
lf = _scan_csv_impl(
source,
has_header=has_header,
separator=separator,
comment_prefix=comment_prefix,
quote_char=quote_char,
skip_rows=skip_rows,
schema_overrides=schema_overrides,
schema=schema,
null_values=null_values,
missing_utf8_is_empty_string=missing_utf8_is_empty_string,
ignore_errors=ignore_errors,
try_parse_dates=try_parse_dates,
infer_schema_length=infer_schema_length,
n_rows=n_rows,
encoding=encoding if encoding == "utf8-lossy" else "utf8",
low_memory=low_memory,
rechunk=rechunk,
skip_rows_after_header=skip_rows_after_header,
row_index_name=row_index_name,
row_index_offset=row_index_offset,
eol_char=eol_char,
raise_if_empty=raise_if_empty,
truncate_ragged_lines=truncate_ragged_lines,
decimal_comma=decimal_comma,
glob=glob,
)

if columns:
lf = lf.select(columns)
elif projection:
lf = lf.select(F.nth(projection))

results = []
df = lf.collect()

for s in sources:
else:
with prepare_file_arg(
s,
source,
encoding=encoding,
use_pyarrow=False,
raise_if_empty=raise_if_empty,
Expand Down Expand Up @@ -474,12 +519,9 @@ def read_csv(
glob=glob,
)

if new_columns:
df = _update_columns(df, new_columns)

results.append(df)

return F.concat(results)
if new_columns:
return _update_columns(df, new_columns)
return df


def _read_csv_impl(
Expand Down
Loading

0 comments on commit 294adc9

Please sign in to comment.