diff --git a/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go b/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go index 584edcf31a..d3235e7acd 100644 --- a/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go +++ b/dbm-services/mysql/db-simulation/app/syntax/parse_relation_db.go @@ -14,6 +14,7 @@ import ( "bufio" "encoding/json" "errors" + "fmt" "io" "os" "runtime/debug" @@ -29,7 +30,8 @@ import ( const AnalyzeConcurrency = 10 // DoParseRelationDbs parse relation db from sql file -func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relationDbs []string, dumpAll bool, +func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relationDbs, allCommands []string, + dumpAll bool, err error) { logger.Info("doing....") tf.result = make(map[string]*CheckInfo) @@ -39,15 +41,13 @@ func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relati if !tf.IsLocalFile { if err = tf.Init(); err != nil { logger.Error("Do init failed %s", err.Error()) - return nil, nil, false, err + return nil, nil, nil, false, err } if err = tf.Downloadfile(); err != nil { logger.Error("failed to download sql file from the product library %s", err.Error()) - return nil, nil, false, err + return nil, nil, nil, false, err } } - // 最后删除临时目录,不会返回错误 - defer tf.delTempDir() logger.Info("all sqlfiles download ok ~") alreadExecutedSqlfileChan := make(chan string, len(tf.Param.FileNames)) @@ -59,12 +59,13 @@ func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relati }() logger.Info("start to analyze the parsing result") - createDbs, relationDbs, dumpAll, err = tf.doParseInchan(alreadExecutedSqlfileChan, version) + createDbs, relationDbs, allCommands, dumpAll, err = tf.doParseInchan(alreadExecutedSqlfileChan, version) if err != nil { logger.Error("failed to analyze the parsing result:%s", err.Error()) - return nil, nil, false, err + return nil, nil, nil, false, err } - logger.Info("createDbs:%v,relationDbs:%v,dumpAll:%v,err:%v", createDbs, relationDbs, dumpAll, err) + logger.Info("createDbs:%v,relationDbs:%v,allcomands%v,dumpAll:%v,err:%v", createDbs, relationDbs, allCommands, dumpAll, + err) dumpdbs := []string{} for _, d := range relationDbs { if slices.Contains(createDbs, d) { @@ -72,12 +73,12 @@ func (tf *TmysqlParseFile) DoParseRelationDbs(version string) (createDbs, relati } dumpdbs = append(dumpdbs, d) } - return lo.Uniq(createDbs), lo.Uniq(dumpdbs), dumpAll, nil + return lo.Uniq(createDbs), lo.Uniq(dumpdbs), lo.Uniq(allCommands), dumpAll, nil } // doParseInchan RelationDbs do parse relation db func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string, - mysqlVersion string) (createDbs []string, relationDbs []string, dumpAll bool, err error) { + mysqlVersion string) (createDbs []string, relationDbs []string, allCommands []string, dumpAll bool, err error) { var errs []error c := make(chan struct{}, AnalyzeConcurrency) errChan := make(chan error) @@ -90,21 +91,23 @@ func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string, c <- struct{}{} go func(fileName string) { defer wg.Done() - cdbs, dbs, dumpAllDbs, err := t.analyzeRelationDbs(fileName, mysqlVersion) + cdbs, dbs, commands, dumpAllDbs, err := t.analyzeRelationDbs(fileName, mysqlVersion) logger.Info("createDbs:%v,dbs:%v,dumpAllDbs:%v,err:%v", cdbs, dbs, dumpAllDbs, err) if err != nil { + logger.Error("analyzeRelationDbs failed %s", err.Error()) errChan <- err + return } // 如果有dumpall 则直接返回退出,不在继续分析 if dumpAllDbs { dumpAll = true <-c - wg.Done() stopChan <- struct{}{} } t.mu.Lock() relationDbs = append(relationDbs, dbs...) createDbs = append(createDbs, cdbs...) + allCommands = append(allCommands, commands...) t.mu.Unlock() <-c }(sqlfile) @@ -121,7 +124,7 @@ func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string, case err := <-errChan: errs = append(errs, err) case <-stopChan: - return createDbs, relationDbs, dumpAll, errors.Join(errs...) + return createDbs, relationDbs, allCommands, dumpAll, errors.Join(errs...) } } } @@ -130,6 +133,7 @@ func (t *TmysqlParse) doParseInchan(alreadExecutedSqlfileCh chan string, func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) ( createDbs []string, relationDbs []string, + allCommandType []string, dumpAll bool, err error) { defer func() { @@ -140,7 +144,7 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) ( f, err := os.Open(t.getAbsoutputfilePath(inputfileName, mysqlVersion)) if err != nil { logger.Error("open file failed %s", err.Error()) - return nil, nil, false, err + return nil, nil, nil, false, err } defer f.Close() reader := bufio.NewReader(f) @@ -151,7 +155,7 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) ( break } logger.Error("read Line Error %s", errx.Error()) - return nil, nil, false, errx + return nil, nil, nil, false, errx } if len(line) == 1 && line[0] == byte('\n') { continue @@ -159,16 +163,19 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) ( var res ParseLineQueryBase if err = json.Unmarshal(line, &res); err != nil { logger.Error("json unmasrshal line:%s failed %s", string(line), err.Error()) - return nil, nil, false, err + return nil, nil, nil, false, err } // 判断是否有语法错误 if res.ErrorCode != 0 { - return nil, nil, false, err + return nil, nil, nil, false, fmt.Errorf("%s", res.ErrorMsg) + } + if lo.IsNotEmpty(res.Command) { + allCommandType = append(allCommandType, res.Command) } if slices.Contains([]string{SQLTypeCreateProcedure, SQLTypeCreateFunction, SQLTypeCreateView, SQLTypeCreateTrigger, SQLTypeInsertSelect, SQLTypeRelaceSelect}, res.Command) { - return nil, nil, true, nil + return nil, nil, nil, true, nil } if lo.IsEmpty(res.DbName) { continue @@ -181,5 +188,74 @@ func (t *TmysqlParse) analyzeRelationDbs(inputfileName, mysqlVersion string) ( relationDbs = append(relationDbs, res.DbName) } - return createDbs, relationDbs, false, nil + return createDbs, relationDbs, allCommandType, false, nil +} + +// ParseSpecialTbls parse special tables +func (tf *TmysqlParseFile) ParseSpecialTbls(mysqlVersion string) (relationTbls []RelationTbl, err error) { + m := make(map[string][]string) + for _, fileName := range tf.Param.FileNames { + mm, err := tf.parseSpecialSQLFile(fileName, mysqlVersion) + if err != nil { + logger.Error("parseAlterSQLFile failed %s", err.Error()) + return nil, err + } + for k, v := range mm { + m[k] = append(m[k], v...) + } + } + for k, v := range m { + relationTbls = append(relationTbls, RelationTbl{ + DbName: k, + Tbls: v, + }) + } + return relationTbls, nil +} + +// RelationTbl dunmp db and table +type RelationTbl struct { + DbName string `json:"db_name"` + Tbls []string `json:"tbls"` +} + +// parseSpecialSQLFile 解析指定库表 +func (t *TmysqlParse) parseSpecialSQLFile(inputfileName, mysqlVersion string) (m map[string][]string, err error) { + f, err := os.Open(t.getAbsoutputfilePath(inputfileName, mysqlVersion)) + if err != nil { + logger.Error("open file failed %s", err.Error()) + return nil, err + } + m = make(map[string][]string) + defer f.Close() + reader := bufio.NewReader(f) + for { + line, errx := reader.ReadBytes(byte('\n')) + if errx != nil { + if errx == io.EOF { + break + } + logger.Error("read Line Error %s", errx.Error()) + return nil, errx + } + if len(line) == 1 && line[0] == byte('\n') { + continue + } + var baseRes ParseIncludeTableBase + if err = json.Unmarshal(line, &baseRes); err != nil { + logger.Error("json unmasrshal line:%s failed %s", string(line), err.Error()) + return nil, err + } + dbName := "" + if baseRes.Command == SQLTypeUseDb { + dbName = baseRes.DbName + } + if lo.IsNotEmpty(baseRes.DbName) { + dbName = baseRes.DbName + } + if lo.IsNotEmpty(baseRes.TableName) { + m[dbName] = append(m[dbName], baseRes.TableName) + } + } + return m, nil } diff --git a/dbm-services/mysql/db-simulation/app/syntax/syntax.go b/dbm-services/mysql/db-simulation/app/syntax/syntax.go index d601990f85..a012923a42 100644 --- a/dbm-services/mysql/db-simulation/app/syntax/syntax.go +++ b/dbm-services/mysql/db-simulation/app/syntax/syntax.go @@ -128,7 +128,7 @@ func (tf *TmysqlParseFile) Do(dbtype string, versions []string) (result map[stri } } // 最后删除临时目录,不会返回错误 - defer tf.delTempDir() + defer tf.DelTempDir() var errs []error for _, version := range versions { @@ -184,7 +184,7 @@ func (tf *TmysqlParseFile) CreateAndUploadDDLTblFile() (err error) { } // 最后删除临时目录,不会返回错误 // 暂时屏蔽 观察过程文件 - defer tf.delTempDir() + defer tf.DelTempDir() if err = tf.Downloadfile(); err != nil { logger.Error("failed to download sql file from the product library %s", err.Error()) @@ -248,7 +248,8 @@ func (t *TmysqlParse) Init() (err error) { return nil } -func (t *TmysqlParse) delTempDir() { +// DelTempDir TODO +func (t *TmysqlParse) DelTempDir() { if err := os.RemoveAll(t.tmpWorkdir); err != nil { logger.Warn("remove tempDir:" + t.tmpWorkdir + ".error info:" + err.Error()) } diff --git a/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go b/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go index 9733443be2..06aab49b48 100644 --- a/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go +++ b/dbm-services/mysql/db-simulation/app/syntax/tmysqlpase_schema.go @@ -49,6 +49,10 @@ const ( SQLTypeInsertSelect = "insert_select" // SQLTypeRelaceSelect replace select sql SQLTypeRelaceSelect = "replace_select" + // SQLTypeDropTable drop table sql + SQLTypeDropTable = "drop_table" + // SQLTypeCreateIndex is creat table sql + SQLTypeCreateIndex = "create_index" ) // NotAllowedDefaulValColMap 不允许默认值的字段 @@ -309,3 +313,11 @@ type UpdateResult struct { HasWhere bool `json:"has_where"` Limit int `json:"limit"` } + +// ParseIncludeTableBase parse include table +type ParseIncludeTableBase struct { + QueryID int `json:"query_id"` + Command string `json:"command"` + DbName string `json:"db_name"` + TableName string `json:"table_name"` +} diff --git a/dbm-services/mysql/db-simulation/handler/syntax_check.go b/dbm-services/mysql/db-simulation/handler/syntax_check.go index aec1b8743f..0416504e1d 100644 --- a/dbm-services/mysql/db-simulation/handler/syntax_check.go +++ b/dbm-services/mysql/db-simulation/handler/syntax_check.go @@ -17,6 +17,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/samber/lo" "github.com/spf13/viper" "dbm-services/common/go-pubpkg/cmutil" @@ -251,11 +252,29 @@ func (s SyntaxHandler) ParseSQLFileRelationDb(r *gin.Context) { FileNames: param.Files, }, } - createDbs, dbs, dumpall, err := p.DoParseRelationDbs("") + createDbs, dbs, allCommands, dumpall, err := p.DoParseRelationDbs("") if err != nil { s.SendResponse(r, err, nil) return } + // 如果所有的命令都是alter table, dump指定库表 + logger.Debug("debug: %v,%d", allCommands, len(allCommands)) + if isAllOperateTable(allCommands) || isAllCreateTable(allCommands) { + relationTbls, err := p.ParseSpecialTbls("") + if err != nil { + s.SendResponse(r, err, nil) + return + } + s.SendResponse(r, nil, gin.H{ + "create_dbs": createDbs, + "dbs": dbs, + "dump_all": false, + "just_dump_special_tbls": true, + "special_tbls": relationTbls, + "timestamp": time.Now().Unix(), + }) + return + } s.SendResponse(r, nil, gin.H{ "create_dbs": createDbs, @@ -265,6 +284,15 @@ func (s SyntaxHandler) ParseSQLFileRelationDb(r *gin.Context) { }) } +func isAllOperateTable(allCommands []string) bool { + return lo.Every([]string{syntax.SQLTypeAlterTable, syntax.SQLTypeUseDb, + syntax.SQLTypeCreateIndex, syntax.SQLTypeDropTable}, allCommands) +} + +func isAllCreateTable(allCommands []string) bool { + return lo.Every([]string{syntax.SQLTypeCreateTable, syntax.SQLTypeUseDb}, allCommands) +} + // ParseSQLRelationDb 语法检查入参SQL string func (s *SyntaxHandler) ParseSQLRelationDb(r *gin.Context) { var param CheckSQLStringParam @@ -298,11 +326,31 @@ func (s *SyntaxHandler) ParseSQLRelationDb(r *gin.Context) { FileNames: []string{fileName}, }, } - createDbs, dbs, dumpall, err := p.DoParseRelationDbs("") + // defer p.DelTempDir() + createDbs, dbs, allCommands, dumpall, err := p.DoParseRelationDbs("") if err != nil { s.SendResponse(r, err, nil) return } + // 如果所有的命令都是alter table, dump指定库表 + logger.Info("make debug: %v,%d", allCommands, len(allCommands)) + if isAllOperateTable(allCommands) || isAllCreateTable(allCommands) { + relationTbls, err := p.ParseSpecialTbls("") + if err != nil { + s.SendResponse(r, err, nil) + return + } + s.SendResponse(r, nil, gin.H{ + "create_dbs": createDbs, + "dbs": dbs, + "dump_all": false, + "just_dump_special_tbls": true, + "special_tbls": relationTbls, + "timestamp": time.Now().Unix(), + }) + return + } + s.SendResponse(r, nil, gin.H{ "create_dbs": createDbs, "dbs": dbs, diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go index 228354254d..ac0f5a8a3d 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go @@ -629,7 +629,7 @@ func (m MysqlUpgradeComp) mysqlUpgrade(conn *native.DbWorker, port int) (err err return nil } // open general_log - if errx := m.openGeneralLog(conn); err != nil { + if errx := m.openGeneralLog(conn); errx != nil { logger.Warn("set global general_log=on failed %s", errx.Error()) } upgradeScript := "" diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go index 93c8c17a3d..f67a7080df 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/semantic_dump_schema.go @@ -55,12 +55,19 @@ type DumpSchemaParam struct { ParseNeedDumpDbs []string `json:"parse_need_dump_dbs"` // SQL 语句中解析出来的create database dbs // 需要导出的原因是复现 create database 是否已经存在的错误 - ParseCreateDbs []string `json:"parse_create_dbs"` - ExecuteObjects []ExecuteSQLFileObj `json:"execute_objects"` - + ParseCreateDbs []string `json:"parse_create_dbs"` + ExecuteObjects []ExecuteSQLFileObj `json:"execute_objects"` + JustDumpSpecialTbls bool `json:"just_dump_special_tbls"` + SpecialTbls []SpecialTblInfo `json:"special_tbls"` UploadBkRepoParam } +// SpecialTblInfo TODO +type SpecialTblInfo struct { + DbName string `json:"db_name"` + Tbls []string `json:"tbls"` +} + // UploadBkRepoParam upload to bk repo param type UploadBkRepoParam struct { BackupFileName string `json:"backup_file_name"` @@ -195,7 +202,7 @@ func (c *SemanticDumpSchemaComp) getDumpdbs(alldbs []string, version string) (re if c.Params.DumpAll { logger.Info("param is dump all") reg := regexp.MustCompile(`^bak_cbs`) - newBackupDbreg := regexp.MustCompile(`^stage_truncate`) + newBackupDbreg := regexp.MustCompile(fmt.Sprintf("^%s", cst.StageDbHeader)) for _, db := range dbsExcluesysdbs { if reg.MatchString(db) { continue @@ -223,7 +230,7 @@ func (c *SemanticDumpSchemaComp) getDumpdbs(alldbs []string, version string) (re finaldbs = append(finaldbs, realexcutedbs...) } createSQLExistDbs := lo.Intersect(alldbs, c.Params.ParseCreateDbs) - finaldbs = append(finaldbs, c.Params.ParseNeedDumpDbs...) + finaldbs = append(finaldbs, lo.Intersect(alldbs, c.Params.ParseNeedDumpDbs)...) finaldbs = append(finaldbs, createSQLExistDbs...) } logger.Info("dump dbs:%v", finaldbs) @@ -280,7 +287,38 @@ func (c *SemanticDumpSchemaComp) DumpSchema() (err error) { dumpOption.GtidPurgedOff = true c.useTmysqldump = false } - dumper = &mysqlutil.MySQLDumperTogether{ + switch { + case c.Params.JustDumpSpecialTbls: + return c.DumpSpecialTables(dumpOption) + default: + dumper = &mysqlutil.MySQLDumperTogether{ + MySQLDumper: mysqlutil.MySQLDumper{ + DumpDir: c.Params.BackupDir, + Ip: c.Params.Host, + Port: c.Params.Port, + DbBackupUser: c.GeneralParam.RuntimeAccountParam.AdminUser, + DbBackupPwd: c.GeneralParam.RuntimeAccountParam.AdminPwd, + DbNames: c.dbs, + IgnoreTables: c.ignoreTables, + DumpCmdFile: c.dumpCmd, + Charset: c.charset, + MySQLDumpOption: dumpOption, + }, + UseTMySQLDump: c.useTmysqldump, + OutputfileName: c.Params.BackupFileName, + } + if err := dumper.Dump(); err != nil { + logger.Error("dump failed: %s", err.Error()) + return err + } + return nil + } +} + +// DumpSpecialTables dump zhi special tables +func (c *SemanticDumpSchemaComp) DumpSpecialTables(dumpOption mysqlutil.MySQLDumpOption) (err error) { + dumpOption.Force = true + dumper := &mysqlutil.MySQLDumperAppend{ MySQLDumper: mysqlutil.MySQLDumper{ DumpDir: c.Params.BackupDir, Ip: c.Params.Host, @@ -293,7 +331,13 @@ func (c *SemanticDumpSchemaComp) DumpSchema() (err error) { Charset: c.charset, MySQLDumpOption: dumpOption, }, - UseTMySQLDump: c.useTmysqldump, + DumpMap: lo.SliceToMap(c.Params.SpecialTbls, func(item SpecialTblInfo) (string, []string) { + if len(item.Tbls) > 0 { + return item.DbName, + item.Tbls + } + return item.DbName, []string{} + }), OutputfileName: c.Params.BackupFileName, } if err := dumper.Dump(); err != nil { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go index 5a8e76e959..98428fb2d8 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/install_mysql_proxy.go @@ -114,7 +114,7 @@ func (i *InstallMySQLProxyComp) Init() (err error) { /** * @description: 预检查: * - 检查是否存在安装proxy的路径 - * - 检查是否存在proxy processs + * - 检查是否存在proxy processes * - 检查安装包是否存在,如果存在检查md5是否正确 * - * @return {*} @@ -312,8 +312,7 @@ func (i *InstallMySQLProxyComp) DecompressPkg() (err error) { proxyRealDirName, ) if output, err := osutil.ExecShellCommand(false, extraCmd); err != nil { - err := fmt.Errorf("execute shell[%s] get an error:%w and output:%s", extraCmd, err, output) - return err + return fmt.Errorf("execute shell[%s] get an error:%w and output:%s", extraCmd, err, output) } logger.Info("untar %s successfully", i.Params.Pkg) return nil diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go index 2ea22e41ea..b76a476612 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/uninstall_mysql_proxy.go @@ -76,20 +76,20 @@ func (u *UnInstallMySQLProxyComp) Init() (err error) { func (u *UnInstallMySQLProxyComp) PreCheck() (err error) { for _, port := range u.Params.Ports { if !u.Params.Force { - db, err := native.InsObject{ + db, errx := native.InsObject{ Host: u.Params.Host, User: u.runTimeCtx.proxyAdminUser, Pwd: u.proxyAdminPwd, Port: port, }.ConnProxyAdmin() - if err != nil { - logger.Error("连接%d的Admin Port 失败%s", port, err.Error()) - return err + if errx != nil { + logger.Error("连接%d的Admin Port 失败:%s", port, errx.Error()) + return errx } - inuse, err := db.CheckProxyInUse() - if err != nil { - logger.Error("检查Proxy可用性检查失败") - return err + inuse, errx := db.CheckProxyInUse() + if errx != nil { + logger.Error("检查Proxy可用性检查失败:%s", errx.Error()) + return errx } if inuse { return fmt.Errorf("检测到%d存在可用连接", port) diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go index f79ad154a5..1ef299de13 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql_proxy/upgrade_mysql_proxy.go @@ -122,7 +122,7 @@ func (p *UpgradeProxyComp) checkAppProcessist() (err error) { if len(activeprocesslist) > 0 { errMsg := fmt.Sprintf("还存在活跃的业务连接,请先确认,具体连接%v", activeprocesslist) logger.Error(errMsg) - return fmt.Errorf(errMsg) + return fmt.Errorf("%s", errMsg) } } return nil diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go b/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go index 152c85a3c7..0a7c8561da 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/cst.go @@ -32,6 +32,8 @@ const ( const ( // DefaultCharset default charset DefaultCharset = "default" + // StageDbHeader backup db header + StageDbHeader = "stage_truncate" ) // GetNowTimeLayoutStr 20060102150405 diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go index 6d4e40557f..610dbe7e0e 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go @@ -120,6 +120,7 @@ func (h *DbWorker) ExecMore(sqls []string) (rowsAffectedCount int64, err error) return h.ExecMoreContext(sqls, ctx) } +// ExecMoreContext TODO func (h *DbWorker) ExecMoreContext(sqls []string, ctx context.Context) (rowsAffected int64, err error) { var c int64 db, err := h.Db.Conn(ctx) diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go index 0786b2acda..2604cf13ce 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go @@ -11,6 +11,7 @@ package mysqlutil import ( + "bufio" "errors" "fmt" "os" @@ -18,6 +19,7 @@ import ( "path/filepath" "regexp" "runtime" + "slices" "strings" "sync" @@ -54,6 +56,7 @@ type MySQLDumpOption struct { GtidPurgedOff bool // --set-gtid-purged=OFF Quick bool ExtendedInsert bool + Force bool } type runtimectx struct { @@ -111,7 +114,7 @@ func (m MySQLDumper) Dump() (err error) { <-concurrencyControl }() errFile := path.Join(dump.DumpDir, fmt.Sprintf("%s.err", db)) - dumpCmd := dump.getDumpCmd(outputFile, errFile, "") + dumpCmd := dump.getDumpCmd(outputFile, errFile, "", false) logger.Info("mysqldump cmd:%s", mysqlcomm.RemovePassword(dumpCmd)) output, err := osutil.StandardShellCommand(false, dumpCmd) if err != nil { @@ -198,7 +201,7 @@ func (m *MySQLDumperTogether) Dump() (err error) { logger.Error("errFile:%s", errFileContext) } }() - dumpCmd := m.getDumpCmd(outputFile, errFile, dumpOption) + dumpCmd := m.getDumpCmd(outputFile, errFile, dumpOption, false) logger.Info("mysqldump cmd:%s", mysqlcomm.ClearSensitiveInformation(dumpCmd)) output, err := osutil.StandardShellCommand(false, dumpCmd) if err != nil { @@ -211,6 +214,98 @@ func (m *MySQLDumperTogether) Dump() (err error) { return } +// MySQLDumperAppend 不同库表导出到同一个文件 +type MySQLDumperAppend struct { + MySQLDumper + OutputfileName string + DumpMap map[string][]string +} + +// Dump do dump +func (m *MySQLDumperAppend) Dump() (err error) { + outputFile := path.Join(m.DumpDir, m.OutputfileName) + errFile := path.Join(m.DumpDir, m.OutputfileName+".err") + fd, err := os.OpenFile(outputFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open file failed %s", err.Error()) + } + _, err = fd.WriteString("-- dump schema for dbm simulation\n") + if err != nil { + logger.Error("write file failed %s", err.Error()) + return err + } + defer func() { + if err != nil { + if cmutil.FileExists(errFile) { + errMsg, errx := osutil.ReadFileString(errFile) + if errx != nil { + logger.Error("read errFile failed %s", errx.Error()) + } + logger.Error("errFile contenxt:%s", errMsg) + } + } + }() + defer fd.Close() + inputdbs := m.DbNames + for db, tables := range m.DumpMap { + var realdbs []string + if lo.IsNotEmpty(db) { + // inputdbs是实际存在的库 + // 如果dumpMap中的库不在inputdbs中,直接跳过 + // 让错误在模拟执行中体现 + if !slices.Contains(inputdbs, db) { + logger.Warn("db %s not in inputdbs %v", db, inputdbs) + continue + } + realdbs = []string{db} + } else { + realdbs = inputdbs + } + for _, realdb := range realdbs { + _, err = fd.WriteString(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`;\n USE `%s`;\n", realdb, realdb)) + if err != nil { + return fmt.Errorf("write file failed %s", err.Error()) + } + m.Tables = lo.Uniq(tables) + m.DbNames = []string{realdb} + dumpCmd := m.getDumpCmd(outputFile, errFile, "", true) + logger.Info("mysqldump cmd:%s", mysqlcomm.ClearSensitiveInformation(dumpCmd)) + output, errx := osutil.StandardShellCommand(false, dumpCmd) + if errx != nil { + if err = dumpIsOk(errFile); err == nil { + continue + } + return fmt.Errorf("execte %s get an error:%s,%w", dumpCmd, output, errx) + } + if err = checkDumpComplete(outputFile); err != nil { + logger.Error("checkDumpComplete failed %s", err.Error()) + return err + } + } + } + return err +} + +func dumpIsOk(errLog string) (err error) { + fd, err := os.Open(errLog) + if err != nil { + return err + } + r := regexp.MustCompile(`Couldn't find table:`) + var lines []string + scanner := bufio.NewScanner(fd) + for scanner.Scan() { + l := scanner.Text() + if !r.MatchString(l) && lo.IsNotEmpty(l) { + lines = append(lines, l) + } + } + if len(lines) > 0 { + return fmt.Errorf("%s", strings.Join(lines, "\n")) + } + return +} + /* mysqldump 参数说明: -B --databases :后面指定的 db 名字空格分隔,例如 --databases db1 db2 >> aaa.sql @@ -247,7 +342,7 @@ DumpSchema 功能概述: >/data/dbbak/$dump_file.$old_db_name 2>/data/dbbak/$dump_file.$old_db_name.$SUBJOB_ID.err; */ // nolint -func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string) (dumpCmd string) { +func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string, appendOutput bool) (dumpCmd string) { switch { case m.DumpData && m.DumpSchema: @@ -298,6 +393,9 @@ func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string) (dumpCm if m.Charset != "" { // charset 可能为空 dumpOption += " --default-character-set=" + m.Charset } + if m.Force { + dumpOption += " -f " + } dumpCmd = fmt.Sprintf( `%s -h%s -P%d -u%s -p%s --skip-opt --create-options --single-transaction --max-allowed-packet=1G -q --no-autocommit %s`, m.DumpCmdFile, @@ -329,8 +427,10 @@ func (m *MySQLDumper) getDumpCmd(outputFile, errFile, dumpOption string) (dumpCm dumpCmd += fmt.Sprintf(" --ignore-table=%s", igTb) } } - mysqlDumpCmd := fmt.Sprintf("%s > %s 2>%s", dumpCmd, outputFile, errFile) + if appendOutput { + mysqlDumpCmd = fmt.Sprintf("%s >> %s 2>>%s", dumpCmd, outputFile, errFile) + } return strings.ReplaceAll(mysqlDumpCmd, "\n", " ") } diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py index 311723c1e0..7a55ea0d9b 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/import_sqlfile_flow.py @@ -32,7 +32,11 @@ from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload -from backend.flow.utils.mysql.mysql_commom_query import parse_db_from_sqlfile, query_mysql_variables +from backend.flow.utils.mysql.mysql_commom_query import ( + merge_resp_to_cluster, + parse_db_from_sqlfile, + query_mysql_variables, +) from backend.ticket.constants import TicketType logger = logging.getLogger("flow") @@ -197,10 +201,7 @@ def sql_semantic_check_flow(self): if resp is None: logger.warning("root id:[{}]parse db from sqlfile resp is None,set dump_all to True.".format(self.root_id)) else: - logger.info(f"resp: {resp}") - template_cluster["dump_all"] = resp.get("dump_all") - template_cluster["parse_need_dump_dbs"] = resp.get("dbs") - template_cluster["parse_create_dbs"] = resp.get("create_dbs") + template_cluster.update(merge_resp_to_cluster(resp)) template_cluster["semantic_dump_schema_file_name_suffix"] = self.semantic_dump_schema_file_name_suffix template_cluster["execute_objects"] = self.data["execute_objects"] semantic_check_pipeline.add_act( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py index a185941d97..c394a23a50 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_proxy_upgrade.py @@ -107,7 +107,7 @@ def upgrade_mysql_proxy_flow(self): uid=self.uid, root_id=self.root_id, cluster=cluster_obj, - is_check_client_conn=not self.force_upgrade, + is_check_client_conn=True, is_proxy=True, check_client_conn_inst=proxy_ins, ) @@ -134,7 +134,7 @@ def upgrade_mysql_proxy_flow(self): pkg_id=pkg_id, proxy_version=get_sub_version_by_pkg_name(proxy_pkg.name), proxy_ports=ports, - force_upgrade=False, + force_upgrade=self.force_upgrade, ) ) # 最后一个节点无需再确认 diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py index fabfe66309..31a364d7df 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/import_sqlfile_flow.py @@ -33,7 +33,7 @@ from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload -from backend.flow.utils.mysql.mysql_commom_query import parse_db_from_sqlfile +from backend.flow.utils.mysql.mysql_commom_query import merge_resp_to_cluster, parse_db_from_sqlfile from backend.flow.utils.mysql.mysql_version_parse import major_version_parse from backend.flow.utils.spider.spider_bk_config import get_spider_version_and_charset from backend.ticket.constants import TicketType @@ -183,10 +183,7 @@ def sql_semantic_check_flow(self): if resp is None: logger.warning("root id:[{}]parse db from sqlfile resp is None,set dump_all to True.".format(self.root_id)) else: - logger.info(f"resp: {resp}") - cluster["dump_all"] = resp.get("dump_all") - cluster["parse_need_dump_dbs"] = resp.get("dbs") - cluster["parse_create_dbs"] = resp.get("create_dbs") + cluster.update(merge_resp_to_cluster(resp)) cluster["execute_objects"] = self.data["execute_objects"] cluster["semantic_dump_schema_file_name_suffix"] = self.semantic_dump_schema_file_name_suffix semantic_check_pipeline.add_act( diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 9f76868506..f71ac29cb5 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -918,6 +918,8 @@ def get_semantic_dump_schema_payload(self, **kwargs): "parse_need_dump_dbs": self.cluster.get("parse_need_dump_dbs", []), "parse_create_dbs": self.cluster.get("parse_create_dbs", []), "execute_objects": self.cluster.get("execute_objects", None), + "just_dump_special_tbls": self.cluster.get("just_dump_special_tbls", False), + "special_tbls": self.cluster.get("special_tbls", []), "backup_file_name": f"{self.cluster['semantic_dump_schema_file_name']}", "backup_file_name_suffix": f"{self.cluster['semantic_dump_schema_file_name_suffix']}", "backup_dir": BK_PKG_INSTALL_PATH, diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py index 5b168607f0..ce591f1fc0 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_commom_query.py @@ -176,6 +176,20 @@ def parse_db_from_sqlfile(path: str, files: List[str]): return None +def merge_resp_to_cluster(resp: dict): + """ + 合并返回的数据到集群 + """ + dump_schema_payload = {} + logger.info(f"resp: {resp}") + dump_schema_payload["dump_all"] = resp.get("dump_all") + dump_schema_payload["parse_need_dump_dbs"] = resp.get("dbs") + dump_schema_payload["parse_create_dbs"] = resp.get("create_dbs") + dump_schema_payload["just_dump_special_tbls"] = resp.get("just_dump_special_tbls") + dump_schema_payload["special_tbls"] = resp.get("special_tbls") + return dump_schema_payload + + def create_tdbctl_user_for_remote(cluster: Cluster, ctl_primary: str, new_ip: str, new_port: int, tdbctl_pass: str): """ 给新的remote实例对中控primary授权