diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java index 60cedc9787..2c31d77a36 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java @@ -42,4 +42,6 @@ public interface SavepointService extends IService { void removeApp(Application application); String getSavePointPath(Application app) throws Exception; + + String processPath(String path, String jobName, Long jobId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 51246f5eed..ce5c86ed18 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1336,7 +1336,11 @@ public void cancel(Application appParam) throws Exception { String customSavepoint = null; if (appParam.getRestoreOrTriggerSavepoint()) { customSavepoint = appParam.getSavepointPath(); - if (StringUtils.isBlank(customSavepoint)) { + if (customSavepoint == null) { + customSavepoint = + savepointService.processPath( + customSavepoint, application.getJobName(), application.getId()); + } else { customSavepoint = savepointService.getSavePointPath(appParam); } } @@ -1451,23 +1455,23 @@ public String checkSavepointPath(Application appParam) throws Exception { final String scheme = uri.getScheme(); final String pathPart = uri.getPath(); if (scheme == null) { - return "This state.savepoints.dir value " + return "This state savepoint dir value " + savepointPath + " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI."; } if (pathPart == null) { - return "This state.savepoints.dir value " + return "This state savepoint dir value " + savepointPath + " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data."; } if (pathPart.isEmpty() || "/".equals(pathPart)) { - return "This state.savepoints.dir value " + return "This state savepoint dir value " + savepointPath + " Cannot use the root directory for checkpoints."; } return null; } else { - return "When custom savepoint is not set, state.savepoints.dir needs to be set in properties or flink-conf.yaml of application"; + return "When a custom savepoint is not set, state.savepoints.dir or execution.checkpointing.savepoint-dir needs to be configured in the properties or flink-conf.yaml of the application."; } } @@ -1660,7 +1664,7 @@ public void start(Application appParam, boolean auto) throws Exception { application.getJobName(), appConf, application.getApplicationType(), - getSavepointPath(appParam), + getSavepointPath(appParam, application.getJobName(), application.getId()), applicationArgs, buildResult, extraParameter, @@ -1877,19 +1881,20 @@ private Boolean checkJobName(String jobName) { return false; } - private String getSavepointPath(Application appParam) { + private String getSavepointPath(Application appParam, String jobName, Long jobId) { + String path = null; if (appParam.getRestoreOrTriggerSavepoint() != null && appParam.getRestoreOrTriggerSavepoint()) { if (StringUtils.isBlank(appParam.getSavepointPath())) { Savepoint savepoint = savepointService.getLatest(appParam.getId()); if (savepoint != null) { - return savepoint.getPath(); + path = savepoint.getPath(); } } else { - return appParam.getSavepointPath(); + path = appParam.getSavepointPath(); } } - return null; + return savepointService.processPath(path, jobName, jobId); } /** diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java index ead9934a8e..61b025b011 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java @@ -70,6 +70,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -95,7 +96,7 @@ public class SavepointServiceImpl extends ServiceImpl properties = + PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties()); - if (StringUtils.isBlank(savepointPath)) { - // for flink 1.20 - savepointPath = - PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties()) - .get(SAVEPOINT_DIRECTORY_NEW_KEY); - } + String savepointPath = + properties.getOrDefault( + CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), + properties.get(SAVEPOINT_DIRECTORY_NEW_KEY)); // Application conf configuration has the second priority. If it is a streampark|flinksql type // task, @@ -266,11 +264,10 @@ public String getSavePointPath(Application appParam) throws Exception { if (applicationConfig != null) { Map map = applicationConfig.readConfig(); if (FlinkUtils.isCheckpointEnabled(map)) { - savepointPath = map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()); - if (StringUtils.isBlank(savepointPath)) { - // for flink 1.20 - savepointPath = map.get(SAVEPOINT_DIRECTORY_NEW_KEY); - } + savepointPath = + map.getOrDefault( + CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), + map.get(SAVEPOINT_DIRECTORY_NEW_KEY)); } } } @@ -290,11 +287,10 @@ public String getSavePointPath(Application appParam) throws Exception { application.getFlinkClusterId())); Map config = cluster.getFlinkConfig(); if (!config.isEmpty()) { - savepointPath = config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()); - if (StringUtils.isBlank(savepointPath)) { - // for flink 1.20 - savepointPath = config.get(SAVEPOINT_DIRECTORY_NEW_KEY); - } + savepointPath = + config.getOrDefault( + CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), + config.get(SAVEPOINT_DIRECTORY_NEW_KEY)); } } } @@ -303,16 +299,25 @@ public String getSavePointPath(Application appParam) throws Exception { if (StringUtils.isBlank(savepointPath)) { // flink FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId()); + Properties flinkConfig = flinkEnv.getFlinkConfig(); savepointPath = - flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()); - - if (StringUtils.isBlank(savepointPath)) { - // for flink 1.20 - savepointPath = flinkEnv.getFlinkConfig().getProperty(SAVEPOINT_DIRECTORY_NEW_KEY); - } + flinkConfig.getProperty( + CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), + flinkConfig.getProperty(SAVEPOINT_DIRECTORY_NEW_KEY)); } - return savepointPath; + return processPath(savepointPath, application.getJobName(), application.getId()); + } + + @Override + public String processPath(String path, String jobName, Long jobId) { + if (StringUtils.isNotBlank(path)) { + return path.replaceAll("\\$job(Id|id)", jobId.toString()) + .replaceAll("\\$\\{job(Id|id)}", jobId.toString()) + .replaceAll("\\$job(Name|name)", jobName) + .replaceAll("\\$\\{job(Name|name)}", jobName); + } + return path; } @Override @@ -340,6 +345,9 @@ public void trigger(Long appId, @Nullable String savepointPath) { // infer savepoint String customSavepoint = this.getFinalSavepointDir(savepointPath, application); + if (StringUtils.isNotBlank(customSavepoint)) { + customSavepoint = processPath(customSavepoint, application.getJobName(), application.getId()); + } FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); String clusterId = getClusterId(application, cluster);