From 2a9af0cc92ff810b620176ee3dfd376f62091176 Mon Sep 17 00:00:00 2001 From: Bo Liu Date: Fri, 24 Jan 2025 17:26:41 +0800 Subject: [PATCH] feat(updater): defer scale in after rolling update is done (#6052) --- apis/core/v1alpha1/common_types.go | 19 ++++- pkg/client/alias.go | 4 + pkg/controllers/pdgroup/tasks/updater_test.go | 6 +- .../tidbgroup/tasks/updater_test.go | 6 +- .../tiflashgroup/tasks/updater_test.go | 6 +- .../tikvgroup/tasks/updater_test.go | 6 +- pkg/updater/actor.go | 52 +++++++++++- pkg/updater/builder.go | 10 ++- pkg/updater/builder_test.go | 4 +- pkg/updater/executor.go | 14 +++- pkg/updater/executor_test.go | 43 ++++++++-- tests/e2e/pd/pd.go | 22 ++--- tests/e2e/utils/waiter/pod.go | 84 +++++++++++++------ 13 files changed, 212 insertions(+), 64 deletions(-) diff --git a/apis/core/v1alpha1/common_types.go b/apis/core/v1alpha1/common_types.go index 2327817f4c..6f4c45b369 100644 --- a/apis/core/v1alpha1/common_types.go +++ b/apis/core/v1alpha1/common_types.go @@ -35,9 +35,15 @@ const ( const ( // Finalizer is the finalizer used by all resources managed by TiDB Operator. Finalizer = "core.pingcap.com/finalizer" +) +const ( + KeyPrefix = "pingcap.com/" +) + +const ( // LabelKeyPrefix defines key prefix of well known labels - LabelKeyPrefix = "pingcap.com/" + LabelKeyPrefix = KeyPrefix // LabelKeyManagedBy means resources are managed by tidb operator LabelKeyManagedBy = LabelKeyPrefix + "managed-by" @@ -83,6 +89,17 @@ const ( LabelKeyStoreID = "tidb.pingcap.com/store-id" ) +const ( + // AnnoKeyPrefix defines key prefix of well known annotations + AnnoKeyPrefix = KeyPrefix + + // all bool anno will use this val as default + AnnoValTrue = "true" + + // means the instance is marked as deleted and will be deleted later + AnnoKeyDeferDelete = AnnoKeyPrefix + "defer-delete" +) + const ( // NamePrefix for "names" in k8s resources // Users may overlay some fields in managed resource such as pods. Names with this diff --git a/pkg/client/alias.go b/pkg/client/alias.go index 9754410579..b7c26e3e04 100644 --- a/pkg/client/alias.go +++ b/pkg/client/alias.go @@ -44,3 +44,7 @@ var ObjectKeyFromObject = client.ObjectKeyFromObject var IgnoreNotFound = client.IgnoreNotFound type GracePeriodSeconds = client.GracePeriodSeconds + +type MergeFromOption = client.MergeFromOption + +var RawPatch = client.RawPatch diff --git a/pkg/controllers/pdgroup/tasks/updater_test.go b/pkg/controllers/pdgroup/tasks/updater_test.go index 172604667e..c962f72024 100644 --- a/pkg/controllers/pdgroup/tasks/updater_test.go +++ b/pkg/controllers/pdgroup/tasks/updater_test.go @@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedPDNum: 1, }, { @@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedPDNum: 2, }, { @@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedPDNum: 3, }, } diff --git a/pkg/controllers/tidbgroup/tasks/updater_test.go b/pkg/controllers/tidbgroup/tasks/updater_test.go index 20c8540049..2afb0f626e 100644 --- a/pkg/controllers/tidbgroup/tasks/updater_test.go +++ b/pkg/controllers/tidbgroup/tasks/updater_test.go @@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiDBNum: 1, }, { @@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiDBNum: 2, }, { @@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiDBNum: 3, }, } diff --git a/pkg/controllers/tiflashgroup/tasks/updater_test.go b/pkg/controllers/tiflashgroup/tasks/updater_test.go index cd0ea023df..3293e462ba 100644 --- a/pkg/controllers/tiflashgroup/tasks/updater_test.go +++ b/pkg/controllers/tiflashgroup/tasks/updater_test.go @@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiFlashNum: 1, }, { @@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiFlashNum: 2, }, { @@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiFlashNum: 3, }, } diff --git a/pkg/controllers/tikvgroup/tasks/updater_test.go b/pkg/controllers/tikvgroup/tasks/updater_test.go index c2b8a23198..3367ed1fbc 100644 --- a/pkg/controllers/tikvgroup/tasks/updater_test.go +++ b/pkg/controllers/tikvgroup/tasks/updater_test.go @@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiKVNum: 1, }, { @@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiKVNum: 2, }, { @@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SWait, expectedTiKVNum: 3, }, } diff --git a/pkg/updater/actor.go b/pkg/updater/actor.go index c04a128dac..50c7b0de15 100644 --- a/pkg/updater/actor.go +++ b/pkg/updater/actor.go @@ -16,8 +16,12 @@ package updater import ( "context" + "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/types" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" "github.com/pingcap/tidb-operator/pkg/runtime" ) @@ -41,6 +45,8 @@ type actor[T runtime.Tuple[O, R], O client.Object, R runtime.Instance] struct { update State[R] outdated State[R] + // deleted set records all instances that are marked by defer delete annotation + deleted State[R] addHooks []AddHook[R] updateHooks []UpdateHook[R] @@ -112,7 +118,7 @@ func (act *actor[T, O, R]) ScaleInOutdated(ctx context.Context) (bool, error) { obj := act.outdated.Del(name) isUnavailable := !obj.IsHealthy() || !obj.IsUpToDate() - if err := act.c.Delete(ctx, act.converter.To(obj)); err != nil { + if err := act.deferDelete(ctx, obj); err != nil { return false, err } @@ -123,6 +129,40 @@ func (act *actor[T, O, R]) ScaleInOutdated(ctx context.Context) (bool, error) { return isUnavailable, nil } +type Patch struct { + Metadata Metadata `json:"metadata"` +} + +type Metadata struct { + ResourceVersion string `json:"resourceVersion"` + Annotations map[string]string `json:"annotations"` +} + +func (act *actor[T, O, R]) deferDelete(ctx context.Context, obj R) error { + o := act.converter.To(obj) + p := Patch{ + Metadata: Metadata{ + ResourceVersion: o.GetResourceVersion(), + Annotations: map[string]string{ + v1alpha1.AnnoKeyDeferDelete: v1alpha1.AnnoValTrue, + }, + }, + } + + data, err := json.Marshal(&p) + if err != nil { + return fmt.Errorf("invaid patch: %w", err) + } + + if err := act.c.Patch(ctx, o, client.RawPatch(types.MergePatchType, data)); err != nil { + return fmt.Errorf("cannot mark obj %s/%s as defer delete: %w", obj.GetNamespace(), obj.GetName(), err) + } + + act.deleted.Add(obj) + + return nil +} + func (act *actor[T, O, R]) Update(ctx context.Context) error { name, err := act.chooseToUpdate(act.outdated.List()) if err != nil { @@ -143,3 +183,13 @@ func (act *actor[T, O, R]) Update(ctx context.Context) error { return nil } + +func (act *actor[T, O, R]) Cleanup(ctx context.Context) error { + for _, item := range act.deleted.List() { + if err := act.c.Delete(ctx, act.converter.To(item)); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/updater/builder.go b/pkg/updater/builder.go index f5aa74c0d7..d5282a2ef4 100644 --- a/pkg/updater/builder.go +++ b/pkg/updater/builder.go @@ -15,6 +15,7 @@ package updater import ( + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" "github.com/pingcap/tidb-operator/pkg/runtime" ) @@ -55,7 +56,7 @@ type builder[T runtime.Tuple[O, R], O client.Object, R runtime.Instance] struct } func (b *builder[T, O, R]) Build() Executor { - update, outdated := split(b.instances, b.rev) + update, outdated, deleted := split(b.instances, b.rev) updatePolicies := b.updatePreferPolicies updatePolicies = append(updatePolicies, PreferUnavailable[R]()) @@ -65,6 +66,7 @@ func (b *builder[T, O, R]) Build() Executor { update: NewState(update), outdated: NewState(outdated), + deleted: NewState(deleted), addHooks: b.addHooks, updateHooks: append(b.updateHooks, KeepName[R](), KeepTopology[R]()), @@ -141,7 +143,7 @@ func (b *builder[T, O, R]) WithUpdatePreferPolicy(ps ...PreferPolicy[R]) Builder return b } -func split[R runtime.Instance](all []R, rev string) (update, outdated []R) { +func split[R runtime.Instance](all []R, rev string) (update, outdated, deleted []R) { for _, instance := range all { // if instance is deleting, just ignore it // TODO(liubo02): make sure it's ok for PD @@ -150,12 +152,14 @@ func split[R runtime.Instance](all []R, rev string) (update, outdated []R) { } if instance.GetUpdateRevision() == rev { update = append(update, instance) + } else if _, ok := instance.GetAnnotations()[v1alpha1.AnnoKeyDeferDelete]; ok { + deleted = append(deleted, instance) } else { outdated = append(outdated, instance) } } - return update, outdated + return update, outdated, deleted } func countUnavailable[R runtime.Instance](all []R) int { diff --git a/pkg/updater/builder_test.go b/pkg/updater/builder_test.go index 9cedfb03be..69f0833c11 100644 --- a/pkg/updater/builder_test.go +++ b/pkg/updater/builder_test.go @@ -50,12 +50,12 @@ func TestBuilder(t *testing.T) { { desc: "scale out", desired: 3, - expectedWait: false, + expectedWait: true, }, { desc: "scale in", desired: 1, - expectedWait: false, + expectedWait: true, }, } diff --git a/pkg/updater/executor.go b/pkg/updater/executor.go index a4c0b993bb..f6b54de3c7 100644 --- a/pkg/updater/executor.go +++ b/pkg/updater/executor.go @@ -23,6 +23,9 @@ type Actor interface { Update(ctx context.Context) error ScaleInUpdate(ctx context.Context) (unavailable bool, _ error) ScaleInOutdated(ctx context.Context) (unavailable bool, _ error) + + // delete all instances marked as defer deletion + Cleanup(ctx context.Context) error } // TODO: return instance list after Do @@ -150,7 +153,7 @@ func (ex *executor) Do(ctx context.Context) (bool, error) { checkAvail = true } } else { - // ex.update + ex.outdated > ex.desired + min(ex.maxSurge, ex.outdated) and ex.update >= ex.desired + // ex.update + ex.outdated > ex.desired + min(ex.maxSurge, ex.outdated) and ex.update <= ex.desired // => ex.outdated > min(ex.maxSurge, ex.outdated) // => ex.outdated > 0 unavailable, err := ex.act.ScaleInOutdated(ctx) @@ -173,5 +176,14 @@ func (ex *executor) Do(ctx context.Context) (bool, error) { } } + if ex.unavailableUpdate > 0 { + // wait until update are all available + return true, nil + } + + if err := ex.act.Cleanup(ctx); err != nil { + return false, err + } + return false, nil } diff --git a/pkg/updater/executor_test.go b/pkg/updater/executor_test.go index 9a7bcb9833..ad70dcdb82 100644 --- a/pkg/updater/executor_test.go +++ b/pkg/updater/executor_test.go @@ -29,6 +29,7 @@ const ( actionUpdate actionScaleInUpdate actionScaleInOutdated + actionCleanup ) type FakeActor struct { @@ -74,6 +75,11 @@ func (a *FakeActor) Update(_ context.Context) error { return nil } +func (a *FakeActor) Cleanup(_ context.Context) error { + a.Actions = append(a.Actions, actionCleanup) + return nil +} + func TestExecutor(t *testing.T) { cases := []struct { desc string @@ -91,13 +97,15 @@ func TestExecutor(t *testing.T) { expectedWait bool }{ { - desc: "do nothing", - update: 3, - outdated: 0, - desired: 3, - maxSurge: 1, - maxUnavailable: 1, - expectedActions: nil, + desc: "do nothing", + update: 3, + outdated: 0, + desired: 3, + maxSurge: 1, + maxUnavailable: 1, + expectedActions: []action{ + actionCleanup, + }, }, { desc: "scale out from 0 with 0 maxSurge", @@ -111,6 +119,7 @@ func TestExecutor(t *testing.T) { actionScaleOut, actionScaleOut, }, + expectedWait: true, }, { desc: "scale out from 0 with 1 maxSurge", @@ -124,6 +133,7 @@ func TestExecutor(t *testing.T) { actionScaleOut, actionScaleOut, }, + expectedWait: true, }, { desc: "scale in to 0", @@ -136,6 +146,7 @@ func TestExecutor(t *testing.T) { actionScaleInUpdate, actionScaleInUpdate, actionScaleInUpdate, + actionCleanup, }, }, { @@ -172,6 +183,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionUpdate, }, + expectedWait: true, }, { desc: "rolling update with 1 maxSurge and 0 maxUnavailable(0)", @@ -218,6 +230,7 @@ func TestExecutor(t *testing.T) { maxUnavailable: 0, expectedActions: []action{ actionScaleInOutdated, + actionCleanup, }, }, { @@ -259,6 +272,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionScaleInOutdated, }, + expectedWait: true, }, { desc: "rolling update with 1 maxSurge and 1 maxUnavailable(1)", @@ -271,6 +285,7 @@ func TestExecutor(t *testing.T) { actionUpdate, actionScaleInOutdated, }, + expectedWait: true, }, { desc: "rolling update with 2 maxSurge and 0 maxUnavailable(0)", @@ -307,6 +322,7 @@ func TestExecutor(t *testing.T) { maxUnavailable: 0, expectedActions: []action{ actionScaleInOutdated, + actionCleanup, }, }, { @@ -356,6 +372,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionUpdate, }, + expectedWait: true, }, { desc: "scale out and rolling update at same time with 1 maxSurge and 0 maxUnavailable(0)", @@ -404,6 +421,7 @@ func TestExecutor(t *testing.T) { maxUnavailable: 0, expectedActions: []action{ actionScaleInOutdated, + actionCleanup, }, }, { @@ -442,6 +460,7 @@ func TestExecutor(t *testing.T) { maxUnavailable: 1, expectedActions: []action{ actionScaleInOutdated, + actionCleanup, }, }, { @@ -480,6 +499,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionUpdate, }, + expectedWait: true, }, { desc: "scale in and rolling update at same time with 1 maxSurge and 0 maxUnavailable(0)", @@ -527,6 +547,7 @@ func TestExecutor(t *testing.T) { maxUnavailable: 0, expectedActions: []action{ actionScaleInOutdated, + actionCleanup, }, }, { @@ -554,6 +575,7 @@ func TestExecutor(t *testing.T) { actionUpdate, actionScaleInOutdated, }, + expectedWait: true, }, { desc: "rolling update with all are unavailable(0)", @@ -568,6 +590,7 @@ func TestExecutor(t *testing.T) { actionUpdate, actionUpdate, }, + expectedWait: true, }, { desc: "scale in when all are unavailable(0)", @@ -581,6 +604,7 @@ func TestExecutor(t *testing.T) { actionScaleInUpdate, actionScaleInUpdate, }, + expectedWait: true, }, { desc: "complex case(0)", @@ -639,6 +663,7 @@ func TestExecutor(t *testing.T) { actionUpdate, actionUpdate, }, + expectedWait: true, }, { desc: "complex case(2-1)", @@ -678,6 +703,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionUpdate, }, + expectedWait: true, }, { desc: "complex case(3-1)", @@ -703,6 +729,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionUpdate, }, + expectedWait: true, }, { desc: "complex case(3-3)", @@ -728,6 +755,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionUpdate, }, + expectedWait: true, }, { desc: "complex case(3-5)", @@ -741,6 +769,7 @@ func TestExecutor(t *testing.T) { expectedActions: []action{ actionUpdate, }, + expectedWait: true, }, } diff --git a/tests/e2e/pd/pd.go b/tests/e2e/pd/pd.go index 35733bbe83..3f91729e60 100644 --- a/tests/e2e/pd/pd.go +++ b/tests/e2e/pd/pd.go @@ -59,8 +59,8 @@ var _ = ginkgo.Describe("PD", label.PD, func() { }) }) - ginkgo.Context("Scale", label.P0, label.Scale, func() { - ginkgo.It("support scale PD form 3 to 5", func(ctx context.Context) { + ginkgo.Context("Scale and Update", label.P0, func() { + ginkgo.It("support scale PD form 3 to 5", label.Scale, func(ctx context.Context) { pdg := data.NewPDGroup( f.Namespace.Name, data.WithReplicas[*runtime.PDGroup](3), @@ -78,7 +78,7 @@ var _ = ginkgo.Describe("PD", label.PD, func() { f.WaitForPDGroupReady(ctx, pdg) }) - ginkgo.It("support scale PD form 5 to 3", func(ctx context.Context) { + ginkgo.It("support scale PD form 5 to 3", label.Scale, func(ctx context.Context) { pdg := data.NewPDGroup( f.Namespace.Name, //nolint:mnd // easy for test @@ -96,10 +96,8 @@ var _ = ginkgo.Describe("PD", label.PD, func() { f.Must(f.Client.Patch(ctx, pdg, patch)) f.WaitForPDGroupReady(ctx, pdg) }) - }) - ginkgo.Context("Update", label.P0, label.Update, func() { - ginkgo.It("support rolling update PD by change config file with 3 replicas", func(ctx context.Context) { + ginkgo.It("support rolling update PD by change config file", label.Update, func(ctx context.Context) { pdg := data.NewPDGroup( f.Namespace.Name, data.WithReplicas[*runtime.PDGroup](3), @@ -117,7 +115,7 @@ var _ = ginkgo.Describe("PD", label.PD, func() { go func() { defer close(ch) defer ginkgo.GinkgoRecover() - f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), waiter.LongTaskTimeout)) + f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), 0, waiter.LongTaskTimeout)) }() changeTime := time.Now() @@ -129,10 +127,11 @@ var _ = ginkgo.Describe("PD", label.PD, func() { <-ch }) - ginkgo.It("support update PD by change config file with 1 replica", func(ctx context.Context) { + ginkgo.It("support scale PD form 5 to 3 and rolling update at same time", label.Scale, label.Update, func(ctx context.Context) { pdg := data.NewPDGroup( f.Namespace.Name, - data.WithReplicas[*runtime.PDGroup](1), + //nolint:mnd // easy for test + data.WithReplicas[*runtime.PDGroup](5), ) ginkgo.By("Create PDGroup") @@ -140,6 +139,7 @@ var _ = ginkgo.Describe("PD", label.PD, func() { f.WaitForPDGroupReady(ctx, pdg) patch := client.MergeFrom(pdg.DeepCopy()) + pdg.Spec.Replicas = ptr.To[int32](3) pdg.Spec.Template.Spec.Config = `log.level = 'warn'` nctx, cancel := context.WithCancel(ctx) @@ -147,11 +147,11 @@ var _ = ginkgo.Describe("PD", label.PD, func() { go func() { defer close(ch) defer ginkgo.GinkgoRecover() - f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), waiter.LongTaskTimeout)) + f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), -2, waiter.LongTaskTimeout)) }() changeTime := time.Now() - ginkgo.By("Change config of the PDGroup") + ginkgo.By("Change config and replicas of the PDGroup") f.Must(f.Client.Patch(ctx, pdg, patch)) f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromPDGroup(pdg), changeTime, waiter.LongTaskTimeout)) f.WaitForPDGroupReady(ctx, pdg) diff --git a/tests/e2e/utils/waiter/pod.go b/tests/e2e/utils/waiter/pod.go index 7a6a85d7dd..52486b6bd4 100644 --- a/tests/e2e/utils/waiter/pod.go +++ b/tests/e2e/utils/waiter/pod.go @@ -46,11 +46,66 @@ func WaitPodsRollingUpdateOnce[G runtime.Group]( ctx context.Context, c client.Client, g G, + // scale means scale out/in before/after rolling update + // k means scale out k instances and -k means scale in k instances + // 0 means only rolling update + scale int, timeout time.Duration, ) error { - podMap := map[string]podInfo{} ctx, cancel := watchtools.ContextWithOptionalTimeout(ctx, timeout) defer cancel() + + podMap, err := generatePodInfoMapByWatch(ctx, c, g) + if err != nil { + return err + } + + infos := []podInfo{} + for _, v := range podMap { + infos = append(infos, v) + } + sortPodInfos(infos) + detail := strings.Builder{} + for _, info := range infos { + if info.deletionTime.IsZero() { + detail.WriteString(fmt.Sprintf("%v(%v) created at %s\n", info.name, info.uid, info.creationTime)) + } else { + detail.WriteString(fmt.Sprintf("%v(%v) created at %s, deleted at %s\n", info.name, info.uid, info.creationTime, info.deletionTime)) + } + } + + if scale > 0 { + for i := range scale { + if !infos[i].deletionTime.IsZero() { + return fmt.Errorf("expect scale out %v pods before rolling update, detail:\n%v", scale, detail.String()) + } + } + + infos = infos[scale:] + } + if scale < 0 { + for i := range -scale { + if infos[len(infos)-1-i].deletionTime.IsZero() { + return fmt.Errorf("expect scale in %v pods after rolling update, detail:\n%v", scale, detail.String()) + } + } + infos = infos[:len(infos)+scale] + } + + if len(infos) != 2*int(g.Replicas()) { + return fmt.Errorf("expect %v pods info, now only %v, detail:\n%v", 2*g.Replicas(), len(infos), detail.String()) + } + for i := range g.Replicas() { + if infos[2*i].name != infos[2*i+1].name { + return fmt.Errorf("pod may be restarted at same time, detail:\n%v", detail.String()) + } + } + + return nil +} + +func generatePodInfoMapByWatch[G runtime.Group](ctx context.Context, c client.Client, g G) (map[string]podInfo, error) { + podMap := map[string]podInfo{} lw := newListWatch(ctx, c, g) _, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, func(event watch.Event) (bool, error) { pod, ok := event.Object.(*corev1.Pod) @@ -77,33 +132,10 @@ func WaitPodsRollingUpdateOnce[G runtime.Group]( }) if !wait.Interrupted(err) { - return fmt.Errorf("watch stopped unexpected: %w", err) + return nil, fmt.Errorf("watch stopped unexpected: %w", err) } - infos := []podInfo{} - for _, v := range podMap { - infos = append(infos, v) - } - sortPodInfos(infos) - detail := strings.Builder{} - for _, info := range infos { - if info.deletionTime.IsZero() { - detail.WriteString(fmt.Sprintf("%v(%v) created at %s\n", info.name, info.uid, info.creationTime)) - } else { - detail.WriteString(fmt.Sprintf("%v(%v) created at %s, deleted at %s\n", info.name, info.uid, info.creationTime, info.deletionTime)) - } - } - - if len(infos) != 2*int(g.Replicas()) { - return fmt.Errorf("expect %v pods info, now only %v, detail:\n%v", 2*g.Replicas(), len(infos), detail.String()) - } - for i := range g.Replicas() { - if infos[2*i].name != infos[2*i+1].name { - return fmt.Errorf("pod may be restarted at same time, detail:\n%v", detail.String()) - } - } - - return nil + return podMap, nil } func newListWatch[G runtime.Group](ctx context.Context, c client.Client, g G) cache.ListerWatcher {