Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5505][FLINK] Support HELP command #5585

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions externals/kyuubi-flink-sql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<scope>provided</scope>
</dependency>

<!-- tests -->
<dependency>
<groupId>org.apache.kyuubi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@

package org.apache.kyuubi.engine.flink.operation

import java.util.Optional

import scala.concurrent.duration.Duration

import org.apache.flink.api.common.JobID
import org.apache.flink.table.api.TableException
import org.apache.flink.table.gateway.api.operation.OperationHandle
import org.apache.flink.table.operations.Operation
import org.apache.flink.table.operations.command.HelpOperation

import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.reflect.{DynConstructors, DynFields, DynMethods}

class ExecuteStatement(
session: Session,
Expand Down Expand Up @@ -59,6 +65,14 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)

val operation = parseExtendedStatement(statement)
if (operation.isPresent && operation.get().isInstanceOf[HelpOperation]) {
resultSet = ResultSetUtil.helpMessageResultSet
setState(OperationState.FINISHED)
return
}

val resultFetcher = executor.executeStatement(
new OperationHandle(getHandle.identifier),
statement)
Expand All @@ -71,4 +85,36 @@ class ExecuteStatement(
shutdownTimeoutMonitor()
}
}

private def parseExtendedStatement(statement: String): Optional[Operation] = {
val plannerModuleClassLoader: ClassLoader = getPlannerModuleClassLoader
val extendedParser: AnyRef =
DynConstructors.builder()
.loader(plannerModuleClassLoader)
.impl("org.apache.flink.table.planner.parse.ExtendedParser")
.build().newInstance()
DynMethods.builder("parse")
.hiddenImpl(extendedParser.getClass, classOf[String])
.buildChecked()
.invoke(extendedParser, statement)
}

