diff --git a/pkg/runtime/kubernetes/kubeadm.go b/pkg/runtime/kubernetes/kubeadm.go index 5f054c53715..3287d251113 100644 --- a/pkg/runtime/kubernetes/kubeadm.go +++ b/pkg/runtime/kubernetes/kubeadm.go @@ -64,6 +64,10 @@ func (k *KubeadmRuntime) setAPIVersion(apiVersion string) { k.kubeadmConfig.SetAPIVersion(apiVersion) } +func (k *KubeadmRuntime) setInitConfigurationPullPolicy(policy v1.PullPolicy) { + k.kubeadmConfig.InitConfiguration.NodeRegistration.ImagePullPolicy = policy +} + // GetterKubeadmAPIVersion is covert version to kubeadmAPIServerVersion // The support matrix will look something like this now and in the future: // v1.22: v1beta2 read-only, writes only v1beta3 Config. Errors if the user tries to use v1beta1 and older diff --git a/pkg/runtime/kubernetes/master.go b/pkg/runtime/kubernetes/master.go index 70413c8f6aa..a05e2538b88 100644 --- a/pkg/runtime/kubernetes/master.go +++ b/pkg/runtime/kubernetes/master.go @@ -30,7 +30,7 @@ import ( func (k *KubeadmRuntime) InitMaster0() error { logger.Info("start to init master0...") master0 := k.getMaster0IPAndPort() - if err := k.imagePull(master0); err != nil { + if err := k.imagePull(master0, ""); err != nil { return err } cmdInit := k.Command(InitMaster) @@ -44,8 +44,11 @@ func (k *KubeadmRuntime) InitMaster0() error { return k.copyMasterKubeConfig(master0) } -func (k *KubeadmRuntime) imagePull(hostAndPort string) error { - imagePull := fmt.Sprintf("kubeadm config images pull --cri-socket unix://%s --kubernetes-version %s %s", k.cluster.GetImageEndpoint(), k.getKubeVersion(), vlogToStr(k.klogLevel)) +func (k *KubeadmRuntime) imagePull(hostAndPort, version string) error { + if version == "" { + version = k.getKubeVersion() + } + imagePull := fmt.Sprintf("kubeadm config images pull --cri-socket unix://%s --kubernetes-version %s %s", k.cluster.GetImageEndpoint(), version, vlogToStr(k.klogLevel)) err := k.sshCmdAsync(hostAndPort, imagePull) if err != nil { return fmt.Errorf("master pull image failed, error: %s", err.Error()) @@ -123,7 +126,7 @@ func (k *KubeadmRuntime) joinMasters(masters []string) error { } for _, master := range masters { logger.Info("start to join %s as master", master) - if err = k.imagePull(master); err != nil { + if err = k.imagePull(master, ""); err != nil { return err } logger.Debug("start to generate cert for master %s", master) diff --git a/pkg/runtime/kubernetes/upgrade.go b/pkg/runtime/kubernetes/upgrade.go index 663cfb2e00d..80431ba0808 100644 --- a/pkg/runtime/kubernetes/upgrade.go +++ b/pkg/runtime/kubernetes/upgrade.go @@ -17,10 +17,12 @@ package kubernetes import ( "context" "fmt" + "path" "strings" "time" "github.com/Masterminds/semver/v3" + v1 "k8s.io/api/core/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" @@ -31,8 +33,8 @@ import ( ) const ( - upgradeApplyCmd = "kubeadm upgrade apply --yes %s" - upradeNodeCmd = "kubeadm upgrade node" + upgradeApplyCmd = "kubeadm upgrade apply --config %s --yes" + upradeNodeCmd = "kubeadm upgrade node --skip-phases preflight" //drainNodeCmd = "kubectl drain %s --ignore-daemonsets" cordonNodeCmd = "kubectl cordon %s" uncordonNodeCmd = "kubectl uncordon %s" @@ -42,16 +44,21 @@ const ( installKubeadmCmd = "cp -rf %s/kubeadm /usr/bin" installKubeletCmd = "cp -rf %s/kubelet /usr/bin" installKubectlCmd = "cp -rf %s/kubectl /usr/bin" + + writeKubeadmConfig = `cat > %s << EOF +%s +EOF` ) func (k *KubeadmRuntime) upgradeCluster(version string) error { logger.Info("Change ClusterConfiguration up to newVersion if need.") - if err := k.autoUpdateConfig(version); err != nil { + conversion, err := k.autoUpdateConfig(version) + if err != nil { return err } //upgrade master0 logger.Info("start to upgrade master0") - err := k.upgradeMaster0(version) + err = k.upgradeMaster0(conversion, version) if err != nil { return err } @@ -67,7 +74,7 @@ func (k *KubeadmRuntime) upgradeCluster(version string) error { return k.upgradeOtherNodes(upgradeNodes, version) } -func (k *KubeadmRuntime) upgradeMaster0(version string) error { +func (k *KubeadmRuntime) upgradeMaster0(conversion *types.ConvertedKubeadmConfig, version string) error { master0ip := k.getMaster0IP() sver := semver.MustParse(version) if gte(sver, V1260) { @@ -86,11 +93,29 @@ func (k *KubeadmRuntime) upgradeMaster0(version string) error { if err = k.pingAPIServer(); err != nil { return err } + + // force cri to pull the image + err = k.imagePull(master0ip, version) + if err != nil { + logger.Warn("image pull pre-upgrade failed: %s", err.Error()) + } + + config, err := yaml.MarshalConfigs(&conversion.InitConfiguration, &conversion.ClusterConfiguration) + if err != nil { + logger.Error("kubeadm config marshal failed: %s", err.Error()) + return err + } + + upgradeConfigName := "kubeadm-upgrade.yaml" + upgradeConfigPath := path.Join(k.pathResolver.EtcPath(), upgradeConfigName) + err = k.sshCmdAsync(master0ip, //install kubeadm:{version} at master0 fmt.Sprintf(installKubeadmCmd, kubeBinaryPath), + // write kubeadm config to file + fmt.Sprintf(writeKubeadmConfig, upgradeConfigPath, string(config)), //execute kubeadm upgrade apply {version} at master0 - fmt.Sprintf(upgradeApplyCmd, version), + fmt.Sprintf(upgradeApplyCmd, upgradeConfigPath), //kubectl cordon fmt.Sprintf(cordonNodeCmd, master0Name), //install kubelet:{version},kubectl{version} at master0 @@ -125,6 +150,13 @@ func (k *KubeadmRuntime) upgradeOtherNodes(ips []string, version string) error { if err = k.pingAPIServer(); err != nil { return err } + + // force cri to pull the image + err = k.imagePull(ip, version) + if err != nil { + logger.Error("image pull pre-upgrade failed: %s", err.Error()) + } + logger.Info("upgrade node %s", nodename) err = k.sshCmdAsync(ip, //install kubeadm:{version} at the node @@ -150,19 +182,19 @@ func (k *KubeadmRuntime) upgradeOtherNodes(ips []string, version string) error { return nil } -func (k *KubeadmRuntime) autoUpdateConfig(version string) error { +func (k *KubeadmRuntime) autoUpdateConfig(version string) (*types.ConvertedKubeadmConfig, error) { exp, err := k.getKubeExpansion() if err != nil { - return err + return nil, err } ctx := context.Background() clusterCfg, err := exp.FetchKubeadmConfig(ctx) if err != nil { - return err + return nil, err } kubeletCfg, err := exp.FetchKubeletConfig(ctx) if err != nil { - return err + return nil, err } logger.Debug("get cluster configmap data:\n%s", clusterCfg) logger.Debug("get kubelet configmap data:\n%s", kubeletCfg) @@ -170,7 +202,7 @@ func (k *KubeadmRuntime) autoUpdateConfig(version string) error { defaultKubeadmConfig, err := types.LoadKubeadmConfigs(allConfig, false, decode.CRDFromString) if err != nil { logger.Error("failed to decode cluster kubeadm config: %s", err) - return err + return nil, err } defaultKubeadmConfig.InitConfiguration = kubeadm.InitConfiguration{ TypeMeta: metaV1.TypeMeta{ @@ -183,36 +215,37 @@ func (k *KubeadmRuntime) autoUpdateConfig(version string) error { } kk.setKubeVersion(version) kk.setFeatureGatesConfiguration() + kk.setInitConfigurationPullPolicy(v1.PullNever) conversion, err := kk.kubeadmConfig.ToConvertedKubeadmConfig() if err != nil { - return err + return nil, err } newClusterData, err := yaml.MarshalConfigs(&conversion.ClusterConfiguration) if err != nil { logger.Error("failed to encode ClusterConfiguration: %s", err) - return err + return nil, err } logger.Debug("update cluster config:\n%s", string(newClusterData)) err = exp.UpdateKubeadmConfig(ctx, string(newClusterData)) if err != nil { logger.Error("failed to update kubeadm-config with k8s-client: %s", err) - return err + return nil, err } newKubeletData, err := yaml.MarshalConfigs(&conversion.KubeletConfiguration) if err != nil { logger.Error("failed to encode KubeletConfiguration: %s", err) - return err + return nil, err } logger.Debug("update kubelet config:\n%s", string(newKubeletData)) err = exp.UpdateKubeletConfig(ctx, string(newKubeletData)) if err != nil { logger.Error("failed to update kubelet-config with k8s-client: %s", err) - return err + return nil, err } - return nil + return conversion, nil } func (k *KubeadmRuntime) pingAPIServer() error {