Skip to content

Commit

Permalink
feat: add cluster/member/store ID for Pod and PVC (pingcap#6049)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Jan 23, 2025
1 parent a90bc31 commit c75ea80
Show file tree
Hide file tree
Showing 30 changed files with 390 additions and 198 deletions.
3 changes: 3 additions & 0 deletions apis/core/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ type ClusterStatus struct {
// +listMapKey=type
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`

// ID is the cluster id.
ID string `json:"id"`

// PD means url of the pd service, it's prepared for internal use
// e.g. https://pd:2379
PD string `json:"pd,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions apis/core/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ const (
// Instead, we choose to hash the user-specified config,
// and the worst case is that users expect a reboot but it doesn't happen.
LabelKeyConfigHash = LabelKeyPrefix + "config-hash"

// LabelKeyClusterID is the unique identifier of the cluster.
// This label is used for backward compatibility with TiDB Operator v1, so it has a different prefix.
LabelKeyClusterID = "tidb.pingcap.com/cluster-id"
// LabelKeyMemberID is the unique identifier of a PD member.
// This label is used for backward compatibility with TiDB Operator v1, so it has a different prefix.
LabelKeyMemberID = "tidb.pingcap.com/member-id"
// LabelKeyStoreID is the unique identifier of a TiKV or TiFlash store.
// This label is used for backward compatibility with TiDB Operator v1, so it has a different prefix.
LabelKeyStoreID = "tidb.pingcap.com/store-id"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func addIndexer(ctx context.Context, mgr ctrl.Manager) error {
}

func setupControllers(mgr ctrl.Manager, c client.Client, pdcm pdm.PDClientManager, vm volumes.Modifier) error {
if err := cluster.Setup(mgr, c); err != nil {
if err := cluster.Setup(mgr, c, pdcm); err != nil {
return fmt.Errorf("unable to create controller Cluster: %w", err)
}
if err := pdgroup.Setup(mgr, c, pdcm); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions manifests/crd/core.pingcap.com_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ spec:
x-kubernetes-list-map-keys:
- type
x-kubernetes-list-type: map
id:
description: ID is the cluster id.
type: string
observedGeneration:
description: |-
observedGeneration is the most recent generation observed for this Cluster. It corresponds to the
Expand All @@ -226,6 +229,8 @@ spec:
PD means url of the pd service, it's prepared for internal use
e.g. https://pd:2379
type: string
required:
- id
type: object
type: object
served: true
Expand Down
15 changes: 9 additions & 6 deletions pkg/controllers/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@ import (
"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/controllers/cluster/tasks"
pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd"
"github.com/pingcap/tidb-operator/pkg/utils/k8s"
"github.com/pingcap/tidb-operator/pkg/utils/task"
)

type Reconciler struct {
Logger logr.Logger
Client client.Client
Logger logr.Logger
Client client.Client
PDClientManager pdm.PDClientManager
}

func Setup(mgr manager.Manager, c client.Client) error {
func Setup(mgr manager.Manager, c client.Client, pdcm pdm.PDClientManager) error {
r := &Reconciler{
Logger: mgr.GetLogger().WithName("Cluster"),
Client: c,
Logger: mgr.GetLogger().WithName("Cluster"),
Client: c,
PDClientManager: pdcm,
}
return ctrl.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).
Watches(&v1alpha1.PDGroup{}, handler.EnqueueRequestsFromMapFunc(enqueueForGroup)).
Expand Down Expand Up @@ -85,7 +88,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
runner.AddTasks(
tasks.NewTaskContext(logger, r.Client),
tasks.NewTaskFinalizer(logger, r.Client),
tasks.NewTaskStatus(logger, r.Client),
tasks.NewTaskStatus(logger, r.Client, r.PDClientManager),
)

return runner.Run(rtx)
Expand Down
48 changes: 43 additions & 5 deletions pkg/controllers/cluster/tasks/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,34 @@
package tasks

import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd"
"github.com/pingcap/tidb-operator/pkg/utils/task"
)

type TaskStatus struct {
Logger logr.Logger
Client client.Client
Logger logr.Logger
Client client.Client
PDClientManager pdm.PDClientManager
}

func NewTaskStatus(logger logr.Logger, c client.Client) task.Task[ReconcileContext] {
func NewTaskStatus(logger logr.Logger, c client.Client, pdcm pdm.PDClientManager) task.Task[ReconcileContext] {
return &TaskStatus{
Logger: logger,
Client: c,
Logger: logger,
Client: c,
PDClientManager: pdcm,
}
}

Expand Down Expand Up @@ -67,13 +73,19 @@ func (t *TaskStatus) Sync(ctx task.Context[ReconcileContext]) task.Result {
}
needUpdate = t.syncComponentStatus(rtx) || needUpdate
needUpdate = t.syncConditions(rtx) || needUpdate
needUpdate = t.syncClusterID(ctx, rtx) || needUpdate

if needUpdate {
if err := t.Client.Status().Update(ctx, rtx.Cluster); err != nil {
return task.Fail().With(fmt.Sprintf("can't update cluster status: %v", err))
}
}

if rtx.Cluster.Status.ID == "" {
// no watch for this, so we need to retry
//nolint:mnd // only one usage
return task.Retry(5 * time.Second).With("cluster id is not set")
}
return task.Complete().With("updated status")
}

Expand Down Expand Up @@ -193,3 +205,29 @@ func (*TaskStatus) syncConditions(rtx *ReconcileContext) bool {
Message: suspendMessage,
}) || changed
}

func (t *TaskStatus) syncClusterID(ctx context.Context, rtx *ReconcileContext) bool {
if rtx.Cluster.Status.ID != "" {
// already synced, this will nerver change
return false
}

pdClient, ok := t.PDClientManager.Get(pdm.PrimaryKey(rtx.Cluster.Namespace, rtx.Cluster.Name))
if !ok {
t.Logger.Info("pd client is not registered")
return false // wait for next sync
}

cluster, err := pdClient.Underlay().GetCluster(ctx)
if err != nil {
t.Logger.Error(err, "failed to get cluster info")
return false
}

if cluster.Id == 0 {
return false
}

rtx.Cluster.Status.ID = strconv.FormatUint(cluster.Id, 10)
return true
}
52 changes: 51 additions & 1 deletion pkg/controllers/cluster/tasks/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@
package tasks

import (
"context"
"strconv"
"testing"

"github.com/go-logr/logr"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/pdapi/v1"
"github.com/pingcap/tidb-operator/pkg/timanager"
pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd"
"github.com/pingcap/tidb-operator/pkg/utils/fake"
"github.com/pingcap/tidb-operator/pkg/utils/task"
)
Expand All @@ -36,6 +44,7 @@ func TestStatusUpdater(t *testing.T) {
expected task.Result
components []v1alpha1.ComponentStatus
conditions []metav1.Condition
clusterID uint64
}{
{
desc: "creating cluster",
Expand All @@ -46,6 +55,7 @@ func TestStatusUpdater(t *testing.T) {
pdGroup: fake.FakeObj(
"pd-group",
func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup {
obj.Spec.Cluster.Name = "test"
obj.Spec.Replicas = new(int32)
*obj.Spec.Replicas = 3
return obj
Expand All @@ -72,6 +82,7 @@ func TestStatusUpdater(t *testing.T) {
Status: metav1.ConditionFalse,
},
},
clusterID: 123,
},
}

Expand All @@ -84,8 +95,14 @@ func TestStatusUpdater(t *testing.T) {
ctx.Cluster = c.cluster
ctx.PDGroup = c.pdGroup

m := newFakePDClientManager(tt, getCluster(ctx, &metapb.Cluster{
Id: c.clusterID,
}, nil))
m.Start(ctx)
require.NoError(tt, m.Register(ctx.PDGroup))

fc := client.NewFakeClient(c.cluster)
tk := NewTaskStatus(logr.Discard(), fc)
tk := NewTaskStatus(logr.Discard(), fc, m)
res := tk.Sync(ctx)
assert.Equal(tt, c.expected, res)
assert.Equal(tt, c.cluster.Generation, c.cluster.Status.ObservedGeneration)
Expand All @@ -99,6 +116,39 @@ func TestStatusUpdater(t *testing.T) {
})
}
assert.Equal(tt, c.conditions, conditions)
assert.Equal(tt, strconv.FormatUint(c.clusterID, 10), c.cluster.Status.ID)
})
}
}

