Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Jan 17, 2024
1 parent bce364f commit 3224e53
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, KYUUBI_ENGINE_URL}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
Expand All @@ -60,7 +59,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin

@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
@volatile private var engineSavePath: Option[String] = None
@volatile private var engineSavePath: Option[Path] = None

override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
Expand Down Expand Up @@ -92,9 +91,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
engineSavePath = Some(s"$savePath/$engineId")
val path = new Path(engineSavePath.get)
engineSavePath =
Some(backendService.sessionManager.asInstanceOf[
SparkSQLSessionManager].getEngineResultSavePath())
val path = engineSavePath.get
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(path)
fs.deleteOnExit(path)
Expand All @@ -114,8 +114,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
exec,
Duration(60, TimeUnit.SECONDS))
})
engineSavePath.foreach { p =>
val path = new Path(p)
engineSavePath.foreach { path =>
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
Expand Down Expand Up @@ -178,7 +178,10 @@ class ExecuteStatement(
resultSaveThreshold,
result)) {
val sessionId = session.handle.identifier.toString
val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
val savePath =
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
session.handle,
handle)
saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
// Rename all col name to avoid duplicate columns
val colName = range(0, result.schema.size).map(x => "col" + x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.util.ThreadUtils
Expand Down Expand Up @@ -184,8 +185,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
} finally {
var resultPath: Path = null
try {
resultPath = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
s"$engineId/${sessionHandle.identifier}")
resultPath = getSessionResultSavePath(sessionHandle)
val fs = resultPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(resultPath)) {
fs.delete(resultPath, true)
Expand All @@ -206,4 +206,20 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
}

override protected def isServer: Boolean = false

private[spark] def getEngineResultSavePath(): Path = {
new Path(Seq(
conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR),
engineId).mkString(Path.SEPARATOR))
}

private def getSessionResultSavePath(sessionHandle: SessionHandle): Path = {
new Path(getEngineResultSavePath(), sessionHandle.identifier.toString)
}

private[spark] def getOperationResultSavePath(
sessionHandle: SessionHandle,
opHandle: OperationHandle): Path = {
new Path(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString)
}
}

0 comments on commit 3224e53

Please sign in to comment.