Skip to content

Commit

Permalink
fix(updater): fix topology policy (#6054)
Browse files Browse the repository at this point in the history
* fix(updater): fix topology policy

Signed-off-by: liubo02 <[email protected]>

* fix lint

Signed-off-by: liubo02 <[email protected]>

* fix e2e

Signed-off-by: liubo02 <[email protected]>

---------

Signed-off-by: liubo02 <[email protected]>
  • Loading branch information
liubog2008 authored Jan 30, 2025
1 parent 7d238e5 commit 469992a
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 19 deletions.
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ linters-settings:
- '3'
ignored-functions:
- strings.SplitN
ignored-files:
- 'tests/e2e/cluster/.+\.go$'
- 'tests/e2e/pd/.+\.go$'
- 'tests/e2e/tidb/.+\.go$'
- 'tests/e2e/tikv/.+\.go$'
govet:
enable:
- nilness
Expand Down
9 changes: 9 additions & 0 deletions hack/lib/kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ networking:
ipFamily: dual
nodes:
- role: control-plane
- role: worker
labels:
zone: zone-a
- role: worker
labels:
zone: zone-b
- role: worker
labels:
zone: zone-c
EOF


Expand Down
2 changes: 1 addition & 1 deletion pkg/updater/policy/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewTopologyPolicy[R runtime.Instance](ts []v1alpha1.ScheduleTopology, rs ..
scheduler: s,
}
for _, r := range rs {
p.Add(r)
p.scheduler.Add(r.GetName(), r.GetTopology())
}
return p, nil
}
Expand Down
17 changes: 5 additions & 12 deletions tests/e2e/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,6 @@ var _ = Describe("TiDB Cluster", func() {
Eventually(func(g Gomega) {
podList, err = clientSet.CoreV1().Pods(tc.Namespace).List(ctx, metav1.ListOptions{})
g.Expect(err).To(BeNil())
//nolint:mnd // easy to understand
g.Expect(len(podList.Items)).To(Equal(4))
}).WithTimeout(time.Minute).WithPolling(createClusterPolling).Should(Succeed())

Expand Down Expand Up @@ -676,7 +675,6 @@ var _ = Describe("TiDB Cluster", func() {
var kvgGet v1alpha1.TiKVGroup
Expect(k8sClient.Get(ctx, client.ObjectKey{Namespace: tc.Namespace, Name: kvg.Name}, &kvgGet)).To(Succeed())
Expect(len(kvgGet.Spec.Template.Spec.Volumes)).To(Equal(1))
//nolint:mnd // easy to understand
kvgGet.Spec.Template.Spec.Volumes[0].Storage = data.StorageSizeGi2quantity(5)
Expect(k8sClient.Update(ctx, &kvgGet)).To(Succeed())

Expand All @@ -692,7 +690,6 @@ var _ = Describe("TiDB Cluster", func() {
pvcList, err := clientSet.CoreV1().PersistentVolumeClaims(tc.Namespace).List(ctx, listOpts)
Expect(err).To(BeNil())
Expect(len(pvcList.Items)).To(Equal(1))
//nolint:mnd // easy to understand
Expect(pvcList.Items[0].Status.Capacity.Storage()).To(Equal(data.StorageSizeGi2quantity(5)))
}).WithTimeout(createClusterTimeout).WithPolling(createClusterPolling).Should(Succeed())
})
Expand Down Expand Up @@ -908,7 +905,6 @@ var _ = Describe("TiDB Cluster", func() {
GinkgoWriter.Printf("%v(%v) created at %s, deleted at %s\n", info.name, info.uid, info.creationTime, info.deletionTime)
}
}
//nolint:mnd // easy to understand
Expect(len(infos)).To(Equal(6))
Expect(infos[0].name).To(Equal(infos[1].name))
Expect(infos[2].name).To(Equal(infos[3].name))
Expand Down Expand Up @@ -1127,7 +1123,6 @@ var _ = Describe("TiDB Cluster", func() {
GinkgoWriter.Printf("%v(%v) created at %s, deleted at %s\n", info.name, info.uid, info.creationTime, info.deletionTime)
}
}
//nolint:mnd // easy to understand
Expect(len(infos)).To(Equal(6))
Expect(infos[0].name).To(Equal(infos[1].name))
Expect(infos[2].name).To(Equal(infos[3].name))
Expand Down Expand Up @@ -1307,8 +1302,7 @@ var _ = Describe("TiDB Cluster", func() {
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
// TODO(liubo02): extract to a namer pkg
SecretName: groupName + "-" + componentName + "-cluster-secret",
//nolint:mnd // easy to understand
SecretName: groupName + "-" + componentName + "-cluster-secret",
DefaultMode: ptr.To(int32(420)),
},
},
Expand All @@ -1327,8 +1321,7 @@ var _ = Describe("TiDB Cluster", func() {
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
// TODO(liubo02): extract to a namer pkg
SecretName: dbg.Name + "-tidb-server-secret",
//nolint:mnd // easy to understand
SecretName: dbg.Name + "-tidb-server-secret",
DefaultMode: ptr.To(int32(420)),
},
},
Expand Down Expand Up @@ -1446,7 +1439,7 @@ location-labels = ["region", "zone", "host"]`
g.Expect(v).To(ContainSubstring(`zone=`))
g.Expect(v).To(ContainSubstring(`host=`))
}
}).WithTimeout(time.Minute).WithPolling(5 * time.Second).Should(Succeed()) //nolint:mnd // easy to understand
}).WithTimeout(time.Minute).WithPolling(5 * time.Second).Should(Succeed())
})

It("should enable readiness probe for PD, TiKV and TiFlash", func() {
Expand Down Expand Up @@ -1489,8 +1482,8 @@ location-labels = ["region", "zone", "host"]`
Expect(pod.Spec.Containers[0].ReadinessProbe.TimeoutSeconds).To(Equal(int32(1)))
Expect(pod.Spec.Containers[0].ReadinessProbe.FailureThreshold).To(Equal(int32(3)))
Expect(pod.Spec.Containers[0].ReadinessProbe.SuccessThreshold).To(Equal(int32(1)))
Expect(pod.Spec.Containers[0].ReadinessProbe.InitialDelaySeconds).To(Equal(int32(10))) //nolint:mnd // default value in Operator
Expect(pod.Spec.Containers[0].ReadinessProbe.PeriodSeconds).To(Equal(int32(10))) //nolint:mnd // easy to understand
Expect(pod.Spec.Containers[0].ReadinessProbe.InitialDelaySeconds).To(Equal(int32(10)))
Expect(pod.Spec.Containers[0].ReadinessProbe.PeriodSeconds).To(Equal(int32(10)))
Expect(pod.Spec.Containers[0].ReadinessProbe.TCPSocket).NotTo(BeNil())
}
}
Expand Down
27 changes: 27 additions & 0 deletions tests/e2e/data/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,30 @@ func NewTiKVGroup(ns string, patches ...GroupPatch[*runtime.TiKVGroup]) *v1alpha