func newFakePDClientManager(t *testing.T, acts ...action) pdm.PDClientManager {
return timanager.NewManagerBuilder[*v1alpha1.PDGroup, pdapi.PDClient, pdm.PDClient]().
WithNewUnderlayClientFunc(func(*v1alpha1.PDGroup) (pdapi.PDClient, error) {
return nil, nil
}).
WithNewClientFunc(func(string, pdapi.PDClient, timanager.SharedInformerFactory[pdapi.PDClient]) pdm.PDClient {
return NewFakePDClient(t, acts...)
}).
WithCacheKeysFunc(pdm.CacheKeys).
Build()
}

func NewFakePDClient(t *testing.T, acts ...action) pdm.PDClient {
ctrl := gomock.NewController(t)
pdc := pdm.NewMockPDClient(ctrl)
for _, act := range acts {
act(ctrl, pdc)
}

return pdc
}

type action func(ctrl *gomock.Controller, pdc *pdm.MockPDClient)

func getCluster(ctx context.Context, cluster *metapb.Cluster, err error) action {
return func(ctrl *gomock.Controller, pdc *pdm.MockPDClient) {
underlay := pdapi.NewMockPDClient(ctrl)
pdc.EXPECT().Underlay().Return(underlay).AnyTimes()
underlay.EXPECT().GetCluster(ctx).Return(cluster, err).AnyTimes()
}
}
6 changes: 4 additions & 2 deletions pkg/controllers/pd/tasks/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type ReconcileContext struct {
Initialized bool
Healthy bool

MemberID string
IsLeader bool
ClusterID string
MemberID string
IsLeader bool

// ConfigHash stores the hash of **user-specified** config (i.e.`.Spec.Config`),
// which will be used to determine whether the config has changed.
Expand Down Expand Up @@ -71,6 +72,7 @@ func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task
return task.Fail().With("cannot get member: %w", err)
}

