Skip to content

Commit

Permalink
feat(mysql): dbbackup支持rocksdb备份数据导入 TencentBlueKing#6991
Browse files Browse the repository at this point in the history
  • Loading branch information
uriwang authored and zhangzhw8 committed Oct 22, 2024
1 parent 6c5f87b commit 9fa0667
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 61 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ pre-*-bkcodeai

bkcodeai.json
package-lock.json

*.swp
### PreCI ###
.codecc
.codecc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func loadData(cnf *config.BackupConfig, backupType string) error {

metaInfo, err := backupexe.ParseJsonFile(indexPath)
if err != nil {
logger.Log.Errorf("can not parse index file:%s, errmsg:%s", indexPath, err)
return err
}
if backupType != "" && metaInfo.BackupType != backupType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
)

const (
StorageEnginRocksdb = "rocksdb"
StorageEngineRocksdb = "rocksdb"
)

// backup role: dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/mysql.go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func BuildDumper(cnf *config.BackupConfig, storageEngine string) (dumper Dumper,
return nil, err
}

if cst.StorageEnginRocksdb == storageEngine {
if cst.StorageEngineRocksdb == storageEngine {
dumper = &PhysicalRocksdbDumper{
cfg: cnf,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,33 @@ import (
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/util"
)

// PhysicalRocksdbDumper physical rocksdb dumper
type PhysicalRocksdbDumper struct {
cfg *config.BackupConfig
backupLogfile string
dbbackupHome string
checkpointDir string
mysqlVersion string
isOfficial bool
rocksdbCmd string
storageEngine string
mysqlRole string
masterHost string
masterPort int
backupStartTime time.Time
backupEndTime time.Time
}

// buildArgs construct the instruction parameters for data recovery.
func (p *PhysicalRocksdbDumper) buildArgs() []string {

targetPath := filepath.Join(p.cfg.Public.BackupDir, p.cfg.Public.TargetName())

args := []string{
fmt.Sprintf("--host=%s", p.cfg.Public.MysqlHost),
fmt.Sprintf("--port=%d", p.cfg.Public.MysqlPort),
fmt.Sprintf("--user=%s", p.cfg.Public.MysqlUser),
fmt.Sprintf("--password=%s", p.cfg.Public.MysqlPasswd),
fmt.Sprintf("--host=%s", p.cfg.Public.MysqlHost),
fmt.Sprintf("--port=%d", p.cfg.Public.MysqlPort),
fmt.Sprintf("--checkpoint_dir=%s", p.checkpointDir),
fmt.Sprintf("--backup_dir=%s", targetPath),
"--stream=disabled",
Expand All @@ -56,6 +62,7 @@ func (p *PhysicalRocksdbDumper) buildArgs() []string {
return args
}

// initConfig init config
func (p *PhysicalRocksdbDumper) initConfig(mysqlVersion string) error {
if p.cfg == nil {
return errors.New("rocksdb physical dumper config missed")
Expand All @@ -68,55 +75,80 @@ func (p *PhysicalRocksdbDumper) initConfig(mysqlVersion string) error {
}

p.dbbackupHome = filepath.Dir(cmdPath)
db, err := mysqlconn.InitConn(&p.cfg.Public)

// connect to the mysql and obtain the base information
db, err := mysqlconn.InitConn(&p.cfg.Public)
if err != nil {
logger.Log.Errorf("can not connect to the mysql, host:%s, port:%d, errmsg:%s",
p.cfg.Public.MysqlHost, p.cfg.Public.MysqlPort, err)
return err
}

defer func() {
_ = db.Close()
}()

p.mysqlVersion, p.isOfficial = util.VersionParser(mysqlVersion)
p.storageEngine, err = mysqlconn.GetStorageEngine(db)

if err != nil {
logger.Log.Errorf("can not get the storage engine from the mysql, host:%s, port:%d, errmsg:%s",
p.cfg.Public.MysqlHost, p.cfg.Public.MysqlPort, err)
return err
}

// keep the storage engine name is lower
p.storageEngine = strings.ToLower(p.storageEngine)
p.mysqlRole = strings.ToLower(p.cfg.Public.MysqlRole)

defer func() {
_ = db.Close()
}()
// if the current node is slave, obtain the master ip and port
if p.mysqlRole == cst.RoleSlave || p.mysqlRole == cst.RoleRepeater {
p.masterHost, p.masterPort, err = mysqlconn.ShowMysqlSlaveStatus(db)
if err != nil {
logger.Log.Errorf("can not get the master host and port from the mysql, host:%s, port:%d, errmsg:%s",
p.cfg.Public.MysqlHost, p.cfg.Public.MysqlPort, err)
return err
}
}

p.checkpointDir = fmt.Sprintf("%s/MyRocks_checkpoint", p.cfg.Public.BackupDir)
p.rocksdbCmd = "/bin/" + cst.ToolMyrocksHotbackup
// set the base config
p.checkpointDir = filepath.Join(p.cfg.Public.BackupDir, "MyRocks_checkpoint")
p.rocksdbCmd = filepath.Join("bin", cst.ToolMyrocksHotbackup)
BackupTool = cst.ToolMyrocksHotbackup
return nil
}

// Execute Perform data recovery operations.
func (p *PhysicalRocksdbDumper) Execute(enableTimeOut bool) error {
p.backupStartTime = time.Now()
defer func() {
p.backupEndTime = time.Now()
}()

if p.storageEngine != cst.StorageEnginRocksdb {
err := fmt.Errorf("%s engine not support", p.storageEngine)
// the storage engine must be rocksdb
if p.storageEngine != cst.StorageEngineRocksdb {
err := fmt.Errorf("unsupported engine:%s, host:%s, port:%d", p.storageEngine,
p.cfg.Public.MysqlHost, p.cfg.Public.MysqlPort)
logger.Log.Error(err)
return err
}

// pre-created checkpoint dir
_, err := os.Stat(p.checkpointDir)
if os.IsNotExist(err) {
logger.Log.Infof("the checkpoint does not exist, will create it. checkpoint:%s", p.checkpointDir)
err = os.MkdirAll(p.checkpointDir, 0755)
}

if err != nil {
logger.Log.Errorf("failed to create checkpoint(%s), err-msg:%s", p.checkpointDir, err)
logger.Log.Errorf("can not create the checkpoint:%s, errmsg:%s", p.checkpointDir, err)
return err
}

binPath := filepath.Join(p.dbbackupHome, p.rocksdbCmd)
args := p.buildArgs()

// perform the dump operation
var cmd *exec.Cmd
backupCmd := fmt.Sprintf(`%s %s`, binPath, strings.Join(args, " "))

Expand All @@ -134,60 +166,50 @@ func (p *PhysicalRocksdbDumper) Execute(enableTimeOut bool) error {
cmd = exec.Command("sh", "-c", backupCmd)
}

backuplogFilename := fmt.Sprintf("%s_backup_%d_%d.log", p.storageEngine, p.cfg.Public.MysqlPort, int(time.Now().Weekday()))
rocksdbBackuplogFilename := filepath.Join(p.dbbackupHome, "logs", backuplogFilename)
// create a dumper log file to store the log of the dumper command
p.backupLogfile = fmt.Sprintf("dumper_%s_%s_%d_%d.log", p.storageEngine,
cst.ToolMyrocksHotbackup, p.cfg.Public.MysqlPort, int(time.Now().Weekday()))

p.backupLogfile = filepath.Join(p.dbbackupHome, "logs", p.backupLogfile)

outFile, err := os.Create(rocksdbBackuplogFilename)
// pre-created dump log file
outFile, err := os.Create(p.backupLogfile)

if err != nil {
logger.Log.Error("create log file failed: ", err)
logger.Log.Errorf("can not create the dumper log file, file name:%s, errmsg:%s", p.backupLogfile, err)
return err
}

defer func() {
_ = outFile.Close()
}()

// redirect standard output and error messages to a file
cmd.Stdout = outFile
cmd.Stderr = outFile
logger.Log.Info("rocksdb backup command: ", cmd.String())

// perform the dump command
err = cmd.Run()
if err != nil {
logger.Log.Error("run rocksdb physical backup failed: ", err)
logger.Log.Errorf("can not run the rocksdb physical dumper command:%s, engine:%s, errmsg:%s",
backupCmd, p.storageEngine, err)
return err
}

logger.Log.Infof("dump rocksdb success, command:%s", cmd.String())
return nil
}

// PrepareBackupMetaInfo generate the metadata of database backup
func (p *PhysicalRocksdbDumper) PrepareBackupMetaInfo(cnf *config.BackupConfig) (*dbareport.IndexContent, error) {
db, err := mysqlconn.InitConn(&cnf.Public)
if err != nil {
return nil, errors.WithMessage(err, "IndexContent")
}

defer func() {
_ = db.Close()
}()

storageEngine, err := mysqlconn.GetStorageEngine(db)
if err != nil {
return nil, err
}

storageEngine = strings.ToLower(storageEngine)

if storageEngine != "rocksdb" {
logger.Log.Errorf("unknown storage engine(%s)", storageEngine)
return nil, nil
}

// parse the binglog position
xtrabackupBinlogInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_binlog_info")
xtrabackupSlaveInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_slave_info")

tmpFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "tmp_dbbackup_go.txt")

// obtain the qpress command path
exepath, err := os.Executable()
if err != nil {
return nil, err
Expand All @@ -200,30 +222,38 @@ func (p *PhysicalRocksdbDumper) PrepareBackupMetaInfo(cnf *config.BackupConfig)
BinlogInfo: dbareport.BinlogStatusInfo{},
}

if masterStatus, err := parseXtraBinlogInfo(qpressPath, xtrabackupBinlogInfoFileName, tmpFileName); err != nil {
// parse the binlog
masterStatus, err := parseXtraBinlogInfo(qpressPath, xtrabackupBinlogInfoFileName, tmpFileName)
if err != nil {
logger.Log.Errorf("do not parse xtrabackup binlog file, file name:%s, errmsg:%s",
xtrabackupBinlogInfoFileName, err)
return nil, err
} else {
metaInfo.BinlogInfo.ShowMasterStatus = masterStatus
metaInfo.BinlogInfo.ShowMasterStatus.MasterHost = cnf.Public.MysqlHost
metaInfo.BinlogInfo.ShowMasterStatus.MasterPort = cnf.Public.MysqlPort
}

if mysqlRole := strings.ToLower(cnf.Public.MysqlRole); mysqlRole == cst.RoleSlave || mysqlRole == cst.RoleRepeater {
if slaveStatus, err := parseXtraSlaveInfo(qpressPath, xtrabackupSlaveInfoFileName, tmpFileName); err != nil {
// save the master node status
metaInfo.BinlogInfo.ShowMasterStatus = masterStatus
metaInfo.BinlogInfo.ShowMasterStatus.MasterHost = cnf.Public.MysqlHost
metaInfo.BinlogInfo.ShowMasterStatus.MasterPort = cnf.Public.MysqlPort

// parse the information of the master node
if p.mysqlRole == cst.RoleSlave || p.mysqlRole == cst.RoleRepeater {
slaveStatus, err := parseXtraSlaveInfo(qpressPath, xtrabackupSlaveInfoFileName, tmpFileName)

if err != nil {
logger.Log.Errorf("do not parse xtrabackup slave information, xtrabackup file:%s, errmsg:%s",
xtrabackupSlaveInfoFileName, err)
return nil, err
} else {
metaInfo.BinlogInfo.ShowSlaveStatus = slaveStatus
masterHost, masterPort, err := mysqlconn.ShowMysqlSlaveStatus(db)
if err != nil {
return nil, err
}
metaInfo.BinlogInfo.ShowSlaveStatus.MasterHost = masterHost
metaInfo.BinlogInfo.ShowSlaveStatus.MasterPort = masterPort
}

metaInfo.BinlogInfo.ShowSlaveStatus = slaveStatus
metaInfo.BinlogInfo.ShowSlaveStatus.MasterHost = p.masterHost
metaInfo.BinlogInfo.ShowSlaveStatus.MasterPort = p.masterPort
}

// teh mark indicating whether the update is a full backup or not
metaInfo.JudgeIsFullBackup(&cnf.Public)
if err = os.Remove(tmpFileName); err != nil {
logger.Log.Errorf("do not delete the tmp file, file name:%s, errmsg:%s", tmpFileName, err)
return &metaInfo, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backupexe
import (
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/config"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/dbareport"
"strings"
)

// ExecuteLoad execute load backup command
Expand All @@ -11,7 +12,8 @@ func ExecuteLoad(cnf *config.BackupConfig, indexFileContent *dbareport.IndexCont
return envErr
}

loader, err := BuildLoader(cnf, indexFileContent.BackupType, indexFileContent.BackupTool)
backupStorageEngine := strings.ToLower(indexFileContent.StorageEngine)
loader, err := BuildLoader(cnf, indexFileContent.BackupType, indexFileContent.BackupTool, backupStorageEngine)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func BackupGrant(cfg *config.Public) error {

rows, err := db.Query("select user, host from mysql.user where user not in ('ADMIN','yw','dba_bak_all_sel')")
if err != nil {
logger.Log.Error("can't send query to Mysql server %v\n", err)
logger.Log.Errorf("can't send query to Mysql server %v\n", err)
return err
}
defer rows.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type Loader interface {
}

// BuildLoader TODO
func BuildLoader(cnf *config.BackupConfig, backupType string, backupTool string) (loader Loader, err error) {
func BuildLoader(cnf *config.BackupConfig, backupType string, backupTool string, storageEngine string) (loader Loader, err error) {

if strings.ToLower(backupType) == cst.BackupLogical {
if backupTool == cst.ToolMysqldump {
// mysqldump 共用 LogicalLoad 参数
Expand Down Expand Up @@ -46,8 +47,15 @@ func BuildLoader(cnf *config.BackupConfig, backupType string, backupTool string)
if err := validate.GoValidateStruct(cnf.PhysicalLoad, false, false); err != nil {
return nil, err
}
loader = &PhysicalLoader{
cnf: cnf,

if cst.StorageEngineRocksdb == storageEngine {
loader = &PhysicalRocksdbLoader{
cfg: cnf,
}
} else {
loader = &PhysicalLoader{
cnf: cnf,
}
}
} else {
logger.Log.Error(fmt.Sprintf("Unknown BackupType: %s", backupType))
Expand Down
Loading

0 comments on commit 9fa0667

Please sign in to comment.