Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm1 committed Mar 26, 2024
1 parent bf878c9 commit af19ce0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
Expand Down Expand Up @@ -56,7 +57,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin

private val shutdown = new AtomicBoolean(false)
private val gracefulStopDeregistered = new AtomicBoolean(false)

private val objectMapper = new ObjectMapper
@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
private lazy val engineSavePath =
Expand Down Expand Up @@ -188,18 +189,17 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
.flatMap { stage =>
statusTracker.getStageInfo(stage).map(_.numActiveTasks)
}.sum
val engineMetrics = Map(
"openSessionCount" -> openSessionCount,
"activeTask" -> activeTask,
"poolId" -> engineSpace.split("-").last)
info(s"Spark engine has $openSessionCount open sessions and $activeTask active tasks.")
val engineMetrics = objectMapper.createObjectNode()
.put("openSessionCount", openSessionCount)
.put("activeTask", activeTask)
.put("poolID", engineSpace.split("-").last.toInt).toString
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
if (client.pathNonExists(metricsSpace)) {
client.create(metricsSpace, "PERSISTENT")
}
client.setData(
s"/metrics$engineSpace",
engineMetrics.map { case (k, v) => s"$k=$v" }.mkString(";").getBytes)
engineMetrics.getBytes)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
import scala.util.Random

import com.codahale.metrics.MetricRegistry
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.google.common.annotations.VisibleForTesting

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
Expand Down Expand Up @@ -66,6 +68,8 @@ private[kyuubi] class EngineRef(

private val timeout: Long = conf.get(ENGINE_INIT_TIMEOUT)

private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

// Share level of the engine
private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))

Expand Down Expand Up @@ -371,10 +375,10 @@ private[kyuubi] class EngineRef(
}
}

def getAdaptivePoolId(poolSize: Int): Int = {
private def getAdaptivePoolId(poolSize: Int): Int = {
val sessionThreshold = conf.get(ENGINE_POOL_ADAPTIVE_SESSION_THRESHOLD)
val metricsSpace =
s"/metrics/${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}/$user"
s"/metrics/${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType/$user"
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
tryWithLock(client) {
if (client.pathNonExists(metricsSpace)) {
Expand All @@ -387,25 +391,23 @@ private[kyuubi] class EngineRef(
} else {
engineType match {
case SPARK_SQL =>
val engineMetricsMap = metrics.map(p =>
new String(client.getData(s"$metricsSpace/$p"))
.split(";")
.map(_.split("=", 2))
.filter(_.length == 2)
.map(kv => (kv.head, kv.last.toInt))
.toMap)
val engineMetricsMap = metrics.map { p =>
objectMapper.readValue(
new String(client.getData(s"$metricsSpace/$p")),
classOf[Map[String, Int]])
}
if (engineMetricsMap.isEmpty) {
return Random.nextInt(poolSize)
} else {
val sortedEngineMetrics = engineMetricsMap.sortBy { map =>
(
map.getOrElse("openSessionCount", sessionThreshold),
map.getOrElse("activeTask", 0))
}
val candidate = sortedEngineMetrics.head
if (candidate.contains("poolId") && (candidate(
"openSessionCount") < sessionThreshold || metrics.size == poolSize)) {
candidate("poolId")
val candidate = engineMetricsMap.filter(_.contains("poolID"))
.minBy { map =>
(
map.getOrElse("openSessionCount", sessionThreshold),
map.getOrElse("activeTask", 0))
}
if ((candidate.nonEmpty && candidate("openSessionCount") < sessionThreshold) ||
metrics.size == poolSize) {
candidate("poolID")
} else {
Random.nextInt(poolSize)
}
Expand Down

0 comments on commit af19ce0

Please sign in to comment.