From d2987c94e5de15037b350a0479ce314a34e4f652 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 8 Jan 2024 12:40:26 +0800 Subject: [PATCH 1/6] Support reading legacy parquet files written by Apache Sedona <= 1.3.1-incubating --- ...gacy-parquet-nested-columns.snappy.parquet | Bin 0 -> 8552 bytes .../parquet/GeoParquetFileFormat.scala | 25 ++++++----- .../parquet/GeoParquetReadSupport.scala | 9 ++-- .../GeoParquetRecordMaterializer.scala | 5 ++- .../parquet/GeoParquetRowConverter.scala | 40 +++++++++++++----- .../parquet/GeoParquetSchemaConverter.scala | 18 ++++---- .../datasources/parquet/GeoParquetUtils.scala | 37 ++++++++++++++++ .../apache/sedona/sql/geoparquetIOTests.scala | 14 ++++++ 8 files changed, 115 insertions(+), 33 deletions(-) create mode 100644 spark/common/src/test/resources/parquet/legacy-parquet-nested-columns.snappy.parquet diff --git a/spark/common/src/test/resources/parquet/legacy-parquet-nested-columns.snappy.parquet b/spark/common/src/test/resources/parquet/legacy-parquet-nested-columns.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..976a965e5aebab99c21f9ff8e1d3360e11adec6e GIT binary patch literal 8552 zcmeI2dvH|M9mmhzy}P-QJa&P*?zTzTU6K$mZj&sJ5R;f)$%8im0s-=1v)P2`CV}h* z5+VXyt+7Kpold7Zj-86_wA8U3CqrAu+77d=B~9_sR>dk+Fj_>UL_kDku-|ikcOP8A zN~eEOnEdkD-+BDb?|aUI)Dr3l3kiz|iwR2z zO9{&e%Lyw8^@Np#RfN@q2ErP`T0$dX9br9T1EGn~OlTpr5;hWif}svuu?@!m_z6rF zv*XSah@Fl>2df=z{4VOcOI zEEkp!D}c>{6~bJwB3Lo31m=d#hRuP^h0TMN!sf%uV7I}_VHL1S*e;Cg0llz0U<+W? zuo_q`jH_A*TLfDSTLN1OTLxPWTLC+O81>*v*ecj+SOaVgY%Q!2who5p`R%Rp=ydtw zGZ)7NJmC)tm>{EUWNhR%`#DUC{?dpbTr#lF|2%DA1DHF8K7!FuWtYBgmvs1XNR_(O z`y5ilF4Y*Bl&i~j&0FzeEy`AyK-6Ss31m)5RFCbCUmN~m9a5HHAS3T&A9((jekhv|6b!i zr}3ZH_%CSu7d8H0H2z;T{@*nIOB(-WjsJ?qKdkXz)%btc_^)aF*ERkTjsFjg|AxkY zQ{x}i_{TKLoX?`9lP`rE%3t3sLUUy|@m$u7NF|f8r_8`9I3@e_YM~iNpwxtNEYQ z=6^CN|DntCKS249#-l_I@cc*A{P)Mpi$-GkpVH=k>MHpU@%)df`M;aU|0vJ@X*K_6 z5+gjV=6_n7|LLUs@4hU5kxdhn1(0H4(jshH)hxS7@QOuIRmC4-l_-J=p$@C?fmX!_ zS5a{{S7BFG6sRf;5w2lO)o>^ge@xYIR?(1^*r#VJ9N1es4=US;jmI`BJLt_3#AvB- zO4(f%@kr59Z?+&-y`V~XDPB?a0wm;Z6QvIo1xZ7E$Q6jSJA`_~V*c0=D5>=5#AqYe z;PHsMW>rK+K`LwHIu5HkUQG;gSk-Y}tK&TBxSw?VC^iVYpFa(ol-;E3j>m~^;=Ac} zRm72adDZI>G4o^gkygV;$|@YB$2n=1YVb4e&EMl;13djV)%1@h!oR7ee?go6 zg$f5hq*Cs2^zuoX9?M@$xyRe8XTv1mTk%NIVJ=`?74UYvylR{Te5@7lu_B=<|qYzN!X!m&;$$FKBq7C=b= zH9pV>c7Pq=UT`ni33h_92%F$<;t|#x2H5)a0oH|rE;NV*xD)=JNdfMHe^*j~Ap9T? zumJ%!AV4D}I|u3=XjcNlc)NbI>qj(o zOkqE&Gy`r$ldWiS9|nZ+Kzq?@F9MB#Fdk?dT5ZFC&o&GDd0xT<>NH6W7|O)~VfUwe z#SJZ3IYWz?H`C-ah){z`Iq@c%oVKIM_C!v~U`VKNqy>+cIMRZHiC@kf)yr?`)Hi$joNe*{uA&SowpG@tbFz&7xL|{EpXV z6_uwwQT!!)@ylB|(d!DqvMiFkZxTjhwk*q`O!?Au_4AlJQGVVlpUY6o&o{^ymCuNK zq%JMA+1GbhYskMT(Cx!ruq@{3th%mtTwzrifqq+9k)%9qL$%am9<3oD2(UZNCJiZ7Uu%b4XthitK& zX_qTm{-Mk)%w3*Qw3RATq)ba}I`~)*3YPstmWK<_GXKd*S+w|8W$`eOvK7c3f-d>*)nD6` zzQP9_;{#guy(VBQNML(cHv=76)zNzZ|^(Am)y2)2iuWgCW^ zJ-%*U)zyw-wOO#YrxRXJe=z843!)Z}PoU5li|8yH+OeZB-jhF7@xJk)`rE@O^ap(* z4CB0FwxK|KZ;!8)C*H7NwW}6!0^z(_XMDKpb&flWuXnA@|8d7P?_kzN>hpYScb{E70PMxC;u2Shy`ZhX zE7)$IH!t8W@|PCPce_jH2K;kM{BD1TZ?4}xyKP>e!|g69c9)d+=g*#LYr?-6C;zZ6 LTtBYB|FQTtA)TC4 literal 0 HcmV?d00001 diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala index 15bb8897ce..58a3adaf0b 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala @@ -294,7 +294,8 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter]) convertTz, enableVectorizedReader = false, datetimeRebaseMode, - int96RebaseMode) + int96RebaseMode, + options) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -356,8 +357,9 @@ object GeoParquetFileFormat extends Logging { val converter = new GeoParquetToSparkSchemaConverter( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp) - readSchemaFromFooter(footer, keyValueMetaData, converter) + assumeInt96IsTimestamp = assumeInt96IsTimestamp, + parameters = parameters) + readSchemaFromFooter(footer, keyValueMetaData, converter, parameters) } } @@ -365,17 +367,19 @@ object GeoParquetFileFormat extends Logging { } private def readSchemaFromFooter(footer: Footer, keyValueMetaData: java.util.Map[String, String], - converter: GeoParquetToSparkSchemaConverter): StructType = { + converter: GeoParquetToSparkSchemaConverter, + parameters: Map[String, String]): StructType = { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData .asScala.toMap .get(ParquetReadSupport.SPARK_METADATA_KEY) - .flatMap(schema => deserializeSchemaString(schema, keyValueMetaData)) + .flatMap(schema => deserializeSchemaString(schema, keyValueMetaData, parameters)) .getOrElse(converter.convert(fileMetaData.getSchema)) } - private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String]): Option[StructType] = { + private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): Option[StructType] = { // Tries to deserialize the schema string as JSON first, then falls back to the case class // string parser (data generated by older versions of Spark SQL uses this format). val schemaOpt = Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { @@ -392,14 +396,13 @@ object GeoParquetFileFormat extends Logging { Failure(cause) }.toOption - schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData)) + schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData, parameters)) } private def replaceGeometryColumnWithGeometryUDT(schema: StructType, - keyValueMetaData: java.util.Map[String, String]): StructType = { - val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { - throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") - } + keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): StructType = { + val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters) val fields = schema.fields.map { field => field.dataType match { case _: BinaryType if geoParquetMetaData.columns.contains(field.name) => diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala index 275cf071d5..321b7b1b41 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala @@ -54,7 +54,9 @@ class GeoParquetReadSupport ( override val convertTz: Option[ZoneId], enableVectorizedReader: Boolean, datetimeRebaseMode: LegacyBehaviorPolicy.Value, - int96RebaseMode: LegacyBehaviorPolicy.Value) + int96RebaseMode: LegacyBehaviorPolicy.Value, + parameters: Map[String, String] + ) extends ParquetReadSupport with Logging { private var catalystRequestedSchema: StructType = _ @@ -121,10 +123,11 @@ class GeoParquetReadSupport ( new GeoParquetRecordMaterializer( parquetRequestedSchema, GeoParquetReadSupport.expandUDT(catalystRequestedSchema), - new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf), + new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf, parameters), convertTz, datetimeRebaseMode, - int96RebaseMode) + int96RebaseMode, + parameters) } } diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala index 575ef7eeac..281144ce55 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.types.StructType * Gregorian calendar: mode + optional original time zone * @param int96RebaseSpec the specification of rebasing INT96 timestamp from Julian to Proleptic * Gregorian calendar + * @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not. */ class GeoParquetRecordMaterializer( parquetSchema: MessageType, @@ -42,7 +43,8 @@ class GeoParquetRecordMaterializer( schemaConverter: GeoParquetToSparkSchemaConverter, convertTz: Option[ZoneId], datetimeRebaseMode: LegacyBehaviorPolicy.Value, - int96RebaseMode: LegacyBehaviorPolicy.Value) + int96RebaseMode: LegacyBehaviorPolicy.Value, + parameters: Map[String, String]) extends RecordMaterializer[InternalRow] { private val rootConverter = new GeoParquetRowConverter( schemaConverter, @@ -51,6 +53,7 @@ class GeoParquetRecordMaterializer( convertTz, datetimeRebaseMode, int96RebaseMode, + parameters, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala index 5c12bac07c..4eef6a03b0 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala @@ -36,6 +36,7 @@ import org.locationtech.jts.io.WKBReader import java.math.{BigDecimal, BigInteger} import java.time.{ZoneId, ZoneOffset} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** @@ -75,6 +76,7 @@ import scala.collection.mutable.ArrayBuffer * calendar * @param int96RebaseMode the mode of rebasing INT96 timestamp from Julian to Proleptic Gregorian * calendar + * @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not. * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class GeoParquetRowConverter( @@ -84,6 +86,7 @@ private[parquet] class GeoParquetRowConverter( convertTz: Option[ZoneId], datetimeRebaseMode: LegacyBehaviorPolicy.Value, int96RebaseMode: LegacyBehaviorPolicy.Value, + parameters: Map[String, String], updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -202,11 +205,28 @@ private[parquet] class GeoParquetRowConverter( new ParquetPrimitiveConverter(updater) case GeometryUDT => - new ParquetPrimitiveConverter(updater) { - override def addBinary(value: Binary): Unit = { - val wkbReader = new WKBReader() - val geom = wkbReader.read(value.getBytes) - updater.set(GeometryUDT.serialize(geom)) + if (parquetType.isPrimitive) { + new ParquetPrimitiveConverter(updater) { + override def addBinary(value: Binary): Unit = { + val wkbReader = new WKBReader() + val geom = wkbReader.read(value.getBytes) + updater.set(GeometryUDT.serialize(geom)) + } + } + } else { + if (GeoParquetUtils.isLegacyMode(parameters)) { + new ParquetArrayConverter(parquetType.asGroupType(), ArrayType(ByteType, containsNull = false), updater) { + override def end(): Unit = { + val wkbReader = new WKBReader() + val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray + val geom = wkbReader.read(byteArray) + updater.set(GeometryUDT.serialize(geom)) + } + } + } else { + throw new IllegalArgumentException( + s"Parquet type for geometry column is $parquetType. This parquet file could be written by " + + "Apache Sedona <= 1.3.1-incubating. Please use option(\"legacyMode\", \"true\") to read this file.") } } @@ -337,6 +357,7 @@ private[parquet] class GeoParquetRowConverter( convertTz, datetimeRebaseMode, int96RebaseMode, + parameters, wrappedUpdater) case t => @@ -474,13 +495,12 @@ private[parquet] class GeoParquetRowConverter( * * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists */ - private final class ParquetArrayConverter( - parquetSchema: GroupType, - catalystSchema: ArrayType, - updater: ParentContainerUpdater) + private class ParquetArrayConverter(parquetSchema: GroupType, + catalystSchema: ArrayType, + updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) { - private[this] val currentArray = ArrayBuffer.empty[Any] + protected[this] val currentArray: mutable.ArrayBuffer[Any] = ArrayBuffer.empty[Any] private[this] val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala index 11bfcfcabf..bdc64bfdd8 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala @@ -40,25 +40,27 @@ import org.apache.spark.sql.types._ * [[StringType]] fields. * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL * [[TimestampType]] fields. + * @param parameters Options for reading GeoParquet files. */ class GeoParquetToSparkSchemaConverter( keyValueMetaData: java.util.Map[String, String], assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) { + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, + parameters: Map[String, String]) { - private val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { - throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") - } + private val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters) - def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf) = this( + def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf, parameters: Map[String, String]) = this( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = conf.isParquetBinaryAsString, - assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp) + assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, + parameters = parameters) - def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration) = this( + def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration, parameters: Map[String, String]) = this( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, - assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean) + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, + parameters = parameters) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala index 5e8c9c98f7..bf475930f2 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala @@ -77,4 +77,41 @@ object GeoParquetUtils { file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } + + /** + * Legacy mode option is for reading Parquet files written by old versions of Apache Sedona (<= 1.3.1-incubating). + * Such files are actually not GeoParquet files and do not have GeoParquet file metadata. Geometry fields were + * encoded as list of bytes and stored as group type in Parquet files. The Definition of GeometryUDT before 1.4.0 was: + * {{{ + * case class GeometryUDT extends UserDefinedType[Geometry] { + * override def sqlType: DataType = ArrayType(ByteType, containsNull = false) + * // ... + * }}} + * Since 1.4.0, the sqlType of GeometryUDT is changed to BinaryType. This is a breaking change for reading old Parquet + * files. To read old Parquet files, users need to use "geoparquet" format and set legacyMode to true. + * @param parameters user provided parameters for reading GeoParquet files using `.option()` method, e.g. + * `spark.read.format("geoparquet").option("legacyMode", "true").load("path")` + * @return true if legacyMode is set to true, false otherwise + */ + def isLegacyMode(parameters: Map[String, String]): Boolean = + parameters.getOrElse("legacyMode", "false").toBoolean + + /** + * Parse GeoParquet file metadata from Parquet file metadata. Legacy parquet files do not contain GeoParquet file + * metadata, so we'll simply return an empty GeoParquetMetaData object when legacy mode is enabled. + * @param keyValueMetaData Parquet file metadata + * @param parameters user provided parameters for reading GeoParquet files + * @return GeoParquetMetaData object + */ + def parseGeoParquetMetaData(keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): GeoParquetMetaData = { + val isLegacyMode = GeoParquetUtils.isLegacyMode(parameters) + GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { + if (isLegacyMode) { + GeoParquetMetaData(None, "", Map.empty) + } else { + throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") + } + } + } } diff --git a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 7b209e5429..c2d59756bf 100644 --- a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -51,6 +51,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { val geoparquetdatalocation2: String = resourceFolder + "geoparquet/example2.parquet" val geoparquetdatalocation3: String = resourceFolder + "geoparquet/example3.parquet" val geoparquetdatalocation4: String = resourceFolder + "geoparquet/example-1.0.0-beta.1.parquet" + val legacyparquetdatalocation: String = resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet" val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/" override def afterAll(): Unit = FileUtils.deleteDirectory(new File(geoparquetoutputlocation)) @@ -510,6 +511,19 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { sparkSession.sparkContext.removeSparkListener(sparkListener) } } + + it("Ready legacy parquet files written by Apache Sedona <= 1.3.1-incubating") { + val df = sparkSession.read.format("geoparquet").option("legacyMode", "true").load(legacyparquetdatalocation) + val rows = df.collect() + assert(rows.nonEmpty) + rows.foreach { row => + assert(row.getAs[AnyRef]("geom").isInstanceOf[Geometry]) + assert(row.getAs[AnyRef]("struct_geom").isInstanceOf[Row]) + val structGeom = row.getAs[Row]("struct_geom") + assert(structGeom.getAs[AnyRef]("g0").isInstanceOf[Geometry]) + assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry]) + } + } } def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = { From bd71354d8d52b909d0d1b019e0cb29c254684ab0 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 8 Jan 2024 13:07:52 +0800 Subject: [PATCH 2/6] Add documentation for legacy parquet support --- docs/api/sql/Reading-legacy-parquet.md | 47 ++++++++++++++++++++++++++ docs/tutorial/sql.md | 7 ++++ mkdocs.yml | 1 + 3 files changed, 55 insertions(+) create mode 100644 docs/api/sql/Reading-legacy-parquet.md diff --git a/docs/api/sql/Reading-legacy-parquet.md b/docs/api/sql/Reading-legacy-parquet.md new file mode 100644 index 0000000000..dd4874181d --- /dev/null +++ b/docs/api/sql/Reading-legacy-parquet.md @@ -0,0 +1,47 @@ +Due to a breaking change in Apache Sedona 1.4.0 to the SQL type of `GeometryUDT` +([SEDONA-205](https://issues.apache.org/jira/browse/SEDONA-205)) as well as the +serialization format of geometry values ([SEDONA-207](https://issues.apache.org/jira/browse/SEDONA-207)), Parquet files +containing geometry columns written by Apache Sedona 1.3.1 or earlier cannot be read by Apache Sedona 1.4.0 or later. +Here is an example of exception when trying to read such files: + +``` +24/01/08 12:52:56 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 11) +org.apache.spark.sql.AnalysisException: Invalid Spark read type: expected required group geom (LIST) { + repeated group list { + required int32 element (INTEGER(8,true)); + } +} to be list type but found Some(BinaryType) + at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:745) + at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertGroupField$3(ParquetSchemaConverter.scala:343) + at scala.Option.fold(Option.scala:251) + at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertGroupField(ParquetSchemaConverter.scala:324) + at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:188) + at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3(ParquetSchemaConverter.scala:147) + at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3$adapted(ParquetSchemaConverter.scala:117) + at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) + at scala.collection.immutable.Range.foreach(Range.scala:158) + at scala.collection.TraversableLike.map(TraversableLike.scala:286) + at scala.collection.TraversableLike.map$(TraversableLike.scala:279) + at scala.collection.AbstractTraversable.map(Traversable.scala:108) + ... +``` + +To read such files, you can use `"geoparquet"` format with the `.option("legacyMode", "true")` option. Here is an example: + +=== "Scala/Java" + + ```scala + val df = sedona.read.format("geoparquet").option("legacyMode", "true").load("path/to/legacy-parquet-files") + ``` + +=== "Java" + + ```java + Dataset df = sedona.read.format("geoparquet").option("legacyMode", "true").load("path/to/legacy-parquet-files") + ``` + +=== "Python" + + ```python + df = sedona.read.format("geoparquet").option("legacyMode", "true").load("path/to/legacy-parquet-files") + ``` diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index f4095988a0..2da0dfd3fb 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -373,6 +373,13 @@ root Sedona supports spatial predicate push-down for GeoParquet files, please refer to the [SedonaSQL query optimizer](../api/sql/Optimizer.md) documentation for details. +GeoParquet file reader can also be used to read legacy Parquet files written by Apache Sedona 1.3.1-incubating or earlier. +Please refer to [Reading Legacy Parquet Files](../api/sql/Reading-legacy-parquet.md) for details. + +!!!warning + GeoParquet file reader does not work on Databricks runtime when Photon is enabled. Please disable Photon when using + GeoParquet file reader on Databricks runtime. + ### Inspect GeoParquet metadata Since v`1.5.1`, Sedona provides a Spark SQL data source `"geoparquet.metadata"` for inspecting GeoParquet metadata. The resulting dataframe contains diff --git a/mkdocs.yml b/mkdocs.yml index 8d44e4a042..766d77c53a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -59,6 +59,7 @@ nav: - Aggregate function: api/sql/AggregateFunction.md - DataFrame Style functions: api/sql/DataFrameAPI.md - Query optimization: api/sql/Optimizer.md + - Reading Legacy Parquet Files: api/sql/Reading-legacy-parquet.md - Visualization: - SedonaPyDeck: api/sql/Visualization_SedonaPyDeck.md - SedonaKepler: api/sql/Visualization_SedonaKepler.md From 09ae2e18744ab25afa9e5f798dfaa16f1e97071b Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 8 Jan 2024 14:20:00 +0800 Subject: [PATCH 3/6] Port to Spark 3.4 and Spark 3.5 --- .../parquet/GeoParquetFileFormat.scala | 25 +++++++----- .../parquet/GeoParquetReadSupport.scala | 8 ++-- .../GeoParquetRecordMaterializer.scala | 5 ++- .../parquet/GeoParquetRowConverter.scala | 40 ++++++++++++++----- .../parquet/GeoParquetSchemaConverter.scala | 18 +++++---- .../datasources/parquet/GeoParquetUtils.scala | 37 +++++++++++++++++ .../apache/sedona/sql/geoparquetIOTests.scala | 14 +++++++ .../parquet/GeoParquetFileFormat.scala | 25 +++++++----- .../parquet/GeoParquetReadSupport.scala | 8 ++-- .../GeoParquetRecordMaterializer.scala | 5 ++- .../parquet/GeoParquetRowConverter.scala | 40 ++++++++++++++----- .../parquet/GeoParquetSchemaConverter.scala | 18 +++++---- .../datasources/parquet/GeoParquetUtils.scala | 37 +++++++++++++++++ .../apache/sedona/sql/geoparquetIOTests.scala | 14 +++++++ 14 files changed, 228 insertions(+), 66 deletions(-) diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala index c2d2c52604..7a0dba2c9c 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala @@ -299,7 +299,8 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter]) convertTz, enableVectorizedReader = false, datetimeRebaseMode, - int96RebaseMode) + int96RebaseMode, + options) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -361,8 +362,9 @@ object GeoParquetFileFormat extends Logging { val converter = new GeoParquetToSparkSchemaConverter( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp) - readSchemaFromFooter(footer, keyValueMetaData, converter) + assumeInt96IsTimestamp = assumeInt96IsTimestamp, + parameters = parameters) + readSchemaFromFooter(footer, keyValueMetaData, converter, parameters) } } @@ -370,17 +372,19 @@ object GeoParquetFileFormat extends Logging { } private def readSchemaFromFooter(footer: Footer, keyValueMetaData: java.util.Map[String, String], - converter: GeoParquetToSparkSchemaConverter): StructType = { + converter: GeoParquetToSparkSchemaConverter, + parameters: Map[String, String]): StructType = { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData .asScala.toMap .get(ParquetReadSupport.SPARK_METADATA_KEY) - .flatMap(schema => deserializeSchemaString(schema, keyValueMetaData)) + .flatMap(schema => deserializeSchemaString(schema, keyValueMetaData, parameters)) .getOrElse(converter.convert(fileMetaData.getSchema)) } - private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String]): Option[StructType] = { + private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): Option[StructType] = { // Tries to deserialize the schema string as JSON first, then falls back to the case class // string parser (data generated by older versions of Spark SQL uses this format). val schemaOpt = Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { @@ -397,14 +401,13 @@ object GeoParquetFileFormat extends Logging { Failure(cause) }.toOption - schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData)) + schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData, parameters)) } private def replaceGeometryColumnWithGeometryUDT(schema: StructType, - keyValueMetaData: java.util.Map[String, String]): StructType = { - val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { - throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") - } + keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): StructType = { + val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters) val fields = schema.fields.map { field => field.dataType match { case _: BinaryType if geoParquetMetaData.columns.contains(field.name) => diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala index 81125f9595..2921d78478 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala @@ -53,7 +53,8 @@ import scala.collection.JavaConverters._ class GeoParquetReadSupport (override val convertTz: Option[ZoneId], enableVectorizedReader: Boolean, datetimeRebaseMode: LegacyBehaviorPolicy.Value, - int96RebaseMode: LegacyBehaviorPolicy.Value) + int96RebaseMode: LegacyBehaviorPolicy.Value, + parameters: Map[String, String]) extends ParquetReadSupport with Logging { private var catalystRequestedSchema: StructType = _ @@ -122,10 +123,11 @@ class GeoParquetReadSupport (override val convertTz: Option[ZoneId], new GeoParquetRecordMaterializer( parquetRequestedSchema, GeoParquetReadSupport.expandUDT(catalystRequestedSchema), - new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf), + new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf, parameters), convertTz, datetimeRebaseMode, - int96RebaseMode) + int96RebaseMode, + parameters) } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala index 575ef7eeac..281144ce55 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.types.StructType * Gregorian calendar: mode + optional original time zone * @param int96RebaseSpec the specification of rebasing INT96 timestamp from Julian to Proleptic * Gregorian calendar + * @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not. */ class GeoParquetRecordMaterializer( parquetSchema: MessageType, @@ -42,7 +43,8 @@ class GeoParquetRecordMaterializer( schemaConverter: GeoParquetToSparkSchemaConverter, convertTz: Option[ZoneId], datetimeRebaseMode: LegacyBehaviorPolicy.Value, - int96RebaseMode: LegacyBehaviorPolicy.Value) + int96RebaseMode: LegacyBehaviorPolicy.Value, + parameters: Map[String, String]) extends RecordMaterializer[InternalRow] { private val rootConverter = new GeoParquetRowConverter( schemaConverter, @@ -51,6 +53,7 @@ class GeoParquetRecordMaterializer( convertTz, datetimeRebaseMode, int96RebaseMode, + parameters, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala index ff8a19cdaa..ba8b851496 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala @@ -36,6 +36,7 @@ import org.locationtech.jts.io.WKBReader import java.math.{BigDecimal, BigInteger} import java.time.{ZoneId, ZoneOffset} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** @@ -75,6 +76,7 @@ import scala.collection.mutable.ArrayBuffer * calendar * @param int96RebaseMode the mode of rebasing INT96 timestamp from Julian to Proleptic Gregorian * calendar + * @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not. * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class GeoParquetRowConverter( @@ -84,6 +86,7 @@ private[parquet] class GeoParquetRowConverter( convertTz: Option[ZoneId], datetimeRebaseMode: LegacyBehaviorPolicy.Value, int96RebaseMode: LegacyBehaviorPolicy.Value, + parameters: Map[String, String], updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -202,11 +205,28 @@ private[parquet] class GeoParquetRowConverter( new ParquetPrimitiveConverter(updater) case GeometryUDT => - new ParquetPrimitiveConverter(updater) { - override def addBinary(value: Binary): Unit = { - val wkbReader = new WKBReader() - val geom = wkbReader.read(value.getBytes) - updater.set(GeometryUDT.serialize(geom)) + if (parquetType.isPrimitive) { + new ParquetPrimitiveConverter(updater) { + override def addBinary(value: Binary): Unit = { + val wkbReader = new WKBReader() + val geom = wkbReader.read(value.getBytes) + updater.set(GeometryUDT.serialize(geom)) + } + } + } else { + if (GeoParquetUtils.isLegacyMode(parameters)) { + new ParquetArrayConverter(parquetType.asGroupType(), ArrayType(ByteType, containsNull = false), updater) { + override def end(): Unit = { + val wkbReader = new WKBReader() + val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray + val geom = wkbReader.read(byteArray) + updater.set(GeometryUDT.serialize(geom)) + } + } + } else { + throw new IllegalArgumentException( + s"Parquet type for geometry column is $parquetType. This parquet file could be written by " + + "Apache Sedona <= 1.3.1-incubating. Please use option(\"legacyMode\", \"true\") to read this file.") } } @@ -337,6 +357,7 @@ private[parquet] class GeoParquetRowConverter( convertTz, datetimeRebaseMode, int96RebaseMode, + parameters, wrappedUpdater) case t => @@ -474,13 +495,12 @@ private[parquet] class GeoParquetRowConverter( * * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists */ - private final class ParquetArrayConverter( - parquetSchema: GroupType, - catalystSchema: ArrayType, - updater: ParentContainerUpdater) + private class ParquetArrayConverter(parquetSchema: GroupType, + catalystSchema: ArrayType, + updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) { - private[this] val currentArray = ArrayBuffer.empty[Any] + protected[this] val currentArray: mutable.ArrayBuffer[Any] = ArrayBuffer.empty[Any] private[this] val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala index 11bfcfcabf..bdc64bfdd8 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala @@ -40,25 +40,27 @@ import org.apache.spark.sql.types._ * [[StringType]] fields. * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL * [[TimestampType]] fields. + * @param parameters Options for reading GeoParquet files. */ class GeoParquetToSparkSchemaConverter( keyValueMetaData: java.util.Map[String, String], assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) { + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, + parameters: Map[String, String]) { - private val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { - throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") - } + private val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters) - def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf) = this( + def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf, parameters: Map[String, String]) = this( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = conf.isParquetBinaryAsString, - assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp) + assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, + parameters = parameters) - def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration) = this( + def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration, parameters: Map[String, String]) = this( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, - assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean) + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, + parameters = parameters) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala index 5e8c9c98f7..bf475930f2 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala @@ -77,4 +77,41 @@ object GeoParquetUtils { file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } + + /** + * Legacy mode option is for reading Parquet files written by old versions of Apache Sedona (<= 1.3.1-incubating). + * Such files are actually not GeoParquet files and do not have GeoParquet file metadata. Geometry fields were + * encoded as list of bytes and stored as group type in Parquet files. The Definition of GeometryUDT before 1.4.0 was: + * {{{ + * case class GeometryUDT extends UserDefinedType[Geometry] { + * override def sqlType: DataType = ArrayType(ByteType, containsNull = false) + * // ... + * }}} + * Since 1.4.0, the sqlType of GeometryUDT is changed to BinaryType. This is a breaking change for reading old Parquet + * files. To read old Parquet files, users need to use "geoparquet" format and set legacyMode to true. + * @param parameters user provided parameters for reading GeoParquet files using `.option()` method, e.g. + * `spark.read.format("geoparquet").option("legacyMode", "true").load("path")` + * @return true if legacyMode is set to true, false otherwise + */ + def isLegacyMode(parameters: Map[String, String]): Boolean = + parameters.getOrElse("legacyMode", "false").toBoolean + + /** + * Parse GeoParquet file metadata from Parquet file metadata. Legacy parquet files do not contain GeoParquet file + * metadata, so we'll simply return an empty GeoParquetMetaData object when legacy mode is enabled. + * @param keyValueMetaData Parquet file metadata + * @param parameters user provided parameters for reading GeoParquet files + * @return GeoParquetMetaData object + */ + def parseGeoParquetMetaData(keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): GeoParquetMetaData = { + val isLegacyMode = GeoParquetUtils.isLegacyMode(parameters) + GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { + if (isLegacyMode) { + GeoParquetMetaData(None, "", Map.empty) + } else { + throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") + } + } + } } diff --git a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 7b209e5429..c2d59756bf 100644 --- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -51,6 +51,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { val geoparquetdatalocation2: String = resourceFolder + "geoparquet/example2.parquet" val geoparquetdatalocation3: String = resourceFolder + "geoparquet/example3.parquet" val geoparquetdatalocation4: String = resourceFolder + "geoparquet/example-1.0.0-beta.1.parquet" + val legacyparquetdatalocation: String = resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet" val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/" override def afterAll(): Unit = FileUtils.deleteDirectory(new File(geoparquetoutputlocation)) @@ -510,6 +511,19 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { sparkSession.sparkContext.removeSparkListener(sparkListener) } } + + it("Ready legacy parquet files written by Apache Sedona <= 1.3.1-incubating") { + val df = sparkSession.read.format("geoparquet").option("legacyMode", "true").load(legacyparquetdatalocation) + val rows = df.collect() + assert(rows.nonEmpty) + rows.foreach { row => + assert(row.getAs[AnyRef]("geom").isInstanceOf[Geometry]) + assert(row.getAs[AnyRef]("struct_geom").isInstanceOf[Row]) + val structGeom = row.getAs[Row]("struct_geom") + assert(structGeom.getAs[AnyRef]("g0").isInstanceOf[Geometry]) + assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry]) + } + } } def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = { diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala index 7afb90804b..e87f799260 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala @@ -300,7 +300,8 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter]) convertTz, enableVectorizedReader = false, datetimeRebaseSpec, - int96RebaseSpec) + int96RebaseSpec, + options) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -370,8 +371,9 @@ object GeoParquetFileFormat extends Logging { val converter = new GeoParquetToSparkSchemaConverter( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp) - readSchemaFromFooter(footer, keyValueMetaData, converter) + assumeInt96IsTimestamp = assumeInt96IsTimestamp, + parameters = parameters) + readSchemaFromFooter(footer, keyValueMetaData, converter, parameters) } } @@ -379,17 +381,19 @@ object GeoParquetFileFormat extends Logging { } private def readSchemaFromFooter(footer: Footer, keyValueMetaData: java.util.Map[String, String], - converter: GeoParquetToSparkSchemaConverter): StructType = { + converter: GeoParquetToSparkSchemaConverter, + parameters: Map[String, String]): StructType = { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData .asScala.toMap .get(ParquetReadSupport.SPARK_METADATA_KEY) - .flatMap(schema => deserializeSchemaString(schema, keyValueMetaData)) + .flatMap(schema => deserializeSchemaString(schema, keyValueMetaData, parameters)) .getOrElse(converter.convert(fileMetaData.getSchema)) } - private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String]): Option[StructType] = { + private def deserializeSchemaString(schemaString: String, keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): Option[StructType] = { // Tries to deserialize the schema string as JSON first, then falls back to the case class // string parser (data generated by older versions of Spark SQL uses this format). val schemaOpt = Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { @@ -406,14 +410,13 @@ object GeoParquetFileFormat extends Logging { Failure(cause) }.toOption - schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData)) + schemaOpt.map(schema => replaceGeometryColumnWithGeometryUDT(schema, keyValueMetaData, parameters)) } private def replaceGeometryColumnWithGeometryUDT(schema: StructType, - keyValueMetaData: java.util.Map[String, String]): StructType = { - val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { - throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") - } + keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): StructType = { + val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters) val fields = schema.fields.map { field => field.dataType match { case _: BinaryType if geoParquetMetaData.columns.contains(field.name) => diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala index 0526c980de..cd224e793a 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetReadSupport.scala @@ -51,7 +51,8 @@ import scala.collection.JavaConverters._ class GeoParquetReadSupport (override val convertTz: Option[ZoneId], enableVectorizedReader: Boolean, datetimeRebaseSpec: RebaseSpec, - int96RebaseSpec: RebaseSpec) + int96RebaseSpec: RebaseSpec, + parameters: Map[String, String]) extends ParquetReadSupport with Logging { private var catalystRequestedSchema: StructType = _ @@ -85,10 +86,11 @@ class GeoParquetReadSupport (override val convertTz: Option[ZoneId], new GeoParquetRecordMaterializer( parquetRequestedSchema, GeoParquetReadSupport.expandUDT(catalystRequestedSchema), - new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf), + new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf, parameters), convertTz, datetimeRebaseSpec, - int96RebaseSpec) + int96RebaseSpec, + parameters) } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala index 6e83f95c87..aebe0d4260 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRecordMaterializer.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.types.StructType * Gregorian calendar: mode + optional original time zone * @param int96RebaseSpec the specification of rebasing INT96 timestamp from Julian to Proleptic * Gregorian calendar + * @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not. */ class GeoParquetRecordMaterializer( parquetSchema: MessageType, @@ -42,7 +43,8 @@ class GeoParquetRecordMaterializer( schemaConverter: GeoParquetToSparkSchemaConverter, convertTz: Option[ZoneId], datetimeRebaseSpec: RebaseSpec, - int96RebaseSpec: RebaseSpec) + int96RebaseSpec: RebaseSpec, + parameters: Map[String, String]) extends RecordMaterializer[InternalRow] { private val rootConverter = new GeoParquetRowConverter( schemaConverter, @@ -51,6 +53,7 @@ class GeoParquetRecordMaterializer( convertTz, datetimeRebaseSpec, int96RebaseSpec, + parameters, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala index d680ca0c26..cac718cb09 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetRowConverter.scala @@ -38,6 +38,7 @@ import org.locationtech.jts.io.WKBReader import java.math.{BigDecimal, BigInteger} import java.time.{ZoneId, ZoneOffset} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** @@ -77,6 +78,7 @@ import scala.collection.mutable.ArrayBuffer * Gregorian calendar: mode + optional original time zone * @param int96RebaseSpec the specification of rebasing INT96 timestamp from Julian to Proleptic * Gregorian calendar + * @param parameters Options for reading GeoParquet files. For example, if legacyMode is enabled or not. * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class GeoParquetRowConverter( @@ -86,6 +88,7 @@ private[parquet] class GeoParquetRowConverter( convertTz: Option[ZoneId], datetimeRebaseSpec: RebaseSpec, int96RebaseSpec: RebaseSpec, + parameters: Map[String, String], updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -204,11 +207,28 @@ private[parquet] class GeoParquetRowConverter( new ParquetPrimitiveConverter(updater) case GeometryUDT => - new ParquetPrimitiveConverter(updater) { - override def addBinary(value: Binary): Unit = { - val wkbReader = new WKBReader() - val geom = wkbReader.read(value.getBytes) - updater.set(GeometryUDT.serialize(geom)) + if (parquetType.isPrimitive) { + new ParquetPrimitiveConverter(updater) { + override def addBinary(value: Binary): Unit = { + val wkbReader = new WKBReader() + val geom = wkbReader.read(value.getBytes) + updater.set(GeometryUDT.serialize(geom)) + } + } + } else { + if (GeoParquetUtils.isLegacyMode(parameters)) { + new ParquetArrayConverter(parquetType.asGroupType(), ArrayType(ByteType, containsNull = false), updater) { + override def end(): Unit = { + val wkbReader = new WKBReader() + val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray + val geom = wkbReader.read(byteArray) + updater.set(GeometryUDT.serialize(geom)) + } + } + } else { + throw new IllegalArgumentException( + s"Parquet type for geometry column is $parquetType. This parquet file could be written by " + + "Apache Sedona <= 1.3.1-incubating. Please use option(\"legacyMode\", \"true\") to read this file.") } } @@ -339,6 +359,7 @@ private[parquet] class GeoParquetRowConverter( convertTz, datetimeRebaseSpec, int96RebaseSpec, + parameters, wrappedUpdater) case t => @@ -476,13 +497,12 @@ private[parquet] class GeoParquetRowConverter( * * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists */ - private final class ParquetArrayConverter( - parquetSchema: GroupType, - catalystSchema: ArrayType, - updater: ParentContainerUpdater) + private class ParquetArrayConverter(parquetSchema: GroupType, + catalystSchema: ArrayType, + updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) { - private[this] val currentArray = ArrayBuffer.empty[Any] + protected[this] val currentArray: mutable.ArrayBuffer[Any] = ArrayBuffer.empty[Any] private[this] val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala index 11bfcfcabf..bdc64bfdd8 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetSchemaConverter.scala @@ -40,25 +40,27 @@ import org.apache.spark.sql.types._ * [[StringType]] fields. * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL * [[TimestampType]] fields. + * @param parameters Options for reading GeoParquet files. */ class GeoParquetToSparkSchemaConverter( keyValueMetaData: java.util.Map[String, String], assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) { + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, + parameters: Map[String, String]) { - private val geoParquetMetaData: GeoParquetMetaData = GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { - throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") - } + private val geoParquetMetaData: GeoParquetMetaData = GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters) - def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf) = this( + def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf, parameters: Map[String, String]) = this( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = conf.isParquetBinaryAsString, - assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp) + assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, + parameters = parameters) - def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration) = this( + def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration, parameters: Map[String, String]) = this( keyValueMetaData = keyValueMetaData, assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, - assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean) + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, + parameters = parameters) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala index 5e8c9c98f7..bf475930f2 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetUtils.scala @@ -77,4 +77,41 @@ object GeoParquetUtils { file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } + + /** + * Legacy mode option is for reading Parquet files written by old versions of Apache Sedona (<= 1.3.1-incubating). + * Such files are actually not GeoParquet files and do not have GeoParquet file metadata. Geometry fields were + * encoded as list of bytes and stored as group type in Parquet files. The Definition of GeometryUDT before 1.4.0 was: + * {{{ + * case class GeometryUDT extends UserDefinedType[Geometry] { + * override def sqlType: DataType = ArrayType(ByteType, containsNull = false) + * // ... + * }}} + * Since 1.4.0, the sqlType of GeometryUDT is changed to BinaryType. This is a breaking change for reading old Parquet + * files. To read old Parquet files, users need to use "geoparquet" format and set legacyMode to true. + * @param parameters user provided parameters for reading GeoParquet files using `.option()` method, e.g. + * `spark.read.format("geoparquet").option("legacyMode", "true").load("path")` + * @return true if legacyMode is set to true, false otherwise + */ + def isLegacyMode(parameters: Map[String, String]): Boolean = + parameters.getOrElse("legacyMode", "false").toBoolean + + /** + * Parse GeoParquet file metadata from Parquet file metadata. Legacy parquet files do not contain GeoParquet file + * metadata, so we'll simply return an empty GeoParquetMetaData object when legacy mode is enabled. + * @param keyValueMetaData Parquet file metadata + * @param parameters user provided parameters for reading GeoParquet files + * @return GeoParquetMetaData object + */ + def parseGeoParquetMetaData(keyValueMetaData: java.util.Map[String, String], + parameters: Map[String, String]): GeoParquetMetaData = { + val isLegacyMode = GeoParquetUtils.isLegacyMode(parameters) + GeoParquetMetaData.parseKeyValueMetaData(keyValueMetaData).getOrElse { + if (isLegacyMode) { + GeoParquetMetaData(None, "", Map.empty) + } else { + throw new IllegalArgumentException("GeoParquet file does not contain valid geo metadata") + } + } + } } diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 7b209e5429..c2d59756bf 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -51,6 +51,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { val geoparquetdatalocation2: String = resourceFolder + "geoparquet/example2.parquet" val geoparquetdatalocation3: String = resourceFolder + "geoparquet/example3.parquet" val geoparquetdatalocation4: String = resourceFolder + "geoparquet/example-1.0.0-beta.1.parquet" + val legacyparquetdatalocation: String = resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet" val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/" override def afterAll(): Unit = FileUtils.deleteDirectory(new File(geoparquetoutputlocation)) @@ -510,6 +511,19 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { sparkSession.sparkContext.removeSparkListener(sparkListener) } } + + it("Ready legacy parquet files written by Apache Sedona <= 1.3.1-incubating") { + val df = sparkSession.read.format("geoparquet").option("legacyMode", "true").load(legacyparquetdatalocation) + val rows = df.collect() + assert(rows.nonEmpty) + rows.foreach { row => + assert(row.getAs[AnyRef]("geom").isInstanceOf[Geometry]) + assert(row.getAs[AnyRef]("struct_geom").isInstanceOf[Row]) + val structGeom = row.getAs[Row]("struct_geom") + assert(structGeom.getAs[AnyRef]("g0").isInstanceOf[Geometry]) + assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry]) + } + } } def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = { From 9f63d359d5e83f404e9e2eaa8388064199ddcfbb Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 8 Jan 2024 14:19:49 +0800 Subject: [PATCH 4/6] Add python test --- python/tests/__init__.py | 1 + python/tests/sql/test_geoparquet.py | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/python/tests/__init__.py b/python/tests/__init__.py index 08e91d0863..96f6a2b860 100644 --- a/python/tests/__init__.py +++ b/python/tests/__init__.py @@ -39,6 +39,7 @@ geojson_id_input_location = os.path.join(tests_resource, "testContainsId.json") geoparquet_input_location = os.path.join(tests_resource, "geoparquet/example1.parquet") plain_parquet_input_location = os.path.join(tests_resource, "geoparquet/plain.parquet") +legacy_parquet_input_location = os.path.join(tests_resource, "parquet/legacy-parquet-nested-columns.snappy.parquet") google_buildings_input_location = os.path.join(tests_resource, "813_buildings_test.csv") chicago_crimes_input_location = os.path.join(tests_resource, "Chicago_Crimes.csv") world_map_raster_input_location = os.path.join(tests_resource, "raster/raster_with_no_data/test5.tiff") diff --git a/python/tests/sql/test_geoparquet.py b/python/tests/sql/test_geoparquet.py index 478c8aacd7..a7f5aa0fe3 100644 --- a/python/tests/sql/test_geoparquet.py +++ b/python/tests/sql/test_geoparquet.py @@ -82,3 +82,12 @@ def test_inspect_geoparquet_metadata(self): assert column_metadata['encoding'] == 'WKB' assert len(column_metadata['bbox']) == 4 assert isinstance(json.loads(column_metadata['crs']), dict) + + def test_reading_legacy_parquet_files(self): + df = self.spark.read.format("geoparquet").option("legacyMode", "true").load(legacy_parquet_input_location) + rows = df.collect() + assert len(rows) > 0 + for row in rows: + assert isinstance(row['geom'], BaseGeometry) + assert isinstance(row['struct_geom']['g0'], BaseGeometry) + assert isinstance(row['struct_geom']['g1'], BaseGeometry) From 497237d2f9e9a5f1c9fbbfa9aacf1c1ea0ec08ef Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 8 Jan 2024 14:45:22 +0800 Subject: [PATCH 5/6] Refine documentation --- docs/api/sql/Reading-legacy-parquet.md | 11 +++++++++-- docs/tutorial/sql.md | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/api/sql/Reading-legacy-parquet.md b/docs/api/sql/Reading-legacy-parquet.md index dd4874181d..1c810e2f98 100644 --- a/docs/api/sql/Reading-legacy-parquet.md +++ b/docs/api/sql/Reading-legacy-parquet.md @@ -2,7 +2,14 @@ Due to a breaking change in Apache Sedona 1.4.0 to the SQL type of `GeometryUDT` ([SEDONA-205](https://issues.apache.org/jira/browse/SEDONA-205)) as well as the serialization format of geometry values ([SEDONA-207](https://issues.apache.org/jira/browse/SEDONA-207)), Parquet files containing geometry columns written by Apache Sedona 1.3.1 or earlier cannot be read by Apache Sedona 1.4.0 or later. -Here is an example of exception when trying to read such files: + +For parquet files written by `"parquet"` format when using Apache Sedona 1.3.1-incubating or earlier: + +```python +df.write.format("parquet").save("path/to/parquet/files") +``` + +Reading such files with Apache Sedona 1.4.0 or later using `spark.read.format("parquet").load("path/to/parquet/files")` will result in an exception: ``` 24/01/08 12:52:56 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 11) @@ -26,7 +33,7 @@ org.apache.spark.sql.AnalysisException: Invalid Spark read type: expected requir ... ``` -To read such files, you can use `"geoparquet"` format with the `.option("legacyMode", "true")` option. Here is an example: +Since v1.5.1, GeoParquet supports reading legacy Parquet files. you can use `"geoparquet"` format with the `.option("legacyMode", "true")` option. Here is an example: === "Scala/Java" diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index 2da0dfd3fb..e29d3596e5 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -378,7 +378,7 @@ Please refer to [Reading Legacy Parquet Files](../api/sql/Reading-legacy-parquet !!!warning GeoParquet file reader does not work on Databricks runtime when Photon is enabled. Please disable Photon when using - GeoParquet file reader on Databricks runtime. + GeoParquet file reader on Databricks runtime. ### Inspect GeoParquet metadata From cfdf514f0cdc73b42bf257124631cc2aa421a933 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 8 Jan 2024 15:00:14 +0800 Subject: [PATCH 6/6] Fix python tests --- python/tests/sql/test_geoparquet.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/sql/test_geoparquet.py b/python/tests/sql/test_geoparquet.py index a7f5aa0fe3..969d9cfdc0 100644 --- a/python/tests/sql/test_geoparquet.py +++ b/python/tests/sql/test_geoparquet.py @@ -28,6 +28,7 @@ from tests.test_base import TestBase from tests import geoparquet_input_location from tests import plain_parquet_input_location +from tests import legacy_parquet_input_location class TestGeoParquet(TestBase):