Skip to content

Commit

Permalink
fix: Mark non-fulfilled taskSetNodes error when agent pod failed. Fixes
Browse files Browse the repository at this point in the history
#12703 (#12723)

Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored and Joibel committed Sep 20, 2024
1 parent 74708c7 commit f197662
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,11 @@ rules:
- create
- get
- delete
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
resourceNames:
- argo-workflows-agent-ca-certificates
8 changes: 8 additions & 0 deletions manifests/quick-start-minimal.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions manifests/quick-start-mysql.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions manifests/quick-start-postgres.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ func (woc *wfOperationCtx) reconcileAgentPod(ctx context.Context) error {
}
// Check Pod is just created
if pod.Status.Phase != "" {
woc.updateAgentPodStatus(ctx, pod)
woc.updateAgentPodStatus(pod)
}
return nil
}

func (woc *wfOperationCtx) updateAgentPodStatus(ctx context.Context, pod *apiv1.Pod) {
func (woc *wfOperationCtx) updateAgentPodStatus(pod *apiv1.Pod) {
woc.log.Info("updateAgentPodStatus")
newPhase, message := assessAgentPodStatus(pod)
if newPhase == wfv1.WorkflowFailed || newPhase == wfv1.WorkflowError {
woc.markWorkflowError(ctx, fmt.Errorf("agent pod failed with reason %s", message))
if newPhase == wfv1.NodeFailed || newPhase == wfv1.NodeError {
woc.markTaskSetNodesError(fmt.Errorf(`agent pod failed with reason:"%s"`, message))
}
}

func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) {
var newPhase wfv1.WorkflowPhase
func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.NodePhase, string) {
var newPhase wfv1.NodePhase
var message string
log.WithField("namespace", pod.Namespace).
WithField("podName", pod.Name).
Expand All @@ -63,10 +63,10 @@ func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) {
case apiv1.PodSucceeded, apiv1.PodRunning, apiv1.PodPending:
return "", ""
case apiv1.PodFailed:
newPhase = wfv1.WorkflowFailed
newPhase = wfv1.NodeFailed
message = pod.Status.Message
default:
newPhase = wfv1.WorkflowError
newPhase = wfv1.NodeError
message = fmt.Sprintf("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase)
}
return newPhase, message
Expand Down Expand Up @@ -263,7 +263,10 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
if err != nil {
log.WithError(err).Info("Failed to create Agent pod")
if apierr.IsAlreadyExists(err) {
return created, nil
// get a reference to the currently existing Pod since the created pod returned before was nil.
if existing, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err == nil {
return existing, nil
}
}
return nil, errors.InternalWrapError(fmt.Errorf("failed to create Agent pod. Reason: %v", err))
}
Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestAssessAgentPodStatus(t *testing.T) {
Status: apiv1.PodStatus{Phase: apiv1.PodFailed},
}
nodeStatus, msg := assessAgentPodStatus(pod1)
assert.Equal(t, wfv1.WorkflowFailed, nodeStatus)
assert.Equal(t, wfv1.NodeFailed, nodeStatus)
assert.Equal(t, "", msg)
})
t.Run("Running", func(t *testing.T) {
Expand All @@ -156,15 +156,15 @@ func TestAssessAgentPodStatus(t *testing.T) {
}

nodeStatus, msg := assessAgentPodStatus(pod1)
assert.Equal(t, wfv1.WorkflowPhase(""), nodeStatus)
assert.Equal(t, wfv1.NodePhase(""), nodeStatus)
assert.Equal(t, "", msg)
})
t.Run("Success", func(t *testing.T) {
pod1 := &apiv1.Pod{
Status: apiv1.PodStatus{Phase: apiv1.PodSucceeded},
}
nodeStatus, msg := assessAgentPodStatus(pod1)
assert.Equal(t, wfv1.WorkflowPhase(""), nodeStatus)
assert.Equal(t, wfv1.NodePhase(""), nodeStatus)
assert.Equal(t, "", msg)
})

Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool)
return
}
if woc.isAgentPod(pod) {
woc.updateAgentPodStatus(ctx, pod)
woc.updateAgentPodStatus(pod)
return
}
nodeID := woc.nodeID(pod)
Expand Down
95 changes: 51 additions & 44 deletions workflow/controller/operator_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package controller

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

var httpwf = `apiVersion: argoproj.io/v1alpha1
Expand All @@ -22,61 +24,66 @@ spec:
- name: http
http:
url: https://www.google.com/
`

var taskSet = `apiVersion: argoproj.io/v1alpha1
kind: WorkflowTaskSet
metadata:
creationTimestamp: "2021-04-23T21:49:05Z"
generation: 1
name: hello-world
namespace: default
ownerReferences:
- apiVersion: argoproj.io/v1alpha1
kind: Workflow
name: hello-world
uid: 0b451726-8ddd-4ba3-8d69-c3b5b43e93a3
resourceVersion: "11581184"
selfLink: /apis/argoproj.io/v1alpha1/namespaces/default/workflowtasksets/hello-world
uid: b80385b8-8b72-4f13-af6d-f429a2cad443
spec:
tasks:
http-template-nxvtg-1265710817:
http:
url: http://www.google.com
status:
nodes:
hello-world:
phase: Succeed
outputs:
parameters:
- name: test
value: "welcome"
`

