diff --git a/backup/restore.go b/backup/restore.go index 441ff299e..ca49f168d 100644 --- a/backup/restore.go +++ b/backup/restore.go @@ -42,11 +42,12 @@ type RestoreManager struct { keyPrefix string restorePoint time.Time workDir string + schema string } var ErrBadConnection = errors.New("the connection hasn't reflected the latest user's privileges") -func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time) (*RestoreManager, error) { +func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time, schema string) (*RestoreManager, error) { log := zap.New(zap.WriteTo(os.Stderr), zap.StacktraceLevel(zapcore.DPanicLevel)) scheme := runtime.NewScheme() if err := clientgoscheme.AddToScheme(scheme); err != nil { @@ -74,6 +75,7 @@ func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, keyPrefix: prefix, restorePoint: restorePoint, workDir: dir, + schema: schema, }, nil } @@ -254,7 +256,7 @@ func (rm *RestoreManager) loadDump(ctx context.Context, op bkop.Operator, key st return fmt.Errorf("failed to untar dump file: %w", err) } - return op.LoadDump(ctx, dumpDir) + return op.LoadDump(ctx, dumpDir, rm.schema) } func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key string) error { @@ -317,5 +319,5 @@ func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key os.RemoveAll(tmpDir) }() - return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint) + return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint, rm.schema) } diff --git a/cmd/moco-backup/cmd/restore.go b/cmd/moco-backup/cmd/restore.go index d70558704..2ea16da20 100644 --- a/cmd/moco-backup/cmd/restore.go +++ b/cmd/moco-backup/cmd/restore.go @@ -12,7 +12,7 @@ import ( ) var restoreCmd = &cobra.Command{ - Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss", + Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss SCHEMA", Short: "restore MySQL data from a backup", Long: `Restore MySQL data from a backup. @@ -21,8 +21,10 @@ SOURCE_NAMESPACE: The source MySQLCluster's namespace. SOURCE_NAME: The source MySQLCluster's name. NAMESPACE: The target MySQLCluster's namespace. NAME: The target MySQLCluster's name. -YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423`, - Args: cobra.ExactArgs(6), +YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423 +SCHEMA: The target schema to restore."`, + + Args: cobra.ExactArgs(7), RunE: func(cmd *cobra.Command, args []string) error { maxRetry := 3 for i := 0; i < maxRetry; i++ { @@ -54,6 +56,7 @@ func runRestore(cmd *cobra.Command, args []string) (e error) { srcName := args[2] namespace := args[3] name := args[4] + schema := args[6] restorePoint, err := time.Parse(constants.BackupTimeFormat, args[5]) if err != nil { @@ -75,7 +78,8 @@ func runRestore(cmd *cobra.Command, args []string) (e error) { namespace, name, mysqlPassword, commonArgs.threads, - restorePoint) + restorePoint, + schema) if err != nil { return fmt.Errorf("failed to create a restore manager: %w", err) } diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 07a5cbba5..5f29d5510 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -1448,6 +1448,7 @@ func (r *MySQLClusterReconciler) reconcileV1RestoreJob(ctx context.Context, req args = append(args, cluster.Spec.Restore.SourceNamespace, cluster.Spec.Restore.SourceName) args = append(args, cluster.Namespace, cluster.Name) args = append(args, cluster.Spec.Restore.RestorePoint.UTC().Format(constants.BackupTimeFormat)) + args = append(args, cluster.Spec.Restore.Schema) resources := corev1ac.ResourceRequirements() if !noJobResource { diff --git a/pkg/bkop/operator.go b/pkg/bkop/operator.go index bc7e8d99d..6e71fc69c 100644 --- a/pkg/bkop/operator.go +++ b/pkg/bkop/operator.go @@ -37,10 +37,10 @@ type Operator interface { PrepareRestore(context.Context) error // LoadDump loads data dumped by `DumpFull`. - LoadDump(ctx context.Context, dir string) error + LoadDump(ctx context.Context, dir string, schema string) error // LoadBinLog applies binary logs up to `restorePoint`. - LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error + LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error // FinishRestore sets global variables of the database instance after restoration. FinishRestore(context.Context) error diff --git a/pkg/bkop/restore.go b/pkg/bkop/restore.go index 6d011616a..f9907f298 100644 --- a/pkg/bkop/restore.go +++ b/pkg/bkop/restore.go @@ -21,7 +21,7 @@ func (o operator) PrepareRestore(ctx context.Context) error { return nil } -func (o operator) LoadDump(ctx context.Context, dir string) error { +func (o operator) LoadDump(ctx context.Context, dir string, schema string) error { args := []string{ fmt.Sprintf("mysql://%s@%s", o.user, net.JoinHostPort(o.host, fmt.Sprint(o.port))), "-p" + o.password, @@ -38,6 +38,9 @@ func (o operator) LoadDump(ctx context.Context, dir string) error { "--deferTableIndexes=all", "--updateGtidSet=replace", } + if schema != "" { + args = append(args, "--includeSchemas="+schema) + } cmd := exec.CommandContext(ctx, "mysqlsh", args...) cmd.Stdout = os.Stdout @@ -45,7 +48,7 @@ func (o operator) LoadDump(ctx context.Context, dir string) error { return cmd.Run() } -func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error { +func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error { dirents, err := os.ReadDir(binlogDir) if err != nil { return err @@ -88,6 +91,9 @@ func (o operator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, rest //mysqlbinlog --stop-datetime="2021-05-13 10:45:00" log/binlog.000001 log/binlog.000002 // | mysql --binary-mode -h moco-single-primary.bar.svc -u moco-admin -p binlogArgs := append([]string{"--stop-datetime=" + restorePoint.Format("2006-01-02 15:04:05")}, binlogFiles...) + if schema != "" { + binlogArgs = append(binlogArgs, "--database="+schema) + } binlogCmd := exec.CommandContext(ctx, "mysqlbinlog", binlogArgs...) binlogCmd.Stdout = pw binlogCmd.Stderr = os.Stderr