Skip to content

Commit

Permalink
complete processInputMappingsWhenGetDescriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
zeyu10 committed Oct 21, 2024
1 parent fc33979 commit e2ddfcd
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ private PassTask generateEndPassTask(DAG dag, Map<String, BaseTask> taskMap) {
public String processWhenGetDescriptor(String descriptor) {
DAG dag = dagParser.parse(descriptor);
Map<String, BaseTask> taskMap = getTaskMapByDag(dag);
boolean taskExistsInput = taskMap.values().stream()
.anyMatch(task -> MapUtils.isNotEmpty(task.getInput()));
boolean taskExistsInput = taskMap.values().stream().anyMatch(task -> MapUtils.isNotEmpty(task.getInput()));
// 如果没有按照新版本配置,则不需要处理
if (!taskExistsInput) {
return descriptor;
}
Expand Down Expand Up @@ -353,6 +353,21 @@ private void processOutputMappingsWhenGetDescriptor(BaseTask task) {
* @param task 待处理的任务
*/
private void processInputMappingsWhenGetDescriptor(BaseTask task) {
task.setInputMappings(null);
if (CollectionUtils.isEmpty(task.getInputMappings())) {
return;
}
Map<String, Object> input = task.getInput();
Set<String> targets = new HashSet<>();
input.keySet().forEach(key -> targets.add("$.input." + key));
List<Mapping> resInputMappings = new ArrayList<>();
for (Mapping inputMapping : task.getInputMappings()) {
if (!targets.contains(inputMapping.getTarget())) {
resInputMappings.add(inputMapping);
}
}
if (CollectionUtils.isEmpty(resInputMappings)) {
resInputMappings = null;
}
task.setInputMappings(resInputMappings);
}
}

0 comments on commit e2ddfcd

Please sign in to comment.