diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index df2ff8bb9c28..a1d95722e8ff 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -75,6 +75,7 @@ use polars_error::{polars_bail, PolarsResult}; use polars_json::json::write::FallibleStreamingIterator; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use simd_json::prelude::*; use simd_json::BorrowedValue; use crate::mmap::{MmapBytesReader, ReaderBytes}; @@ -223,6 +224,7 @@ where schema: Option, schema_overwrite: Option<&'a Schema>, json_format: JsonFormat, + sub_json_path: Option<&'a str>, } pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> { @@ -251,6 +253,7 @@ where schema: None, schema_overwrite: None, json_format: JsonFormat::Json, + sub_json_path: None, } } @@ -261,7 +264,7 @@ where /// Take the SerReader and return a parsed DataFrame. /// - /// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between + /// Because JSON values specify their types (number, string, etc.), no upcasting or conversion is performed between /// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an /// error is returned or whether elements of incompatible dtypes are replaced with `null`. fn finish(mut self) -> PolarsResult { @@ -281,74 +284,21 @@ where } else { simd_json::to_borrowed_value(owned).map_err(to_compute_err)? }; - if let BorrowedValue::Array(array) = &json_value { - if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() { - return Ok(DataFrame::empty()); - } - } - - let allow_extra_fields_in_struct = self.schema.is_some(); - // struct type - let dtype = if let Some(mut schema) = self.schema { - if let Some(overwrite) = self.schema_overwrite { - let mut_schema = Arc::make_mut(&mut schema); - overwrite_schema(mut_schema, overwrite)?; - } + let mut json_value = &json_value; - DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest()) - } else { - // infer - let inner_dtype = if let BorrowedValue::Array(values) = &json_value { - infer::json_values_to_supertype( - values, - self.infer_schema_len - .unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()), - )? - .to_arrow(CompatLevel::newest()) - } else { - polars_json::json::infer(&json_value)? - }; - - if let Some(overwrite) = self.schema_overwrite { - let ArrowDataType::Struct(fields) = inner_dtype else { - polars_bail!(ComputeError: "can only deserialize json objects") - }; - - let mut schema = Schema::from_iter(fields.iter().map(Into::::into)); - overwrite_schema(&mut schema, overwrite)?; - - DataType::Struct( - schema - .into_iter() - .map(|(name, dt)| Field::new(name, dt)) - .collect(), - ) - .to_arrow(CompatLevel::newest()) - } else { - inner_dtype + if let Some(sub_json_path) = self.sub_json_path { + for section in sub_json_path.split('/') { + json_value = json_value.get(section).ok_or_else(|| polars_err!(NoData: "Can not find the data by path: {}", sub_json_path))?; } - }; - - let dtype = if let BorrowedValue::Array(_) = &json_value { - ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new( - PlSmallStr::from_static("item"), - dtype, - true, - ))) - } else { - dtype - }; + } - let arr = polars_json::json::deserialize( - &json_value, - dtype, - allow_extra_fields_in_struct, - )?; - let arr = arr.as_any().downcast_ref::().ok_or_else( - || polars_err!(ComputeError: "can only deserialize json objects"), - )?; - DataFrame::try_from(arr.clone()) + json_to_data_frame( + json_value, + self.schema, + self.schema_overwrite, + self.infer_schema_len, + ) }, JsonFormat::JsonLines => { let mut json_reader = CoreJsonReader::new( @@ -383,6 +333,79 @@ where } } +pub fn json_to_data_frame( + json_value: &simd_json::BorrowedValue, + schema: Option, + schema_overwrite: Option<&Schema>, + infer_schema_len: Option, +) -> PolarsResult { + if let BorrowedValue::Array(array) = json_value { + if array.is_empty() & schema.is_none() & schema_overwrite.is_none() { + return Ok(DataFrame::empty()); + } + } + + let allow_extra_fields_in_struct = schema.is_some(); + + // struct type + let dtype = if let Some(mut schema) = schema { + if let Some(overwrite) = schema_overwrite { + let mut_schema = Arc::make_mut(&mut schema); + overwrite_schema(mut_schema, overwrite)?; + } + + DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest()) + } else { + // infer + let inner_dtype = if let BorrowedValue::Array(values) = json_value { + infer::json_values_to_supertype( + values, + infer_schema_len.unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()), + )? + .to_arrow(CompatLevel::newest()) + } else { + polars_json::json::infer(json_value)? + }; + + if let Some(overwrite) = schema_overwrite { + let ArrowDataType::Struct(fields) = inner_dtype else { + polars_bail!(ComputeError: "can only deserialize json objects") + }; + + let mut schema = Schema::from_iter(fields.iter().map(Into::::into)); + overwrite_schema(&mut schema, overwrite)?; + + DataType::Struct( + schema + .into_iter() + .map(|(name, dt)| Field::new(name, dt)) + .collect(), + ) + .to_arrow(CompatLevel::newest()) + } else { + inner_dtype + } + }; + + let dtype = if let BorrowedValue::Array(_) = &json_value { + ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new( + PlSmallStr::from_static("item"), + dtype, + true, + ))) + } else { + dtype + }; + + let arr = polars_json::json::deserialize(json_value, dtype, allow_extra_fields_in_struct)?; + let arr = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| polars_err!(ComputeError: "can only deserialize json objects"))?; + + DataFrame::try_from(arr.clone()) +} + impl<'a, R> JsonReader<'a, R> where R: MmapBytesReader, @@ -423,7 +446,7 @@ where /// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all /// columns are kept. /// - /// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and + /// Setting `projection` to the columns you want to keep is more efficient than deserializing all the columns and /// then dropping the ones you don't want. pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; @@ -440,4 +463,40 @@ where self.ignore_errors = ignore; self } + + /// Set the path to the sub JSON. This is used when reading the partial data from the JSON. + /// + /// # Example + /// + /// ``` + /// use polars_io::SerReader; + /// + /// let json = r#"{ + /// "data": { + /// "data_data": [ + /// {"a": 1, "b": 2}, + /// {"a": 3, "b": 4} + /// ] + /// }, + /// "other": {"foo": 1, "bar": "2"} + /// }"#; + /// + /// let df = polars_io::json::JsonReader::new(std::io::Cursor::new(json)) + /// .with_sub_json_path("data/data_data") + /// .finish()?; + /// + /// let reference = polars_core::df! { + /// "a" => [1, 3], + /// "b" => [2, 4], + /// }?; + /// + /// assert_eq!(reference, df); + /// + /// # Ok::<(), polars_error::PolarsError>(()) + /// ``` + /// + pub fn with_sub_json_path(mut self, sub_json_path: &'a str) -> Self { + self.sub_json_path = Some(sub_json_path); + self + } }