Skip to content

Commit

Permalink
feat(updater): defer scale in after rolling update is done (#6052)
Browse files Browse the repository at this point in the history
liubog2008 authored Jan 24, 2025
1 parent 3d2a1b5 commit 2a9af0c
Showing 13 changed files with 212 additions and 64 deletions.
19 changes: 18 additions & 1 deletion apis/core/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
@@ -35,9 +35,15 @@ const (
const (
// Finalizer is the finalizer used by all resources managed by TiDB Operator.
Finalizer = "core.pingcap.com/finalizer"
)

const (
KeyPrefix = "pingcap.com/"
)

const (
// LabelKeyPrefix defines key prefix of well known labels
LabelKeyPrefix = "pingcap.com/"
LabelKeyPrefix = KeyPrefix

// LabelKeyManagedBy means resources are managed by tidb operator
LabelKeyManagedBy = LabelKeyPrefix + "managed-by"
@@ -83,6 +89,17 @@ const (
LabelKeyStoreID = "tidb.pingcap.com/store-id"
)

const (
// AnnoKeyPrefix defines key prefix of well known annotations
AnnoKeyPrefix = KeyPrefix

// all bool anno will use this val as default
AnnoValTrue = "true"

// means the instance is marked as deleted and will be deleted later
AnnoKeyDeferDelete = AnnoKeyPrefix + "defer-delete"
)

const (
// NamePrefix for "names" in k8s resources
// Users may overlay some fields in managed resource such as pods. Names with this
4 changes: 4 additions & 0 deletions pkg/client/alias.go
Original file line number Diff line number Diff line change
@@ -44,3 +44,7 @@ var ObjectKeyFromObject = client.ObjectKeyFromObject
var IgnoreNotFound = client.IgnoreNotFound

type GracePeriodSeconds = client.GracePeriodSeconds

type MergeFromOption = client.MergeFromOption

var RawPatch = client.RawPatch
6 changes: 3 additions & 3 deletions pkg/controllers/pdgroup/tasks/updater_test.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedPDNum: 1,
},
{
@@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedPDNum: 2,
},
{
@@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedPDNum: 3,
},
}
6 changes: 3 additions & 3 deletions pkg/controllers/tidbgroup/tasks/updater_test.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiDBNum: 1,
},
{
@@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiDBNum: 2,
},
{
@@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiDBNum: 3,
},
}
6 changes: 3 additions & 3 deletions pkg/controllers/tiflashgroup/tasks/updater_test.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiFlashNum: 1,
},
{
@@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiFlashNum: 2,
},
{
@@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiFlashNum: 3,
},
}
6 changes: 3 additions & 3 deletions pkg/controllers/tikvgroup/tasks/updater_test.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiKVNum: 1,
},
{
@@ -103,7 +103,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiKVNum: 2,
},
{
@@ -236,7 +236,7 @@ func TestTaskUpdater(t *testing.T) {
},
},

expectedStatus: task.SComplete,
expectedStatus: task.SWait,
expectedTiKVNum: 3,
},
}
52 changes: 51 additions & 1 deletion pkg/updater/actor.go
Original file line number Diff line number Diff line change
@@ -16,8 +16,12 @@ package updater

