Skip to content

Commit

Permalink
set jdbc fetch size in operation manager
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm1 committed Nov 10, 2023
1 parent 5dd3fb9 commit 0d6e858
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.jdbc.connection.user | <undefined> | The user is used for connecting to server | string | 1.6.0 |
| kyuubi.engine.jdbc.driver.class | <undefined> | The driver class for JDBC engine connection | string | 1.6.0 |
| kyuubi.engine.jdbc.extra.classpath | <undefined> | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 |
| kyuubi.engine.jdbc.fetch.size | 1000 | The default fetch size of JDBC engine | int | 1.9.0 |
| kyuubi.engine.jdbc.fetch.size | 1000 | The fetch size of JDBC engine | int | 1.9.0 |
| kyuubi.engine.jdbc.initialize.sql | SELECT 1 | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SELECT 1` to eagerly active JDBCClient. | seq | 1.8.0 |
| kyuubi.engine.jdbc.java.options | <undefined> | The extra Java options for the JDBC query engine | string | 1.6.0 |
| kyuubi.engine.jdbc.memory | 1g | The heap memory for the JDBC query engine | string | 1.6.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.util

import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_FETCH_SIZE, ENGINE_JDBC_SHORT_NAME}
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialects.defaultFetchSize
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
import org.apache.kyuubi.operation.Operation
Expand All @@ -31,7 +30,7 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._

abstract class JdbcDialect extends SupportServiceLoader with Logging {

def createStatement(connection: Connection, fetchSize: Int = defaultFetchSize): Statement = {
def createStatement(connection: Connection, fetchSize: Int = 1000): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(fetchSize)
Expand Down Expand Up @@ -86,9 +85,7 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {

object JdbcDialects extends Logging {

var defaultFetchSize: Int = 0
def get(conf: KyuubiConf): JdbcDialect = {
defaultFetchSize = conf.get(ENGINE_JDBC_FETCH_SIZE)
val shortName: String = conf.get(ENGINE_JDBC_SHORT_NAME).getOrElse {
val url = conf.get(ENGINE_JDBC_CONNECTION_URL).get
assert(url.length > 5 && url.substring(5).contains(":"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class ExecuteStatement(
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
incrementalCollect: Boolean)
incrementalCollect: Boolean,
fetchSize: Int)
extends JdbcOperation(session) with Logging {

private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
Expand All @@ -58,7 +59,7 @@ class ExecuteStatement(
var jdbcStatement: Statement = null
try {
val connection: Connection = session.asInstanceOf[JdbcSessionImpl].sessionConnection
jdbcStatement = dialect.createStatement(connection)
jdbcStatement = dialect.createStatement(connection, fetchSize)
val hasResult = jdbcStatement.execute(statement)
if (hasResult) {
val resultSetWrapper = new ResultSetWrapper(jdbcStatement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_INCREMENTAL_COLLECT
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, OPERATION_INCREMENTAL_COLLECT}
import org.apache.kyuubi.engine.jdbc.dialect.{JdbcDialect, JdbcDialects}
import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
Expand All @@ -44,13 +44,17 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
_.toBoolean).getOrElse(
session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(
_.toInt).getOrElse(
session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))
val executeStatement =
new ExecuteStatement(
session,
statement,
runAsync,
queryTimeout,
incrementalCollect)
incrementalCollect,
fetchSize)
addOperation(executeStatement)
}

Expand Down Expand Up @@ -79,8 +83,12 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
tableName: String,
tableTypes: util.List[String]): Operation = {
val query = dialect.getTablesQuery(catalogName, schemaName, tableName, tableTypes)
val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(
_.toInt).getOrElse(
session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))
val executeStatement =
new ExecuteStatement(session, query, false, 0L, true)
new ExecuteStatement(session, query, false, 0L, true, fetchSize)
addOperation(executeStatement)
}

Expand All @@ -96,8 +104,12 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
tableName: String,
columnName: String): Operation = {
val query = dialect.getColumnsQuery(session, catalogName, schemaName, tableName, columnName)
val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(
_.toInt).getOrElse(
session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))
val executeStatement =
new ExecuteStatement(session, query, false, 0L, true)
new ExecuteStatement(session, query, false, 0L, true, fetchSize)
addOperation(executeStatement)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2810,7 +2810,7 @@ object KyuubiConf {

val ENGINE_JDBC_FETCH_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.engine.jdbc.fetch.size")
.doc("The default fetch size of JDBC engine")
.doc("The fetch size of JDBC engine")
.version("1.9.0")
.intConf
.createWithDefault(1000)
Expand Down

0 comments on commit 0d6e858

Please sign in to comment.