From 43de8f5631ac678569d17fbc81b89e8125d4906f Mon Sep 17 00:00:00 2001 From: Hoshea Date: Mon, 23 Dec 2024 18:07:56 +0800 Subject: [PATCH] Fix(e2e): run a mysql client in kind to test graceful shutdown case (#176) (#6005) --- cmd/testing-workload/main.go | 134 ++++++++++++++++++++++++++ hack/lib/build.sh | 2 +- hack/lib/e2e.sh | 2 +- image/Dockerfile | 14 ++- pkg/configs/tidb/config.go | 2 +- pkg/controllers/tidb/tasks/cm_test.go | 2 +- tests/e2e/cluster/cluster.go | 125 ++++++++++++++---------- tests/e2e/utils/tidb/tidb.go | 37 +------ 8 files changed, 227 insertions(+), 91 deletions(-) create mode 100644 cmd/testing-workload/main.go diff --git a/cmd/testing-workload/main.go b/cmd/testing-workload/main.go new file mode 100644 index 0000000000..d83edc1286 --- /dev/null +++ b/cmd/testing-workload/main.go @@ -0,0 +1,134 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "errors" + "flag" + "fmt" + "sync" + "sync/atomic" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +var ( + host string + durationInMinutes int + maxConnections int + sleepIntervalSec int + longTxnSleepSec int +) + +//nolint:mnd // default values +func main() { + flag.StringVar(&host, "host", "", "host") + flag.IntVar(&durationInMinutes, "duration", 10, "duration in minutes") + flag.IntVar(&maxConnections, "max-connections", 30, "max connections") + flag.IntVar(&sleepIntervalSec, "sleep-interval", 1, "sleep interval in seconds") + flag.IntVar(&longTxnSleepSec, "long-txn-sleep", 10, "how many seconds to sleep to simulate a long transaction") + flag.Parse() + + db, err := sql.Open("mysql", fmt.Sprintf("root:@(%s:4000)/test?charset=utf8mb4", host)) + if err != nil { + panic(err) + } + if err = db.Ping(); err != nil { + panic(err) + } + defer db.Close() + db.SetConnMaxLifetime(time.Minute) + db.SetMaxIdleConns(maxConnections) + db.SetMaxOpenConns(maxConnections) + + table := "test.e2e_test" + str := fmt.Sprintf("create table if not exists %s(id int primary key auto_increment, v int);", table) + _, err = db.Exec(str) + if err != nil { + panic(err) + } + + var totalCount, failCount atomic.Uint64 + var wg sync.WaitGroup + clientCtx, cancel := context.WithTimeout(context.Background(), time.Duration(durationInMinutes)*time.Minute) + defer cancel() + + for i := 1; i <= maxConnections; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for { + select { + case <-clientCtx.Done(): + return + default: + err := executeSimpleTransaction(db, id, table) + totalCount.Add(1) + if err != nil { + fmt.Printf("[%d-%s] failed to execute simple transaction(long: %v): %v\n", id, time.Now().String(), id%3 == 0, err) + failCount.Add(1) + } + time.Sleep(time.Duration(sleepIntervalSec) * time.Second) + } + } + }(i) + } + wg.Wait() + fmt.Printf("total count: %d, fail count: %d\n", totalCount.Load(), failCount.Load()) + if failCount.Load() > 0 { + panic("there are failed transactions") + } +} + +// ExecuteSimpleTransaction performs a transaction to insert or update the given id in the specified table. +func executeSimpleTransaction(db *sql.DB, id int, table string) error { + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("failed to begin txn: %w", err) + } + defer func() { + if r := recover(); r != nil { + _ = tx.Rollback() + } + }() + + // Prepare SQL statement to replace or insert a record + //nolint:gosec // only for testing + str := fmt.Sprintf("replace into %s(id, v) values(?, ?);", table) + if _, err = tx.Exec(str, id, id); err != nil { + _ = tx.Rollback() + return fmt.Errorf("failed to exec statement: %w", err) + } + + // Simulate a different operation by updating the value + if _, err = tx.Exec(fmt.Sprintf("update %s set v = ? where id = ?;", table), id*2, id); err != nil { + _ = tx.Rollback() + return fmt.Errorf("failed to exec update statement: %w", err) + } + + // Simulate a long transaction by sleeping for 10 seconds + if id%3 == 0 { + time.Sleep(time.Duration(longTxnSleepSec) * time.Second) + } + + // Commit the transaction + if err = tx.Commit(); err != nil && !errors.Is(err, sql.ErrTxDone) { + return fmt.Errorf("failed to commit txn: %w", err) + } + return nil +} diff --git a/hack/lib/build.sh b/hack/lib/build.sh index 5494847866..e879d9427a 100644 --- a/hack/lib/build.sh +++ b/hack/lib/build.sh @@ -46,7 +46,7 @@ function build::all() { shift done if [[ ${#targets[@]} -eq 0 ]]; then - targets=("operator" "prestop-checker") + targets=("operator" "prestop-checker" "testing-workload") fi local platforms diff --git a/hack/lib/e2e.sh b/hack/lib/e2e.sh index c799ed1b8e..29573ccba5 100755 --- a/hack/lib/e2e.sh +++ b/hack/lib/e2e.sh @@ -129,7 +129,7 @@ function e2e::prepare() { e2e::install_rbac # build the operator image and load it into the kind cluster - image::build prestop-checker operator --push + image::build prestop-checker operator testing-workload --push e2e::uninstall_operator e2e::install_operator diff --git a/image/Dockerfile b/image/Dockerfile index ba7cd69074..f13f7abccc 100644 --- a/image/Dockerfile +++ b/image/Dockerfile @@ -37,7 +37,6 @@ USER 65532:65532 ENTRYPOINT ["/operator"] - FROM --platform=$TARGETPLATFORM ghcr.io/pingcap-qe/bases/pingcap-base:v1.9.2 AS prestop-checker ARG TARGETPLATFORM @@ -50,3 +49,16 @@ COPY --from=builder ./_output/$TARGETPLATFORM/bin/prestop-checker prestop-checke USER 65532:65532 ENTRYPOINT ["/prestop-checker"] + +FROM --platform=$TARGETPLATFORM ghcr.io/pingcap-qe/bases/pingcap-base:v1.9.2 AS testing-workload + +ARG TARGETPLATFORM + +WORKDIR / + +COPY --from=builder ./_output/$TARGETPLATFORM/bin/testing-workload testing-workload + +# nonroot user of distroless +USER 65532:65532 + +ENTRYPOINT ["/testing-workload"] \ No newline at end of file diff --git a/pkg/configs/tidb/config.go b/pkg/configs/tidb/config.go index 4cf8f6f718..ccd2e6cb30 100644 --- a/pkg/configs/tidb/config.go +++ b/pkg/configs/tidb/config.go @@ -28,7 +28,7 @@ const ( // defaultGracefulWaitBeforeShutdownInSeconds is the default value of the tidb config `graceful-wait-before-shutdown`, // which is set by the operator if not set by the user, for graceful shutdown. // Note that the default value is zero in tidb-server. - defaultGracefulWaitBeforeShutdownInSeconds = 30 + defaultGracefulWaitBeforeShutdownInSeconds = 60 ) // Config is a subset config of tidb diff --git a/pkg/controllers/tidb/tasks/cm_test.go b/pkg/controllers/tidb/tasks/cm_test.go index 98955ed7a2..54ba369d84 100644 --- a/pkg/controllers/tidb/tasks/cm_test.go +++ b/pkg/controllers/tidb/tasks/cm_test.go @@ -125,7 +125,7 @@ func TestConfigMap(t *testing.T) { }, Data: map[string]string{ v1alpha1.ConfigFileName: `advertise-address = 'test-tidb-tidb.subdomain.default.svc' -graceful-wait-before-shutdown = 30 +graceful-wait-before-shutdown = 60 host = '::' path = 'test-pd.default:2379' store = 'tikv' diff --git a/tests/e2e/cluster/cluster.go b/tests/e2e/cluster/cluster.go index 9a9b32291e..8172808cdd 100644 --- a/tests/e2e/cluster/cluster.go +++ b/tests/e2e/cluster/cluster.go @@ -15,23 +15,25 @@ package cluster import ( + "bytes" "context" "database/sql" "fmt" + "io" "slices" "strings" "sync" - "sync/atomic" "time" + "github.com/Masterminds/semver/v3" + _ "github.com/go-sql-driver/mysql" + //nolint: stylecheck // too many changes, refactor later . "github.com/onsi/ginkgo/v2" //nolint: stylecheck // too many changes, refactor later . "github.com/onsi/gomega" - - "github.com/Masterminds/semver/v3" - _ "github.com/go-sql-driver/mysql" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -1642,63 +1644,71 @@ location-labels = ["region", "zone", "host"]` Expect(k8sClient.Create(ctx, dbg)).To(Succeed()) By("Waiting for the cluster to be ready") + // TODO: extract it to a common utils + svcName := dbg.Name + "-tidb" + var clusterIP string Eventually(func(g Gomega) { _, ready := utiltidb.IsClusterReady(k8sClient, tc.Name, tc.Namespace) g.Expect(ready).To(BeTrue()) - g.Expect(utiltidb.AreAllInstancesReady(k8sClient, pdg, []*v1alpha1.TiKVGroup{kvg}, []*v1alpha1.TiDBGroup{dbg}, nil)).To(Succeed()) - g.Expect(utiltidb.IsTiDBConnectable(ctx, k8sClient, fw, - tc.Namespace, tc.Name, dbg.Name, "root", "", "")).To(Succeed()) + svc, err := clientSet.CoreV1().Services(tc.Namespace).Get(ctx, svcName, metav1.GetOptions{}) + g.Expect(err).To(BeNil()) + clusterIP = svc.Spec.ClusterIP }).WithTimeout(createClusterTimeout).WithPolling(createClusterPolling).Should(Succeed()) - By("Connect to the TiDB cluster to run transactions") - // TODO: extract it to a common utils - svcName := dbg.Name + "-tidb" - dsn, cancel, err := utiltidb.PortForwardAndGetTiDBDSN(fw, tc.Namespace, svcName, "root", "", "test", "charset=utf8mb4") - Expect(err).To(BeNil()) - defer cancel() - db, err := sql.Open("mysql", dsn) - Expect(err).To(BeNil()) - defer db.Close() - maxConn := 30 - db.SetMaxIdleConns(maxConn) - db.SetMaxOpenConns(maxConn) - - table := "test.e2e_test" - str := fmt.Sprintf("create table if not exists %s(id int primary key auto_increment, v int);", table) - _, err = db.Exec(str) + By("Create a Job to connect to the TiDB cluster to run transactions") + jobName := "testing-workload-job" + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: tc.Namespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": jobName, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "testing-workload", + Image: "pingcap/testing-workload:latest", + Args: []string{ + "--host", clusterIP, + "--duration", "8", + "--max-connections", "30", + }, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + BackoffLimit: ptr.To[int32](0), + }, + } + _, err := clientSet.BatchV1().Jobs(tc.Namespace).Create(ctx, job, metav1.CreateOptions{}) Expect(err).To(BeNil()) - var totalCount, failCount atomic.Uint64 - var wg sync.WaitGroup - clientCtx, cancel2 := context.WithCancel(ctx) - defer cancel2() - for i := 0; i < maxConn; i++ { - id := i - wg.Add(1) - go func(db *sql.DB) { - defer wg.Done() - for { - select { - case <-clientCtx.Done(): - return - default: - err := utiltidb.ExecuteSimpleTransaction(db, id, table) - totalCount.Add(1) - if err != nil { - failCount.Add(1) - } - time.Sleep(50 * time.Millisecond) //nolint:mnd // easy to understand - } - } - }(db) - } + By("Ensure the job pod is running") + var jobPodName string + Eventually(func(g Gomega) { + pods, err := clientSet.CoreV1().Pods(tc.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", jobName), + }) + g.Expect(err).To(BeNil()) + g.Expect(len(pods.Items)).To(Equal(1)) + g.Expect(pods.Items[0].Status.Phase).To(Equal(corev1.PodRunning)) + jobPodName = pods.Items[0].Name + }).WithTimeout(time.Minute).WithPolling(createClusterPolling).Should(Succeed()) By("Rolling restart TiDB") var dbgGet v1alpha1.TiDBGroup Expect(k8sClient.Get(ctx, client.ObjectKey{Namespace: tc.Namespace, Name: dbg.Name}, &dbgGet)).To(Succeed()) - dbgGet.Spec.Template.Spec.Config = logLevelConfig + dbgGet.Spec.Template.Spec.Config = "log.level = 'warn'" Expect(k8sClient.Update(ctx, &dbgGet)).To(Succeed()) Eventually(func(g Gomega) { @@ -1706,13 +1716,22 @@ location-labels = ["region", "zone", "host"]` g.Expect(ready).To(BeTrue()) g.Expect(utiltidb.AreAllInstancesReady(k8sClient, pdg, []*v1alpha1.TiKVGroup{kvg}, []*v1alpha1.TiDBGroup{dbg}, nil)).To(Succeed()) + jobGet, err := clientSet.BatchV1().Jobs(tc.Namespace).Get(ctx, jobName, metav1.GetOptions{}) + g.Expect(err).To(BeNil()) + if jobGet.Status.Failed > 0 { + // print the logs if the job failed + req := clientSet.CoreV1().Pods(tc.Namespace).GetLogs(jobPodName, &corev1.PodLogOptions{}) + podLogs, err := req.Stream(ctx) + g.Expect(err).To(BeNil()) + defer podLogs.Close() - g.Expect(totalCount.Load()).To(BeNumerically(">", 0)) + buf := new(bytes.Buffer) + _, _ = io.Copy(buf, podLogs) + GinkgoWriter.Println(buf.String()) + Fail("job failed") + } + g.Expect(jobGet.Status.Succeeded).To(BeNumerically("==", 1)) }).WithTimeout(createClusterTimeout).WithPolling(createClusterPolling).Should(Succeed()) - GinkgoWriter.Printf("total count: %d, fail count: %d\n", totalCount.Load(), failCount.Load()) - Expect(failCount.Load()).To(BeZero()) - cancel2() - wg.Wait() }) }) diff --git a/tests/e2e/utils/tidb/tidb.go b/tests/e2e/utils/tidb/tidb.go index 23f71d4bfd..c6d3a77171 100644 --- a/tests/e2e/utils/tidb/tidb.go +++ b/tests/e2e/utils/tidb/tidb.go @@ -21,7 +21,6 @@ import ( "database/sql" "fmt" "strings" - "time" "github.com/go-sql-driver/mysql" corev1 "k8s.io/api/core/v1" @@ -36,6 +35,10 @@ import ( var dummyCancel = func() {} +func GetTiDBDSN(host, user, password, database, params string, port int) string { + return fmt.Sprintf("%s:%s@(%s:%d)/%s?%s", user, password, host, port, database, params) +} + // PortForwardAndGetTiDBDSN create a port forward for TiDB and return its DSN. func PortForwardAndGetTiDBDSN(fw k8s.PortForwarder, ns, svcName, user, password, database, params string) (string, context.CancelFunc, error) { @@ -358,35 +361,3 @@ func AreAllTiFlashHealthy(cli client.Client, flashg *v1alpha1.TiFlashGroup) erro return nil } - -// ExecuteSimpleTransaction performs a transaction to insert or update the given id in the specified table. -func ExecuteSimpleTransaction(db *sql.DB, id int, table string) error { - tx, err := db.Begin() - if err != nil { - return fmt.Errorf("failed to begin txn: %w", err) - } - - // Prepare SQL statement to replace or insert a record - //nolint:gosec // only replace table name in test - str := fmt.Sprintf("replace into %s(id, v) values(?, ?);", table) - if _, err = tx.Exec(str, id, id); err != nil { - return fmt.Errorf("failed to exec statement: %w", err) - } - - // Simulate a different operation by updating the value - if _, err = tx.Exec(fmt.Sprintf("update %s set v = ? where id = ?;", table), id*2, id); err != nil { - return fmt.Errorf("failed to exec update statement: %w", err) - } - - // Simulate a long transaction by sleeping for 5 seconds for even ids - if id%3 == 0 { - //nolint:mnd // just for testing - time.Sleep(10 * time.Second) - } - - // Commit the transaction - if err = tx.Commit(); err != nil { - return fmt.Errorf("failed to commit txn: %w", err) - } - return nil -}