From 243c2c38352ed50ce994849df3fb0d2642d3a60d Mon Sep 17 00:00:00 2001 From: Bing Li <63471091+sfc-gh-bli@users.noreply.github.com> Date: Thu, 30 May 2024 11:05:55 -0700 Subject: [PATCH] SNOW-1291653 Disable abort_detached_query at Session Level by Default (#562) --- .../snowflake/ShareConnectionSuite.scala | 38 +++++++++++++++++++ .../spark/snowflake/Parameters.scala | 8 ++++ .../spark/snowflake/ServerConnection.scala | 5 +++ .../spark/snowflake/MiscSuite01.scala | 8 ++++ 4 files changed, 59 insertions(+) diff --git a/src/it/scala/net/snowflake/spark/snowflake/ShareConnectionSuite.scala b/src/it/scala/net/snowflake/spark/snowflake/ShareConnectionSuite.scala index e8de8c31..39212396 100644 --- a/src/it/scala/net/snowflake/spark/snowflake/ShareConnectionSuite.scala +++ b/src/it/scala/net/snowflake/spark/snowflake/ShareConnectionSuite.scala @@ -386,4 +386,42 @@ class ShareConnectionSuite extends IntegrationSuiteBase { assert(ServerConnection.jdbcConnectionCount.get() == oldJdbcConnectionCount) assert(ServerConnection.serverConnectionCount.get() > oldServerConnectionCount) } + + test("abort_detached_query") { + // run dummy query to create session + sparkSession + .read + .format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("query", "select 1") + .load() + .show() + val conn1 = ServerConnection + .getServerConnection(Parameters.mergeParameters(connectorOptions)) + val result1 = conn1.jdbcConnection + .prepareStatement("show parameters like 'abort_detached_query'") + .executeQuery() + assert(result1.next()) + assert(result1.getString(1).equals("ABORT_DETACHED_QUERY")) + assert(!result1.getBoolean(2)) + + // run dummy query to create session + sparkSession + .read + .format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("abort_detached_query", "true") + .option("query", "select 1") + .load() + .show() + val conn2 = ServerConnection + .getServerConnection(Parameters + .mergeParameters(connectorOptions + ("abort_detached_query" -> "true"))) + val result2 = conn2.jdbcConnection + .prepareStatement("show parameters like 'abort_detached_query'") + .executeQuery() + assert(result2.next()) + assert(result2.getString(1).equals("ABORT_DETACHED_QUERY")) + assert(result2.getBoolean(2)) + } } diff --git a/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala b/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala index 3d7f1fd9..91fafdb6 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala @@ -233,6 +233,11 @@ object Parameters { "support_share_connection" ) + // enable/disable ABORT_DETACHED_QUERY in the session level + val PARAM_ABORT_DETACHED_QUERY: String = knownParam( + "abort_detached_query" + ) + // preactions and postactions may affect the session level setting, so connection sharing // may be enabled only when the queries in preactions and postactions are in a white list. // force_skip_pre_post_action_check_for_session_sharing is introduced if users are sure that @@ -719,6 +724,9 @@ object Parameters { def supportShareConnection: Boolean = { isTrue(parameters.getOrElse(PARAM_SUPPORT_SHARE_CONNECTION, "true")) } + def abortDetachedQuery: Boolean = { + isTrue(parameters.getOrElse(PARAM_ABORT_DETACHED_QUERY, "false")) + } def forceSkipPrePostActionsCheck: Boolean = { isTrue(parameters.getOrElse( PARAM_FORCE_SKIP_PRE_POST_ACTION_CHECK_FOR_SESSION_SHARING, "false")) diff --git a/src/main/scala/net/snowflake/spark/snowflake/ServerConnection.scala b/src/main/scala/net/snowflake/spark/snowflake/ServerConnection.scala index 4965f556..16fcf760 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/ServerConnection.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/ServerConnection.scala @@ -256,6 +256,11 @@ private[snowflake] object ServerConnection { } } + // abort_detached_query + conn.createStatement().execute( + s"alter session set ABORT_DETACHED_QUERY = ${params.abortDetachedQuery}" + ) + // Setup query result format explicitly because this option is not supported // to be set with JDBC properties if (params.supportAWSStageEndPoint) { diff --git a/src/test/scala/net/snowflake/spark/snowflake/MiscSuite01.scala b/src/test/scala/net/snowflake/spark/snowflake/MiscSuite01.scala index 974e607c..d4b29a3c 100644 --- a/src/test/scala/net/snowflake/spark/snowflake/MiscSuite01.scala +++ b/src/test/scala/net/snowflake/spark/snowflake/MiscSuite01.scala @@ -515,4 +515,12 @@ class MiscSuite01 extends FunSuite with Matchers { assert(queryURL.startsWith("Cannot generate queryID URL for https://unrecognized_url")) } + test("abort_detached_query") { + val param = Parameters.MergedParameters(Map.empty) + assert(!param.abortDetachedQuery) + + val param1 = Parameters.MergedParameters(Map("abort_detached_query" -> "true")) + assert(param1.abortDetachedQuery) + } + }