Skip to content

Commit

Permalink
Shorten the default node lease
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Dec 5, 2023
1 parent 92e0320 commit 40eb177
Show file tree
Hide file tree
Showing 36 changed files with 453 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/config/v1alpha1/kwokctl_configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ type KwokctlConfigurationOptions struct {
NodeStatusUpdateFrequencyMilliseconds int64 `json:"nodeStatusUpdateFrequencyMilliseconds,omitempty"`

// NodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
// +default=1200
// +default=40
NodeLeaseDurationSeconds uint `json:"nodeLeaseDurationSeconds,omitempty"`

// BindAddress is the address to bind to.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/config/v1alpha1/zz_generated.defaults.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions pkg/kwok/controllers/node_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,24 @@ func (c *NodeLeaseController) sync(ctx context.Context, nodeName string) {
logger.Info("Creating lease")
latestLease, err := c.ensureLease(ctx, nodeName)
if err != nil {
if apierrors.IsAlreadyExists(err) {
logger.Error("failed to create lease, lease already exists", err)

_, err = c.syncLease(ctx, nodeName)
if err != nil {
logger.Error("failed to sync lease", err)
return
}
if c.onNodeManagedFunc != nil {
if c.Held(nodeName) {
c.onNodeManagedFunc(nodeName)
} else {
logger.Warn("Lease not held")
}
}
return
}

if !apierrors.IsNotFound(err) || !c.latestLease.IsEmpty() {
logger.Error("failed to create lease", err)
return
Expand All @@ -243,6 +261,15 @@ func (c *NodeLeaseController) sync(ctx context.Context, nodeName string) {
}
}

func (c *NodeLeaseController) syncLease(ctx context.Context, leaseName string) (*coordinationv1.Lease, error) {
lease, err := c.typedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease).Get(ctx, leaseName, metav1.GetOptions{})
if err != nil {
return nil, err
}
c.latestLease.Store(leaseName, lease)
return lease, nil
}

// ensureLease creates a lease if it does not exist
func (c *NodeLeaseController) ensureLease(ctx context.Context, leaseName string) (*coordinationv1.Lease, error) {
lease := &coordinationv1.Lease{
Expand Down
1 change: 1 addition & 0 deletions pkg/kwokctl/cmd/create/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().StringVar(&flags.Kubeconfig, "kubeconfig", flags.Kubeconfig, "The path to the kubeconfig file will be added to the newly created cluster and set to current-context")
cmd.Flags().BoolVar(&flags.Options.DisableQPSLimits, "disable-qps-limits", flags.Options.DisableQPSLimits, "Disable QPS limits for components")
cmd.Flags().StringSliceVar(&flags.Options.EnableCRDs, "enable-crds", flags.Options.EnableCRDs, "List of CRDs to enable")
cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease in seconds")

return cmd
}
Expand Down
39 changes: 32 additions & 7 deletions pkg/kwokctl/runtime/binary/cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,44 @@ func (c *Cluster) SnapshotSave(ctx context.Context, path string) error {
func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error {
logger := log.FromContext(ctx)

err := c.StopComponent(ctx, consts.ComponentEtcd)
if err != nil {
logger.Error("Failed to stop etcd", err)
// Restart etcd and kube-apiserver
components := []string{
consts.ComponentEtcd,
consts.ComponentKubeApiserver,
}
defer func() {
err = c.StartComponent(ctx, consts.ComponentEtcd)
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to start etcd", err)
logger.Error("Failed to stop", err, "component", component)
}
}
defer func() {
for _, component := range components {
err := c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start", err, "component", component)
}
}

components := []string{
consts.ComponentKwokController,
consts.ComponentKubeControllerManager,
consts.ComponentKubeScheduler,
}
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to stop", err, "component", component)
}
err = c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start", err, "component", component)
}
}
}()

