Skip to content

Commit

Permalink
Merge branch 'master' into snow-1454968
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed May 30, 2024
2 parents f4d0307 + 243c2c3 commit 69aaed9
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,42 @@ class ShareConnectionSuite extends IntegrationSuiteBase {
assert(ServerConnection.jdbcConnectionCount.get() == oldJdbcConnectionCount)
assert(ServerConnection.serverConnectionCount.get() > oldServerConnectionCount)
}

test("abort_detached_query") {
// run dummy query to create session
sparkSession
.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("query", "select 1")
.load()
.show()
val conn1 = ServerConnection
.getServerConnection(Parameters.mergeParameters(connectorOptions))
val result1 = conn1.jdbcConnection
.prepareStatement("show parameters like 'abort_detached_query'")
.executeQuery()
assert(result1.next())
assert(result1.getString(1).equals("ABORT_DETACHED_QUERY"))
assert(!result1.getBoolean(2))

// run dummy query to create session
sparkSession
.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("abort_detached_query", "true")
.option("query", "select 1")
.load()
.show()
val conn2 = ServerConnection
.getServerConnection(Parameters
.mergeParameters(connectorOptions + ("abort_detached_query" -> "true")))
val result2 = conn2.jdbcConnection
.prepareStatement("show parameters like 'abort_detached_query'")
.executeQuery()
assert(result2.next())
assert(result2.getString(1).equals("ABORT_DETACHED_QUERY"))
assert(result2.getBoolean(2))
}
}
8 changes: 8 additions & 0 deletions src/main/scala/net/snowflake/spark/snowflake/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ object Parameters {
"support_share_connection"
)

// enable/disable ABORT_DETACHED_QUERY in the session level
val PARAM_ABORT_DETACHED_QUERY: String = knownParam(
"abort_detached_query"
)

// preactions and postactions may affect the session level setting, so connection sharing
// may be enabled only when the queries in preactions and postactions are in a white list.
// force_skip_pre_post_action_check_for_session_sharing is introduced if users are sure that
Expand Down Expand Up @@ -719,6 +724,9 @@ object Parameters {
def supportShareConnection: Boolean = {
isTrue(parameters.getOrElse(PARAM_SUPPORT_SHARE_CONNECTION, "true"))
}
def abortDetachedQuery: Boolean = {
isTrue(parameters.getOrElse(PARAM_ABORT_DETACHED_QUERY, "false"))
}
def forceSkipPrePostActionsCheck: Boolean = {
isTrue(parameters.getOrElse(
PARAM_FORCE_SKIP_PRE_POST_ACTION_CHECK_FOR_SESSION_SHARING, "false"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ private[snowflake] object ServerConnection {
}
}

// abort_detached_query
conn.createStatement().execute(
s"alter session set ABORT_DETACHED_QUERY = ${params.abortDetachedQuery}"
)

// Setup query result format explicitly because this option is not supported
// to be set with JDBC properties
if (params.supportAWSStageEndPoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,4 +515,12 @@ class MiscSuite01 extends FunSuite with Matchers {
assert(queryURL.startsWith("Cannot generate queryID URL for https://unrecognized_url"))
}

test("abort_detached_query") {
val param = Parameters.MergedParameters(Map.empty)
assert(!param.abortDetachedQuery)

val param1 = Parameters.MergedParameters(Map("abort_detached_query" -> "true"))
assert(param1.abortDetachedQuery)
}

}

0 comments on commit 69aaed9

Please sign in to comment.