Skip to content

Commit

Permalink
feat(mysql): mysql清档重命名增加flush tables #8718
Browse files Browse the repository at this point in the history
  • Loading branch information
xfwduke committed Dec 23, 2024
1 parent 90d6b2b commit c2d0275
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"os/exec"
"strings"

"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-tools/dbactuator/pkg/components"
"dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/common"
"dbm-services/mysql/db-tools/dbactuator/pkg/components/rename_dbs/pkg"
"fmt"
"os"
"os/exec"
"strings"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func isTableTransClean(conn *sqlx.Conn, from, to string) (bool, error) {
return false, err
}

defer func() {
for _, t := range tables {
flushTable(conn, from, to, t)
}
}()

for _, table := range tables {
yes, err := IsTableExistsIn(conn, table, to)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,32 @@ func TransDBTables(conn *sqlx.Conn, from, to string, tables []string) ([]string,
return res, nil
}

func flushTable(conn *sqlx.Conn, from, to, tableName string) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err := conn.ExecContext(
ctx,
fmt.Sprintf("FLUSH TABLE `%s`.`%s`", from, tableName),
)
if err != nil {
logger.Error("flushing table `%s`.`%s` failed: %s", from, tableName, err.Error())
}

_, err = conn.ExecContext(
ctx,
fmt.Sprintf("FLUSH TABLE `%s`.`%s`", to, tableName),
)
if err != nil {
logger.Error("flushing table `%s`.`%s` failed: %s", to, tableName, err.Error())
}
}

func transDBTable(conn *sqlx.Conn, from, to, tableName string) ([]string, error) {
defer func() {
flushTable(conn, from, to, tableName)
}()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ func (c *OnMySQLComponent) instanceRecreateSourceTables(port int) error {
}

// 这里改成从 stage 库拿表列表, 在重试的时候才能幂等
//for _, table := range c.dbTablesMap[db] {
for _, table := range stageTables {
err := c.instanceRecreateSourceTable(port, db, stageDBName, table)
if err != nil {
Expand Down Expand Up @@ -349,6 +348,10 @@ func (c *OnMySQLComponent) instanceRecreateSourceTable(port int, dbName, stageDB
// return err
//}
//logger.Info("source table found in stage db on instance %d, try to truncate", port)
defer func() {
c.flushTable(dbName, tableName)
c.flushTable(stageDBName, tableName)
}()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -370,6 +373,19 @@ func (c *OnMySQLComponent) instanceRecreateSourceTable(port int, dbName, stageDB
return nil
}

func (c *OnMySQLComponent) flushTable(dbName, tableName string) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err := c.dbConn.ExecContext(
ctx,
fmt.Sprintf("FLUSH TABLE `%s`.`%s`", dbName, tableName),
)
if err != nil {
logger.Error("flush table %s.%s failed: %s", dbName, tableName, err.Error())
}
}

func (c *OnMySQLComponent) GenerateDropStageSQL() error {
var dropSQLs []string
for dbName := range c.dbTablesMap {
Expand Down

0 comments on commit c2d0275

Please sign in to comment.