Skip to content

Commit

Permalink
skip retry after application closed
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Feb 2, 2024
1 parent d7f8e98 commit 47d1bda
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,19 @@ private[snowflake] case class SnowflakeRelation(
// without first executing it.
private def getRDD[T: ClassTag](statement: SnowflakeSQLStatement,
resultSchema: StructType): RDD[T] = {
if (params.useCopyUnload) {
getSnowflakeRDD(statement, resultSchema)
val appId = sqlContext.sparkContext.applicationId
if (SparkConnectorContext.closedApplicationIDs.contains(appId)) {
// don't execute any snowflake queries if the Spark application was closed.
// Spark trigger `onApplicationEnd` listener early than stop tasks.
// Connector cancels all running sql queries in the `onApplicationEnd` listener.
// spark will re-run canceled Snowflake SQL queries in retries.
throw new IllegalStateException(s"Spark Application ($appId) was closed")
} else {
getSnowflakeResultSetRDD(statement, resultSchema)
if (params.useCopyUnload) {
getSnowflakeRDD(statement, resultSchema)
} else {
getSnowflakeResultSetRDD(statement, resultSchema)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ object SparkConnectorContext {
// The key is the application ID, the value is the set of running queries.
private val runningQueries = mutable.Map[String, mutable.Set[RunningQuery]]()

// save all closed applications' ID, and skip Spark's retries after application closed.
private[snowflake] val closedApplicationIDs = mutable.HashSet.empty[String]

private[snowflake] def getRunningQueries = runningQueries

// Register spark listener to cancel any running queries if application fails.
Expand All @@ -44,6 +47,10 @@ object SparkConnectorContext {
runningQueries.put(appId, mutable.Set.empty)
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
// add application ID to the block list.
// when Spark retries these closed applications,
// Spark connector will skip those queries.
closedApplicationIDs.add(appId)
try {
cancelRunningQueries(appId)
// Close all cached connections
Expand Down

0 comments on commit 47d1bda

Please sign in to comment.