From 4bc772aa6536e547b6f0d5183a8bf636c19dd2ff Mon Sep 17 00:00:00 2001 From: Bing Li Date: Fri, 1 Nov 2024 12:28:06 -0700 Subject: [PATCH] fix file size --- .../snowflake/io/CloudStorageOperations.scala | 19 ++++++++++++++++--- .../spark/snowflake/io/StageWriter.scala | 5 +++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala b/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala index 0605bcc7..a93d5f9b 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala @@ -43,6 +43,7 @@ import net.snowflake.spark.snowflake.Parameters.MergedParameters import net.snowflake.spark.snowflake.io.SupportedFormat.SupportedFormat import net.snowflake.spark.snowflake.DefaultJDBCWrapper.DataBaseOperations import net.snowflake.spark.snowflake.test.{TestHook, TestHookFlag} +import org.apache.avro.Schema import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} import org.apache.commons.io.IOUtils @@ -485,6 +486,8 @@ sealed trait CloudStorage { protected val sfURL: String = param.sfURL protected val useExponentialBackoff: Boolean = param.useExponentialBackoff + protected var avroSchema: Option[String] = None + // The first 10 sleep time in second will be like // 3, 6, 12, 24, 48, 96, 192, 300, 300, 300, etc protected def retrySleepTimeInMS(retry: Int): Int = { @@ -658,7 +661,7 @@ sealed trait CloudStorage { try { format match { case SupportedFormat.PARQUET => - val avroSchema = io.ParquetUtils.convertStructToAvro(schema) + val avroSchema = new Schema.Parser().parse(this.avroSchema.get) val config = new Configuration() config.setBoolean("parquet.avro.write-old-list-structure", false) val writer = AvroParquetWriter.builder[GenericData.Record]( @@ -672,7 +675,6 @@ sealed trait CloudStorage { writer.write(ParquetUtils.rowToAvroRecord(row, avroSchema, schema, param)) rowCount += 1 } - dataSize += writer.getDataSize writer.close() case _ => val rows = input.asInstanceOf[Iterator[String]] @@ -728,11 +730,12 @@ sealed trait CloudStorage { ) } + val dataSizeStr = if (dataSize == 0) "N/A" else Utils.getSizeString(dataSize) CloudStorageOperations.log.info( s"""${SnowflakeResultSetRDD.WORKER_LOG_PREFIX}: | Finish writing partition ID:$partitionID $fileName | write row count is $rowCount. - | Uncompressed data size is ${Utils.getSizeString(dataSize)}. + | Uncompressed data size is $dataSizeStr. | $processTimeInfo |""".stripMargin.filter(_ >= ' ')) @@ -758,6 +761,16 @@ sealed trait CloudStorage { | partitions: directory=$directory ${format.toString} $compress |""".stripMargin.filter(_ >= ' ')) + // 1. Avro Schema is not serializable in Spark 3.1.1. + // 2. Somehow, the Avro Schema can be created only one time with Schema builder. + // Therefore, if we create schema in the mapPartition function, we will get some error. + // e.g. cannot process decimal data. + // Alternatively, we create schema only one time here, and serialize the Json string to + // each partition, and then deserialize the Json string to avro schema in the partition. + if (format == SupportedFormat.PARQUET) { + this.avroSchema = Some(io.ParquetUtils.convertStructToAvro(schema).toString()) + } + // Some explain for newbies on spark connector: // Bellow code is executed in distributed by spark FRAMEWORK // 1. The master node executes "data.mapPartitionsWithIndex()" diff --git a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala index d0a8e1a8..cc23f314 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala @@ -562,10 +562,11 @@ private[io] object StageWriter { totalSize += fileUploadResult.fileSize totalRowCount += fileUploadResult.rowCount }) + val fileSizeStr = if (totalSize == 0) "N/A" else Utils.getSizeString(totalSize) logAndAppend(progress, s"Total file count is ${fileUploadResults.size}, " + s"non-empty files count is ${expectedFileSet.size}, " + - s"total file size is ${Utils.getSizeString(totalSize)}, " + - s"total row count is ${Utils.getSizeString(totalRowCount)}.") + s"total file size is $fileSizeStr, " + + s"total row count is $totalRowCount.") // Indicate whether to use FILES clause in the copy command var useFilesClause = false