Skip to content

Commit

Permalink
tests: refactor realcluster testing (#9086)
Browse files Browse the repository at this point in the history
close #8821

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Feb 24, 2025
1 parent 8428133 commit ffa94f8
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 149 deletions.
22 changes: 14 additions & 8 deletions tests/integrations/realcluster/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ tidy:
check: tiup test

tiup:
# if tiup binary not exist, download it
if ! which tiup > /dev/null 2>&1; then \
curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh; \
fi
@echo "==> Checking tiup installation"
@if ! which tiup > /dev/null 2>&1; then \
echo "Installing tiup..."; \
curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh; \
else \
echo "tiup already installed"; \
fi
@echo "tiup version: $$(tiup --version)"

deploy: kill_cluster deploy_only

Expand All @@ -58,14 +62,16 @@ kill_cluster:
fi

test:
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || (\
echo "follow is pd-0 log\n" ; \
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover -timeout 20m || (\
echo "The following is pd-0 log" ; \
cat ~/.tiup/data/pd_real_cluster_test/pd-0/pd.log ; \
echo "follow is pd-1 log\n" ; \
echo "The following is pd-1 log" ; \
cat ~/.tiup/data/pd_real_cluster_test/pd-1/pd.log ; \
echo "follow is pd-2 log\n" ; \
echo "The following is pd-2 log" ; \
cat ~/.tiup/data/pd_real_cluster_test/pd-2/pd.log ; \
exit 1)

install-tools:
cd $(ROOT_PATH) && $(MAKE) install-tools

.PHONY: tidy static check tiup deploy deploy_only kill_cluster test install-tools
272 changes: 172 additions & 100 deletions tests/integrations/realcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,173 +17,245 @@ package realcluster
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
"syscall"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/utils/logutil"
)

type clusterSuite struct {
suite.Suite

clusterCnt int
suiteName string
ms bool
suiteName string
mode string
cluster *cluster
}

var (
playgroundLogDir = "/tmp/real_cluster/playground"
tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup"
)

// SetupSuite will run before the tests in the suite are run.
func (s *clusterSuite) SetupSuite() {
t := s.T()
re := s.Require()

// Clean the data dir. It is the default data dir of TiUP.
dataDir := filepath.Join(os.Getenv("HOME"), ".tiup", "data", "pd_real_cluster_test_"+s.suiteName+"_*")
dataDir := s.dataDir()
matches, err := filepath.Glob(dataDir)
require.NoError(t, err)
re.NoError(err)

for _, match := range matches {
require.NoError(t, runCommand("rm", "-rf", match))
_, err := runCommandWithOutput(fmt.Sprintf("rm -rf %s", match))
re.NoError(err)
}
s.startCluster(t)
t.Cleanup(func() {
s.stopCluster(t)
})

s.cluster = newCluster(re, s.tag(), dataDir, s.mode)
s.cluster.start()
}

// TearDownSuite will run after all the tests in the suite have been run.
func (s *clusterSuite) TearDownSuite() {
// Even if the cluster deployment fails, we still need to destroy the cluster.
// If the cluster does not fail to deploy, the cluster will be destroyed in
// the cleanup function. And these code will not work.
s.clusterCnt++
s.stopCluster(s.T())
s.cluster.stop()
}

func (s *clusterSuite) startCluster(t *testing.T) {
log.Info("start to deploy a cluster", zap.Bool("ms", s.ms))
func (s *clusterSuite) tag() string {
if s.mode == "ms" {
return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, time.Now().Unix())
}
return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, time.Now().Unix())
}

tag := s.tag()
deployTiupPlayground(t, tag, s.ms)
waitTiupReady(t, tag)
s.clusterCnt++
func (s *clusterSuite) dataDir() string {
if s.mode == "ms" {
return filepath.Join(os.Getenv("HOME"), ".tiup", "data", fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, time.Now().Unix()))
}
return filepath.Join(os.Getenv("HOME"), ".tiup", "data", fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, time.Now().Unix()))
}

