Skip to content

Commit

Permalink
[Improve] get flink config minor improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Sep 15, 2024
1 parent 780a53a commit 5d9f5fb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,25 @@ case class SubmitRequest(
lazy val flinkDefaultConfiguration: Configuration = {
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) match {
case Success(value) =>
value
.keySet()
.foreach(
k => {
val v = value.getString(k, null)
if (v != null) {
val result = v
.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", effectiveAppName)
.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
value.setString(k, result)
}
})
value
executionMode match {
case ExecutionMode.YARN_SESSION | ExecutionMode.KUBERNETES_NATIVE_SESSION |
ExecutionMode.REMOTE =>
value
case _ =>
value
.keySet()
.foreach(
k => {
val v = value.getString(k, null)
if (v != null) {
val result = v
.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", effectiveAppName)
.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
value.setString(k, result)
}
})
value
}
case _ => new Configuration()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo
|""".stripMargin)

val flinkConfig = getFlinkK8sConfig(deployRequest)

replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)", deployRequest.clusterName)

val kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")

var clusterDescriptor: KubernetesClusterDescriptor = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ object YarnSessionClient extends YarnClientTrait {
// app name
.safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName)

replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)", deployRequest.clusterName)

logInfo(s"""
|------------------------------------------------------------------
|Effective submit configuration: $flinkConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,4 +496,18 @@ trait FlinkClientTrait extends Logger {
})
}

def replaceConfig(flinkConfig: Configuration, regexp: String, replacement: String): Unit = {
flinkConfig
.keySet()
.foreach(
k => {
val v = flinkConfig.getString(k, null)
if (v != null) {
val result = v
.replaceAll(regexp, replacement)
flinkConfig.setString(k, result)
}
})
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class FlinkStreamTableTrait(

/** Recommended to use this Api to start tasks */
def start(name: String = null): JobExecutionResult = {
val appName = parameter.getAppName(name, true)
val appName = parameter.getAppName(name, required = true)
execute(appName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object SqlClient extends App {

private[this] val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key()) match {
case Some(e) =>
// 1) flink sql execution.runtime-mode has highest priority
// 1) flink sql execution.runtime-mode has the highest priority
val m = e.operands(1).toUpperCase()
arguments += s"--${ExecutionOptions.RUNTIME_MODE.key()} $m"
m
Expand Down

0 comments on commit 5d9f5fb

Please sign in to comment.