From 756f5303c171e674c6244d828599197c099b920b Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Wed, 1 Nov 2023 16:53:22 +0800 Subject: [PATCH] [KYUUBI #5464] JDBC Engine supports MySQL --- externals/kyuubi-jdbc-engine/pom.xml | 6 + ...ine.jdbc.connection.JdbcConnectionProvider | 1 + ...che.kyuubi.engine.jdbc.dialect.JdbcDialect | 1 + .../engine/jdbc/dialect/MySQLDialect.scala | 154 +++++++++++ .../jdbc/mysql/MySQLConnectionProvider.scala | 22 ++ .../engine/jdbc/mysql/MySQLRowSetHelper.scala | 69 +++++ .../engine/jdbc/mysql/MySQLSchemaHelper.scala | 21 ++ .../engine/jdbc/schema/RowSetHelper.scala | 14 +- .../jdbc/mysql/MySQLOperationSuite.scala | 253 ++++++++++++++++++ .../jdbc/mysql/OperationWithEngineSuite.scala | 79 ++++++ .../engine/jdbc/mysql/SessionSuite.scala | 39 +++ .../engine/jdbc/mysql/StatementSuite.scala | 95 +++++++ .../engine/jdbc/mysql/WithMySQLEngine.scala | 48 ++++ .../jdbc/mysql/OperationWithServerSuite.scala | 27 ++ .../WithKyuubiServerAndMySQLContainer.scala | 59 ++++ pom.xml | 6 + 16 files changed, 888 insertions(+), 6 deletions(-) create mode 100644 externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala create mode 100644 externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLConnectionProvider.scala create mode 100644 externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLRowSetHelper.scala create mode 100644 externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLSchemaHelper.scala create mode 100644 externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLOperationSuite.scala create mode 100644 externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala create mode 100644 externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/SessionSuite.scala create mode 100644 externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/StatementSuite.scala create mode 100644 externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/WithMySQLEngine.scala create mode 100644 integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/OperationWithServerSuite.scala create mode 100644 integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/WithKyuubiServerAndMySQLContainer.scala diff --git a/externals/kyuubi-jdbc-engine/pom.xml b/externals/kyuubi-jdbc-engine/pom.xml index 69870c27870..0ec905f202d 100644 --- a/externals/kyuubi-jdbc-engine/pom.xml +++ b/externals/kyuubi-jdbc-engine/pom.xml @@ -58,6 +58,12 @@ test + + com.dimafeng + testcontainers-scala-mysql_${scala.binary.version} + test + + org.apache.kyuubi ${hive.jdbc.artifact} diff --git a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider index 326088aae06..35282e7224a 100644 --- a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider +++ b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider @@ -18,3 +18,4 @@ org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider org.apache.kyuubi.engine.jdbc.postgresql.PostgreSQLConnectionProvider +org.apache.kyuubi.engine.jdbc.mysql.MySQLConnectionProvider \ No newline at end of file diff --git a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect index 03cc66e51be..e1a6c5c5967 100644 --- a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect +++ b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect @@ -18,3 +18,4 @@ org.apache.kyuubi.engine.jdbc.dialect.DorisDialect org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect org.apache.kyuubi.engine.jdbc.dialect.PostgreSQLDialect +org.apache.kyuubi.engine.jdbc.dialect.MySQLDialect \ No newline at end of file diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala new file mode 100644 index 00000000000..db31e51c1a1 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.dialect +import java.sql.{Connection, ResultSet, Statement} +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.engine.jdbc.mysql.{MySQLRowSetHelper, MySQLSchemaHelper} +import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper} +import org.apache.kyuubi.operation.Operation +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ +import org.apache.kyuubi.session.Session + +class MySQLDialect extends JdbcDialect { + override def createStatement(connection: Connection, fetchSize: Int): Statement = { + val statement = + connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + statement.setFetchSize(Integer.MIN_VALUE) + statement + } + + override def getTablesQuery( + catalog: String, + schema: String, + tableName: String, + tableTypes: util.List[String]): String = { + val tTypes = + if (tableTypes == null || tableTypes.isEmpty) { + Set("BASE TABLE", "SYSTEM VIEW", "VIEW") + } else { + tableTypes.asScala.toSet + } + val query = new StringBuilder( + s""" + |SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, ENGINE, + |TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH, + |CREATE_TIME, UPDATE_TIME, TABLE_COLLATION, TABLE_COMMENT + |FROM INFORMATION_SCHEMA.TABLES + |""".stripMargin) + + val filters = ArrayBuffer[String]() + if (StringUtils.isNotBlank(catalog)) { + filters += s"$TABLE_CATALOG = '$catalog'" + } + + if (StringUtils.isNotBlank(schema)) { + filters += s"$TABLE_SCHEMA LIKE '$schema'" + } + + if (StringUtils.isNotBlank(tableName)) { + filters += s"$TABLE_NAME LIKE '$tableName'" + } + + if (tTypes.nonEmpty) { + filters += s"(${ + tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" } + .mkString(" OR ") + })" + } + + if (filters.nonEmpty) { + query.append(" WHERE ") + query.append(filters.mkString(" AND ")) + } + + query.toString() + } + + override def getTableTypesOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getColumnsQuery( + session: Session, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String): String = { + val query = new StringBuilder( + """ + |SELECT + |`TABLE_CATALOG`,`TABLE_SCHEMA`,`TABLE_NAME`, `COLUMN_NAME`,`ORDINAL_POSITION`, + |`COLUMN_DEFAULT`,`IS_NULLABLE`,`DATA_TYPE`,`CHARACTER_MAXIMUM_LENGTH`, + |`CHARACTER_OCTET_LENGTH`,`NUMERIC_PRECISION`,`NUMERIC_SCALE`,`DATETIME_PRECISION`, + |`CHARACTER_SET_NAME`,`COLLATION_NAME`,`COLUMN_TYPE`,`COLUMN_KEY`,`EXTRA`,`PRIVILEGES`, + |`COLUMN_COMMENT`,`GENERATION_EXPRESSION` + |FROM information_schema.columns + |""".stripMargin) + + val filters = ArrayBuffer[String]() + if (StringUtils.isNotEmpty(catalogName)) { + filters += s"$TABLE_CATALOG = '$catalogName'" + } + if (StringUtils.isNotEmpty(schemaName)) { + filters += s"$TABLE_SCHEMA LIKE '$schemaName'" + } + if (StringUtils.isNotEmpty(tableName)) { + filters += s"$TABLE_NAME LIKE '$tableName'" + } + if (StringUtils.isNotEmpty(columnName)) { + filters += s"$COLUMN_NAME LIKE '$columnName'" + } + + if (filters.nonEmpty) { + query.append(" WHERE ") + query.append(filters.mkString(" AND ")) + } + + query.toString() + } + + override def getFunctionsOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getPrimaryKeysOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getCrossReferenceOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getRowSetHelper(): RowSetHelper = { + new MySQLRowSetHelper + } + + override def getSchemaHelper(): SchemaHelper = { + new MySQLSchemaHelper + } + + override def name(): String = { + "mysql" + } +} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLConnectionProvider.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLConnectionProvider.scala new file mode 100644 index 00000000000..249ea0c31f6 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLConnectionProvider.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +class MySQLConnectionProvider extends Mysql8ConnectionProvider { + + override val name: String = classOf[MySQLConnectionProvider].getSimpleName +} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLRowSetHelper.scala new file mode 100644 index 00000000000..1c85cb009dd --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLRowSetHelper.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +import java.sql.Types + +import org.apache.hive.service.rpc.thrift.{TColumn, TColumnValue} + +import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper + +class MySQLRowSetHelper extends RowSetHelper { + + override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = + toIntegerTColumn(rows, ordinal) + + override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = + toIntegerTColumn(rows, ordinal) + + override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = + toIntegerTColumnValue(row, ordinal) + + override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = + toIntegerTColumnValue(row, ordinal) + + override protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val colHead = if (rows.isEmpty) None else rows.head(ordinal) + colHead match { + case v: Integer => super.toIntegerTColumn(rows, ordinal) + case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal) + case _ => super.toDefaultTColumn(rows, ordinal, Types.INTEGER) + } + } + + override protected def toIntegerTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + row(ordinal) match { + case v: Integer => super.toIntegerTColumnValue(row, ordinal) + case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal) + case _ => super.toDefaultTColumnValue(row, ordinal, Types.INTEGER) + } + } + + override protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val colHead = if (rows.isEmpty) None else rows.head(ordinal) + colHead match { + case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal) + case _ => super.toDefaultTColumn(rows, ordinal, Types.BIGINT) + } + } + + override protected def toBigIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = + row(ordinal) match { + case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal) + case _ => super.toDefaultTColumnValue(row, ordinal, Types.BIGINT) + } +} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLSchemaHelper.scala new file mode 100644 index 00000000000..b7351b26b3e --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLSchemaHelper.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper + +class MySQLSchemaHelper extends SchemaHelper {} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala index 74b4cec108d..714b3bb7e76 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala @@ -49,7 +49,7 @@ abstract class RowSetHelper { val columnSize = row.size var j = 0 while (j < columnSize) { - val columnValue = toTColumnValue(j, row, columns) + val columnValue = toTColumnValue(j, row, columns(i).sqlType) tRow.addToColVals(columnValue) j += 1 } @@ -110,8 +110,8 @@ abstract class RowSetHelper { } } - protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = { - types(ordinal).sqlType match { + protected def toTColumnValue(ordinal: Int, row: List[Any], sqlType: Int): TColumnValue = { + sqlType match { case Types.BIT => toBitTColumnValue(row, ordinal) @@ -140,7 +140,7 @@ abstract class RowSetHelper { toVarcharTColumnValue(row, ordinal) case _ => - toDefaultTColumnValue(row, ordinal, types) + toDefaultTColumnValue(row, ordinal, sqlType) } } @@ -299,11 +299,11 @@ abstract class RowSetHelper { protected def toDefaultTColumnValue( row: List[Any], ordinal: Int, - types: List[Column]): TColumnValue = { + sqlType: Int): TColumnValue = { val tStrValue = new TStringValue if (row(ordinal) != null) { tStrValue.setValue( - toHiveString(row(ordinal), types(ordinal).sqlType)) + toHiveString(row(ordinal), sqlType)) } TColumnValue.stringVal(tStrValue) } @@ -316,6 +316,8 @@ abstract class RowSetHelper { formatLocalDateTime(dateTime) case (decimal: java.math.BigDecimal, Types.DECIMAL) => decimal.toPlainString + case (bigint: java.math.BigInteger, Types.BIGINT) => + bigint.toString() case (other, _) => other.toString } diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLOperationSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLOperationSuite.scala new file mode 100644 index 00000000000..ffd7c0a0fe8 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLOperationSuite.scala @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +import java.sql.ResultSet + +import scala.collection.mutable.ArrayBuffer + +import org.apache.kyuubi.operation.HiveJDBCTestHelper +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ + +abstract class MySQLOperationSuite extends WithMySQLEngine with HiveJDBCTestHelper { + test("mysql - get tables") { + case class Table(catalog: String, schema: String, tableName: String, tableType: String) + + withJdbcStatement() { statement => + val meta = statement.getConnection.getMetaData + val resultBuffer = ArrayBuffer[Table]() + + var tables = meta.getTables(null, null, null, null) + while (tables.next()) { + resultBuffer += + Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table("def", "information_schema", "TABLES", "SYSTEM VIEW"))) + assert(resultBuffer.contains(Table("def", "information_schema", "VIEWS", "SYSTEM VIEW"))) + resultBuffer.clear() + + statement.execute("create database if not exists db1") + statement.execute("create table db1.test1(id bigint)" + + "ENGINE=InnoDB DEFAULT CHARSET=utf8;") + statement.execute("create table db1.test2(id bigint)" + + "ENGINE=InnoDB DEFAULT CHARSET=utf8;") + + statement.execute("create database if not exists db2") + statement.execute("create table db2.test1(id bigint)" + + "ENGINE=InnoDB DEFAULT CHARSET=utf8;") + statement.execute("create table db2.test2(id bigint)" + + "ENGINE=InnoDB DEFAULT CHARSET=utf8;") + + statement.execute("create view db1.view1 (k1) as select id from db1.test1") + + tables = meta.getTables(null, "db1", "test1", Array("BASE TABLE")) + while (tables.next()) { + val table = Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + assert(table == Table("def", "db1", "test1", "BASE TABLE")) + } + + tables = meta.getTables("def", "db1", null, null) + while (tables.next()) { + resultBuffer += Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table("def", "db1", "test2", "BASE TABLE"))) + resultBuffer.clear() + + tables = meta.getTables(null, null, "test1", null) + while (tables.next()) { + resultBuffer += Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table("def", "db1", "test1", "BASE TABLE"))) + assert(resultBuffer.contains(Table("def", "db2", "test1", "BASE TABLE"))) + resultBuffer.clear() + + tables = meta.getTables(null, "db%", "test1", null) + while (tables.next()) { + resultBuffer += Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table("def", "db1", "test1", "BASE TABLE"))) + assert(resultBuffer.contains(Table("def", "db2", "test1", "BASE TABLE"))) + resultBuffer.clear() + + tables = meta.getTables(null, "db2", "test%", null) + while (tables.next()) { + resultBuffer += Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table("def", "db2", "test1", "BASE TABLE"))) + assert(resultBuffer.contains(Table("def", "db2", "test2", "BASE TABLE"))) + resultBuffer.clear() + + tables = meta.getTables(null, "fake_db", "test1", null) + assert(!tables.next()) + + tables = meta.getTables(null, "db1", null, Array("VIEW")) + while (tables.next()) { + val table = Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + assert(table == Table("def", "db1", "view1", "VIEW")) + } + + tables = meta.getTables(null, null, null, Array("VIEW", "BASE TABLE")) + while (tables.next()) { + resultBuffer += Table( + tables.getString(TABLE_CATALOG), + tables.getString(TABLE_SCHEMA), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table("def", "db1", "test1", "BASE TABLE"))) + assert(resultBuffer.contains(Table("def", "db1", "test2", "BASE TABLE"))) + assert(resultBuffer.contains(Table("def", "db2", "test1", "BASE TABLE"))) + assert(resultBuffer.contains(Table("def", "db2", "test2", "BASE TABLE"))) + assert(resultBuffer.contains(Table("def", "db1", "view1", "VIEW"))) + resultBuffer.clear() + + statement.execute("drop view db1.view1") + statement.execute("drop table db1.test1") + statement.execute("drop table db1.test2") + statement.execute("drop table db2.test1") + statement.execute("drop table db2.test2") + statement.execute("drop database db1") + statement.execute("drop database db2") + } + } + + test("mysql - get columns") { + case class Column(tableSchema: String, tableName: String, columnName: String) + + def buildColumn(resultSet: ResultSet): Column = { + val schema = resultSet.getString(TABLE_SCHEMA) + val tableName = resultSet.getString(TABLE_NAME) + val columnName = resultSet.getString(COLUMN_NAME) + val column = Column(schema, tableName, columnName) + column + } + + withJdbcStatement() { statement => + val metadata = statement.getConnection.getMetaData + statement.execute("create database if not exists db1") + statement.execute("create table if not exists db1.test1" + + "(id bigint, str1 varchar(255), str2 varchar(255), age int)" + + "ENGINE=InnoDB DEFAULT CHARSET=utf8;") + statement.execute("create table if not exists db1.test2" + + "(id bigint, str1 varchar(255), str2 varchar(255), age int)" + + "ENGINE=InnoDB DEFAULT CHARSET=utf8;") + + statement.execute("create database if not exists db2") + + statement.execute("create table if not exists db2.test1" + + "(id bigint, str1 varchar(255), str2 varchar(255), age int)" + + "ENGINE=InnoDB DEFAULT CHARSET=utf8;") + + val resultBuffer = ArrayBuffer[Column]() + val resultSet1 = metadata.getColumns(null, "db1", null, null) + while (resultSet1.next()) { + val column = buildColumn(resultSet1) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("db1", "test1", "id"))) + assert(resultBuffer.contains(Column("db1", "test1", "str1"))) + assert(resultBuffer.contains(Column("db1", "test1", "str2"))) + assert(resultBuffer.contains(Column("db1", "test1", "age"))) + + assert(resultBuffer.contains(Column("db1", "test2", "id"))) + assert(resultBuffer.contains(Column("db1", "test2", "str1"))) + assert(resultBuffer.contains(Column("db1", "test2", "str2"))) + assert(resultBuffer.contains(Column("db1", "test2", "age"))) + + resultBuffer.clear() + + val resultSet2 = metadata.getColumns(null, null, "test1", null) + while (resultSet2.next()) { + val column = buildColumn(resultSet2) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("db1", "test1", "id"))) + assert(resultBuffer.contains(Column("db1", "test1", "str1"))) + assert(resultBuffer.contains(Column("db1", "test1", "str2"))) + assert(resultBuffer.contains(Column("db1", "test1", "age"))) + + assert(resultBuffer.contains(Column("db2", "test1", "id"))) + assert(resultBuffer.contains(Column("db2", "test1", "str1"))) + assert(resultBuffer.contains(Column("db2", "test1", "str2"))) + assert(resultBuffer.contains(Column("db2", "test1", "age"))) + + resultBuffer.clear() + + val resultSet3 = metadata.getColumns(null, null, null, "age") + while (resultSet3.next()) { + val column = buildColumn(resultSet3) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("db1", "test1", "age"))) + assert(resultBuffer.contains(Column("db1", "test2", "age"))) + assert(resultBuffer.contains(Column("db2", "test1", "age"))) + + resultBuffer.clear() + + val resultSet4 = metadata.getColumns(null, "d%1", "t%1", "str%") + while (resultSet4.next()) { + val column = buildColumn(resultSet4) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("db1", "test1", "str1"))) + assert(resultBuffer.contains(Column("db1", "test1", "str2"))) + + resultBuffer.clear() + + val resultSet5 = metadata.getColumns(null, "d%1", "t%1", "fake") + assert(!resultSet5.next()) + + statement.execute("drop table db1.test1") + statement.execute("drop table db1.test2") + statement.execute("drop database db1") + statement.execute("drop table db2.test1") + statement.execute("drop database db2") + } + } +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala new file mode 100644 index 00000000000..4cf76427d60 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelper { + + override protected def jdbcUrl: String = jdbcConnectionUrl + + test("Test for Jdbc engine getInfo") { + val metaData = ConnectionProvider.create(kyuubiConf).getMetaData + + withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() { + withSessionHandle { (client, handle) => + val req = new TGetInfoReq() + req.setSessionHandle(handle) + req.setInfoType(TGetInfoType.CLI_DBMS_NAME) + assert(client.GetInfo(req).getInfoValue.getStringValue == metaData.getDatabaseProductName) + + val req2 = new TGetInfoReq() + req2.setSessionHandle(handle) + req2.setInfoType(TGetInfoType.CLI_DBMS_VER) + assert( + client.GetInfo(req2).getInfoValue.getStringValue == metaData.getDatabaseProductVersion) + + val req3 = new TGetInfoReq() + req3.setSessionHandle(handle) + req3.setInfoType(TGetInfoType.CLI_MAX_COLUMN_NAME_LEN) + assert(client.GetInfo(req3).getInfoValue.getLenValue == metaData.getMaxColumnNameLength) + + val req4 = new TGetInfoReq() + req4.setSessionHandle(handle) + req4.setInfoType(TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN) + assert(client.GetInfo(req4).getInfoValue.getLenValue == metaData.getMaxSchemaNameLength) + + val req5 = new TGetInfoReq() + req5.setSessionHandle(handle) + req5.setInfoType(TGetInfoType.CLI_MAX_TABLE_NAME_LEN) + assert(client.GetInfo(req5).getInfoValue.getLenValue == metaData.getMaxTableNameLength) + } + } + } + + test("JDBC ExecuteStatement operation should contain operationLog") { + withSessionHandle { (client, handle) => + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle(handle) + tExecuteStatementReq.setStatement("SELECT 1") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) + tFetchResultsReq.setFetchType(1) + tFetchResultsReq.setMaxRows(1) + + val tFetchResultsResp = client.FetchResults(tFetchResultsReq) + assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + } + } +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/SessionSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/SessionSuite.scala new file mode 100644 index 00000000000..65107603d77 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/SessionSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class SessionSuite extends WithMySQLEngine with HiveJDBCTestHelper { + + test("test session") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery( + "select '1' as id") + val metadata = resultSet.getMetaData + for (i <- 1 to metadata.getColumnCount) { + assert(metadata.getColumnName(i) == "id") + } + while (resultSet.next()) { + val id = resultSet.getObject(1) + assert(id == "1") + } + } + } + + override protected def jdbcUrl: String = jdbcConnectionUrl +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/StatementSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/StatementSuite.scala new file mode 100644 index 00000000000..56ae737fc80 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/StatementSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +import java.sql.{Date, Timestamp} + +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class StatementSuite extends WithMySQLEngine with HiveJDBCTestHelper { + + test("test select") { + withJdbcStatement("test1") { statement => + statement.execute("create database if not exists db1") + statement.execute("use db1") + statement.execute("create table db1.test1(id bigint, name varchar(255), age int, " + + "PRIMARY KEY ( `id` ))" + + "ENGINE=InnoDB " + + "DEFAULT CHARSET=utf8;") + statement.execute("insert into db1.test1 values(1, 'a', 11)") + + val resultSet1 = statement.executeQuery("select * from db1.test1") + while (resultSet1.next()) { + val id = resultSet1.getObject(1) + assert(id == 1) + val name = resultSet1.getObject(2) + assert(name == "a") + val age = resultSet1.getObject(3) + assert(age == 11) + } + } + } + + test("test types") { + withJdbcStatement("test1") { statement => + statement.execute("create database if not exists db1") + statement.execute("use db1") + statement.execute("create table db1.type_test(" + + "id bigint, " + + "tiny_col tinyint, smallint_col smallint, " + + "int_col int, bigint_col bigint, " + + "decimal_col decimal(27, 9)," + + "date_col date, datetime_col datetime, timestamp_col timestamp," + + "char_col char, varchar_col varchar(255), " + + "boolean_col boolean, " + + "double_col double, float_col float," + + "PRIMARY KEY ( `id` )) " + + "ENGINE=InnoDB " + + "DEFAULT CHARSET=utf8") + statement.execute("insert into db1.type_test" + + "(id, " + + "tiny_col, smallint_col, int_col, bigint_col, " + + "decimal_col, " + + "date_col, datetime_col, timestamp_col," + + "char_col, varchar_col, " + + "boolean_col, " + + "double_col, float_col) " + + "VALUES (1, 2, 3, 4, 5, 6.6, '2023-10-23', '2023-10-23 15:31:45', " + + "'2023-10-23 15:31:45', 'a', 'Hello', true, 7.7, 8.8)") + + val resultSet1 = statement.executeQuery("select * from db1.type_test") + while (resultSet1.next()) { + assert(resultSet1.getObject(1) == 1) + assert(resultSet1.getObject(2) == 2) + assert(resultSet1.getObject(3) == 3) + assert(resultSet1.getObject(4) == 4) + assert(resultSet1.getObject(5) == 5) + assert(resultSet1.getObject(6) == new java.math.BigDecimal("6.600000000")) + assert(resultSet1.getObject(7) == Date.valueOf("2023-10-23")) + assert(resultSet1.getObject(8) == Timestamp.valueOf("2023-10-23 15:31:45")) + assert(resultSet1.getObject(9) == Timestamp.valueOf("2023-10-23 15:31:45")) + assert(resultSet1.getObject(10) == "a") + assert(resultSet1.getObject(11) == "Hello") + assert(resultSet1.getObject(12) == true) + assert(resultSet1.getObject(13) == 7.7) + assert(resultSet1.getObject(14) == 8.8) + } + } + } + + override protected def jdbcUrl: String = jdbcConnectionUrl +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/WithMySQLEngine.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/WithMySQLEngine.scala new file mode 100644 index 00000000000..d5ff3f66e51 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/WithMySQLEngine.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.mysql + +import com.dimafeng.testcontainers.MySQLContainer +import com.dimafeng.testcontainers.scalatest.TestContainerForAll +import org.testcontainers.utility.DockerImageName + +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.jdbc.WithJdbcEngine + +trait WithMySQLEngine extends WithJdbcEngine with TestContainerForAll { + + private val mysqlDockerImage = "mysql:8.0.32" + + override val containerDef = MySQLContainer.Def( + dockerImageName = DockerImageName.parse(mysqlDockerImage), + username = "root", + password = "kyuubi") + + override def withKyuubiConf: Map[String, String] = withContainers { mysqlContainer => + Map( + ENGINE_SHARE_LEVEL.key -> "SERVER", + ENGINE_JDBC_CONNECTION_URL.key -> mysqlContainer.jdbcUrl, + ENGINE_JDBC_CONNECTION_USER.key -> "root", + ENGINE_JDBC_CONNECTION_PASSWORD.key -> "kyuubi", + ENGINE_TYPE.key -> "jdbc", + ENGINE_JDBC_SHORT_NAME.key -> "mysql", + ENGINE_JDBC_DRIVER_CLASS.key -> "com.mysql.cj.jdbc.Driver", + ENGINE_JDBC_CONNECTION_PROVIDER.key -> + "MySQLConnectionProvider") + } + +} diff --git a/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/OperationWithServerSuite.scala b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/OperationWithServerSuite.scala new file mode 100644 index 00000000000..263de3d1528 --- /dev/null +++ b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/OperationWithServerSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.it.jdbc.mysql + +import org.apache.kyuubi.engine.jdbc.mysql.MySQLOperationSuite + +class OperationWithServerSuite extends MySQLOperationSuite + with WithKyuubiServerAndMySQLContainer { + + override protected def jdbcUrl: String = getJdbcUrl + +} diff --git a/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/WithKyuubiServerAndMySQLContainer.scala b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/WithKyuubiServerAndMySQLContainer.scala new file mode 100644 index 00000000000..da94df8e799 --- /dev/null +++ b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/mysql/WithKyuubiServerAndMySQLContainer.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.it.jdbc.mysql + +import java.nio.file.{Files, Path, Paths} + +import org.apache.kyuubi.{Utils, WithKyuubiServer} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_EXTRA_CLASSPATH, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME} +import org.apache.kyuubi.engine.jdbc.mysql.WithMySQLEngine + +trait WithKyuubiServerAndMySQLContainer extends WithKyuubiServer with WithMySQLEngine { + + private val kyuubiHome: String = Utils + .getCodeSourceLocation(getClass).split("integration-tests").head + + private val mysqlJdbcConnectorPath: String = { + val keyword = "mysql-connector" + + val jarsDir = Paths.get(kyuubiHome) + .resolve("integration-tests") + .resolve("kyuubi-jdbc-it") + .resolve("target") + + Files.list(jarsDir) + .filter { p: Path => p.getFileName.toString contains keyword } + .findFirst + .orElseThrow { () => new IllegalStateException(s"Can not find $keyword in $jarsDir.") } + .toAbsolutePath + .toString + } + + override protected val conf: KyuubiConf = { + KyuubiConf() + .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome) + .set(ENGINE_JDBC_EXTRA_CLASSPATH, mysqlJdbcConnectorPath) + } + + override def beforeAll(): Unit = { + val configs = withKyuubiConf + configs.foreach(config => conf.set(config._1, config._2)) + super.beforeAll() + } +} diff --git a/pom.xml b/pom.xml index 0917dfd217e..e15b4370146 100644 --- a/pom.xml +++ b/pom.xml @@ -566,6 +566,12 @@ ${testcontainers-scala.version} + + com.dimafeng + testcontainers-scala-mysql_${scala.binary.version} + ${testcontainers-scala.version} + + com.dimafeng testcontainers-scala-trino_${scala.binary.version}