Skip to content

Commit

Permalink
fix query timeout checker leak in chat engine and jdbc engine
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm1 committed Dec 27, 2024
1 parent a051253 commit aed9088
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kyuubi.engine.chat.operation

import org.apache.kyuubi.Logging
import java.util.concurrent.RejectedExecutionException

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.chat.provider.ChatProvider
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -41,9 +43,19 @@ class ExecuteStatement(
executeStatement()
}
}
val chatSessionManager = session.sessionManager
val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
try {
val chatSessionManager = session.sessionManager
val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {
executeStatement()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.operation

import java.sql.{Connection, Statement, Types}
import java.util.concurrent.RejectedExecutionException

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
Expand Down Expand Up @@ -50,9 +51,19 @@ class ExecuteStatement(
executeStatement()
}
}
val jdbcSessionManager = session.sessionManager
val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
try {
val jdbcSessionManager = session.sessionManager
val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {
executeStatement()
}
Expand Down

0 comments on commit aed9088

Please sign in to comment.