Skip to content

Commit

Permalink
[KYUUBI #6249] Drop support for Flink 1.16.
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

This pull request fixes #6249.

## Describe Your Solution 🔧

According to the plan, we will no longer support Flink 1.16, and this PR will try to remove Flink 1.16.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

Pass GHA

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6259 from slfan1989/drop_flink1.16_support.

Closes #6249

808acc2 [Cheng Pan] Update docs/deployment/migration-guide.md
fc5ecf6 [Shilun Fan] [KYUUBI #6249] Fix CheckStyle.
8d8f9de [Shilun Fan] [KYUUBI #6249] Fix CheckStyle.
7a1d974 [Shilun Fan] [KYUUBI #6249] Drop support for Flink 1.16.

Lead-authored-by: Shilun Fan <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
slfan1989 and pan3793 committed Apr 6, 2024
1 parent f4504ea commit a632edc
Show file tree
Hide file tree
Showing 11 changed files with 19 additions and 134 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ jobs:
flink-archive: [ "" ]
comment: [ "normal" ]
include:
- java: 8
flink: '1.17'
flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.16.3 -Dflink.archive.name=flink-1.16.3-bin-scala_2.12.tgz'
comment: 'verify-on-flink-1.16-binary'
- java: 8
flink: '1.17'
flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.18.1 -Dflink.archive.name=flink-1.18.1-bin-scala_2.12.tgz'
Expand Down
1 change: 1 addition & 0 deletions docs/deployment/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
## Upgrading from Kyuubi 1.9 to 1.10

* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.
* Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed.

## Upgrading from Kyuubi 1.8 to 1.9

Expand Down
2 changes: 1 addition & 1 deletion docs/quick_start/quick_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
**Spark** Engine 3.1 to 3.5 A Spark distribution
**Flink** Engine 1.16 to 1.19 A Flink distribution
**Flink** Engine 1.17 to 1.19 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
**Doris** Engine N/A A Doris cluster
**Hive** Engine - 2.1-cdh6/2.3/3.1 - A Hive distribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import java.lang.{Boolean => JBoolean}
import java.net.URL
import java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}

import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions._

import org.apache.commons.cli.{CommandLine, DefaultParser, Options}
import org.apache.flink.api.common.JobID
import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI, GenericCLI}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.util.EnvironmentInformation
Expand All @@ -41,25 +39,21 @@ import org.apache.flink.util.JarUtils

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.util.SemanticVersion
import org.apache.kyuubi.util.reflect._
import org.apache.kyuubi.util.reflect.ReflectUtils._

object FlinkEngineUtils extends Logging {

val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options)

private def SUPPORTED_FLINK_VERSIONS =
Set("1.16", "1.17", "1.18", "1.19").map(SemanticVersion.apply)
Set("1.17", "1.18", "1.19").map(SemanticVersion.apply)

val FLINK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(EnvironmentInformation.getVersion)

def checkFlinkVersion(): Unit = {
val flinkVersion = EnvironmentInformation.getVersion
if (SUPPORTED_FLINK_VERSIONS.contains(FLINK_RUNTIME_VERSION)) {
info(s"The current Flink version is $flinkVersion")
if (FlinkEngineUtils.FLINK_RUNTIME_VERSION === "1.16") {
warn("The support for Flink 1.16 is deprecated, and will be removed in the next version.")
}
} else {
throw new UnsupportedOperationException(
s"You are using unsupported Flink version $flinkVersion, " +
Expand Down Expand Up @@ -127,17 +121,6 @@ object FlinkEngineUtils extends Logging {
(classOf[JList[URL]], dependencies),
(classOf[Boolean], JBoolean.TRUE),
(classOf[Boolean], JBoolean.FALSE))
} else if (FLINK_RUNTIME_VERSION === "1.16") {
val commandLines: JList[CustomCommandLine] =
Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
DynConstructors.builder()
.impl(
classOf[DefaultContext],
classOf[Configuration],
classOf[JList[CustomCommandLine]])
.build()
.newInstance(flinkConf, commandLines)
.asInstanceOf[DefaultContext]
} else {
throw new KyuubiException(
s"Flink version ${EnvironmentInformation.getVersion} are not supported currently.")
Expand All @@ -147,9 +130,6 @@ object FlinkEngineUtils extends Logging {
def getSessionContext(session: Session): SessionContext = getField(session, "sessionContext")

def getResultJobId(resultFetch: ResultFetcher): Option[JobID] = {
if (FLINK_RUNTIME_VERSION <= "1.16") {
return None
}
try {
Option(getField[JobID](resultFetch, "jobID"))
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.ResolvedSchema
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.conversion.DataStructureConverters
import org.apache.flink.table.gateway.api.results.ResultSet.ResultType
import org.apache.flink.table.gateway.service.result.ResultFetcher
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row

import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
import org.apache.kyuubi.operation.FetchIterator
import org.apache.kyuubi.util.reflect.DynFields
Expand All @@ -60,12 +60,10 @@ class IncrementalResultFetchIterator(

val FETCH_INTERVAL_MS: Long = 1000

// for Flink 1.16 and below, isQueryResult is not supported
val isQueryResult: Boolean =
FlinkEngineUtils.FLINK_RUNTIME_VERSION < "1.17" ||
DynFields.builder
.hiddenImpl(classOf[ResultFetcher], "isQueryResult")
.build[Boolean](resultFetcher).get()
DynFields.builder
.hiddenImpl(classOf[ResultFetcher], "isQueryResult")
.build[Boolean](resultFetcher).get()

val effectiveMaxRows: Int = if (isQueryResult) maxRows else Int.MaxValue

Expand All @@ -91,16 +89,15 @@ class IncrementalResultFetchIterator(
while (!fetched && !Thread.interrupted()) {
val rs = resultFetcher.fetchResults(token, effectiveMaxRows - bufferedRows.length)
val flinkRs = new FlinkResultSet(rs)
// TODO: replace string-based match when Flink 1.16 support is dropped
flinkRs.getResultType.name() match {
case "EOS" =>
flinkRs.getResultType match {
case ResultType.EOS =>
debug("EOS received, no more data to fetch.")
fetched = true
hasNext = false
case "NOT_READY" =>
case ResultType.NOT_READY =>
// if flink jobs are not ready, continue to retry
debug("Result not ready, retrying...")
case "PAYLOAD" =>
case ResultType.PAYLOAD =>
val fetchedData = flinkRs.getData
// if no data fetched, continue to retry
if (!fetchedData.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.apache.kyuubi.engine.flink.session

import scala.collection.JavaConverters._
import scala.collection.JavaConverters.mapAsJavaMap

import org.apache.flink.table.gateway.api.session.SessionEnvironment
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.flink.table.gateway.service.session.SessionManagerImpl

import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.FlinkSQLEngine
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion

Expand All @@ -41,7 +40,7 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))

val operationManager = new FlinkSQLOperationManager()
val sessionManager = new FlinkSessionManager(engineContext)
val sessionManager = new SessionManagerImpl(engineContext)

override def start(): Unit = {
super.start()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,19 @@ import java.util

import scala.collection.mutable.ArrayBuffer

import org.apache.flink.configuration.Configuration
import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
import org.apache.flink.table.gateway.service.context.SessionContext

import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.DynMethods

object KDFRegistry {

def createKyuubiDefinedFunctions(sessionContext: SessionContext): Array[KyuubiDefinedFunction] = {

val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction]

val flinkConfigMap: util.Map[String, String] = {
if (FLINK_RUNTIME_VERSION === "1.16") {
DynMethods
.builder("getConfigMap")
.impl(classOf[SessionContext])
.build()
.invoke(sessionContext)
.asInstanceOf[util.Map[String, String]]
} else {
DynMethods
.builder("getSessionConf")
.impl(classOf[SessionContext])
.build()
.invoke(sessionContext)
.asInstanceOf[Configuration]
.toMap
}
}
val flinkConfigMap: util.Map[String, String] = sessionContext.getSessionConf.toMap

val kyuubi_version: KyuubiDefinedFunction = create(
"kyuubi_version",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1262,12 +1262,9 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
stmt.executeQuery("insert into tbl_a values (1)")
val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
// Flink 1.16 doesn't support query id via ResultFetcher
if (FLINK_RUNTIME_VERSION >= "1.17") {
assert(queryId !== null)
// parse the string to check if it's valid Flink job id
assert(JobID.fromHexString(queryId) !== null)
}
assert(queryId !== null)
// parse the string to check if it's valid Flink job id
assert(JobID.fromHexString(queryId) !== null)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
private val tempOpt =
Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "opt")).toFile
Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-client-1.16.3.jar"))
Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-gateway-1.16.3.jar"))
Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-client-1.17.2.jar"))
Files.createFile(Paths.get(tempOpt.toPath.toString, "flink-sql-gateway-1.17.2.jar"))
private val tempUsrLib =
Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "usrlib")).toFile
private val tempUdfJar =
Expand Down
7 changes: 0 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2283,13 +2283,6 @@
</repositories>
</profile>

<profile>
<id>flink-1.16</id>
<properties>
<flink.version>1.16.3</flink.version>
</properties>
</profile>

<profile>
<id>flink-1.17</id>
<properties>
Expand Down

0 comments on commit a632edc

Please sign in to comment.