etcdDataTmp := c.GetWorkdirPath("etcd-data")
err = c.RemoveAll(etcdDataTmp)
err := c.RemoveAll(etcdDataTmp)
if err != nil {
return err
}
Expand Down
79 changes: 65 additions & 14 deletions pkg/kwokctl/runtime/compose/cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,39 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error {

etcdContainerName := c.Name() + "-etcd"
if conf.Runtime != consts.RuntimeTypeNerdctl {
// Restart etcd container
err = c.StopComponent(ctx, consts.ComponentEtcd)
if err != nil {
logger.Error("Failed to stop etcd", err)
// Restart etcd and kube-apiserver
components := []string{
consts.ComponentEtcd,
consts.ComponentKubeApiserver,
}
defer func() {
err = c.StartComponent(ctx, consts.ComponentEtcd)
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to start etcd", err)
logger.Error("Failed to stop", err, "component", component)
}
}
defer func() {
for _, component := range components {
err := c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start", err, "component", component)
}
}

components := []string{
consts.ComponentKwokController,
consts.ComponentKubeControllerManager,
consts.ComponentKubeScheduler,
}
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to stop", err, "component", component)
}
err = c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start", err, "component", component)
}
}
}()

Expand Down Expand Up @@ -110,15 +134,42 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error {
return err
}

// Restart etcd container
err = c.StopComponent(ctx, consts.ComponentEtcd)
if err != nil {
logger.Error("Failed to stop etcd", err)
// Restart etcd and kube-apiserver
components := []string{
consts.ComponentEtcd,
}
defer func() {
err = c.StartComponent(ctx, consts.ComponentEtcd)
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to start etcd", err)
logger.Error("Failed to stop", err, "component", component)
}
}
defer func() {
components := []string{
consts.ComponentEtcd,
consts.ComponentKubeApiserver,
}
for _, component := range components {
err := c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start", err, "component", component)
}
}

components = []string{
consts.ComponentKwokController,
consts.ComponentKubeControllerManager,
consts.ComponentKubeScheduler,
}
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to stop", err, "component", component)
}
err = c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start", err, "component", component)
}
}
}()
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/kwokctl/runtime/kind/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,11 @@ func (c *Cluster) Stop(ctx context.Context) error {
return nil
}

