Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load only the named schema from the dump files #726

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ type RestoreSpec struct {

// Specifies parameters for restore Pod.
JobConfig `json:"jobConfig"`

// Schema is the name of the schema to restore.
// If empty, all schemas are restored.
Schema string `json:"schema,omitempty"`
shunki-fujita marked this conversation as resolved.
Show resolved Hide resolved
}

// MySQLClusterStatus defines the observed state of MySQLCluster
Expand Down
1 change: 1 addition & 0 deletions api/v1beta2/mysqlcluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ var _ = Describe("MySQLCluster Webhook", func() {
EndpointURL: "https://foo.bar.svc:9000",
},
},
Schema: "db1",
}
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expand Down
4 changes: 2 additions & 2 deletions backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func (o *getUUIDSetMockOp) PrepareRestore(_ context.Context) error {
panic("not implemented")
}

func (o *getUUIDSetMockOp) LoadDump(ctx context.Context, dir string) error {
func (o *getUUIDSetMockOp) LoadDump(ctx context.Context, dir string, schema string) error {
panic("not implemented")
}

func (o *getUUIDSetMockOp) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error {
func (o *getUUIDSetMockOp) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error {
panic("not implemented")
}

Expand Down
6 changes: 3 additions & 3 deletions backup/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ var _ = Describe("Backup/Restore", func() {
Expect(bs.WorkDirUsage).To(BeNumerically(">", 0))
Expect(bs.Warnings).To(BeEmpty())

rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bs.Time.Time)
rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bs.Time.Time, "")
Expect(err).NotTo(HaveOccurred())

ctx2, cancel := context.WithTimeout(ctx, 3*time.Second)
Expand Down Expand Up @@ -240,7 +240,7 @@ var _ = Describe("Backup/Restore", func() {
Expect(bs.WorkDirUsage).To(BeNumerically(">", 0))
Expect(bs.Warnings).To(BeEmpty())

rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, restorePoint)
rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, restorePoint, "")
Expect(err).NotTo(HaveOccurred())

err = rm.Restore(ctx)
Expand Down Expand Up @@ -292,7 +292,7 @@ var _ = Describe("Backup/Restore", func() {
Expect(err).NotTo(HaveOccurred())
Expect(bc.contents).To(HaveLen(3))

rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bt)
rm, err := NewRestoreManager(cfg, bc, workDir2, "test", "single", "restore", "target", "", 3, bt, "")
Expect(err).NotTo(HaveOccurred())

err = rm.Restore(ctx)
Expand Down
4 changes: 2 additions & 2 deletions backup/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ func (o *mockOperator) PrepareRestore(_ context.Context) error {
return nil
}

func (o *mockOperator) LoadDump(ctx context.Context, dir string) error {
func (o *mockOperator) LoadDump(ctx context.Context, dir string, schema string) error {
if !o.prepared {
return errors.New("not prepared")
}
_, err := os.Stat(filepath.Join(dir, "@.json"))
return err
}

func (o *mockOperator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time) error {
func (o *mockOperator) LoadBinlog(ctx context.Context, binlogDir, tmpDir string, restorePoint time.Time, schema string) error {
if !o.prepared {
return errors.New("not prepared")
}
Expand Down
8 changes: 5 additions & 3 deletions backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ type RestoreManager struct {
keyPrefix string
restorePoint time.Time
workDir string
schema string
}

var ErrBadConnection = errors.New("the connection hasn't reflected the latest user's privileges")

func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time) (*RestoreManager, error) {
func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName, ns, name, password string, threads int, restorePoint time.Time, schema string) (*RestoreManager, error) {
log := zap.New(zap.WriteTo(os.Stderr), zap.StacktraceLevel(zapcore.DPanicLevel))
scheme := runtime.NewScheme()
if err := clientgoscheme.AddToScheme(scheme); err != nil {
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewRestoreManager(cfg *rest.Config, bc bucket.Bucket, dir, srcNS, srcName,
keyPrefix: prefix,
restorePoint: restorePoint,
workDir: dir,
schema: schema,
}, nil
}

Expand Down Expand Up @@ -254,7 +256,7 @@ func (rm *RestoreManager) loadDump(ctx context.Context, op bkop.Operator, key st
return fmt.Errorf("failed to untar dump file: %w", err)
}

return op.LoadDump(ctx, dumpDir)
return op.LoadDump(ctx, dumpDir, rm.schema)
}

func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key string) error {
Expand Down Expand Up @@ -317,5 +319,5 @@ func (rm *RestoreManager) applyBinlog(ctx context.Context, op bkop.Operator, key
os.RemoveAll(tmpDir)
}()

return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint)
return op.LoadBinlog(ctx, binlogDir, tmpDir, rm.restorePoint, rm.schema)
}
3 changes: 3 additions & 0 deletions charts/moco/templates/generated/crds/moco_crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7880,6 +7880,9 @@ spec:
description: RestorePoint is the target date and time to restor
format: date-time
type: string
schema:
description: Schema is the name of the schema to restore.
type: string
sourceName:
description: SourceName is the name of the source `MySQLCluster
minLength: 1
Expand Down
12 changes: 8 additions & 4 deletions cmd/moco-backup/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var restoreCmd = &cobra.Command{
Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss",
Use: "restore BUCKET SOURCE_NAMESPACE SOURCE_NAME NAMESPACE NAME YYYYMMDD-hhmmss SCHEMA",
Short: "restore MySQL data from a backup",
Long: `Restore MySQL data from a backup.

Expand All @@ -21,8 +21,10 @@ SOURCE_NAMESPACE: The source MySQLCluster's namespace.
SOURCE_NAME: The source MySQLCluster's name.
NAMESPACE: The target MySQLCluster's namespace.
NAME: The target MySQLCluster's name.
YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423`,
Args: cobra.ExactArgs(6),
YYYYMMDD-hhmmss: The point-in-time to restore data. e.g. 20210523-150423
SCHEMA: The target schema to restore. If SCHEMA is empty, all schemas are restored.`,

Args: cobra.ExactArgs(7),
RunE: func(cmd *cobra.Command, args []string) error {
maxRetry := 3
for i := 0; i < maxRetry; i++ {
Expand Down Expand Up @@ -54,6 +56,7 @@ func runRestore(cmd *cobra.Command, args []string) (e error) {
srcName := args[2]
namespace := args[3]
name := args[4]
schema := args[6]

restorePoint, err := time.Parse(constants.BackupTimeFormat, args[5])
if err != nil {
Expand All @@ -75,7 +78,8 @@ func runRestore(cmd *cobra.Command, args []string) (e error) {
namespace, name,
mysqlPassword,
commonArgs.threads,
restorePoint)
restorePoint,
schema)
if err != nil {
return fmt.Errorf("failed to create a restore manager: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/moco.cybozu.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6268,6 +6268,9 @@ spec:
description: RestorePoint is the target date and time to restor
format: date-time
type: string
schema:
description: Schema is the name of the schema to restore.
type: string
sourceName:
description: SourceName is the name of the source `MySQLCluster
minLength: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6268,6 +6268,9 @@ spec:
description: RestorePoint is the target date and time to restor
format: date-time
type: string
schema:
description: Schema is the name of the schema to restore.
type: string
sourceName:
description: SourceName is the name of the source `MySQLCluster
minLength: 1
Expand Down
1 change: 1 addition & 0 deletions controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,7 @@ func (r *MySQLClusterReconciler) reconcileV1RestoreJob(ctx context.Context, req
args = append(args, cluster.Spec.Restore.SourceNamespace, cluster.Spec.Restore.SourceName)
args = append(args, cluster.Namespace, cluster.Name)
args = append(args, cluster.Spec.Restore.RestorePoint.UTC().Format(constants.BackupTimeFormat))
args = append(args, cluster.Spec.Restore.Schema)

resources := corev1ac.ResourceRequirements()
if !noJobResource {
Expand Down
1 change: 1 addition & 0 deletions controllers/mysqlcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,7 @@ var _ = Describe("MySQLCluster reconciler", func() {
"test",
"test",
now.UTC().Format(constants.BackupTimeFormat),
"",
}))
Expect(c.EnvFrom).To(HaveLen(1))
Expect(c.Env).To(HaveLen(2))
Expand Down
1 change: 1 addition & 0 deletions docs/crd_mysqlcluster_v1beta2.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ RestoreSpec represents a set of parameters for Point-in-Time Recovery.
| sourceNamespace | SourceNamespace is the namespace of the source `MySQLCluster`. | string | true |
| restorePoint | RestorePoint is the target date and time to restore data. The format is RFC3339. e.g. \"2006-01-02T15:04:05Z\" | [metav1.Time](https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Time) | true |
| jobConfig | Specifies parameters for restore Pod. | [JobConfig](#jobconfig) | true |
| schema | Schema is the name of the schema to restore. If empty, all schemas are restored. | string | false |

[Back to Custom Resources](#custom-resources)

Expand Down
80 changes: 67 additions & 13 deletions e2e/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ var makeBucketYAML string
//go:embed testdata/backup.yaml
var backupYAML string

//go:embed testdata/restore.yaml
var restoreYAML string
//go:embed testdata/restore1.yaml
var restore1YAML string

//go:embed testdata/restore2.yaml
var restore2YAML string

var _ = Context("backup", func() {
if doUpgrade {
Expand Down Expand Up @@ -58,11 +61,17 @@ var _ = Context("backup", func() {
}).Should(Succeed())

kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-e", "CREATE DATABASE test")
"-e", "CREATE DATABASE test1")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test1", "-e", "CREATE TABLE t (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(32) NOT NULL, PRIMARY KEY (id), KEY key1 (data), KEY key2 (data, id)) ENGINE=InnoDB")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test1", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('aaa')")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-e", "CREATE DATABASE test2")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test", "-e", "CREATE TABLE t (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(32) NOT NULL, PRIMARY KEY (id), KEY key1 (data), KEY key2 (data, id)) ENGINE=InnoDB")
"-D", "test2", "-e", "CREATE TABLE t (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(32) NOT NULL, PRIMARY KEY (id), KEY key1 (data), KEY key2 (data, id)) ENGINE=InnoDB")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('aaa')")
"-D", "test2", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('aaa')")
})

It("should take a full dump", func() {
Expand All @@ -81,14 +90,20 @@ var _ = Context("backup", func() {

It("should take an incremental backup", func() {
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('bbb')")
"-D", "test1", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('bbb')")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test2", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('bbb')")
time.Sleep(1100 * time.Millisecond)
restorePoint = time.Now().UTC()
time.Sleep(1100 * time.Millisecond)
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-admin", "source", "--",
"-D", "test", "--init_command=SET autocommit=1", "-e", "FLUSH LOCAL BINARY LOGS")
"-D", "test1", "--init_command=SET autocommit=1", "-e", "FLUSH LOCAL BINARY LOGS")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test1", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('ccc')")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-admin", "source", "--",
"-D", "test2", "--init_command=SET autocommit=1", "-e", "FLUSH LOCAL BINARY LOGS")
kubectlSafe(nil, "moco", "-n", "backup", "mysql", "-u", "moco-writable", "source", "--",
"-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('ccc')")
"-D", "test2", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('ccc')")
time.Sleep(100 * time.Millisecond)

kubectlSafe(nil, "-n", "backup", "create", "job", "--from=cronjob/moco-backup-source", "backup-2")
Expand All @@ -111,7 +126,41 @@ var _ = Context("backup", func() {
It("should destroy the source then restore the backup data", func() {
kubectlSafe(nil, "-n", "backup", "delete", "mysqlclusters", "source")

tmpl, err := template.New("").Parse(restoreYAML)
tmpl, err := template.New("").Parse(restore1YAML)
Expect(err).NotTo(HaveOccurred())
buf := new(bytes.Buffer)
err = tmpl.Execute(buf, struct {
MySQLVersion string
RestorePoint string
}{
mysqlVersion,
restorePoint.Format(time.RFC3339),
})
Expect(err).NotTo(HaveOccurred())

kubectlSafe(buf.Bytes(), "apply", "-f", "-")
Eventually(func(g Gomega) {
cluster, err := getCluster("backup", "target1")
g.Expect(err).NotTo(HaveOccurred())
condHealthy, err := getClusterCondition(cluster, mocov1beta2.ConditionHealthy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue), "target1 is not healthy")
}).Should(Succeed())

out := kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target1", "--",
"-N", "-D", "test1", "-e", "SELECT COUNT(*) FROM t")
count, err := strconv.Atoi(strings.TrimSpace(string(out)))
Expect(err).NotTo(HaveOccurred())
Expect(count).To(Equal(2))

out = kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target1", "--",
"-N", "-e", "SHOW DATABASES LIKE 'test%'")
databases := strings.Fields(string(out))
Expect(databases).Should(ConsistOf("test1", "test2"))
})

It("should restore only test2 schema", func() {
tmpl, err := template.New("").Parse(restore2YAML)
Expect(err).NotTo(HaveOccurred())
buf := new(bytes.Buffer)
err = tmpl.Execute(buf, struct {
Expand All @@ -125,18 +174,23 @@ var _ = Context("backup", func() {

kubectlSafe(buf.Bytes(), "apply", "-f", "-")
Eventually(func(g Gomega) {
cluster, err := getCluster("backup", "target")
cluster, err := getCluster("backup", "target2")
g.Expect(err).NotTo(HaveOccurred())
condHealthy, err := getClusterCondition(cluster, mocov1beta2.ConditionHealthy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue), "target is not healthy")
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue), "target2 is not healthy")
}).Should(Succeed())

out := kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target", "--",
"-N", "-D", "test", "-e", "SELECT COUNT(*) FROM t")
out := kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target2", "--",
"-N", "-D", "test2", "-e", "SELECT COUNT(*) FROM t")
count, err := strconv.Atoi(strings.TrimSpace(string(out)))
Expect(err).NotTo(HaveOccurred())
Expect(count).To(Equal(2))

out = kubectlSafe(nil, "moco", "-n", "backup", "mysql", "target2", "--",
"-N", "-e", "SHOW DATABASES LIKE 'test%'")
databases := strings.Fields(string(out))
Expect(databases).Should(ConsistOf("test2"))
})

It("should delete clusters", func() {
Expand Down
2 changes: 1 addition & 1 deletion e2e/testdata/restore.yaml → e2e/testdata/restore1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: moco.cybozu.com/v1beta2
kind: MySQLCluster
metadata:
namespace: backup
name: target
name: target1
spec:
mysqlConfigMapName: mycnf
replicas: 1
Expand Down
41 changes: 41 additions & 0 deletions e2e/testdata/restore2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: moco.cybozu.com/v1beta2
kind: MySQLCluster
metadata:
namespace: backup
name: target2
spec:
mysqlConfigMapName: mycnf
replicas: 1
restore:
sourceName: source
sourceNamespace: backup
restorePoint: "{{ .RestorePoint }}"
schema: "test2"
jobConfig:
serviceAccountName: backup-owner
env:
- name: AWS_ACCESS_KEY_ID
value: minioadmin
- name: AWS_SECRET_ACCESS_KEY
value: minioadmin
- name: AWS_REGION
value: us-east-1
bucketConfig:
bucketName: moco
endpointURL: http://minio.default.svc:9000
usePathStyle: true
workVolume:
emptyDir: {}
podTemplate:
spec:
containers:
- name: mysqld
image: ghcr.io/cybozu-go/moco/mysql:{{ .MySQLVersion }}
volumeClaimTemplates:
- metadata:
name: mysql-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 1Gi
Loading
Loading