Skip to content

Commit

Permalink
[Improve] load flink-config minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Oct 7, 2024
1 parent dcef164 commit bbe7731
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ trait FlinkClientTrait extends Logger {
|""".stripMargin)

val flinkConfig = prepareConfig(submitRequest)

flinkConfig.toMap.foreach(c => logInfo(s"flinkConfig: ${c._1}: ${c._2}"))
setConfig(submitRequest, flinkConfig)

Try(doSubmit(submitRequest, flinkConfig)) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(argsMap)

val flinkConf: Map[String, String] = {
parameter.get(KEY_FLINK_CONF(), null) match {
case flinkConf if flinkConf != null =>
PropertiesUtils
.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
.filter(_._2.nonEmpty)
case _ => Map.empty
}
}

val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
val envConfig = Configuration.fromMap(appFlinkConf)
FlinkConfiguration(parameter, envConfig, null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)

val flinkConf: Map[String, String] = {
parameter.get(KEY_FLINK_CONF(), null) match {
case flinkConf if flinkConf != null =>
PropertiesUtils
.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
.filter(_._2.nonEmpty)
case _ => Map.empty
}
}

val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf)
val envConfig = Configuration.fromMap(appFlinkConf)
val tableConfig = Configuration.fromMap(tableConf)

val parameterTool = ParameterTool
Expand Down

0 comments on commit bbe7731

Please sign in to comment.