From dd4b6d1e50b0c2bf056e634d731ac980fc8746da Mon Sep 17 00:00:00 2001 From: "mingzhou.swx" Date: Wed, 1 Mar 2023 17:33:05 +0800 Subject: [PATCH] solve some problems caused by informer latency for workloadspread Signed-off-by: mingzhou.swx --- .../workloadspread_controller.go | 29 +++++++++-- .../workloadspread_event_handler.go | 48 +++++++++++++------ .../workloadspread_event_handler_test.go | 12 ++--- pkg/util/workloadspread/workloadspread.go | 12 ++++- test/e2e/apps/workloadspread.go | 13 ++++- 5 files changed, 88 insertions(+), 26 deletions(-) diff --git a/pkg/controller/workloadspread/workloadspread_controller.go b/pkg/controller/workloadspread/workloadspread_controller.go index dc825889d3..298aa51fd6 100644 --- a/pkg/controller/workloadspread/workloadspread_controller.go +++ b/pkg/controller/workloadspread/workloadspread_controller.go @@ -522,22 +522,45 @@ func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.P func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread, versionedPodMap map[string]map[string][]*corev1.Pod, subsetPodMap map[string][]*corev1.Pod, workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) { - // set the generation in the returned status status := appsv1alpha1.WorkloadSpreadStatus{} + // set the generation in the returned status status.ObservedGeneration = ws.Generation - //status.ObservedWorkloadReplicas = workloadReplicas + // status.ObservedWorkloadReplicas = workloadReplicas status.VersionedSubsetStatuses = make(map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus, len(versionedPodMap)) + // overall subset statuses var scheduleFailedPodMap map[string][]*corev1.Pod status.SubsetStatuses, scheduleFailedPodMap = r.calculateWorkloadSpreadSubsetStatuses(ws, subsetPodMap, workloadReplicas) + // versioned subset statuses calculated by observed pods for version, podMap := range versionedPodMap { status.VersionedSubsetStatuses[version], _ = r.calculateWorkloadSpreadSubsetStatuses(ws, podMap, workloadReplicas) } + // to solve the problem that no pods can be observed caused by informer latency for some version. + for version := range ws.Status.VersionedSubsetStatuses { + if _, exist := versionedPodMap[version]; exist { + continue + } + versionSubsetStatues, _ := r.calculateWorkloadSpreadSubsetStatuses(ws, nil, workloadReplicas) + if !isEmptySubsetStatuses(versionSubsetStatues) { + status.VersionedSubsetStatuses[version] = versionSubsetStatues + } + } + return &status, scheduleFailedPodMap } +func isEmptySubsetStatuses(statues []appsv1alpha1.WorkloadSpreadSubsetStatus) bool { + replicas, creating, deleting := 0, 0, 0 + for _, subset := range statues { + replicas += int(subset.Replicas) + creating += len(subset.CreatingPods) + deleting += len(subset.DeletingPods) + } + return replicas+creating+deleting == 0 +} + func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *appsv1alpha1.WorkloadSpread, podMap map[string][]*corev1.Pod, workloadReplicas int32) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, map[string][]*corev1.Pod) { subsetStatuses := make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets)) @@ -708,7 +731,7 @@ func (r *ReconcileWorkloadSpread) UpdateWorkloadSpreadStatus(ws *appsv1alpha1.Wo status *appsv1alpha1.WorkloadSpreadStatus) error { if status.ObservedGeneration == ws.Status.ObservedGeneration && // status.ObservedWorkloadReplicas == ws.Status.ObservedWorkloadReplicas && - apiequality.Semantic.DeepEqual(status.SubsetStatuses, ws.Status.SubsetStatuses) { + apiequality.Semantic.DeepEqual(status, ws.Status) { return nil } diff --git a/pkg/controller/workloadspread/workloadspread_event_handler.go b/pkg/controller/workloadspread/workloadspread_event_handler.go index 8a3b6e115b..8a85f44872 100644 --- a/pkg/controller/workloadspread/workloadspread_event_handler.go +++ b/pkg/controller/workloadspread/workloadspread_event_handler.go @@ -19,10 +19,12 @@ package workloadspread import ( "context" "encoding/json" + "reflect" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -42,9 +44,10 @@ import ( type EventAction string const ( - CreateEventAction EventAction = "Create" - UpdateEventAction EventAction = "Update" - DeleteEventAction EventAction = "Delete" + CreateEventAction EventAction = "Create" + UpdateEventAction EventAction = "Update" + DeleteEventAction EventAction = "Delete" + DeploymentRevisionAnnotation = "deployment.kubernetes.io/revision" ) var _ handler.EventHandler = &podEventHandler{} @@ -59,7 +62,7 @@ func (p *podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimiting oldPod := evt.ObjectOld.(*corev1.Pod) newPod := evt.ObjectNew.(*corev1.Pod) - if kubecontroller.IsPodActive(oldPod) && !kubecontroller.IsPodActive(newPod) { + if kubecontroller.IsPodActive(oldPod) && !kubecontroller.IsPodActive(newPod) || wsutil.GetPodVersion(oldPod) != wsutil.GetPodVersion(newPod) { p.handlePod(q, newPod, UpdateEventAction) } } @@ -110,8 +113,11 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi otherChanges = newObject.Status.UpdateRevision != oldObject.Status.CurrentRevision gvk = controllerKruiseKindCS case *appsv1.Deployment: - oldReplicas = *evt.ObjectOld.(*appsv1.Deployment).Spec.Replicas - newReplicas = *evt.ObjectNew.(*appsv1.Deployment).Spec.Replicas + oldObject := evt.ObjectOld.(*appsv1.Deployment) + newObject := evt.ObjectNew.(*appsv1.Deployment) + oldReplicas = *oldObject.Spec.Replicas + newReplicas = *newObject.Spec.Replicas + otherChanges = newObject.Annotations[DeploymentRevisionAnnotation] != oldObject.Annotations[DeploymentRevisionAnnotation] gvk = controllerKindDep case *appsv1.ReplicaSet: oldReplicas = *evt.ObjectOld.(*appsv1.ReplicaSet).Spec.Replicas @@ -139,7 +145,8 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi Namespace: evt.ObjectNew.GetNamespace(), Name: evt.ObjectNew.GetName(), } - ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk) + owner := metav1.GetControllerOfNoCopy(evt.ObjectNew) + ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner) if err != nil { klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v", gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err) @@ -185,7 +192,8 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface, Namespace: obj.GetNamespace(), Name: obj.GetName(), } - ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk) + owner := metav1.GetControllerOfNoCopy(obj) + ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner) if err != nil { klog.Errorf("unable to get WorkloadSpread related with %s (%s/%s), err: %v", gvk.Kind, workloadNsn.Namespace, workloadNsn.Name, err) @@ -201,7 +209,7 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface, func (w *workloadEventHandler) getWorkloadSpreadForWorkload( workloadNamespaceName types.NamespacedName, - gvk schema.GroupVersionKind) (*appsv1alpha1.WorkloadSpread, error) { + gvk schema.GroupVersionKind, ownerRef *metav1.OwnerReference) (*appsv1alpha1.WorkloadSpread, error) { wsList := &appsv1alpha1.WorkloadSpreadList{} listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace} if err := w.List(context.TODO(), wsList, listOptions); err != nil { @@ -209,6 +217,17 @@ func (w *workloadEventHandler) getWorkloadSpreadForWorkload( return nil, err } + // In case of ReplicaSet owned by Deployment, we should consider if the + // Deployment is referred by workloadSpread. + var ownerKey *types.NamespacedName + var ownerGvk schema.GroupVersionKind + if ownerRef != nil && reflect.DeepEqual(gvk, controllerKindRS) { + ownerGvk = schema.FromAPIVersionAndKind(ownerRef.APIVersion, ownerRef.Kind) + if reflect.DeepEqual(ownerGvk, controllerKindDep) { + ownerKey = &types.NamespacedName{Namespace: workloadNamespaceName.Namespace, Name: ownerRef.Name} + } + } + for _, ws := range wsList.Items { if ws.DeletionTimestamp != nil { continue @@ -219,13 +238,12 @@ func (w *workloadEventHandler) getWorkloadSpreadForWorkload( continue } - targetGV, err := schema.ParseGroupVersion(targetRef.APIVersion) - if err != nil { - klog.Errorf("failed to parse targetRef's group version: %s", targetRef.APIVersion) - continue + // Ignore version + targetGk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind).GroupKind() + if reflect.DeepEqual(targetGk, gvk.GroupKind()) && targetRef.Name == workloadNamespaceName.Name { + return &ws, nil } - - if targetRef.Kind == gvk.Kind && targetGV.Group == gvk.Group && targetRef.Name == workloadNamespaceName.Name { + if ownerKey != nil && reflect.DeepEqual(targetGk, ownerGvk.GroupKind()) && targetRef.Name == ownerKey.Name { return &ws, nil } } diff --git a/pkg/controller/workloadspread/workloadspread_event_handler_test.go b/pkg/controller/workloadspread/workloadspread_event_handler_test.go index 66433ff651..7f722de760 100644 --- a/pkg/controller/workloadspread/workloadspread_event_handler_test.go +++ b/pkg/controller/workloadspread/workloadspread_event_handler_test.go @@ -380,7 +380,7 @@ func TestGetWorkloadSpreadForCloneSet(t *testing.T) { Name: cs.getCloneSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -506,7 +506,7 @@ func TestGetWorkloadSpreadForDeployment(t *testing.T) { Name: cs.getDeployment().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -608,7 +608,7 @@ func TestGetWorkloadSpreadForJob(t *testing.T) { Name: cs.getJob().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -734,7 +734,7 @@ func TestGetWorkloadSpreadForReplicaSet(t *testing.T) { Name: cs.getReplicaset().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -860,7 +860,7 @@ func TestGetWorkloadSpreadForStatefulSet(t *testing.T) { Name: cs.getStatefulSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -986,7 +986,7 @@ func TestGetWorkloadSpreadForAdvancedStatefulSet(t *testing.T) { Name: cs.getStatefulSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { diff --git a/pkg/util/workloadspread/workloadspread.go b/pkg/util/workloadspread/workloadspread.go index c7eb444a1a..9580906c9e 100644 --- a/pkg/util/workloadspread/workloadspread.go +++ b/pkg/util/workloadspread/workloadspread.go @@ -23,6 +23,7 @@ import ( "math" "regexp" "strconv" + "strings" "time" appsv1 "k8s.io/api/apps/v1" @@ -462,7 +463,7 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, var err error version := GetPodVersion(pod) subsetStatuses := ws.Status.VersionedSubsetStatuses[version] - if subsetStatuses == nil { + if len(subsetStatuses) == 0 { subsetStatuses, err = h.initializedSubsetStatuses(ws) if err != nil { return false, nil, "", err @@ -677,6 +678,9 @@ func (h *Handler) isReferenceEqual(target *appsv1alpha1.TargetReference, owner * rs := &appsv1.ReplicaSet{} err = h.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: owner.Name}, rs) if err != nil { + if errors.IsNotFound(err) { // to solve the problem of informer latency. + return replicaSetNameIsMatched(owner.Name, target.Name) + } return false } if rs.UID != owner.UID { @@ -736,6 +740,12 @@ func getSubsetCondition(ws *appsv1alpha1.WorkloadSpread, subsetName string, cond return nil } +// replicaSetNameIsMatched return if replicaset name (example: server-daily-xznclkds) +// matches deployment name (example: server-daily). +func replicaSetNameIsMatched(rsName, dName string) bool { + return rsName[:strings.LastIndex(rsName, "-")] == dName +} + func GetPodVersion(pod *corev1.Pod) string { if version, exists := pod.Labels[appsv1.DefaultDeploymentUniqueLabelKey]; exists { return version diff --git a/test/e2e/apps/workloadspread.go b/test/e2e/apps/workloadspread.go index b61ead8956..ec8a7d571a 100644 --- a/test/e2e/apps/workloadspread.go +++ b/test/e2e/apps/workloadspread.go @@ -58,6 +58,15 @@ var _ = SIGDescribe("workloadspread", func() { var ns string var tester *framework.WorkloadSpreadTester + IsKubernetesVersionLessThan122 := func() bool { + if v, err := c.Discovery().ServerVersion(); err != nil { + framework.Logf("Failed to discovery server version: %v", err) + } else if minor, err := strconv.Atoi(v.Minor); err != nil || minor < 22 { + return true + } + return false + } + ginkgo.BeforeEach(func() { ns = f.Namespace.Name c = f.ClientSet @@ -1712,6 +1721,9 @@ var _ = SIGDescribe("workloadspread", func() { //test k8s cluster version >= 1.21 ginkgo.It("elastic deploy for deployment, zone-a=2, zone-b=nil", func() { + if IsKubernetesVersionLessThan122() { + ginkgo.Skip("kip this e2e case, it can only run on K8s >= 1.22") + } deployment := tester.NewBaseDeployment(ns) // create workloadSpread targetRef := appsv1alpha1.TargetReference{ @@ -2018,6 +2030,5 @@ var _ = SIGDescribe("workloadspread", func() { // // ginkgo.By("workloadSpread for job, done") //}) - }) })