From e2ddfcddd1b6d7690bc8e25c59c80e67a6302d83 Mon Sep 17 00:00:00 2001 From: zeyu10 Date: Mon, 21 Oct 2024 16:06:16 +0800 Subject: [PATCH] complete processInputMappingsWhenGetDescriptor --- .../service/DescriptorParseServiceImpl.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/service/DescriptorParseServiceImpl.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/service/DescriptorParseServiceImpl.java index 72e3fbc6..7ea07aef 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/service/DescriptorParseServiceImpl.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/service/DescriptorParseServiceImpl.java @@ -304,8 +304,8 @@ private PassTask generateEndPassTask(DAG dag, Map taskMap) { public String processWhenGetDescriptor(String descriptor) { DAG dag = dagParser.parse(descriptor); Map taskMap = getTaskMapByDag(dag); - boolean taskExistsInput = taskMap.values().stream() - .anyMatch(task -> MapUtils.isNotEmpty(task.getInput())); + boolean taskExistsInput = taskMap.values().stream().anyMatch(task -> MapUtils.isNotEmpty(task.getInput())); + // 如果没有按照新版本配置,则不需要处理 if (!taskExistsInput) { return descriptor; } @@ -353,6 +353,21 @@ private void processOutputMappingsWhenGetDescriptor(BaseTask task) { * @param task 待处理的任务 */ private void processInputMappingsWhenGetDescriptor(BaseTask task) { - task.setInputMappings(null); + if (CollectionUtils.isEmpty(task.getInputMappings())) { + return; + } + Map input = task.getInput(); + Set targets = new HashSet<>(); + input.keySet().forEach(key -> targets.add("$.input." + key)); + List resInputMappings = new ArrayList<>(); + for (Mapping inputMapping : task.getInputMappings()) { + if (!targets.contains(inputMapping.getTarget())) { + resInputMappings.add(inputMapping); + } + } + if (CollectionUtils.isEmpty(resInputMappings)) { + resInputMappings = null; + } + task.setInputMappings(resInputMappings); } }