Skip to content

Commit

Permalink
handle session exception for all kyuubi operations
Browse files Browse the repository at this point in the history
save

Revert "save"

This reverts commit c3f8fca.

session
  • Loading branch information
turboFei committed Nov 18, 2023
1 parent 3478fc9 commit 5d9d4dc
Show file tree
Hide file tree
Showing 14 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class BatchJobSubmission(
OperationLog.removeCurrentOperationLog()
}

override protected def runInternal(): Unit = session.handleSessionException {
override protected def runKyuubiOperationInternal(): Unit = {
val asyncOperation: Runnable = () => {
try {
metadata match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class ExecuteStatement(
}
}

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
if (isTimeoutMonitorEnabled) {
addTimeoutMonitor(queryTimeout)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ExecutedCommandExec(
OperationLog.removeCurrentOperationLog()
}

override protected def runInternal(): Unit = session.handleSessionException {
override protected def runKyuubiOperationInternal(): Unit = {
val asyncOperation: Runnable = () => {
setState(OperationState.RUNNING)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kyuubi.session.Session

class GetCatalogs(session: Session) extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getCatalogs
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class GetColumns(
columnName: String)
extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getColumns(catalogName, schemaName, tableName, columnName)
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class GetCrossReference(
foreignTable: String)
extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getCrossReference(
primaryCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class GetFunctions(
functionName: String)
extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getFunctions(catalogName, schemaName, functionName)
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class GetPrimaryKeys(
tableName: String)
extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getPrimaryKeys(catalogName, schemaName, tableName)
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class GetSchemas(
schemaName: String)
extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getSchemas(catalogName, schemaName)
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kyuubi.session.Session

class GetTableTypes(session: Session) extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getTableTypes
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class GetTables(
tableTypes: java.util.List[String])
extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getTables(catalogName, schemaName, tableName, tableTypes)
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kyuubi.session.Session

class GetTypeInfo(session: Session) extends KyuubiOperation(session) {

override protected def runInternal(): Unit = {
override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getTypeInfo
} catch onError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kyuubi.metrics.MetricsConstants.{OPERATION_FAIL, OPERATION_OPE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Session}
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionImpl, KyuubiSessionManager, Session}
import org.apache.kyuubi.util.ThriftUtils

abstract class KyuubiOperation(session: Session) extends AbstractOperation(session) {
Expand Down Expand Up @@ -100,6 +100,13 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
}

final override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
runKyuubiOperationInternal()
}

protected def runKyuubiOperationInternal(): Unit

override protected def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.RUNNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
OperationLog.removeCurrentOperationLog()
}

override protected def runInternal(): Unit = session.handleSessionException {
override protected def runKyuubiOperationInternal(): Unit = {
val asyncOperation: Runnable = () => {
setState(OperationState.RUNNING)
try {
Expand Down

0 comments on commit 5d9d4dc

Please sign in to comment.