Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-465] Support reading legacy parquet files written by Apache Sedona <= 1.3.1-incubating #1190

Merged
merged 6 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/api/sql/Reading-legacy-parquet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
Due to a breaking change in Apache Sedona 1.4.0 to the SQL type of `GeometryUDT`
([SEDONA-205](https://issues.apache.org/jira/browse/SEDONA-205)) as well as the
serialization format of geometry values ([SEDONA-207](https://issues.apache.org/jira/browse/SEDONA-207)), Parquet files
containing geometry columns written by Apache Sedona 1.3.1 or earlier cannot be read by Apache Sedona 1.4.0 or later.

For parquet files written by `"parquet"` format when using Apache Sedona 1.3.1-incubating or earlier:

```python
df.write.format("parquet").save("path/to/parquet/files")
```

Reading such files with Apache Sedona 1.4.0 or later using `spark.read.format("parquet").load("path/to/parquet/files")` will result in an exception:

```
24/01/08 12:52:56 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 11)
org.apache.spark.sql.AnalysisException: Invalid Spark read type: expected required group geom (LIST) {
repeated group list {
required int32 element (INTEGER(8,true));
}
} to be list type but found Some(BinaryType)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:745)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertGroupField$3(ParquetSchemaConverter.scala:343)
at scala.Option.fold(Option.scala:251)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertGroupField(ParquetSchemaConverter.scala:324)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:188)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3(ParquetSchemaConverter.scala:147)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3$adapted(ParquetSchemaConverter.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...
```

Since v1.5.1, GeoParquet supports reading legacy Parquet files. you can use `"geoparquet"` format with the `.option("legacyMode", "true")` option. Here is an example:

=== "Scala/Java"

```scala
val df = sedona.read.format("geoparquet").option("legacyMode", "true").load("path/to/legacy-parquet-files")
```

=== "Java"

```java
Dataset<Row> df = sedona.read.format("geoparquet").option("legacyMode", "true").load("path/to/legacy-parquet-files")
```

=== "Python"

```python
df = sedona.read.format("geoparquet").option("legacyMode", "true").load("path/to/legacy-parquet-files")
```
7 changes: 7 additions & 0 deletions docs/tutorial/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,13 @@ root

Sedona supports spatial predicate push-down for GeoParquet files, please refer to the [SedonaSQL query optimizer](../api/sql/Optimizer.md) documentation for details.

GeoParquet file reader can also be used to read legacy Parquet files written by Apache Sedona 1.3.1-incubating or earlier.
Please refer to [Reading Legacy Parquet Files](../api/sql/Reading-legacy-parquet.md) for details.

!!!warning
GeoParquet file reader does not work on Databricks runtime when Photon is enabled. Please disable Photon when using
GeoParquet file reader on Databricks runtime.

### Inspect GeoParquet metadata

Since v`1.5.1`, Sedona provides a Spark SQL data source `"geoparquet.metadata"` for inspecting GeoParquet metadata. The resulting dataframe contains
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ nav:
- Aggregate function: api/sql/AggregateFunction.md
- DataFrame Style functions: api/sql/DataFrameAPI.md
- Query optimization: api/sql/Optimizer.md
- Reading Legacy Parquet Files: api/sql/Reading-legacy-parquet.md
- Visualization:
- SedonaPyDeck: api/sql/Visualization_SedonaPyDeck.md
- SedonaKepler: api/sql/Visualization_SedonaKepler.md
Expand Down
1 change: 1 addition & 0 deletions python/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
geojson_id_input_location = os.path.join(tests_resource, "testContainsId.json")
geoparquet_input_location = os.path.join(tests_resource, "geoparquet/example1.parquet")
plain_parquet_input_location = os.path.join(tests_resource, "geoparquet/plain.parquet")
legacy_parquet_input_location = os.path.join(tests_resource, "parquet/legacy-parquet-nested-columns.snappy.parquet")
google_buildings_input_location = os.path.join(tests_resource, "813_buildings_test.csv")
chicago_crimes_input_location = os.path.join(tests_resource, "Chicago_Crimes.csv")
world_map_raster_input_location = os.path.join(tests_resource, "raster/raster_with_no_data/test5.tiff")
10 changes: 10 additions & 0 deletions python/tests/sql/test_geoparquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from tests.test_base import TestBase
from tests import geoparquet_input_location
from tests import plain_parquet_input_location
from tests import legacy_parquet_input_location


class TestGeoParquet(TestBase):
Expand Down Expand Up @@ -82,3 +83,12 @@ def test_inspect_geoparquet_metadata(self):
assert column_metadata['encoding'] == 'WKB'
assert len(column_metadata['bbox']) == 4
assert isinstance(json.loads(column_metadata['crs']), dict)

def test_reading_legacy_parquet_files(self):
df = self.spark.read.format("geoparquet").option("legacyMode", "true").load(legacy_parquet_input_location)
rows = df.collect()
assert len(rows) > 0
for row in rows:
assert isinstance(row['geom'], BaseGeometry)
assert isinstance(row['struct_geom']['g0'], BaseGeometry)
assert isinstance(row['struct_geom']['g1'], BaseGeometry)
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter])
convertTz,
enableVectorizedReader = false,
datetimeRebaseMode,
int96RebaseMode)
int96RebaseMode,
options)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
Expand Down Expand Up @@ -356,26 +357,29 @@ object GeoParquetFileFormat extends Logging {
val converter = new GeoParquetToSparkSchemaConverter(
keyValueMetaData = keyValueMetaData,
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
readSchemaFromFooter(footer, keyValueMetaData, converter)
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
parameters = parameters)
readSchemaFromFooter(footer, keyValueMetaData, converter, parameters)
}
}

GeoSchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
}

private def readSchemaFromFooter(footer: Footer, keyValueMetaData: java.util.Map[String, String],
converter: GeoParquetToSparkSchemaConverter): StructType = {
converter: GeoParquetToSparkSchemaConverter,
parameters: Map[String, String]): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
.asScala.toMap
.get(ParquetReadSupport.SPARK_METADATA_KEY)
.flatMap(schema => deserializeSchemaString(schema, keyValueMetaData))
.flatMap(schema => deserializeSchemaString(schema, keyValueMetaData, parameters))
.getOrElse(converter.convert(fileMetaData.getSchema))
}

private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String]): Option[StructType] = {
private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String],
parameters: Map[String, String]): Option[StructType] = {
// Tries to deserialize the schema string as JSON first, then falls back to the case class
// string parser (data generated by older versions of Spark SQL uses this format).
val schemaOpt = Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
Expand All @@ -392,14 +396,13 @@ object GeoParquetFileFormat extends Logging {
Failure(cause)
}.toOption

schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData))
schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData, parameters))
}

private def replaceGeometryColumnWithGeometryUDT(schema: StructType,
keyValueMetaData: java.util.Map[String, String]): StructType = {
val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse {
throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata")
}
keyValueMetaData: java.util.Map[String, String],
parameters: Map[String, String]): StructType = {
val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters)
val fields = schema.fields.map { field =>
field.dataType match {
case _: BinaryType if geoParquetMetaData.columns.contains(field.name) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class GeoParquetReadSupport (
override val convertTz: Option[ZoneId],
enableVectorizedReader: Boolean,
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
int96RebaseMode: LegacyBehaviorPolicy.Value)
int96RebaseMode: LegacyBehaviorPolicy.Value,
parameters: Map[String, String]
)
extends ParquetReadSupport with Logging {
private var catalystRequestedSchema: StructType = _

Expand Down Expand Up @@ -121,10 +123,11 @@ class GeoParquetReadSupport (
new GeoParquetRecordMaterializer(
parquetRequestedSchema,
GeoParquetReadSupport.expandUDT(catalystRequestedSchema),
new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf),
new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf, parameters),
convertTz,
datetimeRebaseMode,
int96RebaseMode)
int96RebaseMode,
parameters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ import org.apache.spark.sql.types.StructType
* Gregorian calendar: mode + optional original time zone
* @param int96RebaseSpec the specification of rebasing INT96 timestamp from Julian to Proleptic
* Gregorian calendar
* @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not.
*/
class GeoParquetRecordMaterializer(
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: GeoParquetToSparkSchemaConverter,
convertTz: Option[ZoneId],
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
int96RebaseMode: LegacyBehaviorPolicy.Value)
int96RebaseMode: LegacyBehaviorPolicy.Value,
parameters: Map[String, String])
extends RecordMaterializer[InternalRow] {
private val rootConverter = new GeoParquetRowConverter(
schemaConverter,
Expand All @@ -51,6 +53,7 @@ class GeoParquetRecordMaterializer(
convertTz,
datetimeRebaseMode,
int96RebaseMode,
parameters,
NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.locationtech.jts.io.WKBReader
import java.math.{BigDecimal, BigInteger}
import java.time.{ZoneId, ZoneOffset}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
Expand Down Expand Up @@ -75,6 +76,7 @@ import scala.collection.mutable.ArrayBuffer
* calendar
* @param int96RebaseMode the mode of rebasing INT96 timestamp from Julian to Proleptic Gregorian
* calendar
* @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not.
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class GeoParquetRowConverter(
Expand All @@ -84,6 +86,7 @@ private[parquet] class GeoParquetRowConverter(
convertTz: Option[ZoneId],
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
int96RebaseMode: LegacyBehaviorPolicy.Value,
parameters: Map[String, String],
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {

Expand Down Expand Up @@ -202,11 +205,28 @@ private[parquet] class GeoParquetRowConverter(
new ParquetPrimitiveConverter(updater)

case GeometryUDT =>
new ParquetPrimitiveConverter(updater) {
override def addBinary(value: Binary): Unit = {
val wkbReader = new WKBReader()
val geom = wkbReader.read(value.getBytes)
updater.set(GeometryUDT.serialize(geom))
if (parquetType.isPrimitive) {
new ParquetPrimitiveConverter(updater) {
override def addBinary(value: Binary): Unit = {
val wkbReader = new WKBReader()
val geom = wkbReader.read(value.getBytes)
updater.set(GeometryUDT.serialize(geom))
}
}
} else {
if (GeoParquetUtils.isLegacyMode(parameters)) {
new ParquetArrayConverter(parquetType.asGroupType(), ArrayType(ByteType, containsNull = false), updater) {
override def end(): Unit = {
val wkbReader = new WKBReader()
val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray
val geom = wkbReader.read(byteArray)
updater.set(GeometryUDT.serialize(geom))
}
}
} else {
throw new IllegalArgumentException(
s"Parquet type for geometry column is $parquetType. This parquet file could be written by " +
"Apache Sedona <= 1.3.1-incubating. Please use option(\"legacyMode\", \"true\") to read this file.")
}
}

Expand Down Expand Up @@ -337,6 +357,7 @@ private[parquet] class GeoParquetRowConverter(
convertTz,
datetimeRebaseMode,
int96RebaseMode,
parameters,
wrappedUpdater)

case t =>
Expand Down Expand Up @@ -474,13 +495,12 @@ private[parquet] class GeoParquetRowConverter(
*
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*/
private final class ParquetArrayConverter(
parquetSchema: GroupType,
catalystSchema: ArrayType,
updater: ParentContainerUpdater)
private class ParquetArrayConverter(parquetSchema: GroupType,
catalystSchema: ArrayType,
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) {

private[this] val currentArray = ArrayBuffer.empty[Any]
protected[this] val currentArray: mutable.ArrayBuffer[Any] = ArrayBuffer.empty[Any]

private[this] val elementConverter: Converter = {
val repeatedType = parquetSchema.getType(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,27 @@ import org.apache.spark.sql.types._
* [[StringType]] fields.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
* [[TimestampType]] fields.
* @param parameters Options for reading GeoParquet files.
*/
class GeoParquetToSparkSchemaConverter(
keyValueMetaData: java.util.Map[String, String],
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
parameters: Map[String, String]) {

private val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse {
throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata")
}
private val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters)

def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf) = this(
def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf, parameters: Map[String, String]) = this(
keyValueMetaData = keyValueMetaData,
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
parameters = parameters)

def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration) = this(
def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration, parameters: Map[String, String]) = this(
keyValueMetaData = keyValueMetaData,
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
parameters = parameters)

/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
Expand Down
Loading
Loading