diff --git a/manifests/cluster-install/workflow-controller-rbac/workflow-controller-clusterrole.yaml b/manifests/cluster-install/workflow-controller-rbac/workflow-controller-clusterrole.yaml index b547e0097382..b66dbc948afc 100644 --- a/manifests/cluster-install/workflow-controller-rbac/workflow-controller-clusterrole.yaml +++ b/manifests/cluster-install/workflow-controller-rbac/workflow-controller-clusterrole.yaml @@ -103,3 +103,11 @@ rules: - create - get - delete +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + resourceNames: + - argo-workflows-agent-ca-certificates \ No newline at end of file diff --git a/manifests/quick-start-minimal.yaml b/manifests/quick-start-minimal.yaml index 371158156f1a..172bf84c8c18 100644 --- a/manifests/quick-start-minimal.yaml +++ b/manifests/quick-start-minimal.yaml @@ -1244,6 +1244,14 @@ rules: - create - get - delete +- apiGroups: + - "" + resourceNames: + - argo-workflows-agent-ca-certificates + resources: + - secrets + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index 568a948fa55a..7364d6068ef8 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -1244,6 +1244,14 @@ rules: - create - get - delete +- apiGroups: + - "" + resourceNames: + - argo-workflows-agent-ca-certificates + resources: + - secrets + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index 8c7c488f09c1..de61aad33253 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -1244,6 +1244,14 @@ rules: - create - get - delete +- apiGroups: + - "" + resourceNames: + - argo-workflows-agent-ca-certificates + resources: + - secrets + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/workflow/controller/agent.go b/workflow/controller/agent.go index f9d0f0122c7a..bea1dec654ae 100644 --- a/workflow/controller/agent.go +++ b/workflow/controller/agent.go @@ -40,21 +40,21 @@ func (woc *wfOperationCtx) reconcileAgentPod(ctx context.Context) error { } // Check Pod is just created if pod.Status.Phase != "" { - woc.updateAgentPodStatus(ctx, pod) + woc.updateAgentPodStatus(pod) } return nil } -func (woc *wfOperationCtx) updateAgentPodStatus(ctx context.Context, pod *apiv1.Pod) { +func (woc *wfOperationCtx) updateAgentPodStatus(pod *apiv1.Pod) { woc.log.Info("updateAgentPodStatus") newPhase, message := assessAgentPodStatus(pod) - if newPhase == wfv1.WorkflowFailed || newPhase == wfv1.WorkflowError { - woc.markWorkflowError(ctx, fmt.Errorf("agent pod failed with reason %s", message)) + if newPhase == wfv1.NodeFailed || newPhase == wfv1.NodeError { + woc.markTaskSetNodesError(fmt.Errorf(`agent pod failed with reason:"%s"`, message)) } } -func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) { - var newPhase wfv1.WorkflowPhase +func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.NodePhase, string) { + var newPhase wfv1.NodePhase var message string log.WithField("namespace", pod.Namespace). WithField("podName", pod.Name). @@ -63,10 +63,10 @@ func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) { case apiv1.PodSucceeded, apiv1.PodRunning, apiv1.PodPending: return "", "" case apiv1.PodFailed: - newPhase = wfv1.WorkflowFailed + newPhase = wfv1.NodeFailed message = pod.Status.Message default: - newPhase = wfv1.WorkflowError + newPhase = wfv1.NodeError message = fmt.Sprintf("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase) } return newPhase, message @@ -263,7 +263,10 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro if err != nil { log.WithError(err).Info("Failed to create Agent pod") if apierr.IsAlreadyExists(err) { - return created, nil + // get a reference to the currently existing Pod since the created pod returned before was nil. + if existing, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err == nil { + return existing, nil + } } return nil, errors.InternalWrapError(fmt.Errorf("failed to create Agent pod. Reason: %v", err)) } diff --git a/workflow/controller/agent_test.go b/workflow/controller/agent_test.go index e563b318714e..64fedbf458c6 100644 --- a/workflow/controller/agent_test.go +++ b/workflow/controller/agent_test.go @@ -147,7 +147,7 @@ func TestAssessAgentPodStatus(t *testing.T) { Status: apiv1.PodStatus{Phase: apiv1.PodFailed}, } nodeStatus, msg := assessAgentPodStatus(pod1) - assert.Equal(t, wfv1.WorkflowFailed, nodeStatus) + assert.Equal(t, wfv1.NodeFailed, nodeStatus) assert.Equal(t, "", msg) }) t.Run("Running", func(t *testing.T) { @@ -156,7 +156,7 @@ func TestAssessAgentPodStatus(t *testing.T) { } nodeStatus, msg := assessAgentPodStatus(pod1) - assert.Equal(t, wfv1.WorkflowPhase(""), nodeStatus) + assert.Equal(t, wfv1.NodePhase(""), nodeStatus) assert.Equal(t, "", msg) }) t.Run("Success", func(t *testing.T) { @@ -164,7 +164,7 @@ func TestAssessAgentPodStatus(t *testing.T) { Status: apiv1.PodStatus{Phase: apiv1.PodSucceeded}, } nodeStatus, msg := assessAgentPodStatus(pod1) - assert.Equal(t, wfv1.WorkflowPhase(""), nodeStatus) + assert.Equal(t, wfv1.NodePhase(""), nodeStatus) assert.Equal(t, "", msg) }) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index f3b880b24e2b..1a538fe50871 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1168,7 +1168,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool) return } if woc.isAgentPod(pod) { - woc.updateAgentPodStatus(ctx, pod) + woc.updateAgentPodStatus(pod) return } nodeID := woc.nodeID(pod) diff --git a/workflow/controller/operator_agent_test.go b/workflow/controller/operator_agent_test.go index 4be63a2255fa..bbf64cac6be6 100644 --- a/workflow/controller/operator_agent_test.go +++ b/workflow/controller/operator_agent_test.go @@ -2,13 +2,15 @@ package controller import ( "context" + "fmt" "testing" + "time" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/yaml" - "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) var httpwf = `apiVersion: argoproj.io/v1alpha1 @@ -22,61 +24,66 @@ spec: - name: http http: url: https://www.google.com/ - -` - -var taskSet = `apiVersion: argoproj.io/v1alpha1 -kind: WorkflowTaskSet -metadata: - creationTimestamp: "2021-04-23T21:49:05Z" - generation: 1 - name: hello-world - namespace: default - ownerReferences: - - apiVersion: argoproj.io/v1alpha1 - kind: Workflow - name: hello-world - uid: 0b451726-8ddd-4ba3-8d69-c3b5b43e93a3 - resourceVersion: "11581184" - selfLink: /apis/argoproj.io/v1alpha1/namespaces/default/workflowtasksets/hello-world - uid: b80385b8-8b72-4f13-af6d-f429a2cad443 -spec: - tasks: - http-template-nxvtg-1265710817: - http: - url: http://www.google.com - -status: - nodes: - hello-world: - phase: Succeed - outputs: - parameters: - - name: test - value: "welcome" ` func TestHTTPTemplate(t *testing.T) { - var ts v1alpha1.WorkflowTaskSet - err := yaml.UnmarshalStrict([]byte(taskSet), &ts) - wf := v1alpha1.MustUnmarshalWorkflow(httpwf) - cancel, controller := newController(wf, ts) + wf := wfv1.MustUnmarshalWorkflow(httpwf) + cancel, controller := newController(wf, defaultServiceAccount) defer cancel() - assert.NoError(t, err) t.Run("ExecuteHTTPTemplate", func(t *testing.T) { ctx := context.Background() woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) - pods, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).List(ctx, metav1.ListOptions{}) + pod, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(ctx, woc.getAgentPodName(), metav1.GetOptions{}) assert.NoError(t, err) - for _, pod := range pods.Items { - assert.Equal(t, pod.Name, "hello-world-1340600742-agent") - } - // tss, err :=controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).List(ctx, metav1.ListOptions{}) + assert.NotNil(t, pod) ts, err := controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{}) assert.NoError(t, err) assert.NotNil(t, ts) assert.Len(t, ts.Spec.Tasks, 1) + + // simulate agent pod failure scenario + pod.Status.Phase = v1.PodFailed + pod.Status.Message = "manual termination" + pod, err = controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + assert.Nil(t, err) + assert.Equal(t, v1.PodFailed, pod.Status.Phase) + // sleep 1 second to wait for informer getting pod info + time.Sleep(time.Second) + woc.operate(ctx) + assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase) + assert.Equal(t, `agent pod failed with reason:"manual termination"`, woc.wf.Status.Message) + assert.Len(t, woc.wf.Status.Nodes, 1) + assert.Equal(t, wfv1.NodeError, woc.wf.Status.Nodes["hello-world"].Phase) + assert.Equal(t, `agent pod failed with reason:"manual termination"`, woc.wf.Status.Nodes["hello-world"].Message) + ts, err = controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, ts) + assert.Empty(t, ts.Spec.Tasks) + assert.Empty(t, ts.Status.Nodes) + }) +} + +func TestHTTPTemplateWithoutServiceAccount(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(httpwf) + cancel, controller := newController(wf) + defer cancel() + + t.Run("ExecuteHTTPTemplateWithoutServiceAccount", func(t *testing.T) { + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + _, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(ctx, woc.getAgentPodName(), metav1.GetOptions{}) + assert.Error(t, err, fmt.Sprintf(`pods "%s" not found`, woc.getAgentPodName())) + ts, err := controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, ts) + assert.Empty(t, ts.Spec.Tasks) + assert.Empty(t, ts.Status.Nodes) + assert.Len(t, woc.wf.Status.Nodes, 1) + assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase) + assert.Equal(t, wfv1.NodeError, woc.wf.Status.Nodes["hello-world"].Phase) + assert.Equal(t, `create agent pod failed with reason:"failed to get token volumes: serviceaccounts "default" not found"`, woc.wf.Status.Nodes["hello-world"].Message) }) } diff --git a/workflow/controller/taskset.go b/workflow/controller/taskset.go index 1aaa14752180..2c59cc00e695 100644 --- a/workflow/controller/taskset.go +++ b/workflow/controller/taskset.go @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) { deletedNode := make(map[string]interface{}) for _, node := range woc.wf.Status.Nodes { - if (node.Type == wfv1.NodeTypeHTTP || node.Type == wfv1.NodeTypePlugin) && node.Fulfilled() { + if taskSetNode(node) && node.Fulfilled() { deletedNode[node.ID] = nil } } @@ -52,6 +52,15 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in } return } + +func (woc *wfOperationCtx) markTaskSetNodesError(err error) { + for _, node := range woc.wf.Status.Nodes { + if taskSetNode(node) && !node.Fulfilled() { + woc.markNodeError(node.Name, err) + } + } +} + func taskSetNode(n wfv1.NodeStatus) bool { return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin } @@ -96,7 +105,7 @@ func (woc *wfOperationCtx) taskSetReconciliation(ctx context.Context) { } if err := woc.reconcileAgentPod(ctx); err != nil { woc.log.WithError(err).Error("error in agent pod reconciliation") - woc.markWorkflowError(ctx, err) + woc.markTaskSetNodesError(fmt.Errorf(`create agent pod failed with reason:"%s"`, err)) return } } diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index e941ff6a8d4f..1d467e594059 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -438,7 +438,10 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin // workflow pod names are deterministic. We can get here if the // controller fails to persist the workflow after creating the pod. woc.log.Infof("Failed pod %s (%s) creation: already exists", nodeName, pod.Name) - return created, nil + // get a reference to the currently existing Pod since the created pod returned before was nil. + if existing, err = woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err == nil { + return existing, nil + } } if errorsutil.IsTransientErr(err) { return nil, err