var importantComponents = map[string]struct{}{
var startImportantComponents = map[string]struct{}{
consts.ComponentEtcd: {},
}

var stopImportantComponents = map[string]struct{}{
consts.ComponentEtcd: {},
consts.ComponentKubeApiserver: {},
}
Expand All @@ -961,7 +965,7 @@ var importantComponents = map[string]struct{}{
func (c *Cluster) StartComponent(ctx context.Context, name string) error {
logger := log.FromContext(ctx)
logger = logger.With("component", name)
if _, important := importantComponents[name]; !important {
if _, important := startImportantComponents[name]; !important {
if !c.IsDryRun() {
if _, exist, err := c.inspectComponent(ctx, name); err != nil {
return err
Expand All @@ -977,6 +981,9 @@ func (c *Cluster) StartComponent(ctx context.Context, name string) error {
if err != nil {
return err
}
if _, important := startImportantComponents[name]; important {
return nil
}
if c.IsDryRun() {
return nil
}
Expand All @@ -987,7 +994,7 @@ func (c *Cluster) StartComponent(ctx context.Context, name string) error {
func (c *Cluster) StopComponent(ctx context.Context, name string) error {
logger := log.FromContext(ctx)
logger = logger.With("component", name)
if _, important := importantComponents[name]; !important {
if _, important := stopImportantComponents[name]; !important {
if !c.IsDryRun() {
if _, exist, err := c.inspectComponent(ctx, name); err != nil {
return err
Expand All @@ -1004,7 +1011,7 @@ func (c *Cluster) StopComponent(ctx context.Context, name string) error {
return err
}
// Once etcd and kube-apiserver are stopped, the cluster will go down
if _, important := importantComponents[name]; important {
if _, important := stopImportantComponents[name]; important {
return nil
}
if c.IsDryRun() {
Expand Down
33 changes: 25 additions & 8 deletions pkg/kwokctl/runtime/kind/cluster_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,38 @@ func (c *Cluster) SnapshotSave(ctx context.Context, path string) error {
// SnapshotRestore restore the snapshot of cluster
func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error {
logger := log.FromContext(ctx)
err := c.StopComponent(ctx, consts.ComponentEtcd)
if err != nil {
logger.Error("Failed to stop etcd", err)
clusterName := c.getClusterName()

components := []string{
consts.ComponentEtcd,
}
for _, component := range components {
err := c.StopComponent(ctx, component)
if err != nil {
logger.Error("Failed to stop", err, "component", component)
}
}
defer func() {
err = c.StartComponent(ctx, consts.ComponentEtcd)
for _, component := range components {
err := c.StartComponent(ctx, component)
if err != nil {
logger.Error("Failed to start", err, "component", component)
}
}

err := c.Stop(ctx)
if err != nil {
logger.Error("Failed to start etcd", err)
logger.Error("Failed to stop", err)
}
err = c.Start(ctx)
if err != nil {
logger.Error("Failed to start", err)
}
}()

// Restore snapshot to host temporary directory
etcdDataTmp := c.GetWorkdirPath(consts.ComponentEtcd)
err = c.Etcdctl(ctx, "snapshot", "restore", path, "--data-dir", etcdDataTmp)
err := c.Etcdctl(ctx, "snapshot", "restore", path, "--data-dir", etcdDataTmp)
if err != nil {
return err
}
Expand All @@ -81,9 +99,8 @@ func (c *Cluster) SnapshotRestore(ctx context.Context, path string) error {
}
}()

kindName := c.getClusterName()
// Copy to kind container from host temporary directory
err = c.Exec(ctx, c.runtime, "cp", etcdDataTmp, kindName+":/var/lib/")
err = c.Exec(ctx, c.runtime, "cp", etcdDataTmp, clusterName+":/var/lib/")
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions site/content/en/docs/generated/kwokctl_create_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ kwokctl create cluster [flags]
--kwok-controller-image string Image of kwok-controller, only for docker/podman/nerdctl/kind/kind-podman runtime
'${KWOK_IMAGE_PREFIX}/kwok:${KWOK_VERSION}'
(default "registry.k8s.io/kwok/kwok:v0.4.0")
--node-lease-duration-seconds uint Duration of node lease in seconds (default 40)
--prometheus-binary string Binary of Prometheus, only for binary runtime (default "https://github.com/prometheus/prometheus/releases/download/v2.44.0/prometheus-2.44.0.linux-amd64.tar.gz#prometheus")
--prometheus-image string Image of Prometheus, only for docker/podman/nerdctl/kind/kind-podman runtime
'${KWOK_PROMETHEUS_IMAGE_PREFIX}/prometheus:${KWOK_PROMETHEUS_VERSION}'
Expand Down
12 changes: 12 additions & 0 deletions test/e2e/kwokctl/binary/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ func TestExec(t *testing.T) {
Feature()
testEnv.Test(t, f0)
}

func TestRestart(t *testing.T) {
f0 := e2e.CaseRestart(kwokctlPath, clusterName).
Feature()
testEnv.Test(t, f0)
}

func TestSnapshot(t *testing.T) {
f0 := e2e.CaseSnapshot(kwokctlPath, clusterName, pwd).
Feature()
testEnv.Test(t, f0)
}
12 changes: 12 additions & 0 deletions test/e2e/kwokctl/docker/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ func TestExec(t *testing.T) {
Feature()
testEnv.Test(t, f0)
}

func TestRestart(t *testing.T) {
f0 := e2e.CaseRestart(kwokctlPath, clusterName).
Feature()
testEnv.Test(t, f0)
}

func TestSnapshot(t *testing.T) {
f0 := e2e.CaseSnapshot(kwokctlPath, clusterName, pwd).
Feature()
testEnv.Test(t, f0)
}
12 changes: 12 additions & 0 deletions test/e2e/kwokctl/kind-podman/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ func TestExec(t *testing.T) {
Feature()
testEnv.Test(t, f0)
}

func TestRestart(t *testing.T) {
f0 := e2e.CaseRestart(kwokctlPath, clusterName).
Feature()
testEnv.Test(t, f0)
}

func TestSnapshot(t *testing.T) {
f0 := e2e.CaseSnapshot(kwokctlPath, clusterName, pwd).
Feature()
testEnv.Test(t, f0)
}
12 changes: 12 additions & 0 deletions test/e2e/kwokctl/kind/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ func TestExec(t *testing.T) {
Feature()
testEnv.Test(t, f0)
}

func TestRestart(t *testing.T) {
f0 := e2e.CaseRestart(kwokctlPath, clusterName).
Feature()
testEnv.Test(t, f0)
}

func TestSnapshot(t *testing.T) {
f0 := e2e.CaseSnapshot(kwokctlPath, clusterName, pwd).
Feature()
testEnv.Test(t, f0)
}
Loading

0 comments on commit 40eb177

Please sign in to comment.