diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 3dc771e6ccf..bbd9b6dc344 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -38,10 +38,9 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, KYUUBI_ENGINE_URL} import org.apache.kyuubi.engine.ShareLevel -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister} -import org.apache.kyuubi.engine.spark.session.SparkSessionImpl +import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.RetryPolicies @@ -60,7 +59,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin @volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None - @volatile private var engineSavePath: Option[String] = None + @volatile private var engineSavePath: Option[Path] = None override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) @@ -92,9 +91,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin } if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) { - val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR) - engineSavePath = Some(s"$savePath/$engineId") - val path = new Path(engineSavePath.get) + engineSavePath = + Some(backendService.sessionManager.asInstanceOf[ + SparkSQLSessionManager].getEngineResultSavePath()) + val path = engineSavePath.get val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) fs.mkdirs(path) fs.deleteOnExit(path) @@ -114,8 +114,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin exec, Duration(60, TimeUnit.SECONDS)) }) - engineSavePath.foreach { p => - val path = new Path(p) + engineSavePath.foreach { path => path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 3b72132dd44..0978d0adebc 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} +import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ -import org.apache.kyuubi.engine.spark.session.SparkSessionImpl +import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -178,7 +178,10 @@ class ExecuteStatement( resultSaveThreshold, result)) { val sessionId = session.handle.identifier.toString - val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR) + val savePath = + session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath( + session.handle, + handle) saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId") // Rename all col name to avoid duplicate columns val colName = range(0, result.schema.size).map(x => "col" + x) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index cadd33ff001..d07e8689116 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -31,6 +31,7 @@ import org.apache.kyuubi.engine.ShareLevel._ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager +import org.apache.kyuubi.operation.OperationHandle import org.apache.kyuubi.session._ import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.util.ThreadUtils @@ -184,8 +185,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) } finally { var resultPath: Path = null try { - resultPath = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" + - s"$engineId/${sessionHandle.identifier}") + resultPath = getSessionResultSavePath(sessionHandle) val fs = resultPath.getFileSystem(spark.sparkContext.hadoopConfiguration) if (fs.exists(resultPath)) { fs.delete(resultPath, true) @@ -206,4 +206,20 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) } override protected def isServer: Boolean = false + + private[spark] def getEngineResultSavePath(): Path = { + new Path(Seq( + conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), + engineId).mkString(Path.SEPARATOR)) + } + + private def getSessionResultSavePath(sessionHandle: SessionHandle): Path = { + new Path(getEngineResultSavePath(), sessionHandle.identifier.toString) + } + + private[spark] def getOperationResultSavePath( + sessionHandle: SessionHandle, + opHandle: OperationHandle): Path = { + new Path(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString) + } }