diff --git a/apis/core/v1alpha1/cluster_types.go b/apis/core/v1alpha1/cluster_types.go index ee1a1c9e1e..1caa40d258 100644 --- a/apis/core/v1alpha1/cluster_types.go +++ b/apis/core/v1alpha1/cluster_types.go @@ -155,6 +155,9 @@ type ClusterStatus struct { // +listMapKey=type Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"` + // ID is the cluster id. + ID string `json:"id"` + // PD means url of the pd service, it's prepared for internal use // e.g. https://pd:2379 PD string `json:"pd,omitempty"` diff --git a/apis/core/v1alpha1/common_types.go b/apis/core/v1alpha1/common_types.go index 2f2144b379..2327817f4c 100644 --- a/apis/core/v1alpha1/common_types.go +++ b/apis/core/v1alpha1/common_types.go @@ -71,6 +71,16 @@ const ( // Instead, we choose to hash the user-specified config, // and the worst case is that users expect a reboot but it doesn't happen. LabelKeyConfigHash = LabelKeyPrefix + "config-hash" + + // LabelKeyClusterID is the unique identifier of the cluster. + // This label is used for backward compatibility with TiDB Operator v1, so it has a different prefix. + LabelKeyClusterID = "tidb.pingcap.com/cluster-id" + // LabelKeyMemberID is the unique identifier of a PD member. + // This label is used for backward compatibility with TiDB Operator v1, so it has a different prefix. + LabelKeyMemberID = "tidb.pingcap.com/member-id" + // LabelKeyStoreID is the unique identifier of a TiKV or TiFlash store. + // This label is used for backward compatibility with TiDB Operator v1, so it has a different prefix. + LabelKeyStoreID = "tidb.pingcap.com/store-id" ) const ( diff --git a/cmd/operator/main.go b/cmd/operator/main.go index c6c4dcfee9..a42f00efef 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -209,7 +209,7 @@ func addIndexer(ctx context.Context, mgr ctrl.Manager) error { } func setupControllers(mgr ctrl.Manager, c client.Client, pdcm pdm.PDClientManager, vm volumes.Modifier) error { - if err := cluster.Setup(mgr, c); err != nil { + if err := cluster.Setup(mgr, c, pdcm); err != nil { return fmt.Errorf("unable to create controller Cluster: %w", err) } if err := pdgroup.Setup(mgr, c, pdcm); err != nil { diff --git a/manifests/crd/core.pingcap.com_clusters.yaml b/manifests/crd/core.pingcap.com_clusters.yaml index 246cb953c2..359c18339a 100644 --- a/manifests/crd/core.pingcap.com_clusters.yaml +++ b/manifests/crd/core.pingcap.com_clusters.yaml @@ -215,6 +215,9 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + id: + description: ID is the cluster id. + type: string observedGeneration: description: |- observedGeneration is the most recent generation observed for this Cluster. It corresponds to the @@ -226,6 +229,8 @@ spec: PD means url of the pd service, it's prepared for internal use e.g. https://pd:2379 type: string + required: + - id type: object type: object served: true diff --git a/pkg/controllers/cluster/controller.go b/pkg/controllers/cluster/controller.go index f7b18c74ac..72d0ddf56d 100644 --- a/pkg/controllers/cluster/controller.go +++ b/pkg/controllers/cluster/controller.go @@ -28,19 +28,22 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" "github.com/pingcap/tidb-operator/pkg/controllers/cluster/tasks" + pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/k8s" "github.com/pingcap/tidb-operator/pkg/utils/task" ) type Reconciler struct { - Logger logr.Logger - Client client.Client + Logger logr.Logger + Client client.Client + PDClientManager pdm.PDClientManager } -func Setup(mgr manager.Manager, c client.Client) error { +func Setup(mgr manager.Manager, c client.Client, pdcm pdm.PDClientManager) error { r := &Reconciler{ - Logger: mgr.GetLogger().WithName("Cluster"), - Client: c, + Logger: mgr.GetLogger().WithName("Cluster"), + Client: c, + PDClientManager: pdcm, } return ctrl.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}). Watches(&v1alpha1.PDGroup{}, handler.EnqueueRequestsFromMapFunc(enqueueForGroup)). @@ -85,7 +88,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu runner.AddTasks( tasks.NewTaskContext(logger, r.Client), tasks.NewTaskFinalizer(logger, r.Client), - tasks.NewTaskStatus(logger, r.Client), + tasks.NewTaskStatus(logger, r.Client, r.PDClientManager), ) return runner.Run(rtx) diff --git a/pkg/controllers/cluster/tasks/status.go b/pkg/controllers/cluster/tasks/status.go index c6dd23d029..b670142ad4 100644 --- a/pkg/controllers/cluster/tasks/status.go +++ b/pkg/controllers/cluster/tasks/status.go @@ -15,9 +15,12 @@ package tasks import ( + "context" "fmt" "reflect" "sort" + "strconv" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" @@ -25,18 +28,21 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" + pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/task" ) type TaskStatus struct { - Logger logr.Logger - Client client.Client + Logger logr.Logger + Client client.Client + PDClientManager pdm.PDClientManager } -func NewTaskStatus(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { +func NewTaskStatus(logger logr.Logger, c client.Client, pdcm pdm.PDClientManager) task.Task[ReconcileContext] { return &TaskStatus{ - Logger: logger, - Client: c, + Logger: logger, + Client: c, + PDClientManager: pdcm, } } @@ -67,6 +73,7 @@ func (t *TaskStatus) Sync(ctx task.Context[ReconcileContext]) task.Result { } needUpdate = t.syncComponentStatus(rtx) || needUpdate needUpdate = t.syncConditions(rtx) || needUpdate + needUpdate = t.syncClusterID(ctx, rtx) || needUpdate if needUpdate { if err := t.Client.Status().Update(ctx, rtx.Cluster); err != nil { @@ -74,6 +81,11 @@ func (t *TaskStatus) Sync(ctx task.Context[ReconcileContext]) task.Result { } } + if rtx.Cluster.Status.ID == "" { + // no watch for this, so we need to retry + //nolint:mnd // only one usage + return task.Retry(5 * time.Second).With("cluster id is not set") + } return task.Complete().With("updated status") } @@ -193,3 +205,29 @@ func (*TaskStatus) syncConditions(rtx *ReconcileContext) bool { Message: suspendMessage, }) || changed } + +func (t *TaskStatus) syncClusterID(ctx context.Context, rtx *ReconcileContext) bool { + if rtx.Cluster.Status.ID != "" { + // already synced, this will nerver change + return false + } + + pdClient, ok := t.PDClientManager.Get(pdm.PrimaryKey(rtx.Cluster.Namespace, rtx.Cluster.Name)) + if !ok { + t.Logger.Info("pd client is not registered") + return false // wait for next sync + } + + cluster, err := pdClient.Underlay().GetCluster(ctx) + if err != nil { + t.Logger.Error(err, "failed to get cluster info") + return false + } + + if cluster.Id == 0 { + return false + } + + rtx.Cluster.Status.ID = strconv.FormatUint(cluster.Id, 10) + return true +} diff --git a/pkg/controllers/cluster/tasks/status_test.go b/pkg/controllers/cluster/tasks/status_test.go index 983c526825..d5203dc6e2 100644 --- a/pkg/controllers/cluster/tasks/status_test.go +++ b/pkg/controllers/cluster/tasks/status_test.go @@ -15,15 +15,23 @@ package tasks import ( + "context" + "strconv" "testing" "github.com/go-logr/logr" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/pdapi/v1" + "github.com/pingcap/tidb-operator/pkg/timanager" + pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/fake" "github.com/pingcap/tidb-operator/pkg/utils/task" ) @@ -36,6 +44,7 @@ func TestStatusUpdater(t *testing.T) { expected task.Result components []v1alpha1.ComponentStatus conditions []metav1.Condition + clusterID uint64 }{ { desc: "creating cluster", @@ -46,6 +55,7 @@ func TestStatusUpdater(t *testing.T) { pdGroup: fake.FakeObj( "pd-group", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = "test" obj.Spec.Replicas = new(int32) *obj.Spec.Replicas = 3 return obj @@ -72,6 +82,7 @@ func TestStatusUpdater(t *testing.T) { Status: metav1.ConditionFalse, }, }, + clusterID: 123, }, } @@ -84,8 +95,14 @@ func TestStatusUpdater(t *testing.T) { ctx.Cluster = c.cluster ctx.PDGroup = c.pdGroup + m := newFakePDClientManager(tt, getCluster(ctx, &metapb.Cluster{ + Id: c.clusterID, + }, nil)) + m.Start(ctx) + require.NoError(tt, m.Register(ctx.PDGroup)) + fc := client.NewFakeClient(c.cluster) - tk := NewTaskStatus(logr.Discard(), fc) + tk := NewTaskStatus(logr.Discard(), fc, m) res := tk.Sync(ctx) assert.Equal(tt, c.expected, res) assert.Equal(tt, c.cluster.Generation, c.cluster.Status.ObservedGeneration) @@ -99,6 +116,39 @@ func TestStatusUpdater(t *testing.T) { }) } assert.Equal(tt, c.conditions, conditions) + assert.Equal(tt, strconv.FormatUint(c.clusterID, 10), c.cluster.Status.ID) }) } } + +func newFakePDClientManager(t *testing.T, acts ...action) pdm.PDClientManager { + return timanager.NewManagerBuilder[*v1alpha1.PDGroup, pdapi.PDClient, pdm.PDClient](). + WithNewUnderlayClientFunc(func(*v1alpha1.PDGroup) (pdapi.PDClient, error) { + return nil, nil + }). + WithNewClientFunc(func(string, pdapi.PDClient, timanager.SharedInformerFactory[pdapi.PDClient]) pdm.PDClient { + return NewFakePDClient(t, acts...) + }). + WithCacheKeysFunc(pdm.CacheKeys). + Build() +} + +func NewFakePDClient(t *testing.T, acts ...action) pdm.PDClient { + ctrl := gomock.NewController(t) + pdc := pdm.NewMockPDClient(ctrl) + for _, act := range acts { + act(ctrl, pdc) + } + + return pdc +} + +type action func(ctrl *gomock.Controller, pdc *pdm.MockPDClient) + +func getCluster(ctx context.Context, cluster *metapb.Cluster, err error) action { + return func(ctrl *gomock.Controller, pdc *pdm.MockPDClient) { + underlay := pdapi.NewMockPDClient(ctrl) + pdc.EXPECT().Underlay().Return(underlay).AnyTimes() + underlay.EXPECT().GetCluster(ctx).Return(cluster, err).AnyTimes() + } +} diff --git a/pkg/controllers/pd/tasks/ctx.go b/pkg/controllers/pd/tasks/ctx.go index 015361cd63..bf6431323e 100644 --- a/pkg/controllers/pd/tasks/ctx.go +++ b/pkg/controllers/pd/tasks/ctx.go @@ -34,8 +34,9 @@ type ReconcileContext struct { Initialized bool Healthy bool - MemberID string - IsLeader bool + ClusterID string + MemberID string + IsLeader bool // ConfigHash stores the hash of **user-specified** config (i.e.`.Spec.Config`), // which will be used to determine whether the config has changed. @@ -71,6 +72,7 @@ func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task return task.Fail().With("cannot get member: %w", err) } + state.ClusterID = m.ClusterID state.MemberID = m.ID state.IsLeader = m.IsLeader diff --git a/pkg/controllers/pd/tasks/pod.go b/pkg/controllers/pd/tasks/pod.go index 887bc9fc1c..cf93fbe85d 100644 --- a/pkg/controllers/pd/tasks/pod.go +++ b/pkg/controllers/pd/tasks/pod.go @@ -44,7 +44,7 @@ const ( func TaskPod(state *ReconcileContext, c client.Client) task.Task { return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result { logger := logr.FromContextOrDiscard(ctx) - expected := newPod(state.Cluster(), state.PD(), state.ConfigHash) + expected := newPod(state) if state.Pod() == nil { // We have to refresh cache of members to make sure a pd without pod is unhealthy. // If the healthy info is out of date, the operator may mark this pd up-to-date unexpectedly @@ -130,7 +130,9 @@ func preDeleteCheck( return false, nil } -func newPod(cluster *v1alpha1.Cluster, pd *v1alpha1.PD, configHash string) *corev1.Pod { +func newPod(state *ReconcileContext) *corev1.Pod { + cluster := state.Cluster() + pd := state.PD() vols := []corev1.Volume{ { Name: v1alpha1.VolumeNameConfig, @@ -193,7 +195,9 @@ func newPod(cluster *v1alpha1.Cluster, pd *v1alpha1.PD, configHash string) *core Name: pd.PodName(), Labels: maputil.Merge(pd.Labels, map[string]string{ v1alpha1.LabelKeyInstance: pd.Name, - v1alpha1.LabelKeyConfigHash: configHash, + v1alpha1.LabelKeyConfigHash: state.ConfigHash, + v1alpha1.LabelKeyClusterID: state.ClusterID, + v1alpha1.LabelKeyMemberID: state.MemberID, }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentPD)), Annotations: anno, OwnerReferences: []metav1.OwnerReference{ diff --git a/pkg/controllers/pd/tasks/pod_test.go b/pkg/controllers/pd/tasks/pod_test.go index 131de3cc31..b1343b0e0c 100644 --- a/pkg/controllers/pd/tasks/pod_test.go +++ b/pkg/controllers/pd/tasks/pod_test.go @@ -442,7 +442,7 @@ func TestTaskPod(t *testing.T) { assert.Equal(tt, c.expectedPodIsTerminating, c.state.PodIsTerminating, c.desc) if c.expectUpdatedPod { - expectedPod := newPod(c.state.Cluster(), c.state.PD(), c.state.ConfigHash) + expectedPod := newPod(c.state) actual := c.state.Pod().DeepCopy() actual.Kind = "" actual.APIVersion = "" diff --git a/pkg/controllers/pd/tasks/pvc.go b/pkg/controllers/pd/tasks/pvc.go index ad347cd838..05c5a78773 100644 --- a/pkg/controllers/pd/tasks/pvc.go +++ b/pkg/controllers/pd/tasks/pvc.go @@ -23,15 +23,15 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/k8s" maputil "github.com/pingcap/tidb-operator/pkg/utils/map" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" ) -func TaskPVC(state common.PDState, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { +func TaskPVC(state *ReconcileContext, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { return task.NameTaskFunc("PVC", func(ctx context.Context) task.Result { - pvcs := newPVCs(state.PD()) + pvcs := newPVCs(state) if wait, err := volumes.SyncPVCs(ctx, c, pvcs, vm, logger); err != nil { return task.Fail().With("failed to sync pvcs: %v", err) } else if wait { @@ -44,7 +44,9 @@ func TaskPVC(state common.PDState, logger logr.Logger, c client.Client, vm volum }) } -func newPVCs(pd *v1alpha1.PD) []*corev1.PersistentVolumeClaim { +func newPVCs(state *ReconcileContext) []*corev1.PersistentVolumeClaim { + cluster := state.Cluster() + pd := state.PD() pvcs := make([]*corev1.PersistentVolumeClaim, 0, len(pd.Spec.Volumes)) for i := range pd.Spec.Volumes { vol := &pd.Spec.Volumes[i] @@ -53,8 +55,10 @@ func newPVCs(pd *v1alpha1.PD) []*corev1.PersistentVolumeClaim { Name: PersistentVolumeClaimName(pd.PodName(), vol.Name), Namespace: pd.Namespace, Labels: maputil.Merge(pd.Labels, map[string]string{ - v1alpha1.LabelKeyInstance: pd.Name, - }), + v1alpha1.LabelKeyInstance: pd.Name, + v1alpha1.LabelKeyClusterID: state.ClusterID, + v1alpha1.LabelKeyMemberID: state.MemberID, + }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentPD)), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(pd, v1alpha1.SchemeGroupVersion.WithKind("PD")), }, diff --git a/pkg/controllers/pd/tasks/pvc_test.go b/pkg/controllers/pd/tasks/pvc_test.go index cd863b9589..c724aee626 100644 --- a/pkg/controllers/pd/tasks/pvc_test.go +++ b/pkg/controllers/pd/tasks/pvc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" "github.com/pingcap/tidb-operator/pkg/utils/fake" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" @@ -38,7 +37,7 @@ import ( func TestTaskPVC(t *testing.T) { cases := []struct { desc string - state common.PDState + state *ReconcileContext pvcs []*corev1.PersistentVolumeClaim unexpectedErr bool @@ -47,40 +46,49 @@ func TestTaskPVC(t *testing.T) { }{ { desc: "no pvc", - state: &state{ - pd: fake.FakeObj[v1alpha1.PD]("aaa-xxx"), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pd: fake.FakeObj[v1alpha1.PD]("aaa-xxx"), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 0, }, { desc: "create a data vol", - state: &state{ - pd: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.PD) *v1alpha1.PD { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pd: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 1, }, { desc: "has a data vol", - state: &state{ - pd: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.PD) *v1alpha1.PD { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pd: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-pd-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -93,16 +101,19 @@ func TestTaskPVC(t *testing.T) { }, { desc: "has a data vol, but failed to apply", - state: &state{ - pd: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.PD) *v1alpha1.PD { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pd: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-pd-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -132,7 +143,7 @@ func TestTaskPVC(t *testing.T) { ctrl := gomock.NewController(tt) vm := volumes.NewMockModifier(ctrl) - expectedPVCs := newPVCs(c.state.PD()) + expectedPVCs := newPVCs(c.state) for _, expected := range expectedPVCs { for _, current := range c.pvcs { if current.Name == expected.Name { diff --git a/pkg/controllers/pd/tasks/status.go b/pkg/controllers/pd/tasks/status.go index 8355ed3ef8..472b15840d 100644 --- a/pkg/controllers/pd/tasks/status.go +++ b/pkg/controllers/pd/tasks/status.go @@ -76,6 +76,7 @@ func TaskStatus(state *ReconcileContext, c client.Client) task.Task { if !healthy || !state.Initialized { return task.Wait().With("pd may not be initialized or healthy, wait for next event") } + // TODO(csuzhangxc): if we reach here, is "ClusterID" always set? return task.Complete().With("status is synced") }) diff --git a/pkg/controllers/tidb/handler.go b/pkg/controllers/tidb/handler.go index 043d4d8239..6d730915fb 100644 --- a/pkg/controllers/tidb/handler.go +++ b/pkg/controllers/tidb/handler.go @@ -37,6 +37,8 @@ func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Obje if newObj.Status.PD != oldObj.Status.PD { r.Logger.Info("pd url is updating", "from", oldObj.Status.PD, "to", newObj.Status.PD) + } else if newObj.Status.ID != oldObj.Status.ID { + r.Logger.Info("cluster id is updating", "from", oldObj.Status.ID, "to", newObj.Status.ID) } else if !reflect.DeepEqual(oldObj.Spec.SuspendAction, newObj.Spec.SuspendAction) { r.Logger.Info("suspend action is updating", "from", oldObj.Spec.SuspendAction, "to", newObj.Spec.SuspendAction) } else if oldObj.Spec.Paused != newObj.Spec.Paused { diff --git a/pkg/controllers/tidb/tasks/pod.go b/pkg/controllers/tidb/tasks/pod.go index 64457633d3..6ff3d51365 100644 --- a/pkg/controllers/tidb/tasks/pod.go +++ b/pkg/controllers/tidb/tasks/pod.go @@ -52,7 +52,7 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result { logger := logr.FromContextOrDiscard(ctx) - expected := newPod(state.Cluster(), state.TiDB(), state.GracefulWaitTimeInSeconds, state.ConfigHash) + expected := newPod(state) if state.Pod() == nil { if err := c.Apply(ctx, expected); err != nil { return task.Fail().With("can't create pod of tidb: %w", err) @@ -89,9 +89,9 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { }) } -func newPod(cluster *v1alpha1.Cluster, - tidb *v1alpha1.TiDB, gracePeriod int64, configHash string, -) *corev1.Pod { +func newPod(state *ReconcileContext) *corev1.Pod { + cluster := state.Cluster() + tidb := state.TiDB() vols := []corev1.Volume{ { Name: v1alpha1.VolumeNameConfig, @@ -225,7 +225,8 @@ func newPod(cluster *v1alpha1.Cluster, Name: tidb.PodName(), Labels: maputil.Merge(tidb.Labels, map[string]string{ v1alpha1.LabelKeyInstance: tidb.Name, - v1alpha1.LabelKeyConfigHash: configHash, + v1alpha1.LabelKeyConfigHash: state.ConfigHash, + v1alpha1.LabelKeyClusterID: cluster.Status.ID, }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentTiDB)), Annotations: maputil.Merge(tidb.GetAnnotations(), k8s.AnnoProm(tidb.GetStatusPort(), metricsPath)), OwnerReferences: []metav1.OwnerReference{ @@ -276,7 +277,7 @@ func newPod(cluster *v1alpha1.Cluster, }, }, Volumes: vols, - TerminationGracePeriodSeconds: ptr.To(gracePeriod + preStopSleepSeconds + bufferSeconds), + TerminationGracePeriodSeconds: ptr.To(state.GracefulWaitTimeInSeconds + preStopSleepSeconds + bufferSeconds), }, } diff --git a/pkg/controllers/tidb/tasks/pod_test.go b/pkg/controllers/tidb/tasks/pod_test.go index 8ee029f487..4c63b43943 100644 --- a/pkg/controllers/tidb/tasks/pod_test.go +++ b/pkg/controllers/tidb/tasks/pod_test.go @@ -266,7 +266,7 @@ func TestTaskPod(t *testing.T) { assert.Equal(tt, c.expectedPodIsTerminating, c.state.PodIsTerminating, c.desc) if c.expectUpdatedPod { - expectedPod := newPod(c.state.Cluster(), c.state.TiDB(), c.state.GracefulWaitTimeInSeconds, c.state.ConfigHash) + expectedPod := newPod(c.state) actual := c.state.Pod().DeepCopy() actual.Kind = "" actual.APIVersion = "" diff --git a/pkg/controllers/tidb/tasks/pvc.go b/pkg/controllers/tidb/tasks/pvc.go index 5ba0b83a6f..ddf79ef1f9 100644 --- a/pkg/controllers/tidb/tasks/pvc.go +++ b/pkg/controllers/tidb/tasks/pvc.go @@ -23,15 +23,15 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/k8s" maputil "github.com/pingcap/tidb-operator/pkg/utils/map" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" ) -func TaskPVC(state common.TiDBState, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { +func TaskPVC(state *ReconcileContext, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { return task.NameTaskFunc("PVC", func(ctx context.Context) task.Result { - pvcs := newPVCs(state.TiDB()) + pvcs := newPVCs(state) if wait, err := volumes.SyncPVCs(ctx, c, pvcs, vm, logger); err != nil { return task.Fail().With("failed to sync pvcs: %w", err) } else if wait { @@ -42,7 +42,9 @@ func TaskPVC(state common.TiDBState, logger logr.Logger, c client.Client, vm vol }) } -func newPVCs(tidb *v1alpha1.TiDB) []*corev1.PersistentVolumeClaim { +func newPVCs(state *ReconcileContext) []*corev1.PersistentVolumeClaim { + cluster := state.Cluster() + tidb := state.TiDB() pvcs := make([]*corev1.PersistentVolumeClaim, 0, len(tidb.Spec.Volumes)) for i := range tidb.Spec.Volumes { vol := &tidb.Spec.Volumes[i] @@ -51,8 +53,9 @@ func newPVCs(tidb *v1alpha1.TiDB) []*corev1.PersistentVolumeClaim { Name: PersistentVolumeClaimName(tidb.PodName(), vol.Name), Namespace: tidb.Namespace, Labels: maputil.Merge(tidb.Labels, map[string]string{ - v1alpha1.LabelKeyInstance: tidb.Name, - }), + v1alpha1.LabelKeyInstance: tidb.Name, + v1alpha1.LabelKeyClusterID: cluster.Status.ID, + }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentTiDB)), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(tidb, v1alpha1.SchemeGroupVersion.WithKind("TiDB")), }, diff --git a/pkg/controllers/tidb/tasks/pvc_test.go b/pkg/controllers/tidb/tasks/pvc_test.go index f1fb423f37..d37baa6856 100644 --- a/pkg/controllers/tidb/tasks/pvc_test.go +++ b/pkg/controllers/tidb/tasks/pvc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" "github.com/pingcap/tidb-operator/pkg/utils/fake" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" @@ -38,7 +37,7 @@ import ( func TestTaskPVC(t *testing.T) { cases := []struct { desc string - state common.TiDBState + state *ReconcileContext pvcs []*corev1.PersistentVolumeClaim unexpectedErr bool @@ -47,40 +46,49 @@ func TestTaskPVC(t *testing.T) { }{ { desc: "no pvc", - state: &state{ - tidb: fake.FakeObj[v1alpha1.TiDB]("aaa-xxx"), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tidb: fake.FakeObj[v1alpha1.TiDB]("aaa-xxx"), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 0, }, { desc: "create a data vol", - state: &state{ - tidb: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tidb: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 1, }, { desc: "has a data vol", - state: &state{ - tidb: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tidb: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-tidb-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -93,16 +101,19 @@ func TestTaskPVC(t *testing.T) { }, { desc: "has a data vol, but failed to apply", - state: &state{ - tidb: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tidb: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-tidb-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -132,7 +143,7 @@ func TestTaskPVC(t *testing.T) { ctrl := gomock.NewController(tt) vm := volumes.NewMockModifier(ctrl) - expectedPVCs := newPVCs(c.state.TiDB()) + expectedPVCs := newPVCs(c.state) for _, expected := range expectedPVCs { for _, current := range c.pvcs { if current.Name == expected.Name { diff --git a/pkg/controllers/tiflash/handler.go b/pkg/controllers/tiflash/handler.go index 4df2c483c8..897f5fc9fe 100644 --- a/pkg/controllers/tiflash/handler.go +++ b/pkg/controllers/tiflash/handler.go @@ -41,6 +41,8 @@ func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Obje if newObj.Status.PD != oldObj.Status.PD { r.Logger.Info("pd url is updating", "from", oldObj.Status.PD, "to", newObj.Status.PD) + } else if newObj.Status.ID != oldObj.Status.ID { + r.Logger.Info("cluster id is updating", "from", oldObj.Status.ID, "to", newObj.Status.ID) } else if !reflect.DeepEqual(oldObj.Spec.SuspendAction, newObj.Spec.SuspendAction) { r.Logger.Info("suspend action is updating", "from", oldObj.Spec.SuspendAction, "to", newObj.Spec.SuspendAction) } else if oldObj.Spec.Paused != newObj.Spec.Paused { diff --git a/pkg/controllers/tiflash/tasks/pod.go b/pkg/controllers/tiflash/tasks/pod.go index 716416b9a1..5c1ec22da6 100644 --- a/pkg/controllers/tiflash/tasks/pod.go +++ b/pkg/controllers/tiflash/tasks/pod.go @@ -41,7 +41,7 @@ const ( func TaskPod(state *ReconcileContext, c client.Client) task.Task { return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result { logger := logr.FromContextOrDiscard(ctx) - expected := newPod(state.Cluster(), state.TiFlash(), state.ConfigHash) + expected := newPod(state) if state.Pod() == nil { if err := c.Apply(ctx, expected); err != nil { return task.Fail().With("can't apply pod of tiflash: %w", err) @@ -77,7 +77,9 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { }) } -func newPod(cluster *v1alpha1.Cluster, tiflash *v1alpha1.TiFlash, configHash string) *corev1.Pod { +func newPod(state *ReconcileContext) *corev1.Pod { + cluster := state.Cluster() + tiflash := state.TiFlash() vols := []corev1.Volume{ { Name: v1alpha1.VolumeNameConfig, @@ -144,7 +146,9 @@ func newPod(cluster *v1alpha1.Cluster, tiflash *v1alpha1.TiFlash, configHash str Name: tiflash.PodName(), Labels: maputil.Merge(tiflash.Labels, map[string]string{ v1alpha1.LabelKeyInstance: tiflash.Name, - v1alpha1.LabelKeyConfigHash: configHash, + v1alpha1.LabelKeyConfigHash: state.ConfigHash, + v1alpha1.LabelKeyClusterID: cluster.Status.ID, + v1alpha1.LabelKeyStoreID: state.StoreID, }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentTiFlash)), Annotations: maputil.Merge(tiflash.GetAnnotations(), k8s.AnnoProm(tiflash.GetMetricsPort(), metricsPath), diff --git a/pkg/controllers/tiflash/tasks/pod_test.go b/pkg/controllers/tiflash/tasks/pod_test.go index 34080cda69..525a4df482 100644 --- a/pkg/controllers/tiflash/tasks/pod_test.go +++ b/pkg/controllers/tiflash/tasks/pod_test.go @@ -266,7 +266,7 @@ func TestTaskPod(t *testing.T) { assert.Equal(tt, c.expectedPodIsTerminating, c.state.PodIsTerminating, c.desc) if c.expectUpdatedPod { - expectedPod := newPod(c.state.Cluster(), c.state.TiFlash(), c.state.ConfigHash) + expectedPod := newPod(c.state) actual := c.state.Pod().DeepCopy() actual.Kind = "" actual.APIVersion = "" diff --git a/pkg/controllers/tiflash/tasks/pvc.go b/pkg/controllers/tiflash/tasks/pvc.go index 4ce4815c11..ef5e366dc9 100644 --- a/pkg/controllers/tiflash/tasks/pvc.go +++ b/pkg/controllers/tiflash/tasks/pvc.go @@ -24,15 +24,15 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/k8s" maputil "github.com/pingcap/tidb-operator/pkg/utils/map" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" ) -func TaskPVC(state common.TiFlashState, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { +func TaskPVC(state *ReconcileContext, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { return task.NameTaskFunc("PVC", func(ctx context.Context) task.Result { - pvcs := newPVCs(state.TiFlash()) + pvcs := newPVCs(state) if wait, err := volumes.SyncPVCs(ctx, c, pvcs, vm, logger); err != nil { return task.Fail().With("failed to sync pvcs: %w", err) } else if wait { @@ -43,7 +43,9 @@ func TaskPVC(state common.TiFlashState, logger logr.Logger, c client.Client, vm }) } -func newPVCs(tiflash *v1alpha1.TiFlash) []*corev1.PersistentVolumeClaim { +func newPVCs(state *ReconcileContext) []*corev1.PersistentVolumeClaim { + cluster := state.Cluster() + tiflash := state.TiFlash() pvcs := make([]*corev1.PersistentVolumeClaim, 0, len(tiflash.Spec.Volumes)) for i := range tiflash.Spec.Volumes { vol := &tiflash.Spec.Volumes[i] @@ -52,8 +54,10 @@ func newPVCs(tiflash *v1alpha1.TiFlash) []*corev1.PersistentVolumeClaim { Name: PersistentVolumeClaimName(tiflash.PodName(), vol.Name), Namespace: tiflash.Namespace, Labels: maputil.Merge(tiflash.Labels, map[string]string{ - v1alpha1.LabelKeyInstance: tiflash.Name, - }), + v1alpha1.LabelKeyInstance: tiflash.Name, + v1alpha1.LabelKeyClusterID: cluster.Status.ID, + v1alpha1.LabelKeyStoreID: state.StoreID, + }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentTiFlash)), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(tiflash, v1alpha1.SchemeGroupVersion.WithKind("TiFlash")), }, diff --git a/pkg/controllers/tiflash/tasks/pvc_test.go b/pkg/controllers/tiflash/tasks/pvc_test.go index 3a8bf0359c..928f1b2f76 100644 --- a/pkg/controllers/tiflash/tasks/pvc_test.go +++ b/pkg/controllers/tiflash/tasks/pvc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" "github.com/pingcap/tidb-operator/pkg/utils/fake" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" @@ -38,7 +37,7 @@ import ( func TestTaskPVC(t *testing.T) { cases := []struct { desc string - state common.TiFlashState + state *ReconcileContext pvcs []*corev1.PersistentVolumeClaim unexpectedErr bool @@ -47,40 +46,49 @@ func TestTaskPVC(t *testing.T) { }{ { desc: "no pvc", - state: &state{ - tiflash: fake.FakeObj[v1alpha1.TiFlash]("aaa-xxx"), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tiflash: fake.FakeObj[v1alpha1.TiFlash]("aaa-xxx"), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 0, }, { desc: "create a data vol", - state: &state{ - tiflash: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiFlash) *v1alpha1.TiFlash { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tiflash: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiFlash) *v1alpha1.TiFlash { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 1, }, { desc: "has a data vol", - state: &state{ - tiflash: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiFlash) *v1alpha1.TiFlash { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tiflash: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiFlash) *v1alpha1.TiFlash { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-tiflash-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -93,16 +101,19 @@ func TestTaskPVC(t *testing.T) { }, { desc: "has a data vol, but failed to apply", - state: &state{ - tiflash: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiFlash) *v1alpha1.TiFlash { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tiflash: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiFlash) *v1alpha1.TiFlash { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-tiflash-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -132,7 +143,7 @@ func TestTaskPVC(t *testing.T) { ctrl := gomock.NewController(tt) vm := volumes.NewMockModifier(ctrl) - expectedPVCs := newPVCs(c.state.TiFlash()) + expectedPVCs := newPVCs(c.state) for _, expected := range expectedPVCs { for _, current := range c.pvcs { if current.Name == expected.Name { diff --git a/pkg/controllers/tikv/handler.go b/pkg/controllers/tikv/handler.go index 6f7e4a2bb9..d230cc708b 100644 --- a/pkg/controllers/tikv/handler.go +++ b/pkg/controllers/tikv/handler.go @@ -41,6 +41,8 @@ func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Obje if newObj.Status.PD != oldObj.Status.PD { r.Logger.Info("pd url is updating", "from", oldObj.Status.PD, "to", newObj.Status.PD) + } else if newObj.Status.ID != oldObj.Status.ID { + r.Logger.Info("cluster id is updating", "from", oldObj.Status.ID, "to", newObj.Status.ID) } else if !reflect.DeepEqual(oldObj.Spec.SuspendAction, newObj.Spec.SuspendAction) { r.Logger.Info("suspend action is updating", "from", oldObj.Spec.SuspendAction, "to", newObj.Spec.SuspendAction) } else if oldObj.Spec.Paused != newObj.Spec.Paused { diff --git a/pkg/controllers/tikv/tasks/pod.go b/pkg/controllers/tikv/tasks/pod.go index 2d31eaadb9..62aa1afdc3 100644 --- a/pkg/controllers/tikv/tasks/pod.go +++ b/pkg/controllers/tikv/tasks/pod.go @@ -63,7 +63,7 @@ func TaskSuspendPod(state *ReconcileContext, c client.Client) task.Task { func TaskPod(state *ReconcileContext, c client.Client) task.Task { return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result { logger := logr.FromContextOrDiscard(ctx) - expected := newPod(state.Cluster(), state.TiKV(), state.ConfigHash) + expected := newPod(state) if state.Pod() == nil { if err := c.Apply(ctx, expected); err != nil { return task.Fail().With("can't apply pod of tikv: %w", err) @@ -120,7 +120,9 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { }) } -func newPod(cluster *v1alpha1.Cluster, tikv *v1alpha1.TiKV, configHash string) *corev1.Pod { +func newPod(state *ReconcileContext) *corev1.Pod { + cluster := state.Cluster() + tikv := state.TiKV() vols := []corev1.Volume{ { Name: v1alpha1.VolumeNameConfig, @@ -195,7 +197,9 @@ func newPod(cluster *v1alpha1.Cluster, tikv *v1alpha1.TiKV, configHash string) * Name: tikv.PodName(), Labels: maputil.Merge(tikv.Labels, map[string]string{ v1alpha1.LabelKeyInstance: tikv.Name, - v1alpha1.LabelKeyConfigHash: configHash, + v1alpha1.LabelKeyConfigHash: state.ConfigHash, + v1alpha1.LabelKeyClusterID: cluster.Status.ID, + v1alpha1.LabelKeyStoreID: state.StoreID, }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentTiKV)), Annotations: maputil.Merge(tikv.GetAnnotations(), k8s.AnnoProm(tikv.GetStatusPort(), metricsPath)), OwnerReferences: []metav1.OwnerReference{ diff --git a/pkg/controllers/tikv/tasks/pod_test.go b/pkg/controllers/tikv/tasks/pod_test.go index 1c1dc6433f..9792ca306a 100644 --- a/pkg/controllers/tikv/tasks/pod_test.go +++ b/pkg/controllers/tikv/tasks/pod_test.go @@ -316,7 +316,7 @@ func TestTaskPod(t *testing.T) { assert.Equal(tt, c.expectedPodIsTerminating, c.state.PodIsTerminating, c.desc) if c.expectUpdatedPod { - expectedPod := newPod(c.state.Cluster(), c.state.TiKV(), c.state.ConfigHash) + expectedPod := newPod(c.state) actual := c.state.Pod().DeepCopy() actual.Kind = "" actual.APIVersion = "" diff --git a/pkg/controllers/tikv/tasks/pvc.go b/pkg/controllers/tikv/tasks/pvc.go index 325fcdeed8..90484e3be4 100644 --- a/pkg/controllers/tikv/tasks/pvc.go +++ b/pkg/controllers/tikv/tasks/pvc.go @@ -23,15 +23,15 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/k8s" maputil "github.com/pingcap/tidb-operator/pkg/utils/map" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" ) -func TaskPVC(state common.TiKVState, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { +func TaskPVC(state *ReconcileContext, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { return task.NameTaskFunc("PVC", func(ctx context.Context) task.Result { - pvcs := newPVCs(state.TiKV()) + pvcs := newPVCs(state) if wait, err := volumes.SyncPVCs(ctx, c, pvcs, vm, logger); err != nil { return task.Fail().With("failed to sync pvcs: %w", err) } else if wait { @@ -42,7 +42,9 @@ func TaskPVC(state common.TiKVState, logger logr.Logger, c client.Client, vm vol }) } -func newPVCs(tikv *v1alpha1.TiKV) []*corev1.PersistentVolumeClaim { +func newPVCs(state *ReconcileContext) []*corev1.PersistentVolumeClaim { + cluster := state.Cluster() + tikv := state.TiKV() pvcs := make([]*corev1.PersistentVolumeClaim, 0, len(tikv.Spec.Volumes)) for i := range tikv.Spec.Volumes { vol := tikv.Spec.Volumes[i] @@ -51,8 +53,10 @@ func newPVCs(tikv *v1alpha1.TiKV) []*corev1.PersistentVolumeClaim { Name: PersistentVolumeClaimName(tikv.PodName(), vol.Name), Namespace: tikv.Namespace, Labels: maputil.Merge(tikv.Labels, map[string]string{ - v1alpha1.LabelKeyInstance: tikv.Name, - }), + v1alpha1.LabelKeyInstance: tikv.Name, + v1alpha1.LabelKeyClusterID: cluster.Status.ID, + v1alpha1.LabelKeyStoreID: state.StoreID, + }, k8s.LabelsK8sApp(cluster.Name, v1alpha1.LabelValComponentTiKV)), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(tikv, v1alpha1.SchemeGroupVersion.WithKind("TiKV")), }, diff --git a/pkg/controllers/tikv/tasks/pvc_test.go b/pkg/controllers/tikv/tasks/pvc_test.go index 4d6895697e..b61319a12e 100644 --- a/pkg/controllers/tikv/tasks/pvc_test.go +++ b/pkg/controllers/tikv/tasks/pvc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/controllers/common" "github.com/pingcap/tidb-operator/pkg/utils/fake" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/pkg/volumes" @@ -38,7 +37,7 @@ import ( func TestTaskPVC(t *testing.T) { cases := []struct { desc string - state common.TiKVState + state *ReconcileContext pvcs []*corev1.PersistentVolumeClaim unexpectedErr bool @@ -47,40 +46,49 @@ func TestTaskPVC(t *testing.T) { }{ { desc: "no pvc", - state: &state{ - tikv: fake.FakeObj[v1alpha1.TiKV]("aaa-xxx"), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tikv: fake.FakeObj[v1alpha1.TiKV]("aaa-xxx"), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 0, }, { desc: "create a data vol", - state: &state{ - tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, expectedStatus: task.SComplete, expectedPVCNum: 1, }, { desc: "has a data vol", - state: &state{ - tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-tikv-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -93,16 +101,19 @@ func TestTaskPVC(t *testing.T) { }, { desc: "has a data vol, but failed to apply", - state: &state{ - tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { - obj.Spec.Volumes = []v1alpha1.Volume{ - { - Name: "data", - Storage: resource.MustParse("10Gi"), - }, - } - return obj - }), + state: &ReconcileContext{ + State: &state{ + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Volumes = []v1alpha1.Volume{ + { + Name: "data", + Storage: resource.MustParse("10Gi"), + }, + } + return obj + }), + }, }, pvcs: []*corev1.PersistentVolumeClaim{ fake.FakeObj("data-aaa-tikv-xxx", func(obj *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim { @@ -132,7 +143,7 @@ func TestTaskPVC(t *testing.T) { ctrl := gomock.NewController(tt) vm := volumes.NewMockModifier(ctrl) - expectedPVCs := newPVCs(c.state.TiKV()) + expectedPVCs := newPVCs(c.state) for _, expected := range expectedPVCs { for _, current := range c.pvcs { if current.Name == expected.Name { diff --git a/pkg/timanager/apis/pd/v1/types.go b/pkg/timanager/apis/pd/v1/types.go index e4253a5bfa..0f0dd33583 100644 --- a/pkg/timanager/apis/pd/v1/types.go +++ b/pkg/timanager/apis/pd/v1/types.go @@ -119,6 +119,7 @@ type Member struct { // Invalid means pd svc is unavailable and store info is untrusted Invalid bool `json:"invalid,omitempty"` + ClusterID string `json:"cluster_id,omitempty"` ID string `json:"id"` PeerUrls []string `json:"peer_urls,omitempty"` ClientUrls []string `json:"client_urls,omitempty"` diff --git a/pkg/timanager/pd/member.go b/pkg/timanager/pd/member.go index ffbc3dca99..89e7e291e4 100644 --- a/pkg/timanager/pd/member.go +++ b/pkg/timanager/pd/member.go @@ -91,6 +91,7 @@ func (l *memberLister) List(ctx context.Context) (*pdv1.MemberList, error) { Name: m.Name, Namespace: l.cluster, }, + ClusterID: strconv.FormatUint(info.Header.ClusterId, 10), ID: strconv.FormatUint(m.MemberId, 10), PeerUrls: m.PeerUrls, ClientUrls: m.ClientUrls,