import (
"context"
"encoding/json"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/runtime"
)
@@ -41,6 +45,8 @@ type actor[T runtime.Tuple[O, R], O client.Object, R runtime.Instance] struct {

update State[R]
outdated State[R]
// deleted set records all instances that are marked by defer delete annotation
deleted State[R]

addHooks []AddHook[R]
updateHooks []UpdateHook[R]
@@ -112,7 +118,7 @@ func (act *actor[T, O, R]) ScaleInOutdated(ctx context.Context) (bool, error) {
obj := act.outdated.Del(name)
isUnavailable := !obj.IsHealthy() || !obj.IsUpToDate()

if err := act.c.Delete(ctx, act.converter.To(obj)); err != nil {
if err := act.deferDelete(ctx, obj); err != nil {
return false, err
}

@@ -123,6 +129,40 @@ func (act *actor[T, O, R]) ScaleInOutdated(ctx context.Context) (bool, error) {
return isUnavailable, nil
}

type Patch struct {
Metadata Metadata `json:"metadata"`
}

type Metadata struct {
ResourceVersion string `json:"resourceVersion"`
Annotations map[string]string `json:"annotations"`
}

func (act *actor[T, O, R]) deferDelete(ctx context.Context, obj R) error {
o := act.converter.To(obj)
p := Patch{
Metadata: Metadata{
ResourceVersion: o.GetResourceVersion(),
Annotations: map[string]string{
v1alpha1.AnnoKeyDeferDelete: v1alpha1.AnnoValTrue,
},
},
}

data, err := json.Marshal(&p)
if err != nil {
return fmt.Errorf("invaid patch: %w", err)
}

if err := act.c.Patch(ctx, o, client.RawPatch(types.MergePatchType, data)); err != nil {
return fmt.Errorf("cannot mark obj %s/%s as defer delete: %w", obj.GetNamespace(), obj.GetName(), err)
}

act.deleted.Add(obj)

return nil
}

func (act *actor[T, O, R]) Update(ctx context.Context) error {
name, err := act.chooseToUpdate(act.outdated.List())
if err != nil {
@@ -143,3 +183,13 @@ func (act *actor[T, O, R]) Update(ctx context.Context) error {

return nil
}

func (act *actor[T, O, R]) Cleanup(ctx context.Context) error {
for _, item := range act.deleted.List() {
if err := act.c.Delete(ctx, act.converter.To(item)); err != nil {
return err
}
}

return nil
}
10 changes: 7 additions & 3 deletions pkg/updater/builder.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
package updater

import (
"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/runtime"
)
@@ -55,7 +56,7 @@ type builder[T runtime.Tuple[O, R], O client.Object, R runtime.Instance] struct
}

func (b *builder[T, O, R]) Build() Executor {
update, outdated := split(b.instances, b.rev)
update, outdated, deleted := split(b.instances, b.rev)

updatePolicies := b.updatePreferPolicies
updatePolicies = append(updatePolicies, PreferUnavailable[R]())
@@ -65,6 +66,7 @@ func (b *builder[T, O, R]) Build() Executor {

update: NewState(update),
outdated: NewState(outdated),
deleted: NewState(deleted),

addHooks: b.addHooks,
updateHooks: append(b.updateHooks, KeepName[R](), KeepTopology[R]()),
@@ -141,7 +143,7 @@ func (b *builder[T, O, R]) WithUpdatePreferPolicy(ps ...PreferPolicy[R]) Builder
return b
}

func split[R runtime.Instance](all []R, rev string) (update, outdated []R) {
func split[R runtime.Instance](all []R, rev string) (update, outdated, deleted []R) {
for _, instance := range all {
// if instance is deleting, just ignore it
// TODO(liubo02): make sure it's ok for PD
@@ -150,12 +152,14 @@ func split[R runtime.Instance](all []R, rev string) (update, outdated []R) {
}
if instance.GetUpdateRevision() == rev {
update = append(update, instance)
} else if _, ok := instance.GetAnnotations()[v1alpha1.AnnoKeyDeferDelete]; ok {
deleted = append(deleted, instance)
} else {
outdated = append(outdated, instance)
}
}

return update, outdated
return update, outdated, deleted
}

func countUnavailable[R runtime.Instance](all []R) int {
4 changes: 2 additions & 2 deletions pkg/updater/builder_test.go
Original file line number Diff line number Diff line change
@@ -50,12 +50,12 @@ func TestBuilder(t *testing.T) {
{
desc: "scale out",
desired: 3,
expectedWait: false,
expectedWait: true,
},
{
desc: "scale in",
desired: 1,
expectedWait: false,
expectedWait: true,
},
}

14 changes: 13 additions & 1 deletion pkg/updater/executor.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,9 @@ type Actor interface {
Update(ctx context.Context) error
ScaleInUpdate(ctx context.Context) (unavailable bool, _ error)
ScaleInOutdated(ctx context.Context) (unavailable bool, _ error)

// delete all instances marked as defer deletion
Cleanup(ctx context.Context) error
}

// TODO: return instance list after Do
@@ -150,7 +153,7 @@ func (ex *executor) Do(ctx context.Context) (bool, error) {
checkAvail = true
}
} else {
// ex.update + ex.outdated > ex.desired + min(ex.maxSurge, ex.outdated) and ex.update >= ex.desired
// ex.update + ex.outdated > ex.desired + min(ex.maxSurge, ex.outdated) and ex.update <= ex.desired
// => ex.outdated > min(ex.maxSurge, ex.outdated)
// => ex.outdated > 0
unavailable, err := ex.act.ScaleInOutdated(ctx)
@@ -173,5 +176,14 @@ func (ex *executor) Do(ctx context.Context) (bool, error) {
}
}

if ex.unavailableUpdate > 0 {
// wait until update are all available
return true, nil
}

if err := ex.act.Cleanup(ctx); err != nil {
return false, err
}

return false, nil
}
43 changes: 36 additions & 7 deletions pkg/updater/executor_test.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ const (
actionUpdate
actionScaleInUpdate
actionScaleInOutdated
actionCleanup
)

type FakeActor struct {
@@ -74,6 +75,11 @@ func (a *FakeActor) Update(_ context.Context) error {
return nil
}

func (a *FakeActor) Cleanup(_ context.Context) error {
a.Actions = append(a.Actions, actionCleanup)
return nil
}

func TestExecutor(t *testing.T) {
cases := []struct {
desc string
@@ -91,13 +97,15 @@ func TestExecutor(t *testing.T) {
expectedWait bool
}{
{
desc: "do nothing",
update: 3,
outdated: 0,
desired: 3,
maxSurge: 1,
maxUnavailable: 1,
expectedActions: nil,
desc: "do nothing",
update: 3,
outdated: 0,
desired: 3,
maxSurge: 1,
maxUnavailable: 1,
expectedActions: []action{
actionCleanup,
},
},
{
desc: "scale out from 0 with 0 maxSurge",
@@ -111,6 +119,7 @@ func TestExecutor(t *testing.T) {
actionScaleOut,
actionScaleOut,
},
expectedWait: true,
},
{
desc: "scale out from 0 with 1 maxSurge",
@@ -124,6 +133,7 @@ func TestExecutor(t *testing.T) {
actionScaleOut,
actionScaleOut,
},
expectedWait: true,
},
{
desc: "scale in to 0",
@@ -136,6 +146,7 @@ func TestExecutor(t *testing.T) {
actionScaleInUpdate,
actionScaleInUpdate,
actionScaleInUpdate,
actionCleanup,
},
},
{
@@ -172,6 +183,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionUpdate,
},
expectedWait: true,
},
{
desc: "rolling update with 1 maxSurge and 0 maxUnavailable(0)",
@@ -218,6 +230,7 @@ func TestExecutor(t *testing.T) {
maxUnavailable: 0,
expectedActions: []action{
actionScaleInOutdated,
actionCleanup,
},
},
{
@@ -259,6 +272,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionScaleInOutdated,
},
expectedWait: true,
},
{
desc: "rolling update with 1 maxSurge and 1 maxUnavailable(1)",
@@ -271,6 +285,7 @@ func TestExecutor(t *testing.T) {
actionUpdate,
actionScaleInOutdated,
},
expectedWait: true,
},
{
desc: "rolling update with 2 maxSurge and 0 maxUnavailable(0)",
@@ -307,6 +322,7 @@ func TestExecutor(t *testing.T) {
maxUnavailable: 0,
expectedActions: []action{
actionScaleInOutdated,
actionCleanup,
},
},
{
@@ -356,6 +372,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionUpdate,
},
expectedWait: true,
},
{
desc: "scale out and rolling update at same time with 1 maxSurge and 0 maxUnavailable(0)",
@@ -404,6 +421,7 @@ func TestExecutor(t *testing.T) {
maxUnavailable: 0,
expectedActions: []action{
actionScaleInOutdated,
actionCleanup,
},
},
{
@@ -442,6 +460,7 @@ func TestExecutor(t *testing.T) {
maxUnavailable: 1,
expectedActions: []action{
actionScaleInOutdated,
actionCleanup,
},
},
{
@@ -480,6 +499,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionUpdate,
},
expectedWait: true,
},
{
desc: "scale in and rolling update at same time with 1 maxSurge and 0 maxUnavailable(0)",
@@ -527,6 +547,7 @@ func TestExecutor(t *testing.T) {
maxUnavailable: 0,
expectedActions: []action{
actionScaleInOutdated,
actionCleanup,
},
},
{
@@ -554,6 +575,7 @@ func TestExecutor(t *testing.T) {
actionUpdate,
actionScaleInOutdated,
},
expectedWait: true,
},
{
desc: "rolling update with all are unavailable(0)",
@@ -568,6 +590,7 @@ func TestExecutor(t *testing.T) {
actionUpdate,
actionUpdate,
},
expectedWait: true,
},
{
desc: "scale in when all are unavailable(0)",
@@ -581,6 +604,7 @@ func TestExecutor(t *testing.T) {
actionScaleInUpdate,
actionScaleInUpdate,
},
expectedWait: true,
},
{
desc: "complex case(0)",
@@ -639,6 +663,7 @@ func TestExecutor(t *testing.T) {
actionUpdate,
actionUpdate,
},
expectedWait: true,
},
{
desc: "complex case(2-1)",
@@ -678,6 +703,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionUpdate,
},
expectedWait: true,
},
{
desc: "complex case(3-1)",
@@ -703,6 +729,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionUpdate,
},
expectedWait: true,
},
{
desc: "complex case(3-3)",
@@ -728,6 +755,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionUpdate,
},
expectedWait: true,
},
{
desc: "complex case(3-5)",
@@ -741,6 +769,7 @@ func TestExecutor(t *testing.T) {
expectedActions: []action{
actionUpdate,
},
expectedWait: true,
},
}

22 changes: 11 additions & 11 deletions tests/e2e/pd/pd.go
Original file line number Diff line number Diff line change
@@ -59,8 +59,8 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
})
})

ginkgo.Context("Scale", label.P0, label.Scale, func() {
ginkgo.It("support scale PD form 3 to 5", func(ctx context.Context) {
ginkgo.Context("Scale and Update", label.P0, func() {
ginkgo.It("support scale PD form 3 to 5", label.Scale, func(ctx context.Context) {
pdg := data.NewPDGroup(
f.Namespace.Name,
data.WithReplicas[*runtime.PDGroup](3),
@@ -78,7 +78,7 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
f.WaitForPDGroupReady(ctx, pdg)
})

ginkgo.It("support scale PD form 5 to 3", func(ctx context.Context) {
ginkgo.It("support scale PD form 5 to 3", label.Scale, func(ctx context.Context) {
pdg := data.NewPDGroup(
f.Namespace.Name,
//nolint:mnd // easy for test
@@ -96,10 +96,8 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
f.Must(f.Client.Patch(ctx, pdg, patch))
f.WaitForPDGroupReady(ctx, pdg)
})
})

ginkgo.Context("Update", label.P0, label.Update, func() {
ginkgo.It("support rolling update PD by change config file with 3 replicas", func(ctx context.Context) {
ginkgo.It("support rolling update PD by change config file", label.Update, func(ctx context.Context) {
pdg := data.NewPDGroup(
f.Namespace.Name,
data.WithReplicas[*runtime.PDGroup](3),
@@ -117,7 +115,7 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
go func() {
defer close(ch)
defer ginkgo.GinkgoRecover()
f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), waiter.LongTaskTimeout))
f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), 0, waiter.LongTaskTimeout))
}()

changeTime := time.Now()
@@ -129,29 +127,31 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
<-ch
})

ginkgo.It("support update PD by change config file with 1 replica", func(ctx context.Context) {
ginkgo.It("support scale PD form 5 to 3 and rolling update at same time", label.Scale, label.Update, func(ctx context.Context) {
pdg := data.NewPDGroup(
f.Namespace.Name,
data.WithReplicas[*runtime.PDGroup](1),
//nolint:mnd // easy for test
data.WithReplicas[*runtime.PDGroup](5),
)

ginkgo.By("Create PDGroup")
f.Must(f.Client.Create(ctx, pdg))
f.WaitForPDGroupReady(ctx, pdg)

patch := client.MergeFrom(pdg.DeepCopy())
pdg.Spec.Replicas = ptr.To[int32](3)
pdg.Spec.Template.Spec.Config = `log.level = 'warn'`

nctx, cancel := context.WithCancel(ctx)
ch := make(chan struct{})
go func() {
defer close(ch)
defer ginkgo.GinkgoRecover()
f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), waiter.LongTaskTimeout))
f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromPDGroup(pdg), -2, waiter.LongTaskTimeout))
}()

changeTime := time.Now()
ginkgo.By("Change config of the PDGroup")
ginkgo.By("Change config and replicas of the PDGroup")
f.Must(f.Client.Patch(ctx, pdg, patch))
f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromPDGroup(pdg), changeTime, waiter.LongTaskTimeout))
f.WaitForPDGroupReady(ctx, pdg)
84 changes: 58 additions & 26 deletions tests/e2e/utils/waiter/pod.go
Original file line number Diff line number Diff line change
@@ -46,11 +46,66 @@ func WaitPodsRollingUpdateOnce[G runtime.Group](
ctx context.Context,
c client.Client,
g G,
// scale means scale out/in before/after rolling update
// k means scale out k instances and -k means scale in k instances
// 0 means only rolling update
scale int,
timeout time.Duration,
) error {
podMap := map[string]podInfo{}
ctx, cancel := watchtools.ContextWithOptionalTimeout(ctx, timeout)
defer cancel()

podMap, err := generatePodInfoMapByWatch(ctx, c, g)
if err != nil {
return err
}

infos := []podInfo{}
for _, v := range podMap {
infos = append(infos, v)
}
sortPodInfos(infos)
detail := strings.Builder{}
for _, info := range infos {
if info.deletionTime.IsZero() {
detail.WriteString(fmt.Sprintf("%v(%v) created at %s\n", info.name, info.uid, info.creationTime))
} else {
detail.WriteString(fmt.Sprintf("%v(%v) created at %s, deleted at %s\n", info.name, info.uid, info.creationTime, info.deletionTime))
}
}

if scale > 0 {
for i := range scale {
if !infos[i].deletionTime.IsZero() {
return fmt.Errorf("expect scale out %v pods before rolling update, detail:\n%v", scale, detail.String())
}
}

infos = infos[scale:]
}
if scale < 0 {
for i := range -scale {
if infos[len(infos)-1-i].deletionTime.IsZero() {
return fmt.Errorf("expect scale in %v pods after rolling update, detail:\n%v", scale, detail.String())
}
}
infos = infos[:len(infos)+scale]
}

if len(infos) != 2*int(g.Replicas()) {
return fmt.Errorf("expect %v pods info, now only %v, detail:\n%v", 2*g.Replicas(), len(infos), detail.String())
}
for i := range g.Replicas() {
if infos[2*i].name != infos[2*i+1].name {
return fmt.Errorf("pod may be restarted at same time, detail:\n%v", detail.String())
}
}

return nil
}

func generatePodInfoMapByWatch[G runtime.Group](ctx context.Context, c client.Client, g G) (map[string]podInfo, error) {
podMap := map[string]podInfo{}
lw := newListWatch(ctx, c, g)
_, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, func(event watch.Event) (bool, error) {
pod, ok := event.Object.(*corev1.Pod)
@@ -77,33 +132,10 @@ func WaitPodsRollingUpdateOnce[G runtime.Group](
})

if !wait.Interrupted(err) {
return fmt.Errorf("watch stopped unexpected: %w", err)
return nil, fmt.Errorf("watch stopped unexpected: %w", err)
}

infos := []podInfo{}
for _, v := range podMap {
infos = append(infos, v)
}
sortPodInfos(infos)
detail := strings.Builder{}
for _, info := range infos {
if info.deletionTime.IsZero() {
detail.WriteString(fmt.Sprintf("%v(%v) created at %s\n", info.name, info.uid, info.creationTime))
} else {
detail.WriteString(fmt.Sprintf("%v(%v) created at %s, deleted at %s\n", info.name, info.uid, info.creationTime, info.deletionTime))
}
}

if len(infos) != 2*int(g.Replicas()) {
return fmt.Errorf("expect %v pods info, now only %v, detail:\n%v", 2*g.Replicas(), len(infos), detail.String())
}
for i := range g.Replicas() {
if infos[2*i].name != infos[2*i+1].name {
return fmt.Errorf("pod may be restarted at same time, detail:\n%v", detail.String())
}
}

return nil
return podMap, nil
}

func newListWatch[G runtime.Group](ctx context.Context, c client.Client, g G) cache.ListerWatcher {

0 comments on commit 2a9af0c

Please sign in to comment.