return runtime.ToTiKVGroup(kvg)
}

func WithEvenlySpreadPolicy() GroupPatch[*runtime.TiKVGroup] {
return func(obj *runtime.TiKVGroup) {
obj.Spec.SchedulePolicies = append(obj.Spec.SchedulePolicies, v1alpha1.SchedulePolicy{
Type: v1alpha1.SchedulePolicyTypeEvenlySpread,
EvenlySpread: &v1alpha1.SchedulePolicyEvenlySpread{
Topologies: []v1alpha1.ScheduleTopology{
{
Topology: v1alpha1.Topology{
"zone": "zone-a",
},
},
{
Topology: v1alpha1.Topology{
"zone": "zone-b",
},
},
{
Topology: v1alpha1.Topology{
"zone": "zone-c",
},
},
},
},
})
}
}
1 change: 1 addition & 0 deletions tests/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
_ "github.com/pingcap/tidb-operator/tests/e2e/cluster"
_ "github.com/pingcap/tidb-operator/tests/e2e/pd"
_ "github.com/pingcap/tidb-operator/tests/e2e/tidb"
_ "github.com/pingcap/tidb-operator/tests/e2e/tikv"
)

func TestE2E(t *testing.T) {
Expand Down
55 changes: 55 additions & 0 deletions tests/e2e/framework/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ package framework

import (
"context"
"math"
"strings"

"github.com/onsi/ginkgo/v2"

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/utils/topology"
"github.com/pingcap/tidb-operator/tests/e2e/utils/waiter"
)

Expand All @@ -30,3 +34,54 @@ func (f *Framework) WaitForTiKVGroupReady(ctx context.Context, kvg *v1alpha1.TiK
f.Must(waiter.WaitForTiKVsHealthy(ctx, f.Client, kvg, waiter.LongTaskTimeout))
f.Must(waiter.WaitForPodsReady(ctx, f.Client, runtime.FromTiKVGroup(kvg), waiter.LongTaskTimeout))
}

func (f *Framework) MustEvenlySpreadTiKV(ctx context.Context, kvg *v1alpha1.TiKVGroup) {
list := v1alpha1.TiKVList{}
f.Must(f.Client.List(ctx, &list, client.InNamespace(kvg.GetNamespace()), client.MatchingLabels{
v1alpha1.LabelKeyCluster: kvg.Spec.Cluster.Name,
v1alpha1.LabelKeyGroup: kvg.GetName(),
v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiKV,
}))

encoder := topology.NewEncoder()
topo := map[string]int{}

detail := strings.Builder{}
for i := range list.Items {
item := &list.Items[i]

key := encoder.Encode(item.Spec.Topology)
val, ok := topo[key]
if !ok {
val = 0
}
val += 1
topo[key] = val

detail.WriteString(item.Name)
detail.WriteString(":\n")
for k, v := range item.Spec.Topology {
detail.WriteString(" ")
detail.WriteString(k)
detail.WriteString(":")
detail.WriteString(v)
detail.WriteString(":\n")
}
}

minimum, maximum := math.MaxInt, 0
for _, val := range topo {
if val < minimum {
minimum = val
}
if val > maximum {
maximum = val
}
}

if maximum-minimum > 1 {
ginkgo.AddReportEntry("TopologyInfo", detail.String())
}

f.True(maximum-minimum <= 1)
}
3 changes: 3 additions & 0 deletions tests/e2e/label/well_known.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ var (
Update = ginkgo.Label("op:Update")
Scale = ginkgo.Label("op:Scale")
Suspend = ginkgo.Label("op:Suspend")

// Env
MultipleAZ = ginkgo.Label("env:MultipleAZ")
)
4 changes: 1 addition & 3 deletions tests/e2e/pd/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
f.WaitForPDGroupReady(ctx, pdg)

patch := client.MergeFrom(pdg.DeepCopy())
pdg.Spec.Replicas = ptr.To[int32](5) //nolint:mnd // easy for test
pdg.Spec.Replicas = ptr.To[int32](5)

ginkgo.By("Change replica of the PDGroup")
f.Must(f.Client.Patch(ctx, pdg, patch))
Expand All @@ -81,7 +81,6 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
ginkgo.It("support scale PD form 5 to 3", label.Scale, func(ctx context.Context) {
pdg := data.NewPDGroup(
f.Namespace.Name,
//nolint:mnd // easy for test
data.WithReplicas[*runtime.PDGroup](5),
)

Expand Down Expand Up @@ -130,7 +129,6 @@ var _ = ginkgo.Describe("PD", label.PD, func() {
ginkgo.It("support scale PD form 5 to 3 and rolling update at same time", label.Scale, label.Update, func(ctx context.Context) {
pdg := data.NewPDGroup(
f.Namespace.Name,
//nolint:mnd // easy for test
data.WithReplicas[*runtime.PDGroup](5),
)

Expand Down
15 changes: 15 additions & 0 deletions tests/e2e/tikv/tikv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv
89 changes: 89 additions & 0 deletions tests/e2e/tikv/topology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"
"time"

"github.com/onsi/ginkgo/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/tests/e2e/data"
"github.com/pingcap/tidb-operator/tests/e2e/framework"
"github.com/pingcap/tidb-operator/tests/e2e/label"
"github.com/pingcap/tidb-operator/tests/e2e/utils/waiter"
)

var _ = ginkgo.Describe("Topology", label.TiKV, label.MultipleAZ, label.P0, func() {
f := framework.New()
f.Setup()
f.SetupCluster()

ginkgo.It("Create tikv evenly spread in multiple azs", func(ctx context.Context) {
ginkgo.By("Creating cluster")
pdg := f.MustCreatePD(ctx)
kvg := f.MustCreateTiKV(ctx,
data.WithReplicas[*runtime.TiKVGroup](6),
data.WithEvenlySpreadPolicy(),
)

f.WaitForPDGroupReady(ctx, pdg)
f.WaitForTiKVGroupReady(ctx, kvg)

f.MustEvenlySpreadTiKV(ctx, kvg)
})

ginkgo.It("support scale from 3 to 6 and rolling update at same time", label.Scale, label.Update, func(ctx context.Context) {
ginkgo.By("Creating cluster")
pdg := f.MustCreatePD(ctx)
kvg := f.MustCreateTiKV(ctx,
data.WithReplicas[*runtime.TiKVGroup](3),
data.WithEvenlySpreadPolicy(),
)

f.WaitForPDGroupReady(ctx, pdg)
f.WaitForTiKVGroupReady(ctx, kvg)

f.MustEvenlySpreadTiKV(ctx, kvg)

patch := client.MergeFrom(kvg.DeepCopy())
kvg.Spec.Replicas = ptr.To[int32](6)
kvg.Spec.Template.Spec.Config = `log.level = 'warn'`

nctx, cancel := context.WithCancel(ctx)
ch := make(chan struct{})
go func() {
defer close(ch)
defer ginkgo.GinkgoRecover()
f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromTiKVGroup(kvg), 3, waiter.LongTaskTimeout))
}()

maxTime, err := waiter.MaxPodsCreateTimestamp(ctx, f.Client, runtime.FromTiKVGroup(kvg))
f.Must(err)
changeTime := maxTime.Add(time.Second)

ginkgo.By("Change config and replicas of the TiKVGroup")
f.Must(f.Client.Patch(ctx, kvg, patch))
f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromTiKVGroup(kvg), changeTime, waiter.LongTaskTimeout))
f.WaitForTiKVGroupReady(ctx, kvg)
cancel()
<-ch

f.MustEvenlySpreadTiKV(ctx, kvg)
})
})
Loading

0 comments on commit 469992a

Please sign in to comment.