Skip to content

Commit

Permalink
Support record and replay
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Nov 28, 2023
1 parent e2e4602 commit e7da913
Show file tree
Hide file tree
Showing 28 changed files with 1,203 additions and 153 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/emicklei/go-restful/v3 v3.10.2
github.com/fsnotify/fsnotify v1.6.0
github.com/google/cel-go v0.16.0
github.com/google/gnostic-models v0.6.8
github.com/google/go-cmp v0.5.9
github.com/itchyny/gojq v0.12.13
github.com/nxadm/tail v1.4.8
Expand All @@ -31,6 +32,7 @@ require (
k8s.io/client-go v0.28.0
k8s.io/code-generator v0.28.0
k8s.io/cri-api v0.28.0
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e
k8s.io/kubelet v0.28.0
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
sigs.k8s.io/controller-runtime v0.16.0
Expand Down Expand Up @@ -65,7 +67,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.1 // indirect
Expand Down Expand Up @@ -119,9 +120,8 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.0 // indirect
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/cmd/config v0.11.3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,13 @@ k8s.io/code-generator v0.28.0 h1:msdkRVJNVFgdiIJ8REl/d3cZsMB9HByFcWMmn13NyuE=
k8s.io/code-generator v0.28.0/go.mod h1:ueeSJZJ61NHBa0ccWLey6mwawum25vX61nRZ6WOzN9A=
k8s.io/cri-api v0.28.0 h1:TVidtHNi425IaKF50oDD5hRvQuK7wB4NQAfTVOcr9QA=
k8s.io/cri-api v0.28.0/go.mod h1:xXygwvSOGcT/2KXg8sMYTHns2xFem3949kCQn5IS1k4=
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d h1:U9tB195lKdzwqicbJvyJeOXV7Klv+wNAWENRnXEGi08=
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 h1:pWEwq4Asjm4vjW7vcsmijwBhOr1/shsbSYiWXmNGlks=
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ=
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM=
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e h1:snPmy96t93RredGRjKfMFt+gvxuVAncqSAyBveJtr4Q=
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
k8s.io/kubelet v0.28.0 h1:H/3JAkLIungVF+WLpqrxhgJ4gzwsbN8VA8LOTYsEX3U=
k8s.io/kubelet v0.28.0/go.mod h1:i8jUg4ltbRusT3ExOhSAeqETuHdoHTZcTT2cPr9RTgc=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
Expand Down
4 changes: 4 additions & 0 deletions pkg/kwokctl/cmd/snapshot/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type flagpole struct {
ImpersonateGroups []string
PageSize int64
PageBufferSize int32
Record bool
}

// NewCommand returns a new cobra.Command for cluster exporting.
Expand All @@ -60,6 +61,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringSliceVar(&flags.ImpersonateGroups, "as-group", nil, "Group to impersonate for the operation, this flag can be repeated to specify multiple groups.")
cmd.Flags().Int64Var(&flags.PageSize, "page-size", 500, "Define the page size")
cmd.Flags().Int32Var(&flags.PageBufferSize, "page-buffer-size", 10, "Define the number of pages to buffer")
cmd.Flags().BoolVar(&flags.Record, "record", false, "Record the change of the cluster")
return cmd
}

Expand Down Expand Up @@ -101,6 +103,8 @@ func runE(ctx context.Context, flags *flagpole) error {

snapshotSaveConfig := snapshot.SaveConfig{
PagerConfig: pagerConfig,
Record: flags.Record,
Relative: true,
}

err = snapshot.Save(ctx, clientset, f, flags.Filters, snapshotSaveConfig)
Expand Down
8 changes: 7 additions & 1 deletion pkg/kwokctl/cmd/snapshot/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type flagpole struct {
Path string
Format string
Filters []string
Replay bool
}

// NewCommand returns a new cobra.Command to save the cluster as a snapshot.
Expand All @@ -56,6 +57,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringVar(&flags.Path, "path", "", "Path to the snapshot")
cmd.Flags().StringVar(&flags.Format, "format", "etcd", "Format of the snapshot file (etcd, k8s)")
cmd.Flags().StringSliceVar(&flags.Filters, "filter", snapshot.Resources, "Filter the resources to restore, only support for k8s format")
cmd.Flags().BoolVar(&flags.Replay, "replay", false, "Replay the change of the cluster")
return cmd
}

