Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snippet for impacts on parquet file of data type and compression algorithm #26

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions spark3/parquet-compression.choose-types-wisely.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Databricks notebook source
// Spark: 3.5.1
// Local: --driver-memory 1G --master 'local[2]' --conf spark.ui.retainedJobs=2

// COMMAND ----------

/*
This snippet shows who data type for numerical information and compression can affect Spark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: how.


# Symptom
Storage needs does not match with expectations, for example is higher in output after filtering than in input.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, needs do not match....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest for example volume is higher.


# Explanation
There is difference in in type when reading the data and type when writing it, causing a loss of compression perfomance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, double in.


*/

// COMMAND ----------

// We are going to demonstrate our purpose by converting the same numerica data into different types,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, numerical.

// and write it in parquet using different compression


// Here are the type we want to compare
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: types.

val allNumericTypes = Seq("byte","short","int","long","double","float", "string","decimal(9,2)", "decimal(18,2)")

// Here are the compressions we want to compare.
// There are more in Spark SQL Parquet Data source documentation: see other in https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option
val allParquetCompressions = Seq("none","gzip","snappy")

// our initial data is generated as one million random numbers.
val rndDF = spark.sparkContext
.parallelize(
List.range (0 ,1000000)
.map(I=>(math.random * 1000000)
) )
.toDF


// COMMAND ----------
println (s"${"-"*20}")
println (s"Write data")

// let's write the data using all expected types and compressions
for (numericTypeName <- allNumericTypes) {
for (parquetCompressionName <- allParquetCompressions ){
val fileName=s"1M/Parquet_${numericTypeName}_${parquetCompressionName}"
print (".")
spark.sparkContext.setJobGroup("Write data",s"Write ${numericTypeName} with compression ${parquetCompressionName}")
rndDF.selectExpr(s"cast(value as $numericTypeName )")
.write
.option("compression", parquetCompressionName)
.format("parquet")
.mode("overwrite")
.save(fileName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reuse the mechanism for creating random and disposable temporary directories that is already present in the other notebooks. This way temporary files land in the predefined and already set up trash directory. See the other snippets for reference (they use an uuid, you can use the same imports and snippet).


}
}


// COMMAND ----------

// Now it's up to you to manually check the amount of storage required for each generated folder ...

// ... or to use Spark's Hadoop configuration to do it for you:
println (s"${"-"*20}")
println (s"Check written data size")
val hadoopFileSystem = org.apache.hadoop.fs.FileSystem.get(
java.net.URI.create("./")
, spark.sparkContext.hadoopConfiguration)

//search for sum of files size in kiloBytes, per parent folder:
val sizeOnDisk= hadoopFileSystem.globStatus(new org.apache.hadoop.fs.Path("./1M/**/part*"))
.filter(o=>o.isFile)
.map(o=> (o.getLen/1024, o.getPath.getParent.toString))
.groupBy(_._2)
.mapValues(_.map(_._1).sum).toSeq.sortBy(_._2)

println("part* files sizes (in kB):")
sizeOnDisk.foreach( o=>println ( s"${o._1}\t${o._2}\tkB"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using implicits, you can use toDF to display a more friendly table:
image


// COMMAND ----------

// now we can also add a check on the effect of choosing a specific number type on the obtained values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rephrase this as follows:

// Now let's see how long it takes to read and process such data when using different compression and numerical data formats .

// and compression on the obtained files size.
println (s"${"-"*20}")
println (s"Read written data")
import scala.collection.mutable.ArrayBuffer
val readInformations: ArrayBuffer[(String,Long,Any)] = ArrayBuffer.empty
for (numericTypeName <- allNumericTypes) {
for (parquetCompressionName <- allParquetCompressions ){
val fileName=s"1M/Parquet_${numericTypeName}_${parquetCompressionName}"
print(".")
spark.sparkContext.setJobGroup("Read data",s"Read ${numericTypeName} with compression ${parquetCompressionName}")

val t0=System.nanoTime
val aggValue = spark.read.parquet(fileName)
.selectExpr (s"avg(value) as `avg_from_${numericTypeName}_${parquetCompressionName}`")
.first
.get(0)

val t1=System.nanoTime
readInformations.append((fileName,(t1-t0)/1000000, aggValue))
}}


println("Time to read and calculating aggregated value")
readInformations.foreach(readInformation=> println ( s"${readInformation._1}\t${readInformation._2} ms\t(agg value was ${readInformation._3})" ))



// COMMAND ----------

// Now you should also check how the variety of values affects compression :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd encourage you to add some conclusions here so that people reading the snippet can take action and understand what to expect with their change.

// our initial source of data is using Doubles between 0 and 1000000
// .map(I=>(math.random * 1000000)
// maybe could you check what's the best option if you have 1000 distinct values only, example:
// .map(I=>(math.floor(math.random * 1000)+999000)