diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md index b30af03efccb..88253a64b4f8 100644 --- a/docs/docs/en/guide/task/flink.md +++ b/docs/docs/en/guide/task/flink.md @@ -37,7 +37,7 @@ Flink task type, used to execute Flink programs. For Flink nodes: | Parallelism | Used to set the degree of parallelism for executing Flink tasks. | | Yarn queue | Used to set the yarn queue, use `default` queue by default. | | Main program parameters | Set the input parameters for the Flink program and support the substitution of custom parameter variables. | -| Optional parameters | Set the flink command options, such as `-D`, `-C`, `-yt`. | +| Optional parameters | Set the flink command options, such as `-D`, `-C`, `-yt`, and support the substitution of custom parameter variables, such as `-Dyarn.application.name=${job_name}` custom parameter job _name will be replaced. | | Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. | ## Task Example diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md index 478e07fae874..1d25eafca9f4 100644 --- a/docs/docs/zh/guide/task/flink.md +++ b/docs/docs/zh/guide/task/flink.md @@ -37,7 +37,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点: | 并行度 | 用于设置执行 Flink 任务的并行度 | | Yarn 队列 | 用于设置 Yarn 队列,默认使用 default 队列 | | 主程序参数 | 设置 Flink 程序的输入参数,支持自定义参数变量的替换 | -| 选项参数 | 设置Flink命令的选项参数,例如`-D`, `-C`, `-yt` | +| 选项参数 | 设置Flink命令的选项参数,例如`-D`, `-C`, `-yt`,支持自定义参数变量的替换,例如`-Dyarn.application.name=${job_name}`,自定义参数job_name会被替换 | | 自定义参数 | 是 Flink 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 | ## 任务样例 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index fe374c9d8fed..6f02b54c8a5a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -112,7 +112,8 @@ private static List buildRunCommandLineForSql(TaskExecutionContext taskE String others = flinkParameters.getOthers(); if (StringUtils.isNotEmpty(others)) { - args.add(others); + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); + args.add(ParameterUtils.convertParameterPlaceholders(others, ParameterUtils.convert(paramsMap))); } return args; } @@ -271,7 +272,8 @@ private static List buildRunCommandLineForOthers(TaskExecutionContext ta // -s -yqu -yat -yD -D if (StringUtils.isNotEmpty(others)) { - args.add(others); + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); + args.add(ParameterUtils.convertParameterPlaceholders(others, ParameterUtils.convert(paramsMap))); } // determine yarn queue diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index b53260bb87eb..e4ff2ec375b5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -18,10 +18,14 @@ package org.apache.dolphinscheduler.plugin.task.flink; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -45,6 +49,7 @@ private FlinkParameters buildTestFlinkParametersWithDeployMode(FlinkDeployMode f flinkParameters.setAppName("demo-app-name"); flinkParameters.setJobManagerMemory("1024m"); flinkParameters.setTaskManagerMemory("1024m"); + flinkParameters.setOthers("-Dyarn.application.name=${job-name}"); return flinkParameters; } @@ -60,6 +65,12 @@ private TaskExecutionContext buildTestTaskExecutionContext() { ResourceContext resourceContext = new ResourceContext(); resourceContext.addResourceItem(resourceItem); taskExecutionContext.setResourceContext(resourceContext); + + Map parameters = new HashMap<>(); + parameters.put("job-name", + Property.builder().type(DataType.VARCHAR).prop("job-name").value("demo-app-name").build()); + taskExecutionContext.setPrepareParamsMap(parameters); + return taskExecutionContext; } @@ -69,7 +80,7 @@ public void testRunJarInApplicationMode() throws Exception { List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -81,7 +92,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine1)); flinkParameters.setFlinkVersion("<1.10"); @@ -89,7 +100,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine2)); flinkParameters.setFlinkVersion(">=1.12"); @@ -97,7 +108,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine3)); } @@ -107,7 +118,7 @@ public void testRunJarInLocalMode() { List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -118,7 +129,7 @@ public void testRunSql() { List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql", + "${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql -Dyarn.application.name=demo-app-name", joinStringListWithSpace(commandLine)); }