Skip to content

Commit

Permalink
[KYUUBI #5464] JDBC Engine supports MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjunbo committed Nov 9, 2023
1 parent aa71e08 commit 756f530
Show file tree
Hide file tree
Showing 16 changed files with 888 additions and 6 deletions.
6 changes: 6 additions & 0 deletions externals/kyuubi-jdbc-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-mysql_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>${hive.jdbc.artifact}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider
org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider
org.apache.kyuubi.engine.jdbc.postgresql.PostgreSQLConnectionProvider
org.apache.kyuubi.engine.jdbc.mysql.MySQLConnectionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
org.apache.kyuubi.engine.jdbc.dialect.DorisDialect
org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect
org.apache.kyuubi.engine.jdbc.dialect.PostgreSQLDialect
org.apache.kyuubi.engine.jdbc.dialect.MySQLDialect
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
import java.sql.{Connection, ResultSet, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.mysql.{MySQLRowSetHelper, MySQLSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class MySQLDialect extends JdbcDialect {
override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)
statement
}

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
Set("BASE TABLE", "SYSTEM VIEW", "VIEW")
} else {
tableTypes.asScala.toSet
}
val query = new StringBuilder(
s"""
|SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, ENGINE,
|TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH,
|CREATE_TIME, UPDATE_TIME, TABLE_COLLATION, TABLE_COMMENT
|FROM INFORMATION_SCHEMA.TABLES
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"$TABLE_CATALOG = '$catalog'"
}

if (StringUtils.isNotBlank(schema)) {
filters += s"$TABLE_SCHEMA LIKE '$schema'"
}

if (StringUtils.isNotBlank(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}

if (tTypes.nonEmpty) {
filters += s"(${
tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" }
.mkString(" OR ")
})"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
val query = new StringBuilder(
"""
|SELECT
|`TABLE_CATALOG`,`TABLE_SCHEMA`,`TABLE_NAME`, `COLUMN_NAME`,`ORDINAL_POSITION`,
|`COLUMN_DEFAULT`,`IS_NULLABLE`,`DATA_TYPE`,`CHARACTER_MAXIMUM_LENGTH`,
|`CHARACTER_OCTET_LENGTH`,`NUMERIC_PRECISION`,`NUMERIC_SCALE`,`DATETIME_PRECISION`,
|`CHARACTER_SET_NAME`,`COLLATION_NAME`,`COLUMN_TYPE`,`COLUMN_KEY`,`EXTRA`,`PRIVILEGES`,
|`COLUMN_COMMENT`,`GENERATION_EXPRESSION`
|FROM information_schema.columns
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotEmpty(catalogName)) {
filters += s"$TABLE_CATALOG = '$catalogName'"
}
if (StringUtils.isNotEmpty(schemaName)) {
filters += s"$TABLE_SCHEMA LIKE '$schemaName'"
}
if (StringUtils.isNotEmpty(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
if (StringUtils.isNotEmpty(columnName)) {
filters += s"$COLUMN_NAME LIKE '$columnName'"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getRowSetHelper(): RowSetHelper = {
new MySQLRowSetHelper
}

override def getSchemaHelper(): SchemaHelper = {
new MySQLSchemaHelper
}

override def name(): String = {
"mysql"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.mysql

class MySQLConnectionProvider extends Mysql8ConnectionProvider {

override val name: String = classOf[MySQLConnectionProvider].getSimpleName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.mysql

import java.sql.Types

import org.apache.hive.service.rpc.thrift.{TColumn, TColumnValue}

import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper

class MySQLRowSetHelper extends RowSetHelper {

override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

override protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case v: Integer => super.toIntegerTColumn(rows, ordinal)
case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.INTEGER)
}
}

override protected def toIntegerTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
row(ordinal) match {
case v: Integer => super.toIntegerTColumnValue(row, ordinal)
case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.INTEGER)
}
}

override protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.BIGINT)
}
}

override protected def toBigIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
row(ordinal) match {
case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.BIGINT)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.mysql

import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper

class MySQLSchemaHelper extends SchemaHelper {}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class RowSetHelper {
val columnSize = row.size
var j = 0
while (j < columnSize) {
val columnValue = toTColumnValue(j, row, columns)
val columnValue = toTColumnValue(j, row, columns(i).sqlType)
tRow.addToColVals(columnValue)
j += 1
}
Expand Down Expand Up @@ -110,8 +110,8 @@ abstract class RowSetHelper {
}
}

protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
types(ordinal).sqlType match {
protected def toTColumnValue(ordinal: Int, row: List[Any], sqlType: Int): TColumnValue = {
sqlType match {
case Types.BIT =>
toBitTColumnValue(row, ordinal)

Expand Down Expand Up @@ -140,7 +140,7 @@ abstract class RowSetHelper {
toVarcharTColumnValue(row, ordinal)

case _ =>
toDefaultTColumnValue(row, ordinal, types)
toDefaultTColumnValue(row, ordinal, sqlType)
}
}

Expand Down Expand Up @@ -299,11 +299,11 @@ abstract class RowSetHelper {
protected def toDefaultTColumnValue(
row: List[Any],
ordinal: Int,
types: List[Column]): TColumnValue = {
sqlType: Int): TColumnValue = {
val tStrValue = new TStringValue
if (row(ordinal) != null) {
tStrValue.setValue(
toHiveString(row(ordinal), types(ordinal).sqlType))
toHiveString(row(ordinal), sqlType))
}
TColumnValue.stringVal(tStrValue)
}
Expand All @@ -316,6 +316,8 @@ abstract class RowSetHelper {
formatLocalDateTime(dateTime)
case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
decimal.toPlainString
case (bigint: java.math.BigInteger, Types.BIGINT) =>
bigint.toString()
case (other, _) =>
other.toString
}
Expand Down
Loading

0 comments on commit 756f530

Please sign in to comment.