From bb931f5c4bfa3d7bd17ce6bb6f42c5feba554def Mon Sep 17 00:00:00 2001 From: shunki-fujita Date: Tue, 6 Aug 2024 07:33:01 +0000 Subject: [PATCH] wip --- backup/restore.go | 50 +++++++++++++------------- cmd/moco-backup/cmd/restore.go | 12 ++++--- controllers/mysqlcluster_controller.go | 1 + pkg/bkop/operator.go | 2 +- pkg/bkop/restore.go | 5 ++- 5 files changed, 40 insertions(+), 30 deletions(-) diff --git a/backup/restore.go b/backup/restore.go index 441ff299e..6995d79ae 100644 --- a/backup/restore.go +++ b/backup/restore.go @@ -31,22 +31,23 @@ import ( ) type RestoreManager struct { - log logr.Logger - client client.Client - scheme *runtime.Scheme - namespace string - name string - password string - threads int - bucket bucket.Bucket - keyPrefix string - restorePoint time.Time - workDir string + log logr.Logger + client client.Client + scheme *runtime.Scheme + namespace string + name string + password string + threads int + bucket bucket.Bucket + keyPrefix string + restorePoint time.Time + workDir string + includeSchemas []string } var ErrBadConnection = errors.New("the connection hasn't reflected the latest user's privileges") -func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time) (*RestoreManager, error) { +func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time, includeSchemas []string) (*RestoreManager, error) { log := zap.New(zap.WriteTo(os.Stderr), zap.StacktraceLevel(zapcore.DPanicLevel)) scheme := runtime.NewScheme() if err := clientgoscheme.AddToScheme(scheme); err != nil { @@ -63,17 +64,18 @@ func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, prefix := calcPrefix(srcNS, srcName) return &RestoreManager{ - log: log, - client: k8sClient, - scheme: scheme, - namespace: ns, - name: name, - password: password, - threads: threads, - bucket: bc, - keyPrefix: prefix, - restorePoint: restorePoint, - workDir: dir, + log: log, + client: k8sClient, + scheme: scheme, + namespace: ns, + name: name, + password: password, + threads: threads, + bucket: bc, + keyPrefix: prefix, + restorePoint: restorePoint, + workDir: dir, + includeSchemas: includeSchemas, }, nil } @@ -254,7 +256,7 @@ func (rm *RestoreManager) loadDump(ctx context.Context, op bkop.Operator, key st return fmt.Errorf("failed to untar dump file: %w", err) } - return op.LoadDump(ctx, dumpDir) + return op.LoadDump(ctx, dumpDir, rm.includeSchemas) } func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key string) error { diff --git a/cmd/moco-backup/cmd/restore.go b/cmd/moco-backup/cmd/restore.go index d70558704..84493e3ce 100644 --- a/cmd/moco-backup/cmd/restore.go +++ b/cmd/moco-backup/cmd/restore.go @@ -12,7 +12,7 @@ import ( ) var restoreCmd = &cobra.Command{ - Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss", + Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss SCHEMAS", Short: "restore MySQL data from a backup", Long: `Restore MySQL data from a backup. @@ -21,8 +21,10 @@ SOURCE_NAMESPACE: The source MySQLCluster's namespace. SOURCE_NAME: The source MySQLCluster's name. NAMESPACE: The target MySQLCluster's namespace. NAME: The target MySQLCluster's name. -YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423`, - Args: cobra.ExactArgs(6), +YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423 +SCHEMAS: The list of schemas to restore. e.g. "db1 db2 db3"`, + + Args: cobra.ExactArgs(7), RunE: func(cmd *cobra.Command, args []string) error { maxRetry := 3 for i := 0; i < maxRetry; i++ { @@ -54,6 +56,7 @@ func runRestore(cmd *cobra.Command, args []string) (e error) { srcName := args[2] namespace := args[3] name := args[4] + includeSchemas := strings.Split(args[6], " ") restorePoint, err := time.Parse(constants.BackupTimeFormat, args[5]) if err != nil { @@ -75,7 +78,8 @@ func runRestore(cmd *cobra.Command, args []string) (e error) { namespace, name, mysqlPassword, commonArgs.threads, - restorePoint) + restorePoint, + includeSchemas) if err != nil { return fmt.Errorf("failed to create a restore manager: %w", err) } diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 07a5cbba5..3961a37b8 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -1448,6 +1448,7 @@ func (r *MySQLClusterReconciler) reconcileV1RestoreJob(ctx context.Context, req args = append(args, cluster.Spec.Restore.SourceNamespace, cluster.Spec.Restore.SourceName) args = append(args, cluster.Namespace, cluster.Name) args = append(args, cluster.Spec.Restore.RestorePoint.UTC().Format(constants.BackupTimeFormat)) + args = append(args, strings.Join(cluster.Spec.Restore.IncludeSchemas, " ")) resources := corev1ac.ResourceRequirements() if !noJobResource { diff --git a/pkg/bkop/operator.go b/pkg/bkop/operator.go index bc7e8d99d..3d962d2bf 100644 --- a/pkg/bkop/operator.go +++ b/pkg/bkop/operator.go @@ -37,7 +37,7 @@ type Operator interface { PrepareRestore(context.Context) error // LoadDump loads data dumped by `DumpFull`. - LoadDump(ctx context.Context, dir string) error + LoadDump(ctx context.Context, dir string, schemas []string) error // LoadBinLog applies binary logs up to `restorePoint`. LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error diff --git a/pkg/bkop/restore.go b/pkg/bkop/restore.go index 6d011616a..094cc7542 100644 --- a/pkg/bkop/restore.go +++ b/pkg/bkop/restore.go @@ -21,7 +21,7 @@ func (o operator) PrepareRestore(ctx context.Context) error { return nil } -func (o operator) LoadDump(ctx context.Context, dir string) error { +func (o operator) LoadDump(ctx context.Context, dir string, schemas []string) error { args := []string{ fmt.Sprintf("mysql://%s@%s", o.user, net.JoinHostPort(o.host, fmt.Sprint(o.port))), "-p" + o.password, @@ -38,6 +38,9 @@ func (o operator) LoadDump(ctx context.Context, dir string) error { "--deferTableIndexes=all", "--updateGtidSet=replace", } + if len(schemas) > 0 { + args = append(args, "--includeSchemas="+strings.Join(schemas, ",")) + } cmd := exec.CommandContext(ctx, "mysqlsh", args...) cmd.Stdout = os.Stdout