Skip to content

Commit

Permalink
[KYUUBI apache#5906] [JDBC] Rebase Doris dialect implementation
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

As decribed.

## Describe Your Solution 🔧

As Doris is MySQL protocol compatible , use MySQLDialect as base class to simplify Doris dialect implementation

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️
No behaviour changes.

#### Behavior With This Pull Request 🎉
No behaviour changes.

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes apache#5906 from bowenliang123/jdbc-doris-simp.

Closes apache#5906

2e1420a [liangbowen] skip
e095d28 [liangbowen] Revert "simplify MySQLTRowSetGenerator"
7d59e2d [liangbowen] simplify MySQLTRowSetGenerator
9214388 [Bowen Liang] simplify StarRocksSchemaHelper
5f91042 [Bowen Liang] simplify DorisSchemaHelper
b90f26d [Bowen Liang] simplify DorisSchemaHelper
45ccdfe [Bowen Liang] update
5b4a2c2 [Bowen Liang] import
f7fdd35 [Bowen Liang] simplify DorisDialect

Lead-authored-by: Bowen Liang <[email protected]>
Co-authored-by: liangbowen <[email protected]>
Signed-off-by: liangbowen <[email protected]>
  • Loading branch information
bowenliang123 committed Dec 22, 2023
1 parent 97a0ba4 commit 86b88b7
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,118 +15,15 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
import java.sql.{Connection, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.engine.jdbc.doris.{DorisSchemaHelper, DorisTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class DorisDialect extends JdbcDialect {

override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement = super.createStatement(connection, fetchSize)
statement.setFetchSize(Integer.MIN_VALUE)
statement
}

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
Set("BASE TABLE", "SYSTEM VIEW", "VIEW")
} else {
tableTypes.asScala.toSet
}
val query = new StringBuilder(
s"""
|SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, ENGINE,
|TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH,
|CREATE_TIME, UPDATE_TIME, TABLE_COLLATION, TABLE_COMMENT
|FROM INFORMATION_SCHEMA.TABLES
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"$TABLE_CATALOG = '$catalog'"
}

if (StringUtils.isNotBlank(schema)) {
filters += s"$TABLE_SCHEMA LIKE '$schema'"
}

if (StringUtils.isNotBlank(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
class DorisDialect extends MySQLDialect {

if (tTypes.nonEmpty) {
filters += s"(${tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" }
.mkString(" OR ")})"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
val query = new StringBuilder(
"""
|SELECT
|`TABLE_CATALOG`,`TABLE_SCHEMA`,`TABLE_NAME`, `COLUMN_NAME`,`ORDINAL_POSITION`,
|`COLUMN_DEFAULT`,`IS_NULLABLE`,`DATA_TYPE`,`CHARACTER_MAXIMUM_LENGTH`,
|`CHARACTER_OCTET_LENGTH`,`NUMERIC_PRECISION`,`NUMERIC_SCALE`,`DATETIME_PRECISION`,
|`CHARACTER_SET_NAME`,`COLLATION_NAME`,`COLUMN_TYPE`,`COLUMN_KEY`,`EXTRA`,`PRIVILEGES`,
|`COLUMN_COMMENT`,`COLUMN_SIZE`,`DECIMAL_DIGITS`,`GENERATION_EXPRESSION`,`SRS_ID`
|FROM information_schema.columns
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotEmpty(catalogName)) {
filters += s"$TABLE_CATALOG = '$catalogName'"
}
if (StringUtils.isNotEmpty(schemaName)) {
filters += s"$TABLE_SCHEMA LIKE '$schemaName'"
}
if (StringUtils.isNotEmpty(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
if (StringUtils.isNotEmpty(columnName)) {
filters += s"$COLUMN_NAME LIKE '$columnName'"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}
override def name(): String = "doris"

override def getTRowSetGenerator(): JdbcTRowSetGenerator = new DorisTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = {
new DorisSchemaHelper
}

override def name(): String = {
"doris"
}
override def getSchemaHelper(): SchemaHelper = new DorisSchemaHelper
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
*/
package org.apache.kyuubi.engine.jdbc.doris

import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.jdbc.mysql.MySQLSchemaHelper

class DorisSchemaHelper extends SchemaHelper {

override def tinyIntToTTypeId: TTypeId = TTypeId.INT_TYPE

override def smallIntToTTypeId: TTypeId = TTypeId.INT_TYPE
}
class DorisSchemaHelper extends MySQLSchemaHelper {}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
case (date: Date, DATE) => formatDate(date)
case (dateTime: LocalDateTime, TIMESTAMP) => formatLocalDateTime(dateTime)
case (decimal: java.math.BigDecimal, DECIMAL) => decimal.toPlainString
case (bigint: java.math.BigInteger, BIGINT) => bigint.toString()
case (other, _) => other.toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
*/
package org.apache.kyuubi.engine.jdbc.starrocks

import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.jdbc.mysql.MySQLSchemaHelper

class StarRocksSchemaHelper extends SchemaHelper {

override def tinyIntToTTypeId: TTypeId = TTypeId.INT_TYPE

override def smallIntToTTypeId: TTypeId = TTypeId.INT_TYPE
}
class StarRocksSchemaHelper extends MySQLSchemaHelper {}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ import org.apache.kyuubi.engine.jdbc.WithJdbcServerContainer

trait WithDorisContainer extends WithJdbcServerContainer {

private val DORIS_FE_PORT = 9030

private val DORIS_BE_PORT = 8040
private val DORIS_FE_MYSQL_PORT = 9030
private val DORIS_BE_HTTTP_PORT = 8040

private val DORIS_FE_SERVICE_NAME = "doris-fe"

private val DORIS_BE_SERVICE_NAME = "doris-be"

override val containerDef: DockerComposeContainer.Def =
Expand All @@ -43,19 +41,18 @@ trait WithDorisContainer extends WithJdbcServerContainer {
exposedServices = Seq[ExposedService](
ExposedService(
DORIS_FE_SERVICE_NAME,
DORIS_FE_PORT,
DORIS_FE_MYSQL_PORT,
waitStrategy =
new DockerHealthcheckWaitStrategy().withStartupTimeout(Duration.ofMinutes(5))),
ExposedService(
DORIS_BE_SERVICE_NAME,
DORIS_BE_PORT,
DORIS_BE_HTTTP_PORT,
waitStrategy =
new DockerHealthcheckWaitStrategy().withStartupTimeout(Duration.ofMinutes(5)))))

protected def feUrl: String = withContainers { container =>
val feHost: String = container.getServiceHost(DORIS_FE_SERVICE_NAME, DORIS_FE_PORT)
val fePort: Int = container.getServicePort(DORIS_FE_SERVICE_NAME, DORIS_FE_PORT)
val url = s"$feHost:$fePort"
url
protected def feJdbcUrl: String = withContainers { container =>
val feHost: String = container.getServiceHost(DORIS_FE_SERVICE_NAME, DORIS_FE_MYSQL_PORT)
val fePort: Int = container.getServicePort(DORIS_FE_SERVICE_NAME, DORIS_FE_MYSQL_PORT)
s"jdbc:mysql://$feHost:$fePort"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait WithDorisEngine extends WithJdbcEngine with WithDorisContainer {

override def withKyuubiConf: Map[String, String] = Map(
ENGINE_SHARE_LEVEL.key -> "SERVER",
ENGINE_JDBC_CONNECTION_URL.key -> s"jdbc:mysql://$feUrl",
ENGINE_JDBC_CONNECTION_URL.key -> feJdbcUrl,
ENGINE_JDBC_CONNECTION_USER.key -> "root",
ENGINE_JDBC_CONNECTION_PASSWORD.key -> "",
ENGINE_TYPE.key -> "jdbc",
Expand Down

0 comments on commit 86b88b7

Please sign in to comment.