From f1a29be844d59ebef4a1f662829453bc77b075f8 Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Fri, 10 Jan 2025 10:30:00 +0800 Subject: [PATCH] [KYUUBI #6843] [FOLLOWUP] Fix 'query-timeout-thread' thread leak ### Why are the changes needed? If the session manager's ThreadPoolExecutor refuses to execute the asyncOperation, then we need to shut down the query-timeout-thread in the catch block. This should also be done in JDBC and the CHAT engine. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes #6873 from lsm1/branch-followup-6843. Closes #6843 aed9088c8 [senmiaoliu] fix query timeout checker leak in chat engine and jdbc engine Authored-by: senmiaoliu Signed-off-by: senmiaoliu (cherry picked from commit 622190197ddacd1a756ae9e0209c649840adbcc2) Signed-off-by: senmiaoliu --- .../chat/operation/ExecuteStatement.scala | 20 +++++++++++++++---- .../jdbc/operation/ExecuteStatement.scala | 17 +++++++++++++--- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala index 754a519324f..f5d93d45d70 100644 --- a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala @@ -16,7 +16,9 @@ */ package org.apache.kyuubi.engine.chat.operation -import org.apache.kyuubi.Logging +import java.util.concurrent.RejectedExecutionException + +import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.engine.chat.provider.ChatProvider import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState} import org.apache.kyuubi.operation.log.OperationLog @@ -41,9 +43,19 @@ class ExecuteStatement( executeStatement() } } - val chatSessionManager = session.sessionManager - val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation) - setBackgroundHandle(backgroundHandle) + try { + val chatSessionManager = session.sessionManager + val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR) + val ke = + KyuubiSQLException("Error submitting query in background, query rejected", rejected) + setOperationException(ke) + shutdownTimeoutMonitor() + throw ke + } } else { executeStatement() } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala index af9e9a10274..d75c7f408cf 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.engine.jdbc.operation import java.sql.{Connection, Statement, Types} +import java.util.concurrent.RejectedExecutionException import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema} @@ -50,9 +51,19 @@ class ExecuteStatement( executeStatement() } } - val jdbcSessionManager = session.sessionManager - val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation) - setBackgroundHandle(backgroundHandle) + try { + val jdbcSessionManager = session.sessionManager + val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR) + val ke = + KyuubiSQLException("Error submitting query in background, query rejected", rejected) + setOperationException(ke) + shutdownTimeoutMonitor() + throw ke + } } else { executeStatement() }