Skip to content

Commit

Permalink
prevent leak
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Jan 17, 2024
1 parent e9e2d18 commit bce364f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class ExecuteStatement(
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
lazy val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
resultSaveThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,24 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
} catch {
case e: KyuubiSQLException =>
warn(s"Error closing session ${sessionHandle}", e)
} finally {
var resultPath: Path = null
try {
resultPath = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
s"$engineId/${sessionHandle.identifier}")
val fs = resultPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(resultPath)) {
fs.delete(resultPath, true)
info(s"Deleted session result path: $resultPath")
}
} catch {
case e: Throwable => error(s"Error cleaning session result path: $resultPath", e)
}
}
if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
s"$engineId/${sessionHandle.identifier}")
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
info(s"Delete session result file $path")
}
}

private def stopSession(): Unit = {
Expand Down

0 comments on commit bce364f

Please sign in to comment.