Skip to content

Commit

Permalink
[Improve] cp/sp path improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Sep 3, 2024
1 parent 705b7c1 commit 2e668cf
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface SavepointService extends IService<Savepoint> {
void removeApp(Application application);

String getSavePointPath(Application app) throws Exception;

String processPath(String path, String jobName, Long jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,11 @@ public void cancel(Application appParam) throws Exception {
String customSavepoint = null;
if (appParam.getRestoreOrTriggerSavepoint()) {
customSavepoint = appParam.getSavepointPath();
if (StringUtils.isBlank(customSavepoint)) {
if (customSavepoint == null) {
customSavepoint =
savepointService.processPath(
customSavepoint, application.getJobName(), application.getId());
} else {
customSavepoint = savepointService.getSavePointPath(appParam);
}
}
Expand Down Expand Up @@ -1451,23 +1455,23 @@ public String checkSavepointPath(Application appParam) throws Exception {
final String scheme = uri.getScheme();
final String pathPart = uri.getPath();
if (scheme == null) {
return "This state.savepoints.dir value "
return "This state savepoint dir value "
+ savepointPath
+ " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI.";
}
if (pathPart == null) {
return "This state.savepoints.dir value "
return "This state savepoint dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
}
if (pathPart.isEmpty() || "/".equals(pathPart)) {
return "This state.savepoints.dir value "
return "This state savepoint dir value "
+ savepointPath
+ " Cannot use the root directory for checkpoints.";
}
return null;
} else {
return "When custom savepoint is not set, state.savepoints.dir needs to be set in properties or flink-conf.yaml of application";
return "When a custom savepoint is not set, state.savepoints.dir or execution.checkpointing.savepoint-dir needs to be configured in the properties or flink-conf.yaml of the application.";
}
}

Expand Down Expand Up @@ -1660,7 +1664,7 @@ public void start(Application appParam, boolean auto) throws Exception {
application.getJobName(),
appConf,
application.getApplicationType(),
getSavepointPath(appParam),
getSavepointPath(appParam, application.getJobName(), application.getId()),
applicationArgs,
buildResult,
extraParameter,
Expand Down Expand Up @@ -1877,19 +1881,20 @@ private Boolean checkJobName(String jobName) {
return false;
}

private String getSavepointPath(Application appParam) {
private String getSavepointPath(Application appParam, String jobName, Long jobId) {
String path = null;
if (appParam.getRestoreOrTriggerSavepoint() != null
&& appParam.getRestoreOrTriggerSavepoint()) {
if (StringUtils.isBlank(appParam.getSavepointPath())) {
Savepoint savepoint = savepointService.getLatest(appParam.getId());
if (savepoint != null) {
return savepoint.getPath();
path = savepoint.getPath();
}
} else {
return appParam.getSavepointPath();
path = appParam.getSavepointPath();
}
}
return null;
return savepointService.processPath(path, jobName, jobId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -95,7 +96,7 @@ public class SavepointServiceImpl extends ServiceImpl<SavepointMapper, Savepoint

@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;

private static final String SAVEPOINT_DIRECTORY_NEW_KEY = "execution.checkpointing.dir";
private static final String SAVEPOINT_DIRECTORY_NEW_KEY = "execution.checkpointing.savepoint-dir";

private static final String MAX_RETAINED_CHECKPOINTS_NEW_KEY =
"execution.checkpointing.num-retained";
Expand Down Expand Up @@ -244,16 +245,13 @@ public String getSavePointPath(Application appParam) throws Exception {
Application application = applicationService.getById(appParam.getId());

// 1) properties have the highest priority, read the properties are set: -Dstate.savepoints.dir
String savepointPath =
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
Map<String, String> properties =
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());

if (StringUtils.isBlank(savepointPath)) {
// for flink 1.20
savepointPath =
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
.get(SAVEPOINT_DIRECTORY_NEW_KEY);
}
String savepointPath =
properties.getOrDefault(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
properties.get(SAVEPOINT_DIRECTORY_NEW_KEY));

// Application conf configuration has the second priority. If it is a streampark|flinksql type
// task,
Expand All @@ -266,11 +264,10 @@ public String getSavePointPath(Application appParam) throws Exception {
if (applicationConfig != null) {
Map<String, String> map = applicationConfig.readConfig();
if (FlinkUtils.isCheckpointEnabled(map)) {
savepointPath = map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
if (StringUtils.isBlank(savepointPath)) {
// for flink 1.20
savepointPath = map.get(SAVEPOINT_DIRECTORY_NEW_KEY);
}
savepointPath =
map.getOrDefault(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
map.get(SAVEPOINT_DIRECTORY_NEW_KEY));
}
}
}
Expand All @@ -290,11 +287,10 @@ public String getSavePointPath(Application appParam) throws Exception {
application.getFlinkClusterId()));
Map<String, String> config = cluster.getFlinkConfig();
if (!config.isEmpty()) {
savepointPath = config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
if (StringUtils.isBlank(savepointPath)) {
// for flink 1.20
savepointPath = config.get(SAVEPOINT_DIRECTORY_NEW_KEY);
}
savepointPath =
config.getOrDefault(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
config.get(SAVEPOINT_DIRECTORY_NEW_KEY));
}
}
}
Expand All @@ -303,16 +299,25 @@ public String getSavePointPath(Application appParam) throws Exception {
if (StringUtils.isBlank(savepointPath)) {
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
Properties flinkConfig = flinkEnv.getFlinkConfig();
savepointPath =
flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());

if (StringUtils.isBlank(savepointPath)) {
// for flink 1.20
savepointPath = flinkEnv.getFlinkConfig().getProperty(SAVEPOINT_DIRECTORY_NEW_KEY);
}
flinkConfig.getProperty(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
flinkConfig.getProperty(SAVEPOINT_DIRECTORY_NEW_KEY));
}

return savepointPath;
return processPath(savepointPath, application.getJobName(), application.getId());
}

@Override
public String processPath(String path, String jobName, Long jobId) {
if (StringUtils.isNotBlank(path)) {
return path.replaceAll("\\$job(Id|id)", jobId.toString())
.replaceAll("\\$\\{job(Id|id)}", jobId.toString())
.replaceAll("\\$job(Name|name)", jobName)
.replaceAll("\\$\\{job(Name|name)}", jobName);
}
return path;
}

@Override
Expand Down Expand Up @@ -340,6 +345,9 @@ public void trigger(Long appId, @Nullable String savepointPath) {

// infer savepoint
String customSavepoint = this.getFinalSavepointDir(savepointPath, application);
if (StringUtils.isNotBlank(customSavepoint)) {
customSavepoint = processPath(customSavepoint, application.getJobName(), application.getId());
}

FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
String clusterId = getClusterId(application, cluster);
Expand Down

0 comments on commit 2e668cf

Please sign in to comment.