state.ClusterID = m.ClusterID
state.MemberID = m.ID
state.IsLeader = m.IsLeader

Expand Down
10 changes: 7 additions & 3 deletions pkg/controllers/pd/tasks/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
func TaskPod(state *ReconcileContext, c client.Client) task.Task {
return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result {
logger := logr.FromContextOrDiscard(ctx)
expected := newPod(state.Cluster(), state.PD(), state.ConfigHash)
expected := newPod(state)
if state.Pod() == nil {
// We have to refresh cache of members to make sure a pd without pod is unhealthy.
// If the healthy info is out of date, the operator may mark this pd up-to-date unexpectedly
Expand Down Expand Up @@ -130,7 +130,9 @@ func preDeleteCheck(
return false, nil
}

func newPod(cluster *v1alpha1.Cluster, pd *v1alpha1.PD, configHash string) *corev1.Pod {
func newPod(state *ReconcileContext) *corev1.Pod {
cluster := state.Cluster()
pd := state.PD()
vols := []corev1.Volume{
{
Name: v1alpha1.VolumeNameConfig,
Expand Down Expand Up @@ -193,7 +195,9 @@ func newPod(cluster *v1alpha1.Cluster, pd *v1alpha1.PD, configHash string) *core
Name: pd.PodName(),
Labels: maputil.Merge(pd.Labels, map[string]string{
v1alpha1.LabelKeyInstance: pd.Name,
v1alpha1.LabelKeyConfigHash: configHash,
v1alpha1.LabelKeyConfigHash: state.ConfigHash,
v1alpha1.LabelKeyClusterID: state.ClusterID,
v1alpha1.LabelKeyMemberID: state.MemberID,
}, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentPD)),
Annotations: anno,
OwnerReferences: []metav1.OwnerReference{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/pd/tasks/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestTaskPod(t *testing.T) {
assert.Equal(tt, c.expectedPodIsTerminating, c.state.PodIsTerminating, c.desc)

if c.expectUpdatedPod {
expectedPod := newPod(c.state.Cluster(), c.state.PD(), c.state.ConfigHash)
expectedPod := newPod(c.state)
actual := c.state.Pod().DeepCopy()
actual.Kind = ""
actual.APIVersion = ""
Expand Down
16 changes: 10 additions & 6 deletions pkg/controllers/pd/tasks/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import (

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/utils/k8s"
maputil "github.com/pingcap/tidb-operator/pkg/utils/map"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
"github.com/pingcap/tidb-operator/pkg/volumes"
)

func TaskPVC(state common.PDState, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task {
func TaskPVC(state *ReconcileContext, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task {
return task.NameTaskFunc("PVC", func(ctx context.Context) task.Result {
pvcs := newPVCs(state.PD())
pvcs := newPVCs(state)
if wait, err := volumes.SyncPVCs(ctx, c, pvcs, vm, logger); err != nil {
return task.Fail().With("failed to sync pvcs: %v", err)
} else if wait {
Expand All @@ -44,7 +44,9 @@ func TaskPVC(state common.PDState, logger logr.Logger, c client.Client, vm volum
})
}

func newPVCs(pd *v1alpha1.PD) []*corev1.PersistentVolumeClaim {
func newPVCs(state *ReconcileContext) []*corev1.PersistentVolumeClaim {
cluster := state.Cluster()
pd := state.PD()
pvcs := make([]*corev1.PersistentVolumeClaim, 0, len(pd.Spec.Volumes))
for i := range pd.Spec.Volumes {
vol := &pd.Spec.Volumes[i]
Expand All @@ -53,8 +55,10 @@ func newPVCs(pd *v1alpha1.PD) []*corev1.PersistentVolumeClaim {
Name: PersistentVolumeClaimName(pd.PodName(), vol.Name),
Namespace: pd.Namespace,
Labels: maputil.Merge(pd.Labels, map[string]string{
v1alpha1.LabelKeyInstance: pd.Name,
}),
v1alpha1.LabelKeyInstance: pd.Name,
v1alpha1.LabelKeyClusterID: state.ClusterID,
v1alpha1.LabelKeyMemberID: state.MemberID,
}, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentPD)),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(pd, v1alpha1.SchemeGroupVersion.WithKind("PD")),
},
Expand Down
Loading

0 comments on commit c75ea80

Please sign in to comment.