Expand Down Expand Up @@ -88,7 +90,11 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}
case "k8s":
err = rt.SnapshotRestoreWithYAML(ctx, flags.Path, flags.Filters)
snapshotLoadConfig := snapshot.LoadConfig{
Replay: flags.Replay,
Relative: true,
}
err = rt.SnapshotRestoreWithYAML(ctx, flags.Path, flags.Filters, snapshotLoadConfig)
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kwokctl/cmd/snapshot/save/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type flagpole struct {
Path string
Format string
Filters []string
Record bool
}

// NewCommand returns a new cobra.Command for cluster snapshotting.
Expand All @@ -56,6 +57,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringVar(&flags.Path, "path", "", "Path to the snapshot")
cmd.Flags().StringVar(&flags.Format, "format", "etcd", "Format of the snapshot file (etcd, k8s)")
cmd.Flags().StringSliceVar(&flags.Filters, "filter", snapshot.Resources, "Filter the resources to save, only support for k8s format")
cmd.Flags().BoolVar(&flags.Record, "record", false, "Record the change of the cluster")
return cmd
}

Expand Down Expand Up @@ -88,7 +90,11 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}
case "k8s":
err = rt.SnapshotSaveWithYAML(ctx, flags.Path, flags.Filters)
snapshotSaveConfig := snapshot.SaveConfig{
Record: flags.Record,
Relative: true,
}
err = rt.SnapshotSaveWithYAML(ctx, flags.Path, flags.Filters, snapshotSaveConfig)
if err != nil {
return err
}
Expand Down
52 changes: 30 additions & 22 deletions pkg/kwokctl/runtime/binary/cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"sigs.k8s.io/kwok/pkg/consts"
"sigs.k8s.io/kwok/pkg/kwokctl/runtime"
"sigs.k8s.io/kwok/pkg/kwokctl/snapshot"
"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/wait"
)
Expand Down Expand Up @@ -74,40 +75,47 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error {
}

// SnapshotSaveWithYAML save the snapshot of cluster
func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error {
err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters)
func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error {
err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters, saveConfig)
if err != nil {
return err
}
return nil
}

// SnapshotRestoreWithYAML restore the snapshot of cluster
func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error {
func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error {
logger := log.FromContext(ctx)
err := wait.Poll(ctx, func(ctx context.Context) (bool, error) {
err := c.StopComponent(ctx, consts.ComponentKubeControllerManager)
if err != nil {
return false, err
}
component, err := c.GetComponent(ctx, consts.ComponentKubeControllerManager)
if err != nil {
return false, err
}
ready := c.isRunning(ctx, component)
return !ready, nil
})
if err != nil {
logger.Error("Failed to stop kube-controller-manager", err)
components := []string{
consts.ComponentKubeScheduler,
consts.ComponentKubeControllerManager,
consts.ComponentKwokController,
}
defer func() {
err = c.StartComponent(ctx, consts.ComponentKubeControllerManager)
for _, component := range components {
err := wait.Poll(ctx, func(ctx context.Context) (bool, error) {
err := c.StopComponent(ctx, component)
if err != nil {
return false, err
}
component, err := c.GetComponent(ctx, component)
if err != nil {
return false, err
}
ready := c.isRunning(ctx, component)
return !ready, nil
})
if err != nil {
logger.Error("Failed to start kube-controller-manager", err)
logger.Error("Failed to stop %q", err, component)
}
}()
defer func() {
err = c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start %q", err, component)
}
}()
}

err = c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters)
err := c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters, loadConfig)
if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/kwokctl/runtime/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,15 @@ func (c *Cluster) GetComponent(ctx context.Context, name string) (internalversio
return component, nil
}

// ListComponents returns the components
func (c *Cluster) ListComponents(ctx context.Context) ([]internalversion.Component, error) {
config, err := c.Config(ctx)
if err != nil {
return nil, err
}
return config.Components, nil
}

