Skip to content

Commit

Permalink
issue-725: Pass schema to mysqlsh
Browse files Browse the repository at this point in the history
  • Loading branch information
shunki-fujita committed Aug 8, 2024
1 parent a80abf3 commit e3c98e5
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 11 deletions.
8 changes: 5 additions & 3 deletions backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ type RestoreManager struct {
keyPrefix string
restorePoint time.Time
workDir string
schema string
}

var ErrBadConnection = errors.New("the connection hasn't reflected the latest user's privileges")

func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time) (*RestoreManager, error) {
func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time, schema string) (*RestoreManager, error) {
log := zap.New(zap.WriteTo(os.Stderr), zap.StacktraceLevel(zapcore.DPanicLevel))
scheme := runtime.NewScheme()
if err := clientgoscheme.AddToScheme(scheme); err != nil {
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName,
keyPrefix: prefix,
restorePoint: restorePoint,
workDir: dir,
schema: schema,
}, nil
}

Expand Down Expand Up @@ -254,7 +256,7 @@ func (rm *RestoreManager) loadDump(ctx context.Context, op bkop.Operator, key st
return fmt.Errorf("failed to untar dump file: %w", err)
}

return op.LoadDump(ctx, dumpDir)
return op.LoadDump(ctx, dumpDir, rm.schema)
}

func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key string) error {
Expand Down Expand Up @@ -317,5 +319,5 @@ func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key
os.RemoveAll(tmpDir)
}()

return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint)
return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint, rm.schema)
}
12 changes: 8 additions & 4 deletions cmd/moco-backup/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var restoreCmd = &cobra.Command{
Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss",
Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss SCHEMA",
Short: "restore MySQL data from a backup",
Long: `Restore MySQL data from a backup.
Expand All @@ -21,8 +21,10 @@ SOURCE_NAMESPACE: The source MySQLCluster's namespace.
SOURCE_NAME: The source MySQLCluster's name.
NAMESPACE: The target MySQLCluster's namespace.
NAME: The target MySQLCluster's name.
YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423`,
Args: cobra.ExactArgs(6),
YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423
SCHEMA: The target schema to restore."`,

Args: cobra.ExactArgs(7),
RunE: func(cmd *cobra.Command, args []string) error {
maxRetry := 3
for i := 0; i < maxRetry; i++ {
Expand Down Expand Up @@ -54,6 +56,7 @@ func runRestore(cmd *cobra.Command, args []string) (e error) {
srcName := args[2]
namespace := args[3]
name := args[4]
schema := args[6]

restorePoint, err := time.Parse(constants.BackupTimeFormat, args[5])
if err != nil {
Expand All @@ -75,7 +78,8 @@ func runRestore(cmd *cobra.Command, args []string) (e error) {
namespace, name,
mysqlPassword,
commonArgs.threads,
restorePoint)
restorePoint,
schema)
if err != nil {
return fmt.Errorf("failed to create a restore manager: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,7 @@ func (r *MySQLClusterReconciler) reconcileV1RestoreJob(ctx context.Context, req
args = append(args, cluster.Spec.Restore.SourceNamespace, cluster.Spec.Restore.SourceName)
args = append(args, cluster.Namespace, cluster.Name)
args = append(args, cluster.Spec.Restore.RestorePoint.UTC().Format(constants.BackupTimeFormat))
args = append(args, cluster.Spec.Restore.Schema)

resources := corev1ac.ResourceRequirements()
if !noJobResource {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bkop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type Operator interface {
PrepareRestore(context.Context) error

// LoadDump loads data dumped by `DumpFull`.
LoadDump(ctx context.Context, dir string) error
LoadDump(ctx context.Context, dir string, schema string) error

// LoadBinLog applies binary logs up to `restorePoint`.
LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error
LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error

// FinishRestore sets global variables of the database instance after restoration.
FinishRestore(context.Context) error
Expand Down
10 changes: 8 additions & 2 deletions pkg/bkop/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (o operator) PrepareRestore(ctx context.Context) error {
return nil
}

func (o operator) LoadDump(ctx context.Context, dir string) error {
func (o operator) LoadDump(ctx context.Context, dir string, schema string) error {
args := []string{
fmt.Sprintf("mysql://%s@%s", o.user, net.JoinHostPort(o.host, fmt.Sprint(o.port))),
"-p" + o.password,
Expand All @@ -38,14 +38,17 @@ func (o operator) LoadDump(ctx context.Context, dir string) error {
"--deferTableIndexes=all",
"--updateGtidSet=replace",
}
if schema != "" {
args = append(args, "--includeSchemas="+schema)
}

cmd := exec.CommandContext(ctx, "mysqlsh", args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}

func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error {
func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error {
dirents, err := os.ReadDir(binlogDir)
if err != nil {
return err
Expand Down Expand Up @@ -88,6 +91,9 @@ func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, rest
//mysqlbinlog --stop-datetime="2021-05-13 10:45:00" log/binlog.000001 log/binlog.000002
// | mysql --binary-mode -h moco-single-primary.bar.svc -u moco-admin -p
binlogArgs := append([]string{"--stop-datetime=" + restorePoint.Format("2006-01-02 15:04:05")}, binlogFiles...)
if schema != "" {
binlogArgs = append(binlogArgs, "--database="+schema)
}
binlogCmd := exec.CommandContext(ctx, "mysqlbinlog", binlogArgs...)
binlogCmd.Stdout = pw
binlogCmd.Stderr = os.Stderr
Expand Down

0 comments on commit e3c98e5

Please sign in to comment.