func TestHTTPTemplate(t *testing.T) {
var ts v1alpha1.WorkflowTaskSet
err := yaml.UnmarshalStrict([]byte(taskSet), &ts)
wf := v1alpha1.MustUnmarshalWorkflow(httpwf)
cancel, controller := newController(wf, ts)
wf := wfv1.MustUnmarshalWorkflow(httpwf)
cancel, controller := newController(wf, defaultServiceAccount)
defer cancel()

assert.NoError(t, err)
t.Run("ExecuteHTTPTemplate", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
pods, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).List(ctx, metav1.ListOptions{})
pod, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(ctx, woc.getAgentPodName(), metav1.GetOptions{})
assert.NoError(t, err)
for _, pod := range pods.Items {
assert.Equal(t, pod.Name, "hello-world-1340600742-agent")
}
// tss, err :=controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).List(ctx, metav1.ListOptions{})
assert.NotNil(t, pod)
ts, err := controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, ts)
assert.Len(t, ts.Spec.Tasks, 1)

// simulate agent pod failure scenario
pod.Status.Phase = v1.PodFailed
pod.Status.Message = "manual termination"
pod, err = controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
assert.Nil(t, err)
assert.Equal(t, v1.PodFailed, pod.Status.Phase)
// sleep 1 second to wait for informer getting pod info
time.Sleep(time.Second)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
assert.Equal(t, `agent pod failed with reason:"manual termination"`, woc.wf.Status.Message)
assert.Len(t, woc.wf.Status.Nodes, 1)
assert.Equal(t, wfv1.NodeError, woc.wf.Status.Nodes["hello-world"].Phase)
assert.Equal(t, `agent pod failed with reason:"manual termination"`, woc.wf.Status.Nodes["hello-world"].Message)
ts, err = controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, ts)
assert.Empty(t, ts.Spec.Tasks)
assert.Empty(t, ts.Status.Nodes)
})
}

func TestHTTPTemplateWithoutServiceAccount(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(httpwf)
cancel, controller := newController(wf)
defer cancel()

t.Run("ExecuteHTTPTemplateWithoutServiceAccount", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
_, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(ctx, woc.getAgentPodName(), metav1.GetOptions{})
assert.Error(t, err, fmt.Sprintf(`pods "%s" not found`, woc.getAgentPodName()))
ts, err := controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, ts)
assert.Empty(t, ts.Spec.Tasks)
assert.Empty(t, ts.Status.Nodes)
assert.Len(t, woc.wf.Status.Nodes, 1)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.NodeError, woc.wf.Status.Nodes["hello-world"].Phase)
assert.Equal(t, `create agent pod failed with reason:"failed to get token volumes: serviceaccounts "default" not found"`, woc.wf.Status.Nodes["hello-world"].Message)
})
}
13 changes: 11 additions & 2 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac
func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) {
deletedNode := make(map[string]interface{})
for _, node := range woc.wf.Status.Nodes {
if (node.Type == wfv1.NodeTypeHTTP || node.Type == wfv1.NodeTypePlugin) && node.Fulfilled() {
if taskSetNode(node) && node.Fulfilled() {
deletedNode[node.ID] = nil
}
}
Expand All @@ -52,6 +52,15 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in
}
return
}

func (woc *wfOperationCtx) markTaskSetNodesError(err error) {
for _, node := range woc.wf.Status.Nodes {
if taskSetNode(node) && !node.Fulfilled() {
woc.markNodeError(node.Name, err)
}
}
}

func taskSetNode(n wfv1.NodeStatus) bool {
return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin
}
Expand Down Expand Up @@ -96,7 +105,7 @@ func (woc *wfOperationCtx) taskSetReconciliation(ctx context.Context) {
}
if err := woc.reconcileAgentPod(ctx); err != nil {
woc.log.WithError(err).Error("error in agent pod reconciliation")
woc.markWorkflowError(ctx, err)
woc.markTaskSetNodesError(fmt.Errorf(`create agent pod failed with reason:"%s"`, err))
return
}
}
Expand Down
5 changes: 4 additions & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,10 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
// workflow pod names are deterministic. We can get here if the
// controller fails to persist the workflow after creating the pod.
woc.log.Infof("Failed pod %s (%s) creation: already exists", nodeName, pod.Name)
return created, nil
// get a reference to the currently existing Pod since the created pod returned before was nil.
if existing, err = woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err == nil {
return existing, nil
}
}
if errorsutil.IsTransientErr(err) {
return nil, err
Expand Down

0 comments on commit f197662

Please sign in to comment.