Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5509] Add Apache Impala JDBC engine dialect #6104

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider
org.apache.kyuubi.engine.jdbc.impala.ImpalaConnectionProvider
org.apache.kyuubi.engine.jdbc.mysql.MySQLConnectionProvider
org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider
org.apache.kyuubi.engine.jdbc.postgresql.PostgreSQLConnectionProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

org.apache.kyuubi.engine.jdbc.dialect.DorisDialect
org.apache.kyuubi.engine.jdbc.dialect.ImpalaDialect
org.apache.kyuubi.engine.jdbc.dialect.MySQLDialect
org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect
org.apache.kyuubi.engine.jdbc.dialect.PostgreSQLDialect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ abstract class JdbcConnectionProvider extends SupportServiceLoader with Logging

val driverClass: String

def canHandle(providerClass: String): Boolean
def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

def getConnection(kyuubiConf: KyuubiConf): Connection = {
val properties = new Properties()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.util

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.impala.{ImpalaSchemaHelper, ImpalaTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.session.Session

class ImpalaDialect extends JdbcDialect {

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
if (isPattern(schema)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like schema names not supported")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible to add a nagetive test case for it? someday we may want to implement it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for such cases to org.apache.kyuubi.engine.jdbc.impala.DialectSuite

}

val query = new StringBuilder("show tables ")

if (StringUtils.isNotEmpty(schema) && !isWildcardSetByKyuubi(schema)) {
query.append(s"in $schema ")
}

if (StringUtils.isNotEmpty(tableName)) {
query.append(s"like '${toImpalaRegex(tableName)}'")
}

query.toString()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
if (StringUtils.isEmpty(tableName)) {
throw KyuubiSQLException("Table name should not be empty")
}

if (isPattern(schemaName)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like schema names not supported")
}

if (isPattern(tableName)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like table names not supported")
}

val query = new StringBuilder("show column stats ")

if (StringUtils.isNotEmpty(schemaName) && !isWildcardSetByKyuubi(schemaName)) {
query.append(s"$schemaName.")
}

query.append(tableName)
query.toString()
}

override def getTRowSetGenerator(): JdbcTRowSetGenerator = new ImpalaTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = new ImpalaSchemaHelper

override def name(): String = "impala"

private def isPattern(value: String): Boolean = {
value != null && !isWildcardSetByKyuubi(value) && value.contains("*")
}

private def isWildcardSetByKyuubi(pattern: String): Boolean = pattern == "%"

private def toImpalaRegex(pattern: String): String = {
pattern.replace("%", "*")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.impala

import org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider

class ImpalaConnectionProvider extends JdbcConnectionProvider {

override val name: String = classOf[ImpalaConnectionProvider].getName

override val driverClass: String = ImpalaConnectionProvider.driverClass
}

object ImpalaConnectionProvider {
// we should use kyuubi hive driver instead of original hive one in order
// to get fixed getMoreResults()
val driverClass: String = "org.apache.kyuubi.jdbc.KyuubiHiveDriver"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.impala

import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId

class ImpalaSchemaHelper extends SchemaHelper {
override protected def floatToTTypeId: TTypeId = {
TTypeId.DOUBLE_TYPE
}

override protected def realToTTypeId: TTypeId = {
TTypeId.DOUBLE_TYPE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.impala

import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}

class ImpalaTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
override def toFloatTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asDoubleTColumn(rows, ordinal)

override def toFloatTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asDoubleTColumnValue(row, ordinal)

override def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asDoubleTColumn(rows, ordinal)

override def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asDoubleTColumnValue(row, ordinal)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ class MySQL8ConnectionProvider extends JdbcConnectionProvider {
override val name: String = classOf[MySQL8ConnectionProvider].getName

override val driverClass: String = MySQL8ConnectionProvider.driverClass

override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

}

object MySQL8ConnectionProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ class ExecuteStatement(
})
} else {
warn(s"Execute in full collect mode")
jdbcStatement.closeOnCompletion()
new ArrayFetchIterator(resultSetWrapper.toArray())
val arrayIter = new ArrayFetchIterator(resultSetWrapper.toArray())
jdbcStatement.close()
arrayIter
}
} else {
schema = Schema(List[Column](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,4 @@ class PhoenixConnectionProvider extends JdbcConnectionProvider {
override val name: String = classOf[PhoenixConnectionProvider].getName

override val driverClass: String = "org.apache.phoenix.queryserver.client.Driver"

override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,4 @@ class PostgreSQLConnectionProvider extends JdbcConnectionProvider {

override val driverClass: String = "org.postgresql.Driver"

override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
case INTEGER => toIntegerTColumn(rows, ordinal)
case BIGINT => toBigIntTColumn(rows, ordinal)
case REAL => toRealTColumn(rows, ordinal)
case FLOAT => toFloatTColumn(rows, ordinal)
case DOUBLE => toDoubleTColumn(rows, ordinal)
case CHAR => toCharTColumn(rows, ordinal)
case VARCHAR => toVarcharTColumn(rows, ordinal)
case BOOLEAN => toBooleanTColumn(rows, ordinal)
case _ => toDefaultTColumn(rows, ordinal, sqlType)
}

Expand All @@ -47,9 +49,11 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
case INTEGER => toIntegerTColumnValue(row, ordinal)
case BIGINT => toBigIntTColumnValue(row, ordinal)
case REAL => toRealTColumnValue(row, ordinal)
case FLOAT => toFloatTColumnValue(row, ordinal)
case DOUBLE => toDoubleTColumnValue(row, ordinal)
case CHAR => toCharTColumnValue(row, ordinal)
case VARCHAR => toVarcharTColumnValue(row, ordinal)
case BOOLEAN => toBooleanTColumnValue(row, ordinal)
case otherType => toDefaultTColumnValue(row, ordinal, otherType)
}
}
Expand All @@ -63,6 +67,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asBooleanTColumn(rows, ordinal)

def toBooleanTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asBooleanTColumn(rows, ordinal)

def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asShortTColumn(rows, ordinal)

Expand All @@ -78,6 +85,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asFloatTColumn(rows, ordinal)

def toFloatTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asFloatTColumn(rows, ordinal)

def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asDoubleTColumn(rows, ordinal)

Expand All @@ -92,6 +102,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asBooleanTColumnValue(row, ordinal)

def toBooleanTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asBooleanTColumnValue(row, ordinal)

def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asShortTColumnValue(row, ordinal)

Expand All @@ -107,6 +120,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asFloatTColumnValue(row, ordinal)

def toFloatTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asFloatTColumnValue(row, ordinal)

def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asDoubleTColumnValue(row, ordinal)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ abstract class SchemaHelper {
case Types.DECIMAL =>
decimalToTTypeId

case Types.FLOAT =>
floatToTTypeId

case Types.BOOLEAN =>
booleanToTTypeId
Comment on lines +103 to +107
Copy link
Member

@pan3793 pan3793 Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like affects(or not?) all dialects, if so, change it in a seperate PR first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, other dialects don't use this types or use another JDBC types for the same types from db (for instance,Types.BIT for db boolean), so I suggest to left these changes in current PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm


// TODO add more type support
case _ =>
defaultToTTypeId
Expand All @@ -109,6 +115,10 @@ abstract class SchemaHelper {
TTypeId.BOOLEAN_TYPE
}

protected def booleanToTTypeId: TTypeId = {
TTypeId.BOOLEAN_TYPE
}

protected def tinyIntToTTypeId: TTypeId = {
TTypeId.TINYINT_TYPE
}
Expand All @@ -129,6 +139,10 @@ abstract class SchemaHelper {
TTypeId.FLOAT_TYPE
}

protected def floatToTTypeId: TTypeId = {
TTypeId.FLOAT_TYPE
}

protected def doubleToTTypeId: TTypeId = {
TTypeId.DOUBLE_TYPE
}
Expand Down
Loading
Loading