Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shunki-fujita committed Aug 6, 2024
1 parent 11043ea commit bb931f5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 30 deletions.
50 changes: 26 additions & 24 deletions backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ import (
)

type RestoreManager struct {
log logr.Logger
client client.Client
scheme *runtime.Scheme
namespace string
name string
password string
threads int
bucket bucket.Bucket
keyPrefix string
restorePoint time.Time
workDir string
log logr.Logger
client client.Client
scheme *runtime.Scheme
namespace string
name string
password string
threads int
bucket bucket.Bucket
keyPrefix string
restorePoint time.Time
workDir string
includeSchemas []string
}

var ErrBadConnection = errors.New("the connection hasn't reflected the latest user's privileges")

func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time) (*RestoreManager, error) {
func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time, includeSchemas []string) (*RestoreManager, error) {
log := zap.New(zap.WriteTo(os.Stderr), zap.StacktraceLevel(zapcore.DPanicLevel))
scheme := runtime.NewScheme()
if err := clientgoscheme.AddToScheme(scheme); err != nil {
Expand All @@ -63,17 +64,18 @@ func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName,

prefix := calcPrefix(srcNS, srcName)
return &RestoreManager{
log: log,
client: k8sClient,
scheme: scheme,
namespace: ns,
name: name,
password: password,
threads: threads,
bucket: bc,
keyPrefix: prefix,
restorePoint: restorePoint,
workDir: dir,
log: log,
client: k8sClient,
scheme: scheme,
namespace: ns,
name: name,
password: password,
threads: threads,
bucket: bc,
keyPrefix: prefix,
restorePoint: restorePoint,
workDir: dir,
includeSchemas: includeSchemas,
}, nil
}

Expand Down Expand Up @@ -254,7 +256,7 @@ func (rm *RestoreManager) loadDump(ctx context.Context, op bkop.Operator, key st
return fmt.Errorf("failed to untar dump file: %w", err)
}

return op.LoadDump(ctx, dumpDir)
return op.LoadDump(ctx, dumpDir, rm.includeSchemas)
}

func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key string) error {
Expand Down
12 changes: 8 additions & 4 deletions cmd/moco-backup/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var restoreCmd = &cobra.Command{
Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss",
Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss SCHEMAS",
Short: "restore MySQL data from a backup",
Long: `Restore MySQL data from a backup.
Expand All @@ -21,8 +21,10 @@ SOURCE_NAMESPACE: The source MySQLCluster's namespace.
SOURCE_NAME: The source MySQLCluster's name.
NAMESPACE: The target MySQLCluster's namespace.
NAME: The target MySQLCluster's name.
YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423`,
Args: cobra.ExactArgs(6),
YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423
SCHEMAS: The list of schemas to restore. e.g. "db1 db2 db3"`,

Args: cobra.ExactArgs(7),
RunE: func(cmd *cobra.Command, args []string) error {
maxRetry := 3
for i := 0; i < maxRetry; i++ {
Expand Down Expand Up @@ -54,6 +56,7 @@ func runRestore(cmd *cobra.Command, args []string) (e error) {
srcName := args[2]
namespace := args[3]
name := args[4]
includeSchemas := strings.Split(args[6], " ")

restorePoint, err := time.Parse(constants.BackupTimeFormat, args[5])
if err != nil {
Expand All @@ -75,7 +78,8 @@ func runRestore(cmd *cobra.Command, args []string) (e error) {
namespace, name,
mysqlPassword,
commonArgs.threads,
restorePoint)
restorePoint,
includeSchemas)
if err != nil {
return fmt.Errorf("failed to create a restore manager: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,7 @@ func (r *MySQLClusterReconciler) reconcileV1RestoreJob(ctx context.Context, req
args = append(args, cluster.Spec.Restore.SourceNamespace, cluster.Spec.Restore.SourceName)
args = append(args, cluster.Namespace, cluster.Name)
args = append(args, cluster.Spec.Restore.RestorePoint.UTC().Format(constants.BackupTimeFormat))
args = append(args, strings.Join(cluster.Spec.Restore.IncludeSchemas, " "))

resources := corev1ac.ResourceRequirements()
if !noJobResource {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bkop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Operator interface {
PrepareRestore(context.Context) error

// LoadDump loads data dumped by `DumpFull`.
LoadDump(ctx context.Context, dir string) error
LoadDump(ctx context.Context, dir string, schemas []string) error

// LoadBinLog applies binary logs up to `restorePoint`.
LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error
Expand Down
5 changes: 4 additions & 1 deletion pkg/bkop/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (o operator) PrepareRestore(ctx context.Context) error {
return nil
}

func (o operator) LoadDump(ctx context.Context, dir string) error {
func (o operator) LoadDump(ctx context.Context, dir string, schemas []string) error {
args := []string{
fmt.Sprintf("mysql://%s@%s", o.user, net.JoinHostPort(o.host, fmt.Sprint(o.port))),
"-p" + o.password,
Expand All @@ -38,6 +38,9 @@ func (o operator) LoadDump(ctx context.Context, dir string) error {
"--deferTableIndexes=all",
"--updateGtidSet=replace",
}
if len(schemas) > 0 {
args = append(args, "--includeSchemas="+strings.Join(schemas, ","))
}

cmd := exec.CommandContext(ctx, "mysqlsh", args...)
cmd.Stdout = os.Stdout
Expand Down

0 comments on commit bb931f5

Please sign in to comment.