Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [master] Sub task of sub_process can't submit to run #14688

Closed
3 tasks done
thirdparty-core opened this issue Aug 2, 2023 · 5 comments
Closed
3 tasks done

[Bug] [master] Sub task of sub_process can't submit to run #14688

thirdparty-core opened this issue Aug 2, 2023 · 5 comments
Labels
bug Something isn't working

Comments

@thirdparty-core
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

WorkflowExecuteRunnable#submitStandByTask方法中,当从readyToSubmitTaskQueue队列peek出的taskInstance实例的dependResult是WAITING时,由于peek并不能从readyToSubmitTaskQueue中remove掉taskInstance,将不能继续peek readyToSubmitTaskQueue中其他taskInstance做判断。在处理sub_process的processInstance时,导致某些情况下,sub_process的子任务一直不运行,而sub_process状态一直处于running状态。
代码如下:

        int length = readyToSubmitTaskQueue.size();
        for (int i = 0; i < length; i++) {
            TaskInstance task = readyToSubmitTaskQueue.peek();
            if (task == null) {
                continue;
            }
            // stop tasks which is retrying if forced success happens
            if (task.taskCanRetry()) {
                TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
                if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
                    task.setState(retryTask.getState());
                    logger.info("task: {} has been forced success, put it into complete task list and stop retrying",
                                task.getName());
                    removeTaskFromStandbyList(task);
                    completeTaskMap.put(task.getTaskCode(), task.getId());
                    taskInstanceMap.put(task.getId(), task);
                    submitPostNode(Long.toString(task.getTaskCode()));
                    continue;
                }
            }
            //init varPool only this task is the first time running
            if (task.isFirstRun()) {
                //get pre task ,get all the task varPool to this task
                Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
                getPreVarPool(task, preTask);
            }
            DependResult dependResult = getDependResultForTask(task);
            if (DependResult.SUCCESS == dependResult) {
                Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
                if (!taskInstanceOptional.isPresent()) {
                    this.taskFailedSubmit = true;
                    // Remove and add to complete map and error map
                    if (!removeTaskFromStandbyList(task)) {
                        logger.error(
                            "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}",
                            processInstance.getId(),
                            task.getTaskCode());
                    }
                    completeTaskMap.put(task.getTaskCode(), task.getId());
                    taskInstanceMap.put(task.getId(), task);
                    errorTaskMap.put(task.getTaskCode(), task.getId());
                    activeTaskProcessorMaps.remove(task.getTaskCode());
                    logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}",
                                 task.getProcessInstanceId(),
                                 task.getId(),
                                 task.getTaskCode());
                } else {
                    removeTaskFromStandbyList(task);
                }
            } else if (DependResult.FAILED == dependResult) {
                // if the dependency fails, the current node is not submitted and the state changes to failure.
                dependFailedTaskMap.put(task.getTaskCode(), task.getId());
                removeTaskFromStandbyList(task);
                logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}",
                            task.getId(),
                            dependResult);
            } else if (DependResult.NON_EXEC == dependResult) {
                // for some reasons(depend task pause/stop) this task would not be submit
                removeTaskFromStandbyList(task);
                logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}",
                            task.getId(),
                            dependResult);
            }
        }

所以,这个地方用for循环的逻辑是不是不合理?

What you expected to happen

不阻塞readyToSubmitTaskQueue队列中taskInstance的判断

How to reproduce

no

Anything else

No response

Version

3.1.x

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@thirdparty-core thirdparty-core added bug Something isn't working Waiting for reply Waiting for reply labels Aug 2, 2023
@github-actions
Copy link

github-actions bot commented Aug 2, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

In the WorkflowExecuteRunnable#submitStandByTask method, when the dependResult of the taskInstance instance peeked out from the readyToSubmitTaskQueue queue is WAITING, since peek cannot remove the taskInstance from the readyToSubmitTaskQueue, it will not be able to continue to peek readyToSubmitTaskQueue to make judgments on other taskInstances. When processing the processInstance of sub_process, in some cases, the subtasks of sub_process have not been running, and the state of sub_process has been in the running state.
code show as below:

        int length = readyToSubmitTaskQueue. size();
        for (int i = 0; i < length; i++) {
            TaskInstance task = readyToSubmitTaskQueue. peek();
            if (task == null) {
                continue;
            }
            // stop tasks which is retrying if forced success happens
            if (task. taskCanRetry()) {
                TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
                if (retryTask != null && retryTask. getState(). equals(ExecutionStatus. FORCED_SUCCESS)) {
                    task.setState(retryTask.getState());
                    logger.info("task: {} has been forced success, put it into complete task list and stop retrying",
                                task. getName());
                    removeTaskFromStandbyList(task);
                    completeTaskMap.put(task.getTaskCode(), task.getId());
                    taskInstanceMap.put(task.getId(), task);
                    submitPostNode(Long. toString(task. getTaskCode()));
                    continue;
                }
            }
            //init varPool only this task is the first time running
            if (task. isFirstRun()) {
                //get pre task , get all the task varPool to this task
                Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
                getPreVarPool(task, preTask);
            }
            DependResult dependResult = getDependResultForTask(task);
            if (DependResult. SUCCESS == dependResult) {
                Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
                if (!taskInstanceOptional. isPresent()) {
                    this.taskFailedSubmit = true;
                    // Remove and add to complete map and error map
                    if (!removeTaskFromStandbyList(task)) {
                        logger. error(
                            "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}",
                            processInstance. getId(),
                            task. getTaskCode());
                    }
                    completeTaskMap.put(task.getTaskCode(), task.getId());
                    taskInstanceMap.put(task.getId(), task);
                    errorTaskMap.put(task.getTaskCode(), task.getId());
                    activeTaskProcessorMaps. remove(task. getTaskCode());
                    logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}",
                                 task.getProcessInstanceId(),
                                 task. getId(),
                                 task. getTaskCode());
                } else {
                    removeTaskFromStandbyList(task);
                }
            } else if (DependResult. FAILED == dependResult) {
                // if the dependency fails, the current node is not submitted and the state changes to failure.
                dependFailedTaskMap.put(task.getTaskCode(), task.getId());
                removeTaskFromStandbyList(task);
                logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}",
                            task. getId(),
                            dependResult);
            } else if (DependResult. NON_EXEC == dependResult) {
                // for some reasons(depend task pause/stop) this task would not be submit
                removeTaskFromStandbyList(task);
                logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}",
                            task. getId(),
                            dependResult);
            }
        }

So, is it unreasonable to use the logic of the for loop in this place?

What you expected to happen

Do not block the judgment of taskInstance in the readyToSubmitTaskQueue queue

How to reproduce

no

Anything else

No response

Version

3.1.x

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@MyLoveZhou
Copy link

我认为其实是合理的,因为这样任务只会按顺序执行。如果你可以让剩下的任务实例也经过扫描是一种不严谨的行为,有可能导致后面的任务执行,但是这个任务未执行的情况。至于sub_process中的子工作流的问题应该是前置条件未满足吧。也有可能是我没有理解你的意思,希望可以给我提供更多的信息,当然有图最好啦

@chenshuai1995
Copy link
Contributor

I have meet this problem, but not to slove it. Do you have a good idea?

@sean1205
Copy link

可以检查一下你的流程图里面有没有在中间节点存在禁止执行的节点,我们这里就发现在比较复杂的子流程图中非叶子结点配置了禁止节点导致死循环。

@SbloodyS
Copy link
Member

Fixed.

@SbloodyS SbloodyS removed the Waiting for reply Waiting for reply label Jul 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants