Skip to content

Commit

Permalink
SONARPY-2489 Create rule S7189 PySpark DataFrames used multiple times…
Browse files Browse the repository at this point in the history
… should be cached or persisted
  • Loading branch information
guillaume-dequenne-sonarsource committed Jan 29, 2025
1 parent 2dc3a33 commit e1fe321
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
2 changes: 2 additions & 0 deletions rules/S7189/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{
}
23 changes: 23 additions & 0 deletions rules/S7189/python/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"title": "PySpark DataFrames used multiple times should be cached or persisted",
"type": "CODE_SMELL",
"status": "ready",
"remediation": {
"func": "Constant\/Issue",
"constantCost": "5min"
},
"tags": [
],
"defaultSeverity": "Major",
"ruleSpecification": "RSPEC-7189",
"sqKey": "S7189",
"scope": "All",
"defaultQualityProfiles": ["Sonar way"],
"quickfix": "unknown",
"code": {
"impacts": {
"MAINTAINABILITY": "MEDIUM"
},
"attribute": "EFFICIENT"
}
}
94 changes: 94 additions & 0 deletions rules/S7189/python/rule.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
This rule raises an issue when a PySpark DataFrame is used multiple times without being cached using the `.cache()` method.

== Why is this an issue?

In Spark, transformations on DataFrames are lazy, meaning they are not executed until an action (like `count`, `collect`, etc.) is called. If you perform multiple actions on the same `DataFrame` without caching or persisting it, Spark will recompute the entire lineage of transformations for each action. By caching or persisting the DataFrame, you store the result of the transformations, avoiding the need to recompute them each time.

For this reason, DataFrames that are reused across multiple functions or operations should be cached using the `.cache()` method. This practice helps to prevent unnecessary recomputations, which can be resource-intensive and time-consuming. By caching `DataFrames`, you can leverage Spark's in-memory computation capabilities to enhance performance. This also reduces the need to read data from the original source repeatedly.

If the DataFrame is too large to fit into memory, consider using .persist() with an appropriate storage level instead of .cache().

== How to fix it
To fix this issue, make sure to either cache or persist `DataFrames` that are reused multiple times.

=== Code examples

==== Noncompliant code example

[source,python,diff-id=1,diff-type=noncompliant]
----
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("data.csv") # Noncompliant
def transform_data_1(df):
# Some transformations
return df.filter(df['value'] > 10)
def transform_data_2(df):
# Some other transformations
return df.groupBy('category').count()
def transform_data_3(df):
# Some other transformations
return df.groupBy('customerID').count()
result1 = transform_data_1(df)
result2 = transform_data_2(df)
result3 = transform_data_3(df)
----

==== Compliant solution

[source,python,diff-id=1,diff-type=compliant]
----
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("data.csv").cache() # Compliant
def transform_data_1(df):
# Some transformations
return df.filter(df['value'] > 10)
def transform_data_2(df):
# Some other transformations
return df.groupBy('category').count()
def transform_data_3(df):
# Some other transformations
return df.groupBy('customerID').count()
result1 = transform_data_1(df)
result2 = transform_data_2(df)
result3 = transform_data_3(df)
----

== Resources
=== Documentation

* Spark documentation - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.cache.html[pyspark.sql.DataFrame.cache]

=== Articles & blog posts
* Spark by Example - https://sparkbyexamples.com/pyspark/pyspark-cache-explained/[PySpark cache explained]

ifdef::env-github,rspecator-view[]
=== Implementation Specification

=== Message

Consider caching or persisting this DataFrame.

=== Highlighting

The API call reading and creating the initial DataFrame.

=== Quickfix

We can add the `.cache()` method to the DataFrame.
Quick fix message: `Cache the DataFrame`.

endif::env-github,rspecator-view[]

0 comments on commit e1fe321

Please sign in to comment.