// Kubectl runs kubectl.
func (c *Cluster) Kubectl(ctx context.Context, args ...string) error {
kubectlPath, err := c.kubectlPath(ctx)
Expand Down Expand Up @@ -561,7 +570,7 @@ func (c *Cluster) InitCRDs(ctx context.Context) error {
logger := log.FromContext(ctx)
ctx = log.NewContext(ctx, logger.With("crds", strings.Join(crds, ",")))

return snapshot.Load(ctx, clientset, buf, nil)
return snapshot.Load(ctx, clientset, buf, nil, snapshot.LoadConfig{})
}

var crdDefines = map[string][]byte{
Expand Down
11 changes: 4 additions & 7 deletions pkg/kwokctl/runtime/cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// SnapshotSaveWithYAML save the snapshot of cluster
func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error {
func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error {
if c.IsDryRun() {
dryrun.PrintMessage("kubectl get %s -o yaml >%s", strings.Join(filters, ","), path)
return nil
Expand All @@ -45,14 +45,11 @@ func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters
_ = f.Close()
}()

// In most cases, the user should have full privileges on the clusters created by kwokctl,
// so no need to expose impersonation args to "snapshot save" command.
snapshotSaveConfig := snapshot.SaveConfig{}
return snapshot.Save(ctx, clientset, f, filters, snapshotSaveConfig)
return snapshot.Save(ctx, clientset, f, filters, saveConfig)
}

// SnapshotRestoreWithYAML restore the snapshot of cluster
func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error {
func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error {
if c.IsDryRun() {
dryrun.PrintMessage("kubectl create -f %s", path)
return nil
Expand All @@ -71,5 +68,5 @@ func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filt
_ = f.Close()
}()

return snapshot.Load(ctx, clientset, f, filters)
return snapshot.Load(ctx, clientset, f, filters, loadConfig)
}
30 changes: 19 additions & 11 deletions pkg/kwokctl/runtime/compose/cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"sigs.k8s.io/kwok/pkg/consts"
"sigs.k8s.io/kwok/pkg/kwokctl/snapshot"
"sigs.k8s.io/kwok/pkg/log"
)

Expand Down Expand Up @@ -127,29 +128,36 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error {
}

// SnapshotSaveWithYAML save the snapshot of cluster
func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error {
err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters)
func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error {
err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters, saveConfig)
if err != nil {
return err
}
return nil
}

// SnapshotRestoreWithYAML restore the snapshot of cluster
func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error {
func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error {
logger := log.FromContext(ctx)
err := c.StopComponent(ctx, consts.ComponentKubeControllerManager)
if err != nil {
logger.Error("Failed to stop kube-controller-manager", err)
components := []string{
consts.ComponentKubeScheduler,
consts.ComponentKubeControllerManager,
consts.ComponentKwokController,
}
defer func() {
err = c.StartComponent(ctx, consts.ComponentKubeControllerManager)
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to start kube-controller-manager", err)
logger.Error("Failed to stop %q", err, component)
}
}()
defer func() {
err = c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start %q", err, component)
}
}()
}

err = c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters)
err := c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters, loadConfig)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/kwokctl/runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/kwokctl/snapshot"
)

// Runtime is the interface for a runtime.
Expand Down Expand Up @@ -65,6 +66,9 @@ type Runtime interface {
// GetComponent return the component if it exists
GetComponent(ctx context.Context, name string) (internalversion.Component, error)

// ListComponents list the components of cluster
ListComponents(ctx context.Context) ([]internalversion.Component, error)

// Ready check the cluster is ready
Ready(ctx context.Context) (bool, error)

Expand Down Expand Up @@ -114,10 +118,10 @@ type Runtime interface {
SnapshotRestore(ctx context.Context, path string) error

// SnapshotSaveWithYAML save the snapshot of cluster
SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error
SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error

// SnapshotRestoreWithYAML restore the snapshot of cluster
SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error
SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error

// GetWorkdirPath get the workdir path of cluster
GetWorkdirPath(name string) string
Expand Down
Loading

0 comments on commit e7da913

Please sign in to comment.