Skip to content

Commit

Permalink
[Improve] read flinkConf bug fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Oct 5, 2024
1 parent 9d5263d commit 5b9e809
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,26 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
throw new IllegalArgumentException(
"[StreamPark] Usage:can't fond config, please set \"--conf $path \" in main arguments")
}
val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
val appFlinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
// config priority: explicitly specified priority > project profiles > system profiles
val parameter = ParameterTool
.fromSystemProperties()
.mergeWith(ParameterTool.fromMap(flinkConf))
.mergeWith(ParameterTool.fromMap(appFlinkConf))
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(argsMap)

val envConfig = Configuration.fromMap(flinkConf)
val flinkConf: Map[String, String] = {
parameter.get(KEY_FLINK_CONF(), null) match {
case flinkConf if flinkConf != null =>
PropertiesUtils
.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
.filter(_._2.nonEmpty)
case _ => Map.empty
}
}

val envConfig = Configuration.fromMap(flinkConf + appFlinkConf)
FlinkConfiguration(parameter, envConfig, null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.{ApiType, PlannerType}
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util.DeflaterUtils
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.streampark.flink.core.EnhancerImplicit._

import org.apache.flink.api.java.utils.ParameterTool
Expand Down Expand Up @@ -182,17 +182,27 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
FlinkConfiguration(cliParameterTool, new Configuration(), new Configuration())
} else {
// config priority: explicitly specified priority > project profiles > system profiles
val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
val appFlinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX)

val envConfig = Configuration.fromMap(flinkConf)
val flinkConf: Map[String, String] = {
parameter.get(KEY_FLINK_CONF(), null) match {
case flinkConf if flinkConf != null =>
PropertiesUtils
.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
.filter(_._2.nonEmpty)
case _ => Map.empty
}
}

val envConfig = Configuration.fromMap(flinkConf + appFlinkConf)
val tableConfig = Configuration.fromMap(tableConf)

val parameterTool = ParameterTool
.fromSystemProperties()
.mergeWith(ParameterTool.fromMap(flinkConf))
.mergeWith(ParameterTool.fromMap(appFlinkConf))
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(ParameterTool.fromMap(tableConf))
.mergeWith(ParameterTool.fromMap(sqlConf))
Expand Down

0 comments on commit 5b9e809

Please sign in to comment.