From d60a0da9fe6ec6dcb2022f087406751108a67a52 Mon Sep 17 00:00:00 2001
From: Fei Wang <fwang12@ebay.com>
Date: Tue, 16 Jan 2024 20:46:43 -0800
Subject: [PATCH] refine

---
 .../kyuubi/engine/spark/SparkSQLEngine.scala  | 10 +++++-----
 .../spark/operation/ExecuteStatement.scala    |  9 ++++++---
 .../session/SparkSQLSessionManager.scala      | 20 +++++++++++++++++--
 3 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 3dc771e6ccf..aa5f116d89d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -41,7 +41,7 @@ import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister}
-import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.RetryPolicies
@@ -60,7 +60,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
 
   @volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
   @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
-  @volatile private var engineSavePath: Option[String] = None
+  @volatile private var engineSavePath: Option[Path] = None
 
   override def initialize(conf: KyuubiConf): Unit = {
     val listener = new SparkSQLEngineListener(this)
@@ -92,9 +92,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
     }
 
     if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
-      val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
-      engineSavePath = Some(s"$savePath/$engineId")
-      val path = new Path(engineSavePath.get)
+      engineSavePath = Some(
+        backendService.sessionManager.asInstanceOf[SparkSQLSessionManager].getEngineResultSavePath())
+      val path = engineSavePath.get
       val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
       fs.mkdirs(path)
       fs.deleteOnExit(path)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 3b72132dd44..0978d0adebc 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,9 +28,9 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
 import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
 import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
@@ -178,7 +178,10 @@ class ExecuteStatement(
           resultSaveThreshold,
           result)) {
         val sessionId = session.handle.identifier.toString
-        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
+        val savePath =
+          session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
+            session.handle,
+            handle)
         saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
         // Rename all col name to avoid duplicate columns
         val colName = range(0, result.schema.size).map(x => "col" + x)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index cadd33ff001..d07e8689116 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -31,6 +31,7 @@ import org.apache.kyuubi.engine.ShareLevel._
 import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
+import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.session._
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
 import org.apache.kyuubi.util.ThreadUtils
@@ -184,8 +185,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
     } finally {
       var resultPath: Path = null
       try {
-        resultPath = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
-          s"$engineId/${sessionHandle.identifier}")
+        resultPath = getSessionResultSavePath(sessionHandle)
         val fs = resultPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
         if (fs.exists(resultPath)) {
           fs.delete(resultPath, true)
@@ -206,4 +206,20 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
   }
 
   override protected def isServer: Boolean = false
+
+  private[spark] def getEngineResultSavePath(): Path = {
+    new Path(Seq(
+      conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR),
+      engineId).mkString(Path.SEPARATOR))
+  }
+
+  private def getSessionResultSavePath(sessionHandle: SessionHandle): Path = {
+    new Path(getEngineResultSavePath(), sessionHandle.identifier.toString)
+  }
+
+  private[spark] def getOperationResultSavePath(
+      sessionHandle: SessionHandle,
+      opHandle: OperationHandle): Path = {
+    new Path(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString)
+  }
 }