-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add snippet for impacts on parquet file of data type and compression algorithm #26
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
// Databricks notebook source | ||
// Spark: 3.5.1 | ||
// Local: --driver-memory 1G --master 'local[2]' --conf spark.ui.retainedJobs=2 | ||
|
||
// COMMAND ---------- | ||
|
||
/* | ||
This snippet shows who data type for numerical information and compression can affect Spark. | ||
|
||
# Symptom | ||
Storage needs does not match with expectations, for example is higher in output after filtering than in input. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest |
||
|
||
# Explanation | ||
There is difference in in type when reading the data and type when writing it, causing a loss of compression perfomance. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo, double |
||
|
||
*/ | ||
|
||
// COMMAND ---------- | ||
|
||
// We are going to demonstrate our purpose by converting the same numerica data into different types, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo, |
||
// and write it in parquet using different compression | ||
|
||
|
||
// Here are the type we want to compare | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo: |
||
val allNumericTypes = Seq("byte","short","int","long","double","float", "string","decimal(9,2)", "decimal(18,2)") | ||
|
||
// Here are the compressions we want to compare. | ||
// There are more in Spark SQL Parquet Data source documentation: see other in https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option | ||
val allParquetCompressions = Seq("none","gzip","snappy") | ||
|
||
// our initial data is generated as one million random numbers. | ||
val rndDF = spark.sparkContext | ||
.parallelize( | ||
List.range (0 ,1000000) | ||
.map(I=>(math.random * 1000000) | ||
) ) | ||
.toDF | ||
|
||
|
||
// COMMAND ---------- | ||
println (s"${"-"*20}") | ||
println (s"Write data") | ||
|
||
// let's write the data using all expected types and compressions | ||
for (numericTypeName <- allNumericTypes) { | ||
for (parquetCompressionName <- allParquetCompressions ){ | ||
val fileName=s"1M/Parquet_${numericTypeName}_${parquetCompressionName}" | ||
print (".") | ||
spark.sparkContext.setJobGroup("Write data",s"Write ${numericTypeName} with compression ${parquetCompressionName}") | ||
rndDF.selectExpr(s"cast(value as $numericTypeName )") | ||
.write | ||
.option("compression", parquetCompressionName) | ||
.format("parquet") | ||
.mode("overwrite") | ||
.save(fileName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd reuse the mechanism for creating random and disposable temporary directories that is already present in the other notebooks. This way temporary files land in the predefined and already set up trash directory. See the other snippets for reference (they use an uuid, you can use the same imports and snippet). |
||
|
||
} | ||
} | ||
|
||
|
||
// COMMAND ---------- | ||
|
||
// Now it's up to you to manually check the amount of storage required for each generated folder ... | ||
|
||
// ... or to use Spark's Hadoop configuration to do it for you: | ||
println (s"${"-"*20}") | ||
println (s"Check written data size") | ||
val hadoopFileSystem = org.apache.hadoop.fs.FileSystem.get( | ||
java.net.URI.create("./") | ||
, spark.sparkContext.hadoopConfiguration) | ||
|
||
//search for sum of files size in kiloBytes, per parent folder: | ||
val sizeOnDisk= hadoopFileSystem.globStatus(new org.apache.hadoop.fs.Path("./1M/**/part*")) | ||
.filter(o=>o.isFile) | ||
.map(o=> (o.getLen/1024, o.getPath.getParent.toString)) | ||
.groupBy(_._2) | ||
.mapValues(_.map(_._1).sum).toSeq.sortBy(_._2) | ||
|
||
println("part* files sizes (in kB):") | ||
sizeOnDisk.foreach( o=>println ( s"${o._1}\t${o._2}\tkB")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
// COMMAND ---------- | ||
|
||
// now we can also add a check on the effect of choosing a specific number type on the obtained values | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rephrase this as follows:
|
||
// and compression on the obtained files size. | ||
println (s"${"-"*20}") | ||
println (s"Read written data") | ||
import scala.collection.mutable.ArrayBuffer | ||
val readInformations: ArrayBuffer[(String,Long,Any)] = ArrayBuffer.empty | ||
for (numericTypeName <- allNumericTypes) { | ||
for (parquetCompressionName <- allParquetCompressions ){ | ||
val fileName=s"1M/Parquet_${numericTypeName}_${parquetCompressionName}" | ||
print(".") | ||
spark.sparkContext.setJobGroup("Read data",s"Read ${numericTypeName} with compression ${parquetCompressionName}") | ||
|
||
val t0=System.nanoTime | ||
val aggValue = spark.read.parquet(fileName) | ||
.selectExpr (s"avg(value) as `avg_from_${numericTypeName}_${parquetCompressionName}`") | ||
.first | ||
.get(0) | ||
|
||
val t1=System.nanoTime | ||
readInformations.append((fileName,(t1-t0)/1000000, aggValue)) | ||
}} | ||
|
||
|
||
println("Time to read and calculating aggregated value") | ||
readInformations.foreach(readInformation=> println ( s"${readInformation._1}\t${readInformation._2} ms\t(agg value was ${readInformation._3})" )) | ||
|
||
|
||
|
||
// COMMAND ---------- | ||
|
||
// Now you should also check how the variety of values affects compression : | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd encourage you to add some conclusions here so that people reading the snippet can take action and understand what to expect with their change. |
||
// our initial source of data is using Doubles between 0 and 1000000 | ||
// .map(I=>(math.random * 1000000) | ||
// maybe could you check what's the best option if you have 1000 distinct values only, example: | ||
// .map(I=>(math.floor(math.random * 1000)+999000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo:
how
.