Skip to content

Commit

Permalink
fix file size
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Nov 1, 2024
1 parent 6018282 commit 4bc772a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import net.snowflake.spark.snowflake.Parameters.MergedParameters
import net.snowflake.spark.snowflake.io.SupportedFormat.SupportedFormat
import net.snowflake.spark.snowflake.DefaultJDBCWrapper.DataBaseOperations
import net.snowflake.spark.snowflake.test.{TestHook, TestHookFlag}
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.commons.io.IOUtils
Expand Down Expand Up @@ -485,6 +486,8 @@ sealed trait CloudStorage {
protected val sfURL: String = param.sfURL
protected val useExponentialBackoff: Boolean = param.useExponentialBackoff

protected var avroSchema: Option[String] = None

// The first 10 sleep time in second will be like
// 3, 6, 12, 24, 48, 96, 192, 300, 300, 300, etc
protected def retrySleepTimeInMS(retry: Int): Int = {
Expand Down Expand Up @@ -658,7 +661,7 @@ sealed trait CloudStorage {
try {
format match {
case SupportedFormat.PARQUET =>
val avroSchema = io.ParquetUtils.convertStructToAvro(schema)
val avroSchema = new Schema.Parser().parse(this.avroSchema.get)
val config = new Configuration()
config.setBoolean("parquet.avro.write-old-list-structure", false)
val writer = AvroParquetWriter.builder[GenericData.Record](
Expand All @@ -672,7 +675,6 @@ sealed trait CloudStorage {
writer.write(ParquetUtils.rowToAvroRecord(row, avroSchema, schema, param))
rowCount += 1
}
dataSize += writer.getDataSize
writer.close()
case _ =>
val rows = input.asInstanceOf[Iterator[String]]
Expand Down Expand Up @@ -728,11 +730,12 @@ sealed trait CloudStorage {
)
}

val dataSizeStr = if (dataSize == 0) "N/A" else Utils.getSizeString(dataSize)
CloudStorageOperations.log.info(
s"""${SnowflakeResultSetRDD.WORKER_LOG_PREFIX}:
| Finish writing partition ID:$partitionID $fileName
| write row count is $rowCount.
| Uncompressed data size is ${Utils.getSizeString(dataSize)}.
| Uncompressed data size is $dataSizeStr.
| $processTimeInfo
|""".stripMargin.filter(_ >= ' '))

Expand All @@ -758,6 +761,16 @@ sealed trait CloudStorage {
| partitions: directory=$directory ${format.toString} $compress
|""".stripMargin.filter(_ >= ' '))

// 1. Avro Schema is not serializable in Spark 3.1.1.
// 2. Somehow, the Avro Schema can be created only one time with Schema builder.
// Therefore, if we create schema in the mapPartition function, we will get some error.
// e.g. cannot process decimal data.
// Alternatively, we create schema only one time here, and serialize the Json string to
// each partition, and then deserialize the Json string to avro schema in the partition.
if (format == SupportedFormat.PARQUET) {
this.avroSchema = Some(io.ParquetUtils.convertStructToAvro(schema).toString())
}

// Some explain for newbies on spark connector:
// Bellow code is executed in distributed by spark FRAMEWORK
// 1. The master node executes "data.mapPartitionsWithIndex()"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,11 @@ private[io] object StageWriter {
totalSize += fileUploadResult.fileSize
totalRowCount += fileUploadResult.rowCount
})
val fileSizeStr = if (totalSize == 0) "N/A" else Utils.getSizeString(totalSize)
logAndAppend(progress, s"Total file count is ${fileUploadResults.size}, " +
s"non-empty files count is ${expectedFileSet.size}, " +
s"total file size is ${Utils.getSizeString(totalSize)}, " +
s"total row count is ${Utils.getSizeString(totalRowCount)}.")
s"total file size is $fileSizeStr, " +
s"total row count is $totalRowCount.")

// Indicate whether to use FILES clause in the copy command
var useFilesClause = false
Expand Down

0 comments on commit 4bc772a

Please sign in to comment.