diff --git a/go.mod b/go.mod index d78ff0f9ff..08075b22c8 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/emicklei/go-restful/v3 v3.10.2 github.com/fsnotify/fsnotify v1.6.0 github.com/google/cel-go v0.16.0 + github.com/google/gnostic-models v0.6.8 github.com/google/go-cmp v0.5.9 github.com/itchyny/gojq v0.12.13 github.com/nxadm/tail v1.4.8 @@ -31,6 +32,7 @@ require ( k8s.io/client-go v0.28.0 k8s.io/code-generator v0.28.0 k8s.io/cri-api v0.28.0 + k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e k8s.io/kubelet v0.28.0 k8s.io/utils v0.0.0-20230726121419-3b25d923346b sigs.k8s.io/controller-runtime v0.16.0 @@ -65,7 +67,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.3.1 // indirect @@ -119,9 +120,8 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.28.0 // indirect - k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect + k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect k8s.io/klog/v2 v2.100.1 // indirect - k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/cmd/config v0.11.3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect diff --git a/go.sum b/go.sum index 4ba59e9ee1..173826c4d6 100644 --- a/go.sum +++ b/go.sum @@ -386,13 +386,13 @@ k8s.io/code-generator v0.28.0 h1:msdkRVJNVFgdiIJ8REl/d3cZsMB9HByFcWMmn13NyuE= k8s.io/code-generator v0.28.0/go.mod h1:ueeSJZJ61NHBa0ccWLey6mwawum25vX61nRZ6WOzN9A= k8s.io/cri-api v0.28.0 h1:TVidtHNi425IaKF50oDD5hRvQuK7wB4NQAfTVOcr9QA= k8s.io/cri-api v0.28.0/go.mod h1:xXygwvSOGcT/2KXg8sMYTHns2xFem3949kCQn5IS1k4= -k8s.io/gengo v0.0.0-20220902162205-c0856e24416d h1:U9tB195lKdzwqicbJvyJeOXV7Klv+wNAWENRnXEGi08= -k8s.io/gengo v0.0.0-20220902162205-c0856e24416d/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= +k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 h1:pWEwq4Asjm4vjW7vcsmijwBhOr1/shsbSYiWXmNGlks= +k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= -k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= +k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e h1:snPmy96t93RredGRjKfMFt+gvxuVAncqSAyBveJtr4Q= +k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/kubelet v0.28.0 h1:H/3JAkLIungVF+WLpqrxhgJ4gzwsbN8VA8LOTYsEX3U= k8s.io/kubelet v0.28.0/go.mod h1:i8jUg4ltbRusT3ExOhSAeqETuHdoHTZcTT2cPr9RTgc= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= diff --git a/pkg/kwokctl/cmd/snapshot/export/export.go b/pkg/kwokctl/cmd/snapshot/export/export.go index df96437d83..1c6966ed14 100644 --- a/pkg/kwokctl/cmd/snapshot/export/export.go +++ b/pkg/kwokctl/cmd/snapshot/export/export.go @@ -39,6 +39,7 @@ type flagpole struct { ImpersonateGroups []string PageSize int64 PageBufferSize int32 + Record bool } // NewCommand returns a new cobra.Command for cluster exporting. @@ -60,6 +61,7 @@ func NewCommand(ctx context.Context) *cobra.Command { cmd.Flags().StringSliceVar(&flags.ImpersonateGroups, "as-group", nil, "Group to impersonate for the operation, this flag can be repeated to specify multiple groups.") cmd.Flags().Int64Var(&flags.PageSize, "page-size", 500, "Define the page size") cmd.Flags().Int32Var(&flags.PageBufferSize, "page-buffer-size", 10, "Define the number of pages to buffer") + cmd.Flags().BoolVar(&flags.Record, "record", false, "Record the change of the cluster") return cmd } @@ -101,6 +103,8 @@ func runE(ctx context.Context, flags *flagpole) error { snapshotSaveConfig := snapshot.SaveConfig{ PagerConfig: pagerConfig, + Record: flags.Record, + Relative: true, } err = snapshot.Save(ctx, clientset, f, flags.Filters, snapshotSaveConfig) diff --git a/pkg/kwokctl/cmd/snapshot/restore/restore.go b/pkg/kwokctl/cmd/snapshot/restore/restore.go index cb82f932dd..67b8457c85 100644 --- a/pkg/kwokctl/cmd/snapshot/restore/restore.go +++ b/pkg/kwokctl/cmd/snapshot/restore/restore.go @@ -38,6 +38,7 @@ type flagpole struct { Path string Format string Filters []string + Replay bool } // NewCommand returns a new cobra.Command to save the cluster as a snapshot. @@ -56,6 +57,7 @@ func NewCommand(ctx context.Context) *cobra.Command { cmd.Flags().StringVar(&flags.Path, "path", "", "Path to the snapshot") cmd.Flags().StringVar(&flags.Format, "format", "etcd", "Format of the snapshot file (etcd, k8s)") cmd.Flags().StringSliceVar(&flags.Filters, "filter", snapshot.Resources, "Filter the resources to restore, only support for k8s format") + cmd.Flags().BoolVar(&flags.Replay, "replay", false, "Replay the change of the cluster") return cmd } @@ -88,7 +90,11 @@ func runE(ctx context.Context, flags *flagpole) error { return err } case "k8s": - err = rt.SnapshotRestoreWithYAML(ctx, flags.Path, flags.Filters) + snapshotLoadConfig := snapshot.LoadConfig{ + Replay: flags.Replay, + Relative: true, + } + err = rt.SnapshotRestoreWithYAML(ctx, flags.Path, flags.Filters, snapshotLoadConfig) if err != nil { return err } diff --git a/pkg/kwokctl/cmd/snapshot/save/save.go b/pkg/kwokctl/cmd/snapshot/save/save.go index 8363983308..fc1f3f0fd6 100644 --- a/pkg/kwokctl/cmd/snapshot/save/save.go +++ b/pkg/kwokctl/cmd/snapshot/save/save.go @@ -38,6 +38,7 @@ type flagpole struct { Path string Format string Filters []string + Record bool } // NewCommand returns a new cobra.Command for cluster snapshotting. @@ -56,6 +57,7 @@ func NewCommand(ctx context.Context) *cobra.Command { cmd.Flags().StringVar(&flags.Path, "path", "", "Path to the snapshot") cmd.Flags().StringVar(&flags.Format, "format", "etcd", "Format of the snapshot file (etcd, k8s)") cmd.Flags().StringSliceVar(&flags.Filters, "filter", snapshot.Resources, "Filter the resources to save, only support for k8s format") + cmd.Flags().BoolVar(&flags.Record, "record", false, "Record the change of the cluster") return cmd } @@ -88,7 +90,11 @@ func runE(ctx context.Context, flags *flagpole) error { return err } case "k8s": - err = rt.SnapshotSaveWithYAML(ctx, flags.Path, flags.Filters) + snapshotSaveConfig := snapshot.SaveConfig{ + Record: flags.Record, + Relative: true, + } + err = rt.SnapshotSaveWithYAML(ctx, flags.Path, flags.Filters, snapshotSaveConfig) if err != nil { return err } diff --git a/pkg/kwokctl/runtime/binary/cluster_snapshot.go b/pkg/kwokctl/runtime/binary/cluster_snapshot.go index 6b6540db2c..36c0c413d3 100644 --- a/pkg/kwokctl/runtime/binary/cluster_snapshot.go +++ b/pkg/kwokctl/runtime/binary/cluster_snapshot.go @@ -21,6 +21,7 @@ import ( "sigs.k8s.io/kwok/pkg/consts" "sigs.k8s.io/kwok/pkg/kwokctl/runtime" + "sigs.k8s.io/kwok/pkg/kwokctl/snapshot" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/wait" ) @@ -74,8 +75,8 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error { } // SnapshotSaveWithYAML save the snapshot of cluster -func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error { - err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters) +func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error { + err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters, saveConfig) if err != nil { return err } @@ -83,31 +84,38 @@ func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters } // SnapshotRestoreWithYAML restore the snapshot of cluster -func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error { +func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error { logger := log.FromContext(ctx) - err := wait.Poll(ctx, func(ctx context.Context) (bool, error) { - err := c.StopComponent(ctx, consts.ComponentKubeControllerManager) - if err != nil { - return false, err - } - component, err := c.GetComponent(ctx, consts.ComponentKubeControllerManager) - if err != nil { - return false, err - } - ready := c.isRunning(ctx, component) - return !ready, nil - }) - if err != nil { - logger.Error("Failed to stop kube-controller-manager", err) + components := []string{ + consts.ComponentKubeScheduler, + consts.ComponentKubeControllerManager, + consts.ComponentKwokController, } - defer func() { - err = c.StartComponent(ctx, consts.ComponentKubeControllerManager) + for _, component := range components { + err := wait.Poll(ctx, func(ctx context.Context) (bool, error) { + err := c.StopComponent(ctx, component) + if err != nil { + return false, err + } + component, err := c.GetComponent(ctx, component) + if err != nil { + return false, err + } + ready := c.isRunning(ctx, component) + return !ready, nil + }) if err != nil { - logger.Error("Failed to start kube-controller-manager", err) + logger.Error("Failed to stop %q", err, component) } - }() + defer func() { + err = c.StartComponent(ctx, component) + if err != nil { + logger.Error("Failed to start %q", err, component) + } + }() + } - err = c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters) + err := c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters, loadConfig) if err != nil { return err } diff --git a/pkg/kwokctl/runtime/cluster.go b/pkg/kwokctl/runtime/cluster.go index 7e1d3b5984..da42bd478d 100644 --- a/pkg/kwokctl/runtime/cluster.go +++ b/pkg/kwokctl/runtime/cluster.go @@ -390,6 +390,15 @@ func (c *Cluster) GetComponent(ctx context.Context, name string) (internalversio return component, nil } +// ListComponents returns the components +func (c *Cluster) ListComponents(ctx context.Context) ([]internalversion.Component, error) { + config, err := c.Config(ctx) + if err != nil { + return nil, err + } + return config.Components, nil +} + // Kubectl runs kubectl. func (c *Cluster) Kubectl(ctx context.Context, args ...string) error { kubectlPath, err := c.kubectlPath(ctx) @@ -561,7 +570,7 @@ func (c *Cluster) InitCRDs(ctx context.Context) error { logger := log.FromContext(ctx) ctx = log.NewContext(ctx, logger.With("crds", strings.Join(crds, ","))) - return snapshot.Load(ctx, clientset, buf, nil) + return snapshot.Load(ctx, clientset, buf, nil, snapshot.LoadConfig{}) } var crdDefines = map[string][]byte{ diff --git a/pkg/kwokctl/runtime/cluster_snapshot.go b/pkg/kwokctl/runtime/cluster_snapshot.go index d111626685..b0c86ddc0d 100644 --- a/pkg/kwokctl/runtime/cluster_snapshot.go +++ b/pkg/kwokctl/runtime/cluster_snapshot.go @@ -26,7 +26,7 @@ import ( ) // SnapshotSaveWithYAML save the snapshot of cluster -func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error { +func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error { if c.IsDryRun() { dryrun.PrintMessage("kubectl get %s -o yaml >%s", strings.Join(filters, ","), path) return nil @@ -45,14 +45,11 @@ func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters _ = f.Close() }() - // In most cases, the user should have full privileges on the clusters created by kwokctl, - // so no need to expose impersonation args to "snapshot save" command. - snapshotSaveConfig := snapshot.SaveConfig{} - return snapshot.Save(ctx, clientset, f, filters, snapshotSaveConfig) + return snapshot.Save(ctx, clientset, f, filters, saveConfig) } // SnapshotRestoreWithYAML restore the snapshot of cluster -func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error { +func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error { if c.IsDryRun() { dryrun.PrintMessage("kubectl create -f %s", path) return nil @@ -71,5 +68,5 @@ func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filt _ = f.Close() }() - return snapshot.Load(ctx, clientset, f, filters) + return snapshot.Load(ctx, clientset, f, filters, loadConfig) } diff --git a/pkg/kwokctl/runtime/compose/cluster_snapshot.go b/pkg/kwokctl/runtime/compose/cluster_snapshot.go index 308fbc7ff9..d6863287bf 100644 --- a/pkg/kwokctl/runtime/compose/cluster_snapshot.go +++ b/pkg/kwokctl/runtime/compose/cluster_snapshot.go @@ -20,6 +20,7 @@ import ( "context" "sigs.k8s.io/kwok/pkg/consts" + "sigs.k8s.io/kwok/pkg/kwokctl/snapshot" "sigs.k8s.io/kwok/pkg/log" ) @@ -127,8 +128,8 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error { } // SnapshotSaveWithYAML save the snapshot of cluster -func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error { - err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters) +func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error { + err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters, saveConfig) if err != nil { return err } @@ -136,20 +137,27 @@ func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters } // SnapshotRestoreWithYAML restore the snapshot of cluster -func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error { +func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error { logger := log.FromContext(ctx) - err := c.StopComponent(ctx, consts.ComponentKubeControllerManager) - if err != nil { - logger.Error("Failed to stop kube-controller-manager", err) + components := []string{ + consts.ComponentKubeScheduler, + consts.ComponentKubeControllerManager, + consts.ComponentKwokController, } - defer func() { - err = c.StartComponent(ctx, consts.ComponentKubeControllerManager) + for _, component := range components { + err := c.StopComponent(ctx, component) if err != nil { - logger.Error("Failed to start kube-controller-manager", err) + logger.Error("Failed to stop %q", err, component) } - }() + defer func() { + err = c.StartComponent(ctx, component) + if err != nil { + logger.Error("Failed to start %q", err, component) + } + }() + } - err = c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters) + err := c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters, loadConfig) if err != nil { return err } diff --git a/pkg/kwokctl/runtime/config.go b/pkg/kwokctl/runtime/config.go index 3c3b72d0d9..acbc3fdc1e 100644 --- a/pkg/kwokctl/runtime/config.go +++ b/pkg/kwokctl/runtime/config.go @@ -22,6 +22,7 @@ import ( "time" "sigs.k8s.io/kwok/pkg/apis/internalversion" + "sigs.k8s.io/kwok/pkg/kwokctl/snapshot" ) // Runtime is the interface for a runtime. @@ -65,6 +66,9 @@ type Runtime interface { // GetComponent return the component if it exists GetComponent(ctx context.Context, name string) (internalversion.Component, error) + // ListComponents list the components of cluster + ListComponents(ctx context.Context) ([]internalversion.Component, error) + // Ready check the cluster is ready Ready(ctx context.Context) (bool, error) @@ -114,10 +118,10 @@ type Runtime interface { SnapshotRestore(ctx context.Context, path string) error // SnapshotSaveWithYAML save the snapshot of cluster - SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error + SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error // SnapshotRestoreWithYAML restore the snapshot of cluster - SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error + SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error // GetWorkdirPath get the workdir path of cluster GetWorkdirPath(name string) string diff --git a/pkg/kwokctl/runtime/kind/cluster_snapshot.go b/pkg/kwokctl/runtime/kind/cluster_snapshot.go index b8a93cbb88..a99df248a4 100644 --- a/pkg/kwokctl/runtime/kind/cluster_snapshot.go +++ b/pkg/kwokctl/runtime/kind/cluster_snapshot.go @@ -20,6 +20,7 @@ import ( "context" "sigs.k8s.io/kwok/pkg/consts" + "sigs.k8s.io/kwok/pkg/kwokctl/snapshot" "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/wait" ) @@ -92,8 +93,8 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error { } // SnapshotSaveWithYAML save the snapshot of cluster -func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string) error { - err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters) +func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters []string, saveConfig snapshot.SaveConfig) error { + err := c.Cluster.SnapshotSaveWithYAML(ctx, path, filters, saveConfig) if err != nil { return err } @@ -101,23 +102,31 @@ func (c *Cluster) SnapshotSaveWithYAML(ctx context.Context, path string, filters } // SnapshotRestoreWithYAML restore the snapshot of cluster -func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string) error { +func (c *Cluster) SnapshotRestoreWithYAML(ctx context.Context, path string, filters []string, loadConfig snapshot.LoadConfig) error { logger := log.FromContext(ctx) - err := wait.Poll(ctx, func(ctx context.Context) (bool, error) { - err := c.StopComponent(ctx, consts.ComponentKubeControllerManager) - return err == nil, err - }) - if err != nil { - logger.Error("Failed to stop kube-controller-manager", err) + + components := []string{ + consts.ComponentKubeScheduler, + consts.ComponentKubeControllerManager, + consts.ComponentKwokController, } - defer func() { - err = c.StartComponent(ctx, consts.ComponentKubeControllerManager) + for _, component := range components { + err := wait.Poll(ctx, func(ctx context.Context) (bool, error) { + err := c.StopComponent(ctx, component) + return err == nil, err + }) if err != nil { - logger.Error("Failed to start kube-controller-manager", err) + logger.Error("Failed to stop %q", err, component) } - }() + defer func() { + err = c.StartComponent(ctx, component) + if err != nil { + logger.Error("Failed to start %q", err, component) + } + }() + } - err = c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters) + err := c.Cluster.SnapshotRestoreWithYAML(ctx, path, filters, loadConfig) if err != nil { return err } diff --git a/pkg/kwokctl/scale/scale.go b/pkg/kwokctl/scale/scale.go index 339cf51fb0..fb6636128b 100644 --- a/pkg/kwokctl/scale/scale.go +++ b/pkg/kwokctl/scale/scale.go @@ -282,7 +282,7 @@ func Scale(ctx context.Context, clientset client.Clientset, conf Config) error { }, wantCreate) ctx = log.NewContext(ctx, logger) - err = snapshot.Load(ctx, clientset, gen, nil) + err = snapshot.Load(ctx, clientset, gen, nil, snapshot.LoadConfig{}) if err != nil { return err } diff --git a/pkg/kwokctl/snapshot/load.go b/pkg/kwokctl/snapshot/load.go index 6b6952dca2..1195d79f10 100644 --- a/pkg/kwokctl/snapshot/load.go +++ b/pkg/kwokctl/snapshot/load.go @@ -39,11 +39,19 @@ import ( "sigs.k8s.io/kwok/pkg/utils/slices" "sigs.k8s.io/kwok/pkg/utils/wait" "sigs.k8s.io/kwok/pkg/utils/yaml" + "sigs.k8s.io/kwok/pkg/utils/format" ) +// LoadConfig is the a combination of the impersonation config +type LoadConfig struct { + NoLoad bool + Replay bool + Relative bool +} + // Load loads the resources to cluster from the reader -func Load(ctx context.Context, clientset client.Clientset, r io.Reader, filters []string) error { - l, err := newLoader(clientset, filters == nil) +func Load(ctx context.Context, clientset client.Clientset, r io.Reader, filters []string, loadConfig LoadConfig) error { + l, err := newLoader(clientset, filters == nil, loadConfig) if err != nil { return err } @@ -73,9 +81,11 @@ type loader struct { restMapper meta.RESTMapper dynamicClient dynamic.Interface + + loadConfig LoadConfig } -func newLoader(clientset client.Clientset, noFilter bool) (*loader, error) { +func newLoader(clientset client.Clientset, noFilter bool, loadConfig LoadConfig) (*loader, error) { restMapper, err := clientset.ToRESTMapper() if err != nil { return nil, fmt.Errorf("failed to create rest mapper: %w", err) @@ -90,6 +100,7 @@ func newLoader(clientset client.Clientset, noFilter bool) (*loader, error) { pending: make(map[uniqueKey][]*unstructured.Unstructured), restMapper: restMapper, dynamicClient: dynClient, + loadConfig: loadConfig, } if !noFilter { l.filterMap = make(map[schema.GroupKind]struct{}) @@ -115,29 +126,178 @@ func (l *loader) addResource(ctx context.Context, resources []string) { func (l *loader) Load(ctx context.Context, r io.Reader) error { logger := log.FromContext(ctx) - start := time.Now() + startTime := time.Now() + + if l.loadConfig.Relative { + r = newReadHook(r, func(b []byte) []byte { + return revertTimeFromRelative(startTime, b) + }) + } + decoder := yaml.NewDecoder(r) + load := true + + var dur time.Duration err := decoder.DecodeToUnstructured(func(obj *unstructured.Unstructured) error { if err := ctx.Err(); err != nil { return err } - if !l.filter(obj) { - logger.Warn("Skipped", - "resource", "filtered", - "kind", obj.GetKind(), - "name", log.KObj(obj), - ) - return nil + + if load { + if obj.GetKind() != resourcePatchType.Kind || obj.GetAPIVersion() != resourcePatchType.APIVersion { + if !l.filter(obj) { + logger.Warn("Skipped", + "resource", "filtered", + "kind", obj.GetKind(), + "name", log.KObj(obj), + ) + return nil + } + + if l.loadConfig.NoLoad { + return nil + } + l.load(ctx, obj) + return nil + } + + load = false + if !l.loadConfig.NoLoad { + err := l.finishLoad(ctx, startTime) + if err != nil { + return err + } + } + + if !l.loadConfig.Replay { + return io.EOF + } + logger.Info("Start replay") + } else if obj.GetKind() != resourcePatchType.Kind || obj.GetAPIVersion() != resourcePatchType.APIVersion { + return fmt.Errorf("unknown object type on replay: %s/%s", obj.GetAPIVersion(), obj.GetKind()) + } + + rpatch, err := yaml.Convert[resourcePatch](obj) + if err != nil { + return err } - l.load(ctx, obj) + time.Sleep(rpatch.DurationNanosecond - dur) + dur = rpatch.DurationNanosecond + + gvr := schema.GroupVersionResource{ + Group: rpatch.Target.Type.Group, + Version: rpatch.Target.Type.Version, + Resource: rpatch.Target.Type.Resource, + } + + nri := l.dynamicClient.Resource(gvr) + var ri dynamic.ResourceInterface = nri + + if ns := rpatch.Target.Namespace; ns != "" { + ri = nri.Namespace(ns) + } + + switch { + case rpatch.Delete: + err := ri.Delete(ctx, rpatch.Target.Name, metav1.DeleteOptions{GracePeriodSeconds: format.Ptr[int64](0)}) + if err != nil { + logger.Error("Failed to delete resource", err, + "gvr", gvr, + "name", log.KObj(obj), + "target", rpatch.Target, + ) + } + case rpatch.Create: + obj := &unstructured.Unstructured{} + err := obj.UnmarshalJSON(rpatch.Patch) + if err != nil { + logger.Error("Failed to unmarshal resource", err, + "gvr", gvr, + "name", log.KObj(obj), + ) + return nil + } + + l.updateOwnerReferences(obj) + obj.SetUID("") + _, err = ri.Create(ctx, obj, metav1.CreateOptions{ + FieldValidation: "Ignore", + }) + if err != nil { + logger.Error("Failed to create resource", err, + "gvr", gvr, + "name", log.KObj(obj), + "target", rpatch.Target, + "obj", obj, + ) + } + case len(rpatch.Patch) != 0: + if gvr.Resource == "pods" && gvr.Version == "v1" { + nodeName, ok, err := unstructured.NestedString(obj.Object, "patch", "spec", "nodeName") + // For scheduling, we need to use a binding for the pod. + if err == nil && ok && nodeName != "" { + bind := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Binding", + "metadata": map[string]any{ + "name": rpatch.Target.Name, + "namespace": rpatch.Target.Namespace, + }, + "target": map[string]any{ + "apiVersion": "v1", + "kind": "Node", + "name": nodeName, + }, + }, + } + _, err = ri.Create(ctx, bind, metav1.CreateOptions{ + FieldValidation: "Ignore", + }, "binding") + if err != nil { + logger.Error("Failed to create binding", err, + "gvr", gvr, + "name", log.KObj(obj), + "target", rpatch.Target, + ) + } + return nil + } + } + + var subresource string + _, hasStatus, _ := unstructured.NestedFieldNoCopy(obj.Object, "patch", "status") + if hasStatus { + subresource = "status" + } + _, err = ri.Patch(ctx, rpatch.Target.Name, types.StrategicMergePatchType, rpatch.Patch, metav1.PatchOptions{ + FieldValidation: "Ignore", + }, subresource) + if err != nil { + logger.Error("Failed to patch resource", err, + "gvr", gvr, + "name", log.KObj(obj), + "target", rpatch.Target, + ) + } + } return nil }) if err != nil { return fmt.Errorf("failed to decode objects: %w", err) } + if load { + return l.finishLoad(ctx, startTime) + } + return nil +} + +func (l *loader) finishLoad(ctx context.Context, startTime time.Time) error { + logger := log.FromContext(ctx) + // Print the skipped resources pending := []*unstructured.Unstructured{} exist := map[types.UID]struct{}{} @@ -172,7 +332,7 @@ func (l *loader) Load(ctx context.Context, r io.Reader) error { ) } - if l.successCounter == 0 { + if !l.loadConfig.Replay && l.successCounter == 0 { return ErrNotHandled } @@ -181,12 +341,12 @@ func (l *loader) Load(ctx context.Context, r io.Reader) error { "counter", l.successCounter+l.failedCounter, "successCounter", l.successCounter, "failedCounter", l.failedCounter, - "elapsed", time.Since(start), + "elapsed", time.Since(startTime), ) } else { logger.Info("Load resources", "counter", l.successCounter, - "elapsed", time.Since(start), + "elapsed", time.Since(startTime), ) } return nil diff --git a/pkg/kwokctl/snapshot/patch.go b/pkg/kwokctl/snapshot/patch.go new file mode 100644 index 0000000000..7d0588f0ea --- /dev/null +++ b/pkg/kwokctl/snapshot/patch.go @@ -0,0 +1,215 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package snapshot + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/kwok/pkg/log" + "io" + "bufio" + "bytes" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var resourcePatchType = metav1.TypeMeta{ + Kind: "ResourcePatch", + APIVersion: "kwok.x-k8s.io/internal", +} + +type resourcePatch struct { + metav1.TypeMeta `json:",inline"` + Target resourcePatchTarget `json:"target"` + DurationNanosecond time.Duration `json:"durationNanosecond"` + Create bool `json:"create,omitempty"` + Delete bool `json:"delete,omitempty"` + Patch json.RawMessage `json:"patch,omitempty"` +} + +type resourcePatchTarget struct { + Type groupVersionResource `json:"type"` + Name string `json:"name"` + Namespace string `json:"namespace,omitempty"` +} + +type groupVersionResource struct { + Group string `json:"group,omitempty"` + Version string `json:"version"` + Resource string `json:"resource"` +} + +type trackDate struct { + Data map[log.ObjectRef]json.RawMessage + ResourceVersion string +} + +func genResourcePatch(ctx context.Context, event watch.Event, patchMeta strategicpatch.LookupPatchMeta, gvr schema.GroupVersionResource, shotTime time.Time, ref map[log.ObjectRef]json.RawMessage) (*resourcePatch, error) { + typ := groupVersionResource{ + Group: gvr.Group, + Version: gvr.Version, + Resource: gvr.Resource, + } + switch o := event.Object.(type) { + default: + return nil, fmt.Errorf("unknown object type: %T", o) + case *metav1.Status: + if errors.Is(ctx.Err(), context.Canceled) { + return nil, context.Canceled + } + return nil, fmt.Errorf("error status: %s: %s", o.Reason, o.Message) + case metav1.Object: + key := log.KObj(o) + + switch event.Type { + case watch.Added, watch.Modified: + clearUnstructured(o) + + modified, err := json.Marshal(o) + if err != nil { + return nil, err + } + + original, ok := ref[key] + if !ok { + ref[key] = modified + rp := resourcePatch{ + TypeMeta: resourcePatchType, + Target: resourcePatchTarget{ + Type: typ, + Name: o.GetName(), + Namespace: o.GetNamespace(), + }, + DurationNanosecond: time.Since(shotTime), + Create: true, + Patch: modified, + } + return &rp, nil + } + ref[key] = modified + + originalMap := map[string]interface{}{} + if len(original) > 0 { + if err := json.Unmarshal(original, &originalMap); err != nil { + return nil, mergepatch.ErrBadJSONDoc + } + } + + modifiedMap := map[string]interface{}{} + if len(modified) > 0 { + if err := json.Unmarshal(modified, &modifiedMap); err != nil { + return nil, mergepatch.ErrBadJSONDoc + } + } + + patchMap, err := strategicpatch.CreateTwoWayMergeMapPatchUsingLookupPatchMeta(originalMap, modifiedMap, patchMeta) + if err != nil { + return nil, err + } + + patch, err := json.Marshal(patchMap) + if err != nil { + return nil, err + } + + rp := resourcePatch{ + TypeMeta: resourcePatchType, + Target: resourcePatchTarget{ + Type: typ, + Name: o.GetName(), + Namespace: o.GetNamespace(), + }, + DurationNanosecond: time.Since(shotTime), + Patch: patch, + } + return &rp, nil + case watch.Deleted: + key := log.KObj(o) + delete(ref, key) + rp := resourcePatch{ + TypeMeta: resourcePatchType, + Target: resourcePatchTarget{ + Type: typ, + Name: o.GetName(), + Namespace: o.GetNamespace(), + }, + DurationNanosecond: time.Since(shotTime), + Delete: true, + } + return &rp, nil + default: + return nil, fmt.Errorf("unknown event type: %s", event.Type) + } + } +} + +func newWriteHook(w io.Writer, hook func([]byte) []byte) io.Writer { + return &writeHook{ + w: w, + hook: hook, + } +} + +type writeHook struct { + hook func([]byte) []byte + w io.Writer +} + +func (w *writeHook) Write(data []byte) (int, error) { + d := w.hook(data) + n, err := w.w.Write(d) + if err != nil { + return n, err + } + if n != len(d) { + return n, io.ErrShortWrite + } + return len(data), nil +} + +type readHook struct { + hook func([]byte) []byte + r *bufio.Reader + buf bytes.Buffer +} + +func newReadHook(r io.Reader, hook func([]byte) []byte) io.Reader { + return &readHook{ + r: bufio.NewReader(r), + hook: hook, + } +} + +func (r *readHook) Read(data []byte) (int, error) { + if r.buf.Len() != 0 { + return r.buf.Read(data) + } + line, err := r.r.ReadSlice('\n') + if err != nil { + return 0, err + } + d := r.hook(line) + r.buf.Write(d) + return r.buf.Read(data) +} diff --git a/pkg/kwokctl/snapshot/runtime.go b/pkg/kwokctl/snapshot/runtime.go index 3e3a10a2b6..09049eca7b 100644 --- a/pkg/kwokctl/snapshot/runtime.go +++ b/pkg/kwokctl/snapshot/runtime.go @@ -30,6 +30,7 @@ import ( // list all resources can use: kubectl api-resources -o name var Resources = []string{ "namespace", + "leases.coordination.k8s.io", "node", "serviceaccount", "configmap", diff --git a/pkg/kwokctl/snapshot/save.go b/pkg/kwokctl/snapshot/save.go index 17138a03c0..85abb2e84f 100644 --- a/pkg/kwokctl/snapshot/save.go +++ b/pkg/kwokctl/snapshot/save.go @@ -18,21 +18,28 @@ package snapshot import ( "context" + "encoding/json" + "errors" "fmt" "io" + "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/pager" "k8s.io/client-go/util/retry" - "sigs.k8s.io/kwok/pkg/log" "sigs.k8s.io/kwok/pkg/utils/client" "sigs.k8s.io/kwok/pkg/utils/yaml" + "k8s.io/client-go/rest" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/kwok/pkg/utils/patch" ) // PagerConfig is the configuration of the list pager. @@ -46,6 +53,9 @@ type PagerConfig struct { // and the PagerConfig. type SaveConfig struct { PagerConfig *PagerConfig + Record bool + NoSave bool + Relative bool } // Save saves the snapshot of cluster @@ -61,78 +71,195 @@ func Save(ctx context.Context, clientset client.Clientset, w io.Writer, resource logger := log.FromContext(ctx) - gvrs := make([]schema.GroupVersionResource, 0, len(resources)) + rms := make([]*meta.RESTMapping, 0, len(resources)) for _, resource := range resources { mapping, err := client.MappingFor(restMapper, resource) if err != nil { logger.Warn("Failed to get mapping for resource", "resource", resource, "err", err) continue } - gvrs = append(gvrs, mapping.Resource) + rms = append(rms, mapping) + } + + var tracks map[*meta.RESTMapping]*trackDate + if saveConfig.Record { + tracks = make(map[*meta.RESTMapping]*trackDate) + for _, rm := range rms { + tracks[rm] = &trackDate{ + Data: map[log.ObjectRef]json.RawMessage{}, + } + } } - encoder := yaml.NewEncoder(w) - totalCounter := 0 start := time.Now() - for _, gvr := range gvrs { - nri := dynamicClient.Resource(gvr) - logger := logger.With("resource", gvr.Resource) - - start := time.Now() - page := 0 - listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - var list runtime.Object - var err error - page++ - logger := logger.With("page", page, "limit", opts.Limit) - logger.Debug("Listing resource") - err = retry.OnError(retry.DefaultBackoff, retriable, func() error { - list, err = nri.List(ctx, opts) - if err != nil { - logger.Error("failed to list resource", err) - } - return err - }) - return list, err + + if saveConfig.Relative { + w = newWriteHook(w, func(b []byte) []byte { + return replaceTimeToRelative(start, b) }) + } - pagerConfig := saveConfig.PagerConfig + encoder := yaml.NewEncoder(w) + + if !saveConfig.NoSave { + totalCounter := 0 + for _, rm := range rms { + gvr := rm.Resource + nri := dynamicClient.Resource(gvr) + logger := logger.With("resource", gvr.Resource) + + start := time.Now() + page := 0 + + latestResourceVersion := "" + listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + var list runtime.Object + var err error + page++ + logger := logger.With("page", page, "limit", opts.Limit) + logger.Debug("Listing resource") + err = retry.OnError(retry.DefaultBackoff, retriable, func() error { + l, err := nri.List(ctx, opts) + if err != nil { + logger.Error("failed to list resource", err) + } else { + list = l + latestResourceVersion = l.GetResourceVersion() + } + return err + }) + return list, err + }) + + pagerConfig := saveConfig.PagerConfig - if pagerConfig != nil { - if pagerConfig.PageSize > 0 { - listPager.PageSize = pagerConfig.PageSize + if pagerConfig != nil { + if pagerConfig.PageSize > 0 { + listPager.PageSize = pagerConfig.PageSize + } + if pagerConfig.PageBufferSize > 0 { + listPager.PageBufferSize = pagerConfig.PageBufferSize + } } - if pagerConfig.PageBufferSize > 0 { - listPager.PageBufferSize = pagerConfig.PageBufferSize + + track := tracks[rm] + count := 0 + if err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + if o, ok := obj.(metav1.Object); ok { + clearUnstructured(o) + if track != nil { + track.Data[log.KObj(o)], _ = json.Marshal(o) + } + } + count++ + return encoder.Encode(obj) + }); err != nil { + return fmt.Errorf("failed to list resource %q: %w", gvr.Resource, err) } - } - count := 0 - if err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { - if o, ok := obj.(metav1.Object); ok { - clearUnstructured(o) + if track != nil { + track.ResourceVersion = latestResourceVersion } - count++ - return encoder.Encode(obj) - }); err != nil { - return fmt.Errorf("failed to list resource %q: %w", gvr.Resource, err) + logger.Debug("Listed resource", + "counter", count, + "elapsed", time.Since(start), + ) + totalCounter += count } - logger.Debug("Listed resource", - "counter", count, + if tracks == nil { + if totalCounter == 0 { + return ErrNotHandled + } + } + logger.Info("Saved resources", + "counter", totalCounter, "elapsed", time.Since(start), ) - totalCounter += count } - if totalCounter == 0 { - return ErrNotHandled - } + if tracks != nil { + logger.Info("Start recording resources") + + wg := sync.WaitGroup{} + shotTime := time.Now() + + restConfig, err := clientset.ToRESTConfig() + if err != nil { + return fmt.Errorf("failed to get rest config: %w", err) + } + + restConfig.GroupVersion = &schema.GroupVersion{} + restClient, err := rest.RESTClientFor(restConfig) + if err != nil { + return fmt.Errorf("failed to create rest client: %w", err) + } + + patchMetaSchema := patch.NewPatchMetaFromOpenAPI3(restClient) + + que := make(chan *resourcePatch) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case resourcePatch := <-que: + err = encoder.Encode(resourcePatch) + if err != nil { + logger.Warn("Failed to encode resource patch", "err", err) + } + } + } + }() + + for rm, track := range tracks { + gvr := rm.Resource + patchMeta, err := patchMetaSchema.Lookup(gvr) + if err != nil { + return fmt.Errorf("failed to lookup patch meta: %w", err) + } + nri := dynamicClient.Resource(gvr) + logger := logger.With("resource", gvr.Resource) + + w, err := nri.Watch(ctx, metav1.ListOptions{ + ResourceVersion: track.ResourceVersion, + }) + if err != nil { + return fmt.Errorf("failed to watch resource %q: %w", gvr.Resource, err) + } - logger.Info("Saved resources", - "counter", totalCounter, - "elapsed", time.Since(start), - ) + trackData := track.Data + wg.Add(1) + + go func(w watch.Interface) { + defer wg.Done() + ch := w.ResultChan() + for { + select { + case <-ctx.Done(): + return + case event := <-ch: + resourcePatch, err := genResourcePatch(ctx, event, patchMeta, gvr, shotTime, trackData) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + logger.Warn("Failed to generate resource patch", "err", err) + continue + } + que <- resourcePatch + } + } + }(w) + } + + logger.Info("Press Ctrl+C to stop recording resources") + wg.Wait() + + logger.Info("Stopping recording resources") + } return nil } diff --git a/pkg/kwokctl/snapshot/time.go b/pkg/kwokctl/snapshot/time.go new file mode 100644 index 0000000000..6f46cddd37 --- /dev/null +++ b/pkg/kwokctl/snapshot/time.go @@ -0,0 +1,58 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package snapshot + +import ( + "time" + "strconv" + "regexp" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + formatRFC3339Nano = time.RFC3339Nano // "2006-01-02T15:04:05.999999999Z07:00" + formatRFC3339Micro = metav1.RFC3339Micro // "2006-01-02T15:04:05.000000Z07:00" + + regReplaceTimeFormat = regexp.MustCompile(`\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})`) + regRevertTimeOffset = regexp.MustCompile(`\$\(time-offset-nanosecond -?\d+\)`) +) + +func replaceTimeToRelative(baseTime time.Time, data []byte) []byte { + return regReplaceTimeFormat.ReplaceAllFunc(data, func(s []byte) []byte { + t, err := time.Parse(formatRFC3339Nano, string(s)) + if err != nil { + return s + } + + sub := t.Sub(baseTime) + return []byte(fmt.Sprintf("$(time-offset-nanosecond %d)", int64(sub))) + }) +} + +func revertTimeFromRelative(baseTime time.Time, data []byte) []byte { + return regRevertTimeOffset.ReplaceAllFunc(data, func(s []byte) []byte { + i, err := strconv.ParseInt(string(s[25:len(s)-1]), 0, 0) + if err != nil { + return s + } + + t := baseTime.Add(time.Duration(i)).UTC() + return []byte(t.Format(formatRFC3339Micro)) + }) +} diff --git a/pkg/kwokctl/snapshot/time_test.go b/pkg/kwokctl/snapshot/time_test.go new file mode 100644 index 0000000000..e4ef86cc15 --- /dev/null +++ b/pkg/kwokctl/snapshot/time_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package snapshot + +import ( + "reflect" + "testing" + "time" +) + +func Test_replaceTimeToRelative(t *testing.T) { + type args struct { + baseTime time.Time + data string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "replace time to relative", + args: args{ + baseTime: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + data: "any xxx 2021-01-01T00:00:01Z any xxx", + }, + want: "any xxx $(time-offset-nanosecond 1000000000) any xxx", + }, + { + name: "replace time to relative with nanosecond", + args: args{ + baseTime: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + data: "any xxx 2021-01-01T00:00:01.1Z any xxx", + }, + want: "any xxx $(time-offset-nanosecond 1100000000) any xxx", + }, + { + name: "replace time to relative with nanosecond", + args: args{ + baseTime: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + data: "any xxx 2021-01-01T00:00:01.00000Z any xxx", + }, + want: "any xxx $(time-offset-nanosecond 1000000000) any xxx", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := string(replaceTimeToRelative(tt.args.baseTime, []byte(tt.args.data))); !reflect.DeepEqual(got, tt.want) { + t.Errorf("replaceTimeToRelative() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_revertTimeFromRelative(t *testing.T) { + type args struct { + baseTime time.Time + data string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "revert time from relative", + args: args{ + baseTime: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + data: "any xxx $(time-offset-nanosecond 1000000000) any xxx", + }, + want: "any xxx 2021-01-01T00:00:01.000000Z any xxx", + }, + { + name: "revert time from relative with nanosecond", + args: args{ + baseTime: time.Date(2021, 1, 1, 0, 0, 0, 1, time.UTC), + data: "any xxx $(time-offset-nanosecond 1100000000) any xxx", + }, + want: "any xxx 2021-01-01T00:00:01.100000Z any xxx", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := string(revertTimeFromRelative(tt.args.baseTime, []byte(tt.args.data))); !reflect.DeepEqual(got, tt.want) { + t.Errorf("revertTimeFromRelative() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/utils/client/clientset.go b/pkg/utils/client/clientset.go index fc3e9414f1..c9589509c7 100644 --- a/pkg/utils/client/clientset.go +++ b/pkg/utils/client/clientset.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -39,7 +40,7 @@ type Clientset interface { ToRawKubeConfigLoader() clientcmd.ClientConfig ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) ToRESTMapper() (meta.RESTMapper, error) - ToRESTClient() (*rest.RESTClient, error) + ToRESTClient(gv schema.GroupVersion) (*rest.RESTClient, error) ToTypedClient() (kubernetes.Interface, error) ToTypedKwokClient() (versioned.Interface, error) ToDynamicClient() (dynamic.Interface, error) @@ -52,7 +53,6 @@ type clientset struct { restConfig *rest.Config discoveryClient discovery.CachedDiscoveryInterface restMapper meta.RESTMapper - restClient *rest.RESTClient clientConfig clientcmd.ClientConfig typedClient *kubernetes.Clientset kwokClient *versioned.Clientset @@ -141,19 +141,19 @@ func (g *clientset) ToRESTMapper() (meta.RESTMapper, error) { } // ToRESTClient returns a REST client. -func (g *clientset) ToRESTClient() (*rest.RESTClient, error) { - if g.restClient == nil { - restConfig, err := g.ToRESTConfig() - if err != nil { - return nil, err - } - restClient, err := rest.RESTClientFor(restConfig) - if err != nil { - return nil, fmt.Errorf("could not get Kubernetes REST client: %w", err) - } - g.restClient = restClient +func (g *clientset) ToRESTClient(gv schema.GroupVersion) (*rest.RESTClient, error) { + restConfig, err := g.ToRESTConfig() + if err != nil { + return nil, err + } + config := *restConfig + config.GroupVersion = &gv + + restClient, err := rest.RESTClientFor(&config) + if err != nil { + return nil, fmt.Errorf("could not get Kubernetes REST client: %w", err) } - return g.restClient, nil + return restClient, nil } // ToRawKubeConfigLoader returns a raw kubeconfig loader. diff --git a/pkg/utils/informer/informer.go b/pkg/utils/informer/informer.go index afd76c4346..29fad4242f 100644 --- a/pkg/utils/informer/informer.go +++ b/pkg/utils/informer/informer.go @@ -28,7 +28,7 @@ import ( "sigs.k8s.io/kwok/pkg/log" ) -// Informer is a wrapper around a Get/List/Watch function. +// Informer is a wrapper around a Get/List/Record function. type Informer[T runtime.Object, L runtime.Object] struct { ListFunc func(ctx context.Context, opts metav1.ListOptions) (L, error) WatchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) diff --git a/pkg/utils/maps/sync_test.go b/pkg/utils/maps/sync_test.go index d7fd432324..5f15779527 100644 --- a/pkg/utils/maps/sync_test.go +++ b/pkg/utils/maps/sync_test.go @@ -105,10 +105,10 @@ func TestSyncMap_Load(t *testing.T) { m := tt.createFunc() gotValue, gotOk := m.Load(tt.args.key) if !reflect.DeepEqual(gotValue, tt.wantValue) { - t.Errorf("Load() gotValue = %v, want %v", gotValue, tt.wantValue) + t.Errorf("NoLoad() gotValue = %v, want %v", gotValue, tt.wantValue) } if gotOk != tt.wantOk { - t.Errorf("Load() gotOk = %v, want %v", gotOk, tt.wantOk) + t.Errorf("NoLoad() gotOk = %v, want %v", gotOk, tt.wantOk) } }) } diff --git a/pkg/utils/patch/openapi.go b/pkg/utils/patch/openapi.go new file mode 100644 index 0000000000..db6233530f --- /dev/null +++ b/pkg/utils/patch/openapi.go @@ -0,0 +1,271 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package patch + +import ( + "fmt" + "strings" + "net/http" + + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/openapi3" + "k8s.io/kube-openapi/pkg/spec3" + "k8s.io/kube-openapi/pkg/validation/spec" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/client-go/openapi" + "k8s.io/client-go/rest" + + "sigs.k8s.io/kwok/pkg/utils/maps" + "sigs.k8s.io/kwok/pkg/utils/slices" +) + +const ( + patchStrategyOpenapiextensionKey = "x-kubernetes-patch-strategy" + patchMergeKeyOpenapiextensionKey = "x-kubernetes-patch-merge-key" +) + +// PatchMetaFromOpenAPI3 is a PatchMetaFromOpenAPI3 implementation that uses openapi3. +type PatchMetaFromOpenAPI3 struct { + root openapi3.Root + specCache map[schema.GroupVersionResource]*patchMeta +} + +// NewPatchMetaFromOpenAPI3 creates a new PatchMetaFromOpenAPI3. +func NewPatchMetaFromOpenAPI3(s rest.Interface) *PatchMetaFromOpenAPI3 { + openapiClient := openapi.NewClient(s) + openapi3Root := openapi3.NewRoot(openapiClient) + return &PatchMetaFromOpenAPI3{ + root: openapi3Root, + specCache: map[schema.GroupVersionResource]*patchMeta{}, + } +} + +// Lookup returns the patch metadata for the given group-version-resource. +func (p *PatchMetaFromOpenAPI3) Lookup(gvr schema.GroupVersionResource) (strategicpatch.LookupPatchMeta, error) { + lookmeta := p.specCache[gvr] + if lookmeta != nil { + return lookmeta, nil + } + + gv := gvr.GroupVersion() + spec, err := p.root.GVSpec(gv) + if err != nil { + return nil, fmt.Errorf("failed to get openapi spec: %w", err) + } + + // Match the suffix like "/nodes/{name}" and exclude the watch path like "/watch/nodes/{name}" + findResourceWithPath := "/" + gvr.Resource + "/{name}" + key, ok := slices.Find(maps.Keys(spec.Paths.Paths), func(s string) bool { + return strings.HasSuffix(s, findResourceWithPath) && + !strings.Contains(s[:len(s)-len(findResourceWithPath)], "/watch/") + }) + if !ok { + return nil, fmt.Errorf("failed to find resource: %s", gvr.Resource) + } + + path := spec.Paths.Paths[key] + if path.Get == nil { + return nil, fmt.Errorf("failed to find get method: %s", key) + } + + if path.Get.Responses.StatusCodeResponses == nil { + return nil, fmt.Errorf("failed to find response: %s", key) + } + + response := path.Get.Responses.StatusCodeResponses[http.StatusOK] + if response == nil { + return nil, fmt.Errorf("failed to find response 200: %s", key) + } + + if response.Content == nil { + return nil, fmt.Errorf("failed to find content: %s", key) + } + + if response.Content["application/json"] == nil { + return nil, fmt.Errorf("failed to find application/json: %s", key) + } + + schema := response.Content["application/json"].Schema + if schema == nil { + return nil, fmt.Errorf("failed to find schema: %s", key) + } + + lookmeta = &patchMeta{ + openapi: spec, + schema: schema, + } + + err = lookmeta.forward(5) + if err != nil { + return nil, fmt.Errorf("failed to forward schema: %w", err) + } + + p.specCache[gvr] = lookmeta + return lookmeta, nil +} + +type patchMeta struct { + openapi *spec3.OpenAPI + schema *spec.Schema + name string +} + +func (p *patchMeta) forwardWithArray(ttl int) error { + if p.schema.Items == nil { + return fmt.Errorf("schema items not found") + } + + p.schema = p.schema.Items.Schema + return p.forward(ttl) +} + +func (p *patchMeta) forward(ttl int) error { + if len(p.schema.Type) != 0 { + return nil + } + + if ttl <= 0 { + return fmt.Errorf("forwarding schema too deep") + } + + if p.schema.Ref.HasFragmentOnly { + token := p.schema.Ref.GetPointer().DecodedTokens() + p.schema = p.openapi.Components.Schemas[token[len(token)-1]] + } else if len(p.schema.AllOf) != 0 { + p.schema = &p.schema.AllOf[0] + } else if len(p.schema.OneOf) != 0 { + p.schema = &p.schema.OneOf[0] + } else if len(p.schema.AnyOf) != 0 { + p.schema = &p.schema.AnyOf[0] + } else { + return fmt.Errorf("failed to forward schema") + } + + return p.forward(ttl - 1) +} + +// LookupPatchMetadataForStruct gets subschema and the patch metadata (e.g. patch strategy and merge key) for map. +func (p *patchMeta) LookupPatchMetadataForStruct(key string) (strategicpatch.LookupPatchMeta, strategicpatch.PatchMeta, error) { + if p.schema == nil { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("schema not found") + } + + if p.schema.Properties == nil { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("schema properties not found") + } + + prop, ok := p.schema.Properties[key] + if !ok { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("schema property not found: %s", key) + } + + meta, err := parsePatchMetadata(prop.Extensions) + if err != nil { + return nil, strategicpatch.PatchMeta{}, err + } + + lookmeta := &patchMeta{ + openapi: p.openapi, + schema: &prop, + name: key, + } + + err = lookmeta.forward(5) + if err != nil { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("failed to forward schema: %w", err) + } + + return lookmeta, meta, nil +} + +// LookupPatchMetadataForSlice get subschema and the patch metadata for slice. +func (p *patchMeta) LookupPatchMetadataForSlice(key string) (strategicpatch.LookupPatchMeta, strategicpatch.PatchMeta, error) { + if p.schema == nil { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("schema not found") + } + + if p.schema.Properties == nil { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("schema properties not found") + } + + prop, ok := p.schema.Properties[key] + if !ok { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("schema property not found: %s", key) + } + if !prop.Type.Contains("array") { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("schema property is not array: %s", key) + } + + meta, err := parsePatchMetadata(prop.Extensions) + if err != nil { + return nil, strategicpatch.PatchMeta{}, err + } + + lookmeta := &patchMeta{ + openapi: p.openapi, + schema: &prop, + name: key, + } + + err = lookmeta.forwardWithArray(5) + if err != nil { + return nil, strategicpatch.PatchMeta{}, fmt.Errorf("failed to forward schema: %w", err) + } + + return lookmeta, meta, nil +} + +// Name returns the type name of the field +func (p *patchMeta) Name() string { + return p.name +} + +func parsePatchMetadata(extensions map[string]interface{}) (strategicpatch.PatchMeta, error) { + ps, foundPS := extensions[patchStrategyOpenapiextensionKey] + var patchStrategies []string + var mergeKey, patchStrategy string + var ok bool + if foundPS { + patchStrategy, ok = ps.(string) + if ok { + patchStrategies = strings.Split(patchStrategy, ",") + } else { + return strategicpatch.PatchMeta{}, mergepatch.ErrBadArgType(patchStrategy, ps) + } + } + mk, foundMK := extensions[patchMergeKeyOpenapiextensionKey] + if foundMK { + mergeKey, ok = mk.(string) + if !ok { + return strategicpatch.PatchMeta{}, mergepatch.ErrBadArgType(mergeKey, mk) + } + } + var meta strategicpatch.PatchMeta + if len(patchStrategies) != 0 { + // Avoid duplicate values being ignored, e.g. heartbeat on condition + patchStrategies = slices.Filter(patchStrategies, func(s string) bool { + return s != "retainKeys" + }) + + meta.SetPatchStrategies(patchStrategies) + } + if mergeKey != "" { + meta.SetPatchMergeKey(mergeKey) + } + return meta, nil +} diff --git a/pkg/utils/yaml/encoder.go b/pkg/utils/yaml/encoder.go index 1879372ea8..89ad804e14 100644 --- a/pkg/utils/yaml/encoder.go +++ b/pkg/utils/yaml/encoder.go @@ -19,8 +19,6 @@ package yaml import ( "io" "sync/atomic" - - "k8s.io/apimachinery/pkg/runtime" ) // Encoder is a YAML encoder. @@ -39,7 +37,7 @@ func NewEncoder(w io.Writer) *Encoder { var separator = []byte("---\n") // Encode prints the object as YAML. -func (p *Encoder) Encode(obj runtime.Object) error { +func (p *Encoder) Encode(obj any) error { count := atomic.AddInt64(&p.printCount, 1) if count > 1 { if _, err := p.w.Write(separator); err != nil { diff --git a/site/content/en/docs/generated/kwokctl_snapshot_export.md b/site/content/en/docs/generated/kwokctl_snapshot_export.md index fc100a5a1e..303376ff5a 100644 --- a/site/content/en/docs/generated/kwokctl_snapshot_export.md +++ b/site/content/en/docs/generated/kwokctl_snapshot_export.md @@ -17,6 +17,7 @@ kwokctl snapshot export [flags] --page-buffer-size int32 Define the number of pages to buffer (default 10) --page-size int Define the page size (default 500) --path string Path to the snapshot + --record Record the change of the cluster ``` ### Options inherited from parent commands diff --git a/site/content/en/docs/generated/kwokctl_snapshot_record.md b/site/content/en/docs/generated/kwokctl_snapshot_record.md new file mode 100644 index 0000000000..f3d2732d3b --- /dev/null +++ b/site/content/en/docs/generated/kwokctl_snapshot_record.md @@ -0,0 +1,34 @@ +## kwokctl snapshot record + +[experimental] Record the snapshot of the cluster + +``` +kwokctl snapshot record [flags] +``` + +### Options + +``` + --as string Username to impersonate for the operation. User could be a regular user or a service account in a namespace. + --as-group strings Group to impersonate for the operation, this flag can be repeated to specify multiple groups. + --filter strings Filter the resources to export (default [namespace,node,serviceaccount,configmap,secret,limitrange,runtimeclass.node.k8s.io,priorityclass.scheduling.k8s.io,clusterrolebindings.rbac.authorization.k8s.io,clusterroles.rbac.authorization.k8s.io,rolebindings.rbac.authorization.k8s.io,roles.rbac.authorization.k8s.io,daemonset.apps,deployment.apps,replicaset.apps,statefulset.apps,cronjob.batch,job.batch,persistentvolumeclaim,persistentvolume,pod,service,endpoints]) + -h, --help help for record + --kubeconfig string Path to the kubeconfig file to use + --page-buffer-size int32 Define the number of pages to buffer (default 10) + --page-size int Define the page size (default 500) + --path string Path to the snapshot +``` + +### Options inherited from parent commands + +``` + -c, --config strings config path (default [~/.kwok/kwok.yaml]) + --dry-run Print the command that would be executed, but do not execute it + --name string cluster name (default "kwok") + -v, --v log-level number for the log level verbosity (DEBUG, INFO, WARN, ERROR) or (-4, 0, 4, 8) (default INFO) +``` + +### SEE ALSO + +* [kwokctl snapshot](kwokctl_snapshot.md) - Snapshot [save, restore, export] one of cluster + diff --git a/site/content/en/docs/generated/kwokctl_snapshot_replay.md b/site/content/en/docs/generated/kwokctl_snapshot_replay.md new file mode 100644 index 0000000000..f94858522a --- /dev/null +++ b/site/content/en/docs/generated/kwokctl_snapshot_replay.md @@ -0,0 +1,29 @@ +## kwokctl snapshot replay + +[experimental] Replay the snapshot of the cluster + +``` +kwokctl snapshot replay [flags] +``` + +### Options + +``` + --filter strings Filter the resources to restore, only support for k8s format (default [namespace,node,serviceaccount,configmap,secret,limitrange,runtimeclass.node.k8s.io,priorityclass.scheduling.k8s.io,clusterrolebindings.rbac.authorization.k8s.io,clusterroles.rbac.authorization.k8s.io,rolebindings.rbac.authorization.k8s.io,roles.rbac.authorization.k8s.io,daemonset.apps,deployment.apps,replicaset.apps,statefulset.apps,cronjob.batch,job.batch,persistentvolumeclaim,persistentvolume,pod,service,endpoints]) + -h, --help help for replay + --path string Path to the snapshot +``` + +### Options inherited from parent commands + +``` + -c, --config strings config path (default [~/.kwok/kwok.yaml]) + --dry-run Print the command that would be executed, but do not execute it + --name string cluster name (default "kwok") + -v, --v log-level number for the log level verbosity (DEBUG, INFO, WARN, ERROR) or (-4, 0, 4, 8) (default INFO) +``` + +### SEE ALSO + +* [kwokctl snapshot](kwokctl_snapshot.md) - Snapshot [save, restore, export] one of cluster + diff --git a/site/content/en/docs/generated/kwokctl_snapshot_restore.md b/site/content/en/docs/generated/kwokctl_snapshot_restore.md index d49bafdf44..8ee8e7e453 100644 --- a/site/content/en/docs/generated/kwokctl_snapshot_restore.md +++ b/site/content/en/docs/generated/kwokctl_snapshot_restore.md @@ -13,6 +13,7 @@ kwokctl snapshot restore [flags] --format string Format of the snapshot file (etcd, k8s) (default "etcd") -h, --help help for restore --path string Path to the snapshot + --replay Replay the change of the cluster ``` ### Options inherited from parent commands diff --git a/site/content/en/docs/generated/kwokctl_snapshot_save.md b/site/content/en/docs/generated/kwokctl_snapshot_save.md index 318a742e71..88d7fcb552 100644 --- a/site/content/en/docs/generated/kwokctl_snapshot_save.md +++ b/site/content/en/docs/generated/kwokctl_snapshot_save.md @@ -13,6 +13,7 @@ kwokctl snapshot save [flags] --format string Format of the snapshot file (etcd, k8s) (default "etcd") -h, --help help for save --path string Path to the snapshot + --record Record the change of the cluster ``` ### Options inherited from parent commands