Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support sub json path for JsonReader #21226

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 125 additions & 66 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use polars_error::{polars_bail, PolarsResult};
use polars_json::json::write::FallibleStreamingIterator;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use simd_json::prelude::*;
use simd_json::BorrowedValue;

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand Down Expand Up @@ -223,6 +224,7 @@ where
schema: Option<SchemaRef>,
schema_overwrite: Option<&'a Schema>,
json_format: JsonFormat,
sub_json_path: Option<&'a str>,
}

pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
Expand Down Expand Up @@ -251,6 +253,7 @@ where
schema: None,
schema_overwrite: None,
json_format: JsonFormat::Json,
sub_json_path: None,
}
}

Expand All @@ -261,7 +264,7 @@ where

/// Take the SerReader and return a parsed DataFrame.
///
/// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
/// Because JSON values specify their types (number, string, etc.), no upcasting or conversion is performed between
/// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
/// error is returned or whether elements of incompatible dtypes are replaced with `null`.
fn finish(mut self) -> PolarsResult<DataFrame> {
Expand All @@ -281,74 +284,21 @@ where
} else {
simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
};
if let BorrowedValue::Array(array) = &json_value {
if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
return Ok(DataFrame::empty());
}
}

let allow_extra_fields_in_struct = self.schema.is_some();

// struct type
let dtype = if let Some(mut schema) = self.schema {
if let Some(overwrite) = self.schema_overwrite {
let mut_schema = Arc::make_mut(&mut schema);
overwrite_schema(mut_schema, overwrite)?;
}
let mut json_value = &json_value;

DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest())
} else {
// infer
let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
infer::json_values_to_supertype(
values,
self.infer_schema_len
.unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
)?
.to_arrow(CompatLevel::newest())
} else {
polars_json::json::infer(&json_value)?
};

if let Some(overwrite) = self.schema_overwrite {
let ArrowDataType::Struct(fields) = inner_dtype else {
polars_bail!(ComputeError: "can only deserialize json objects")
};

let mut schema = Schema::from_iter(fields.iter().map(Into::<Field>::into));
overwrite_schema(&mut schema, overwrite)?;

DataType::Struct(
schema
.into_iter()
.map(|(name, dt)| Field::new(name, dt))
.collect(),
)
.to_arrow(CompatLevel::newest())
} else {
inner_dtype
if let Some(sub_json_path) = self.sub_json_path {
for section in sub_json_path.split('/') {
json_value = json_value.get(section).ok_or_else(|| polars_err!(NoData: "Can not find the data by path: {}", sub_json_path))?;
}
};

let dtype = if let BorrowedValue::Array(_) = &json_value {
ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
PlSmallStr::from_static("item"),
dtype,
true,
)))
} else {
dtype
};
}

let arr = polars_json::json::deserialize(
&json_value,
dtype,
allow_extra_fields_in_struct,
)?;
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
|| polars_err!(ComputeError: "can only deserialize json objects"),
)?;
DataFrame::try_from(arr.clone())
json_to_data_frame(
json_value,
self.schema,
self.schema_overwrite,
self.infer_schema_len,
)
},
JsonFormat::JsonLines => {
let mut json_reader = CoreJsonReader::new(
Expand Down Expand Up @@ -383,6 +333,79 @@ where
}
}

pub fn json_to_data_frame(
json_value: &simd_json::BorrowedValue,
schema: Option<SchemaRef>,
schema_overwrite: Option<&Schema>,
infer_schema_len: Option<NonZeroUsize>,
) -> PolarsResult<DataFrame> {
if let BorrowedValue::Array(array) = json_value {
if array.is_empty() & schema.is_none() & schema_overwrite.is_none() {
return Ok(DataFrame::empty());
}
}

let allow_extra_fields_in_struct = schema.is_some();

// struct type
let dtype = if let Some(mut schema) = schema {
if let Some(overwrite) = schema_overwrite {
let mut_schema = Arc::make_mut(&mut schema);
overwrite_schema(mut_schema, overwrite)?;
}

DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest())
} else {
// infer
let inner_dtype = if let BorrowedValue::Array(values) = json_value {
infer::json_values_to_supertype(
values,
infer_schema_len.unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
)?
.to_arrow(CompatLevel::newest())
} else {
polars_json::json::infer(json_value)?
};

if let Some(overwrite) = schema_overwrite {
let ArrowDataType::Struct(fields) = inner_dtype else {
polars_bail!(ComputeError: "can only deserialize json objects")
};

let mut schema = Schema::from_iter(fields.iter().map(Into::<Field>::into));
overwrite_schema(&mut schema, overwrite)?;

DataType::Struct(
schema
.into_iter()
.map(|(name, dt)| Field::new(name, dt))
.collect(),
)
.to_arrow(CompatLevel::newest())
} else {
inner_dtype
}
};

let dtype = if let BorrowedValue::Array(_) = &json_value {
ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
PlSmallStr::from_static("item"),
dtype,
true,
)))
} else {
dtype
};

let arr = polars_json::json::deserialize(json_value, dtype, allow_extra_fields_in_struct)?;
let arr = arr
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| polars_err!(ComputeError: "can only deserialize json objects"))?;

DataFrame::try_from(arr.clone())
}

impl<'a, R> JsonReader<'a, R>
where
R: MmapBytesReader,
Expand Down Expand Up @@ -423,7 +446,7 @@ where
/// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
/// columns are kept.
///
/// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
/// Setting `projection` to the columns you want to keep is more efficient than deserializing all the columns and
/// then dropping the ones you don't want.
pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
self.projection = projection;
Expand All @@ -440,4 +463,40 @@ where
self.ignore_errors = ignore;
self
}

/// Set the path to the sub JSON. This is used when reading the partial data from the JSON.
///
/// # Example
///
/// ```
/// use polars_io::SerReader;
///
/// let json = r#"{
/// "data": {
/// "data_data": [
/// {"a": 1, "b": 2},
/// {"a": 3, "b": 4}
/// ]
/// },
/// "other": {"foo": 1, "bar": "2"}
/// }"#;
///
/// let df = polars_io::json::JsonReader::new(std::io::Cursor::new(json))
/// .with_sub_json_path("data/data_data")
/// .finish()?;
///
/// let reference = polars_core::df! {
/// "a" => [1, 3],
/// "b" => [2, 4],
/// }?;
///
/// assert_eq!(reference, df);
///
/// # Ok::<(), polars_error::PolarsError>(())
/// ```
///
pub fn with_sub_json_path(mut self, sub_json_path: &'a str) -> Self {
self.sub_json_path = Some(sub_json_path);
self
}
}
Loading