private def getPlannerModuleClassLoader: ClassLoader = {
try {
val plannerModule = DynMethods.builder("getInstance")
.hiddenImpl("org.apache.flink.table.planner.loader.PlannerModule")
.buildStaticChecked()
.invoke().asInstanceOf[AnyRef]

DynFields.builder()
.hiddenImpl(plannerModule.getClass, "submoduleClassLoader")
.build[ClassLoader].bind(plannerModule).get
} catch {
case e: Exception =>
throw new TableException(
"Error obtaining Flink planner module ClassLoader. " +
"Make sure a flink-table-planner-loader.jar is on the classpath",
e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.result

import scala.collection.mutable.ListBuffer

import org.apache.flink.util.Preconditions
import org.jline.utils.{AttributedString, AttributedStringBuilder, AttributedStyle}

/**
* Utility class that contains all strings for Flink SQL commands and messages.
*/
object CommandStrings {
private val CMD_DESC_DELIMITER = "\t\t"

private class SQLCommandsDescriptions {
private var commandMaxLength = -1
private val commandsDescriptionList = ListBuffer[(String, String)]()

def commandDescription(command: String, description: String): SQLCommandsDescriptions = {
Preconditions.checkState(
command.nonEmpty,
s"content of command must not be empty.",
Seq(): _*)
Preconditions.checkState(
description.nonEmpty,
s"content of command's description must not be empty.",
Seq(): _*)

updateMaxCommandLength(command.length)
commandsDescriptionList += ((command, description))
this
}

private def updateMaxCommandLength(newLength: Int): Unit = {
Preconditions.checkState(newLength > 0)
if (commandMaxLength < newLength) {
commandMaxLength = newLength
}
}

private def formatDescription(input: String): String = {
val maxLineLength = 160
val newLinePrefix = " " * commandMaxLength + CMD_DESC_DELIMITER
val words = input.split(" ")

val (lastLine, lines) = words.foldLeft(("", List[String]())) {
case ((line, lines), word) =>
val newLine = if (line.isEmpty) word else line + " " + word
if (newLine.length > maxLineLength) (word, lines :+ line) else (newLine, lines)
}

(lines :+ lastLine).mkString("\n" + newLinePrefix)
}

def build(): AttributedString = {
val attributedStringBuilder = new AttributedStringBuilder
if (commandsDescriptionList.nonEmpty) {
commandsDescriptionList.foreach {
case (cmd, cmdDesc) =>
attributedStringBuilder
.style(AttributedStyle.DEFAULT.bold())
.append(cmd.padTo(commandMaxLength, " ").mkString)
.append(CMD_DESC_DELIMITER)
.style(AttributedStyle.DEFAULT)
.append(formatDescription(cmdDesc))
.append('\n')
}
}
attributedStringBuilder.toAttributedString
}
}

// scalastyle:off line.size.limit
val MESSAGE_HELP: AttributedString =
new AttributedStringBuilder()
.append("The following commands are available:\n\n")
.append(COMMANDS_DESCRIPTIONS)
.style(AttributedStyle.DEFAULT.underline())
.append("\nHint")
.style(AttributedStyle.DEFAULT)
.append(
": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.")
// About Documentation Link.
.style(AttributedStyle.DEFAULT)
.append(
"\nThe above list includes only the most frequently used statements.\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.")
.toAttributedString

def COMMANDS_DESCRIPTIONS: AttributedString =
new SQLCommandsDescriptions()
.commandDescription(
"HELP",
"Prints the available commands or the detailed description of a specified command.")
.commandDescription(
"SET",
"Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.")
.commandDescription(
"RESET",
"Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.")
.commandDescription(
"INSERT INTO",
"Inserts the results of a SQL SELECT query into a declared table sink.")
.commandDescription(
"INSERT OVERWRITE",
"Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")
.commandDescription(
"SELECT",
"Executes a SQL SELECT query on the Flink cluster.")
.commandDescription(
"EXPLAIN",
"Describes the execution plan of a query or table with the given name.")
.commandDescription(
"BEGIN STATEMENT SET",
"Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
.commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
.commandDescription(
"ADD JAR",
"Adds the specified jar file to the submitted jobs' classloader. Syntax: \"ADD JAR '<path_to_filename>.jar'\"")
.commandDescription(
"SHOW JARS",
"Shows the list of user-specified jar dependencies. This list is impacted by the ADD JAR commands.")
.commandDescription(
"SHOW CATALOGS",
"Shows the list of registered catalogs.")
.commandDescription(
"SHOW CURRENT CATALOG",
"Shows the name of the current catalog.")
.commandDescription(
"SHOW DATABASES",
"Shows all databases in the current catalog.")
.commandDescription(
"SHOW CURRENT DATABASE",
"Shows the name of the current database.")
.commandDescription(
"SHOW TABLES",
"Shows all tables for an optionally specified database. Syntax: \"SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]\"")
.commandDescription(
"SHOW CREATE TABLE",
"Shows the CREATE TABLE statement that creates the specified table.")
.commandDescription(
"SHOW COLUMNS",
"Shows all columns of a table with the given name. Syntax: \"SHOW COLUMNS ( FROM | IN ) [[catalog_name.]database.]<table_name> [ [NOT] LIKE <sql_like_pattern>]\"")
.commandDescription(
"SHOW VIEWS",
"Shows all views in the current catalog and the current database.")
.commandDescription(
"SHOW CREATE VIEW",
"Shows the CREATE VIEW statement that creates the specified view. Syntax: \"SHOW CREATE VIEW [catalog_name.][db_name.]view_name\"")
.commandDescription(
"SHOW FUNCTIONS",
"Shows all user-defined and built-in functions in the current catalog and current database. Use \"SHOW USER FUNCTIONS\" for listing all user-defined functions in the current catalog and current database.")
.commandDescription(
"SHOW MODULES",
"Shows all enabled module names with resolution order.")
.commandDescription(
"USE CATALOG",
"Sets the current catalog. All subsequent commands that do not explicitly specify a catalog will use this one. If the provided catalog does not exist, an exception is thrown. The default current catalog is default_catalog. Syntax: \"USE CATALOG catalog_name\"")
.commandDescription(
"USE",
"Sets the current database. All subsequent commands that do not explicitly specify a database will use this one. If the provided database does not exist, an exception is thrown. The default current database is default_database. Syntax: \"USE [catalog_name.]database_name\"")
.commandDescription(
"DESC",
"Describes the schema of a table with the given name. Syntax: \"{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name\"")
.commandDescription(
"ANALYZE",
"ANALYZE statements are used to collect statistics for existing tables and store the result to catalog. Only supports in batch mode. Syntax: \"ANALYZE TABLE [catalog_name.][db_name.]table_name PARTITION(partcol1[=val1] [, partcol2[=val2], ...]) COMPUTE STATISTICS [FOR COLUMNS col1 [, col2, ...] | FOR ALL COLUMNS]\"")
.commandDescription(
"ALTER TABLE",
"Renames a table or change a table's properties. Syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name\", the other syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name SET ( key1=val1[, key2=val2, ...] )\"")
.commandDescription(
"ALTER VIEW",
"Renames a given view to a new name within the same catalog and database. Syntax: \"ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name\"")
.commandDescription(
"ALTER DATABASE",
"Changes a database's properties. Syntax: \"ALTER DATABASE [catalog_name.]db_name SET ( key1=val1[, key2=val2, ...] )\"")
.commandDescription(
"ALTER FUNCTION",
"Changes a catalog function with the new identifier and optional language tag. Syntax: \"ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON]\"")
.commandDescription(
"DROP CATALOG",
"Drops a catalog with the given catalog name. Syntax: \"DROP CATALOG [IF EXISTS] catalog_name\"")
.commandDescription(
"DROP DATABASE",
"Drops a database with the given database name. Syntax: \"DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]\"")
.commandDescription(
"DROP TABLE",
"Drops a table with the given table name. Syntax: \"DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name\"")
.commandDescription(
"DROP VIEW",
"Drops a view with the given view name. Syntax: \"DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name\"")
.commandDescription(
"DROP FUNCTION",
"Drops a catalog function with the given function name. Syntax: \"DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name\"")
.commandDescription(
"CREATE CATALOG",
"Creates a catalog with the given catalog properties. Syntax: \"CREATE CATALOG catalog_name WITH ( 'key1'='value1'[, 'key2'='value2', ...] )\"")
.commandDescription(
"CREATE DATABASE",
"Creates a database with the given database properties. Syntax: \"CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name [COMMENT 'database_comment'] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )]\"")
.commandDescription(
"CREATE TABLE",
"Creates a table with the given table properties. Syntax: \"CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { col_name data_type [COMMENT col_comment] [column_constraint] | table_constraint } [,...] ) [COMMENT table_comment] [PARTITIONED BY (col_name, col_name, ...)] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )] \"")
.commandDescription(
"CREATE VIEW",
"Creates a view with the given view expression. Syntax: \"CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name [(column_name [,...])] [COMMENT view_comment] AS query_expression\"")
.commandDescription(
"CREATE FUNCTION",
"Creates a catalog function with the given function properties. Syntax: \"CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]\"")
.commandDescription(
"SHOW JOBS",
"Show the jobs in the Flink cluster. Supports in version 1.17 and later.")
.commandDescription(
"STOP JOB",
"Stop the job with the given job ID. Supports in version 1.17 and later. Syntax: \"STOP JOB '<job_id>' [WITH SAVEPOINT] [WITH DRAIN]\"")
.commandDescription(
"UPDATE",
"Performs row-level updating on the target table. Only supports in batch mode. Supports in version 1.17 and later. Syntax: \"UPDATE [catalog_name.][db_name.]table_name SET col_name1 = col_val1 [, col_name2 = col_val2 ...] [WHERE condition]\"")
.commandDescription(
"DELETE",
"Performs row-level deleting on the target table. Only supports in batch mode. Supports in version 1.17 and later. Syntax: \"DELETE FROM [catalog_name.][db_name.]table_name [WHERE condition]\"")
.commandDescription(
"TRUNCATE TABLE",
"Truncates the target table. Only supports in batch mode. Supports in version 1.18 and later. Syntax: \"TRUNCATE TABLE [catalog_name.][db_name.]table_name\"")
.commandDescription(
"CALL",
"Calls a stored procedure. Supports in version 1.18 and later. Syntax: \"CALL [catalog_name.][database_name.]procedure_name ([ expression [, expression]* ] )\"")
.build()
// scalastyle:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ object ResultSetUtil {
.data(Array[Row](Row.of("OK")))
.build

def helpMessageResultSet: ResultSet =
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical("result", DataTypes.STRING))
.data(Array[Row](Row.of(CommandStrings.MESSAGE_HELP.toString)))
.build

def fromResultFetcher(
resultFetcher: ResultFetcher,
maxRows: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.engine.flink.WithFlinkTestResources
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.result.{CommandStrings, Constants}
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
Expand Down Expand Up @@ -1265,4 +1265,14 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
})
assert(exception.getMessage === "Futures timed out after [60000 milliseconds]")
}

test("execute statement - help") {
withJdbcStatement() { stmt =>
val resultSet = stmt.executeQuery("help")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) === "result")
assert(resultSet.next())
assert(resultSet.getString(1).equals(CommandStrings.MESSAGE_HELP.toString))
}
}
}
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
Expand Down