Skip to content

Commit

Permalink
Fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Jan 22, 2025
1 parent 2da10c5 commit bddc968
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ case object Yarn extends SparkMaster
case object Kubernetes extends SparkMaster
case object Standalone extends SparkMaster

object SparkMaster {
def apply(master: Option[String]): Option[SparkMaster] = {
master.flatMap {
case url if url.contains("yarn") => Some(Yarn)
case url if url.contains("k8s") => Some(Kubernetes)
case url if url.contains("local") => Some(Local)
case url if url.contains("spark://") => Some(Standalone)
case _ => None
}
}
}

/**
* AutoTuner module that uses event logs and worker's system properties to recommend Spark
* RAPIDS configuration based on heuristics.
Expand Down Expand Up @@ -358,15 +370,7 @@ class AutoTuner(
private var filterByUpdatedPropertiesEnabled: Boolean = true

private lazy val sparkMaster: Option[SparkMaster] = {
appInfoProvider.getProperty("spark.master").flatMap {
case url if url.contains("yarn") => Some(Yarn)
case url if url.contains("k8s") => Some(Kubernetes)
case url if url.contains("local") => Some(Local)
case url if url.contains("spark://") => Some(Standalone)
case unknownUrl =>
logWarning(s"Unrecognized Spark master URL: $unknownUrl")
None
}
SparkMaster(appInfoProvider.getProperty("spark.master"))
}

private def isCalculationEnabled(prop: String) : Boolean = {
Expand Down Expand Up @@ -577,29 +581,6 @@ class AutoTuner(
appendComment(msg)
}

/**
* Find the label of the memory overhead based on the spark master configuration and the spark
* version.
* @return "spark.executor.memoryOverhead", "spark.kubernetes.memoryOverheadFactor",
* or "spark.executor.memoryOverheadFactor".
*/
private def memoryOverheadLabel: String = {
val defaultLabel = "spark.executor.memoryOverhead"
sparkMaster match {
case Some(Kubernetes) =>
appInfoProvider.getSparkVersion match {
case Some(version) =>
if (ToolUtils.isSpark330OrLater(version)) {
"spark.executor.memoryOverheadFactor"
} else {
"spark.kubernetes.memoryOverheadFactor"
}
case None => defaultLabel
}
case _ => defaultLabel
}
}

/**
* Flow:
* if "spark.master" is standalone => Do Nothing
Expand All @@ -611,7 +592,8 @@ class AutoTuner(
*/
private def addRecommendationForMemoryOverhead(recomValue: String): Unit = {
if (!sparkMaster.contains(Standalone)) {
val memOverheadLookup = memoryOverheadLabel
val memOverheadLookup = autoTunerConfigsProvider.getMemoryOverheadLabel(sparkMaster,
appInfoProvider.getSparkVersion)
val pinnedPoolSizeLookup = "spark.rapids.memory.pinnedPool.size"
appendRecommendationForMemoryMB(memOverheadLookup, recomValue)
// if using k8s and pinned pool size is set, add a comment if memory overhead is missing
Expand Down Expand Up @@ -1472,6 +1454,32 @@ trait AutoTunerConfigsProvider extends Logging {
def shuffleManagerCommentForMissingVersion: String = {
"Could not recommend RapidsShuffleManager as Spark version cannot be determined."
}


/**
* Find the label of the memory overhead based on the spark master configuration and the spark
* version.
* @return "spark.executor.memoryOverhead", "spark.kubernetes.memoryOverheadFactor",
* or "spark.executor.memoryOverheadFactor".
*/
def getMemoryOverheadLabel(
sparkMaster: Option[SparkMaster],
sparkVersion: Option[String]) : String = {
val defaultLabel = "spark.executor.memoryOverhead"
sparkMaster match {
case Some(Kubernetes) =>
sparkVersion match {
case Some(version) =>
if (ToolUtils.isSpark330OrLater(version)) {
"spark.executor.memoryOverheadFactor"
} else {
"spark.kubernetes.memoryOverheadFactor"
}
case None => defaultLabel
}
case _ => defaultLabel
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1921,9 +1921,10 @@ We recommend using nodes/workers with more memory. Need at least 17496MB memory.
// k8s value. The Autotuner should detect that the spark-master is k8s and
// should not comment on the missing memoryOverhead value since pinned pool is not set.
test(s"missing memoryOverhead comment is not included for k8s without pinned pool") {
val sparkMaster = "k8s://https://my-cluster-endpoint.example.com:6443"
val logEventsProps: mutable.Map[String, String] =
mutable.LinkedHashMap[String, String](
"spark.master" -> "k8s://https://my-cluster-endpoint.example.com:6443",
"spark.master" -> sparkMaster,
"spark.executor.cores" -> "16",
"spark.executor.instances" -> "1",
"spark.executor.memory" -> "80g",
Expand All @@ -1946,13 +1947,15 @@ We recommend using nodes/workers with more memory. Need at least 17496MB memory.
platform)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
val memoryOverheadLabel = ProfilingAutoTunerConfigsProvider.getMemoryOverheadLabel(
SparkMaster(Some(sparkMaster)), Some(testSparkVersion))
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.executor.instances=8
|--conf spark.executor.memory=32768m
|--conf spark.executor.memoryOverheadFactor=13516m
|--conf $memoryOverheadLabel=13516m
|--conf spark.rapids.memory.pinnedPool.size=4096m
|--conf spark.rapids.shuffle.multiThreaded.reader.threads=24
|--conf spark.rapids.shuffle.multiThreaded.writer.threads=24
Expand All @@ -1965,7 +1968,7 @@ We recommend using nodes/workers with more memory. Need at least 17496MB memory.
|--conf spark.sql.files.maxPartitionBytes=4096m
|
|Comments:
|- 'spark.executor.memoryOverheadFactor' was not set.
|- '$memoryOverheadLabel' was not set.
|- 'spark.rapids.memory.pinnedPool.size' was not set.
|- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set.
|- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set.
Expand Down

0 comments on commit bddc968

Please sign in to comment.