func (s *clusterSuite) stopCluster(t *testing.T) {
s.clusterCnt--
var (
playgroundLogDir = "/tmp/real_cluster/playground"
tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup"
)

log.Info("start to destroy a cluster", zap.String("tag", s.tag()), zap.Bool("ms", s.ms))
destroy(t, s.tag())
time.Sleep(5 * time.Second)
}
const (
defaultTiKVCount = 3
defaultTiDBCount = 1
defaultPDCount = 3
defaultTiFlashCount = 1
)

func (s *clusterSuite) tag() string {
if s.ms {
return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, s.clusterCnt)
func isProcessRunning(pid int) bool {
process, err := os.FindProcess(pid)
if err != nil {
return false
}
return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt)
err = process.Signal(syscall.Signal(0))
return err == nil
}

type cluster struct {
re *require.Assertions
tag string
datadir string
mode string
pids []int
}

func newCluster(re *require.Assertions, tag, datadir, mode string) *cluster {
return &cluster{re: re, datadir: datadir, tag: tag, mode: mode}
}

func (s *clusterSuite) restart() {
tag := s.tag()
log.Info("start to restart", zap.String("tag", tag))
s.stopCluster(s.T())
s.startCluster(s.T())
log.Info("TiUP restart success")
func (c *cluster) start() {
log.Info("start to deploy a cluster", zap.String("mode", c.mode))
c.deploy()
c.waitReady()
}

func destroy(t *testing.T, tag string) {
cmdStr := fmt.Sprintf("ps -ef | grep %s | awk '{print $2}'", tag)
cmd := exec.Command("sh", "-c", cmdStr)
bytes, err := cmd.Output()
require.NoError(t, err)
pids := string(bytes)
pidArr := strings.Split(pids, "\n")
for _, pid := range pidArr {
// nolint:errcheck
runCommand("sh", "-c", "kill -9 "+pid)
func (c *cluster) restart() {
log.Info("start to restart", zap.String("tag", c.tag))
c.stop()
c.start()
log.Info("restart success")
}

func (c *cluster) stop() {
if err := c.collectPids(); err != nil {
log.Warn("failed to collect pids", zap.Error(err))
return
}

for _, pid := range c.pids {
// First try SIGTERM
_ = syscall.Kill(pid, syscall.SIGTERM)
}
log.Info("destroy success", zap.String("tag", tag))

// Wait and force kill if necessary
time.Sleep(3 * time.Second)
for _, pid := range c.pids {
if isProcessRunning(pid) {
_ = syscall.Kill(pid, syscall.SIGKILL)
}
}
log.Info("cluster stopped", zap.String("tag", c.tag))
}

func deployTiupPlayground(t *testing.T, tag string, ms bool) {
func (c *cluster) deploy() {
re := c.re
curPath, err := os.Getwd()
require.NoError(t, err)
require.NoError(t, os.Chdir("../../.."))
re.NoError(err)
re.NoError(os.Chdir("../../.."))

if !fileExists("third_bin") || !fileExists("third_bin/tikv-server") || !fileExists("third_bin/tidb-server") || !fileExists("third_bin/tiflash") {
log.Info("downloading binaries...")
log.Info("this may take a few minutes, you can also download them manually and put them in the bin directory.")
require.NoError(t, runCommand("sh",
"./tests/integrations/realcluster/download_integration_test_binaries.sh"))
_, err := runCommandWithOutput("./tests/integrations/realcluster/download_integration_test_binaries.sh")
re.NoError(err)
}
if !fileExists("bin") || !fileExists("bin/pd-server") {
log.Info("complie pd binaries...")
require.NoError(t, runCommand("make", "pd-server"))
log.Info("compile pd binaries...")
_, err := runCommandWithOutput("make pd-server")
re.NoError(err)
}

if !fileExists(playgroundLogDir) {
require.NoError(t, os.MkdirAll(playgroundLogDir, 0755))
re.NoError(os.MkdirAll(playgroundLogDir, 0755))
}

// nolint:errcheck
go func() {
if ms {
runCommand("sh", "-c",
tiupBin+` playground nightly --pd.mode ms --kv 3 --tiflash 1 --db 1 --pd 3 --tso 1 --scheduling 1 \
--without-monitor --tag `+tag+` \
--pd.binpath ./bin/pd-server \
--kv.binpath ./third_bin/tikv-server \
--db.binpath ./third_bin/tidb-server \
--tiflash.binpath ./third_bin/tiflash \
--tso.binpath ./bin/pd-server \
--scheduling.binpath ./bin/pd-server \
--pd.config ./tests/integrations/realcluster/pd.toml \
> `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `)
} else {
runCommand("sh", "-c",
tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \
--without-monitor --tag `+tag+` \
--pd.binpath ./bin/pd-server \
--kv.binpath ./third_bin/tikv-server \
--db.binpath ./third_bin/tidb-server \
--tiflash.binpath ./third_bin/tiflash \
--pd.config ./tests/integrations/realcluster/pd.toml \
> `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `)
defer logutil.LogPanic()
playgroundOpts := []string{
fmt.Sprintf("--kv %d", defaultTiKVCount),
fmt.Sprintf("--tiflash %d", defaultTiFlashCount),
fmt.Sprintf("--db %d", defaultTiDBCount),
fmt.Sprintf("--pd %d", defaultPDCount),
"--without-monitor",
fmt.Sprintf("--tag %s", c.tag),
}

if c.mode == "ms" {
playgroundOpts = append(playgroundOpts,
"--pd.mode ms",
"--tso 1",
"--scheduling 1",
)
}

cmd := fmt.Sprintf(`%s playground nightly %s %s > %s 2>&1 & `,
tiupBin,
strings.Join(playgroundOpts, " "),
buildBinPathsOpts(c.mode == "ms"),
filepath.Join(playgroundLogDir, c.tag+".log"),
)
_, err := runCommandWithOutput(cmd)
re.NoError(err)
}()

// Avoid to change the dir before execute `tiup playground`.
time.Sleep(10 * time.Second)
require.NoError(t, os.Chdir(curPath))
re.NoError(os.Chdir(curPath))
}

// collectPids will collect the pids of the processes.
func (c *cluster) collectPids() error {
output, err := runCommandWithOutput(fmt.Sprintf("pgrep -f %s", c.tag))
if err != nil {
return fmt.Errorf("failed to collect pids: %v", err)
}

for _, pidStr := range strings.Split(strings.TrimSpace(output), "\n") {
if pid, err := strconv.Atoi(pidStr); err == nil {
c.pids = append(c.pids, pid)
}
}
return nil
}

func waitTiupReady(t *testing.T, tag string) {
const (
interval = 5
maxTimes = 20
)
log.Info("start to wait TiUP ready", zap.String("tag", tag))
for i := range maxTimes {
err := runCommand(tiupBin, "playground", "display", "--tag", tag)
if err == nil {
log.Info("TiUP is ready", zap.String("tag", tag))
return
func (c *cluster) waitReady() {
re := c.re
log.Info("start to wait TiUP ready", zap.String("tag", c.tag))
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-time.After(100 * time.Second):
re.FailNowf("TiUP is not ready after timeout, tag: %s", c.tag)
case <-ticker.C:
log.Info("check TiUP ready", zap.String("tag", c.tag))
cmd := fmt.Sprintf(`%s playground display --tag %s`, tiupBin, c.tag)
output, err := runCommandWithOutput(cmd)
if err == nil {
log.Info("TiUP is ready", zap.String("tag", c.tag))
return
}
log.Info(output)
log.Info("TiUP is not ready, will retry", zap.String("tag", c.tag), zap.Error(err))
}
}
}

log.Info("TiUP is not ready, will retry", zap.Int("retry times", i),
zap.String("tag", tag), zap.Error(err))
time.Sleep(time.Duration(interval) * time.Second)
func buildBinPathsOpts(ms bool) string {
opts := []string{
"--pd.binpath ./bin/pd-server",
"--kv.binpath ./third_bin/tikv-server",
"--db.binpath ./third_bin/tidb-server",
"--tiflash.binpath ./third_bin/tiflash",
}
require.FailNowf(t, "TiUP is not ready after retry: %s", tag)

if ms {
opts = append(opts,
"--tso.binpath ./bin/pd-server",
"--scheduling.binpath ./bin/pd-server",
)
}

return strings.Join(opts, " ")
}
Loading

0 comments on commit ffa94f8

Please sign in to comment.