Skip to content

jxlwqq/cnat-operator

Repository files navigation

cnat-operator

cnat-operator 是书籍《Programming Kubernetes》的经典示例,此书著于2019年,Operator-SDK 版本迭代较快,随书的代码仓库 https://github.com/programming-kubernetes/cnat 已基本不可用。所以本示例基于最新版本的 Operator-SDK,做了大量的修改和调整。

前置条件

  • 安装 Docker Desktop,并启动内置的 Kubernetes 集群
  • 注册一个 hub.docker.com 账户,需要将本地构建好的镜像推送至公开仓库中
  • 安装 operator SDK CLI: brew install operator-sdk
  • 安装 Go: brew install go

本示例推荐的依赖版本:

  • Docker Desktop: >= 4.0.0
  • Kubernetes: >= 1.21.4
  • Operator-SDK: >= 1.11.0
  • Go: >= 1.17

jxlwqq 为笔者的 ID,命令行和代码中涉及的个人 ID,均需要替换为读者自己的,包括

  • --domain=
  • --repo=
  • //+kubebuilder:rbac:groups=
  • IMAGE_TAG_BASE ?=

创建项目

使用 Operator SDK CLI 创建名为 cnat-operator 的项目。

mkdir -p $HOME/projects/cnat-operator
cd $HOME/projects/cnat-operator
go env -w GOPROXY=https://goproxy.cn,direct

operator-sdk init \
--domain=programming-kubernetes.info \
--repo=github.com/jxlwqq/cnat-operator \
--skip-go-version-check

创建 API 和控制器

使用 Operator SDK CLI 创建自定义资源定义(CRD)API 和控制器。

运行以下命令创建带有组 cnat、版本 v1alpha1 和种类 At 的 API:

operator-sdk create api \
--resource=true \
--controller=true \
--group=cnat \
--version=v1alpha1 \
--kind=At

定义 At 自定义资源(CR)的 API。

修改 api/v1alpha1/at.go 中的 Go 类型定义,使其具有以下 spec 和 status

const (
	PhasePending = "PENDING"
	PhaseRunning = "RUNNING"
	PhaseDone    = "DONE"
)

type AtSpec struct {
	Schedule string `json:"schedule,omitempty"`
	Command string `json:"command,omitempty"`
}

type AtStatus struct {
	Phase string `json:"phase,omitempty"`
}

为资源类型更新生成的代码:

make generate

运行以下命令以生成和更新 CRD 清单:

make manifests

实现控制器

在本例中,将生成的控制器文件 controllers/at_controller.go 替换为以下示例实现:

/*
Copyright 2021.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
	"context"
	"fmt"
	cnatv1alpha1 "github.com/jxlwqq/cnat-operator/api/v1alpha1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	ctrl "sigs.k8s.io/controller-runtime"
	"strings"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// AtReconciler reconciles a At object
type AtReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=cnat.programming-kubernetes.info,resources=ats,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=cnat.programming-kubernetes.info,resources=ats/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=cnat.programming-kubernetes.info,resources=ats/finalizers,verbs=update
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the At object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *AtReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := ctrllog.FromContext(ctx)
	reqLogger := log.WithValues("namespace", req.Namespace, "at", req.Name)
	reqLogger.Info("=== Reconciling At")
	// Fetch the At instance
	instance := &cnatv1alpha1.At{}
	err := r.Get(context.TODO(), req.NamespacedName, instance)
	if err != nil {
		if errors.IsNotFound(err) {
			// Request object not found, could have been deleted after reconcile request - return and don't requeue:
			return ctrl.Result{}, nil
		}
		// Error reading the object - requeue the request:
		return ctrl.Result{}, err
	}

	// If no phase set, default to pending (the initial phase):
	if instance.Status.Phase == "" {
		instance.Status.Phase = cnatv1alpha1.PhasePending
	}

	// Now let's make the main case distinction: implementing
	// the state diagram PENDING -> RUNNING -> DONE
	switch instance.Status.Phase {
	case cnatv1alpha1.PhasePending:
		reqLogger.Info("Phase: PENDING")
		// As long as we haven't executed the command yet, we need to check if it's time already to act:
		reqLogger.Info("Checking schedule", "Target", instance.Spec.Schedule)
		// Check if it's already time to execute the command with a tolerance of 2 seconds:
		d, err := timeUntilSchedule(instance.Spec.Schedule)
		if err != nil {
			reqLogger.Error(err, "Schedule parsing failure")
			// Error reading the schedule. Wait until it is fixed.
			return ctrl.Result{}, err
		}
		reqLogger.Info("Schedule parsing done", "diff", fmt.Sprintf("%v", d))
		if d > 0 {
			// Not yet time to execute the command, wait until the scheduled time
			return reconcile.Result{RequeueAfter: d}, nil
		}
		reqLogger.Info("It's time!", "Ready to execute", instance.Spec.Command)
		instance.Status.Phase = cnatv1alpha1.PhaseRunning
	case cnatv1alpha1.PhaseRunning:
		reqLogger.Info("Phase: RUNNING")
		pod := newPodForCR(instance)
		// Set At instance as the owner and controller
		if err := controllerutil.SetControllerReference(instance, pod, r.Scheme); err != nil {
			// requeue with error
			return ctrl.Result{}, err
		}
		found := &corev1.Pod{}
		err = r.Get(context.TODO(), types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, found)
		// Try to see if the pod already exists and if not
		// (which we expect) then create a one-shot pod as per spec:
		if err != nil && errors.IsNotFound(err) {
			err = r.Create(context.TODO(), pod)
			if err != nil {
				// requeue with error
				return ctrl.Result{}, err
			}
			reqLogger.Info("Pod launched", "name", pod.Name)
		} else if err != nil {
			// requeue with error
			return ctrl.Result{}, err
		} else if found.Status.Phase == corev1.PodFailed || found.Status.Phase == corev1.PodSucceeded {
			reqLogger.Info("Container terminated", "reason", found.Status.Reason, "message", found.Status.Message)
			instance.Status.Phase = cnatv1alpha1.PhaseDone
		} else {
			// don't requeue because it will happen automatically when the pod status changes
			return ctrl.Result{}, nil
		}
	case cnatv1alpha1.PhaseDone:
		reqLogger.Info("Phase: DONE")
		return ctrl.Result{}, nil
	default:
		reqLogger.Info("NOP")
		return ctrl.Result{}, nil
	}

	// Update the At instance, setting the status to the respective phase:
	err = r.Status().Update(context.TODO(), instance)
	if err != nil {
		return ctrl.Result{}, err
	}

	// Don't requeue. We should be reconcile because either the pod or the CR changes.

	return ctrl.Result{}, nil
}

// newPodForCR returns a busybox pod with the same name/namespace as the cr
func newPodForCR(cr *cnatv1alpha1.At) *corev1.Pod {
	labels := map[string]string{
		"app": cr.Name,
	}
	return &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      cr.Name + "-pod",
			Namespace: cr.Namespace,
			Labels:    labels,
		},
		Spec: corev1.PodSpec{
			Containers: []corev1.Container{
				{
					Name:    "busybox",
					Image:   "busybox",
					Command: strings.Split(cr.Spec.Command, " "),
				},
			},
			RestartPolicy: corev1.RestartPolicyOnFailure,
		},
	}
}

// timeUntilSchedule parses the schedule string and returns the time until the schedule.
// When it is overdue, the duration is negative.
func timeUntilSchedule(schedule string) (time.Duration, error) {
	now := time.Now().UTC()
	layout := "2006-01-02T15:04:05Z"
	s, err := time.Parse(layout, schedule)
	if err != nil {
		return time.Duration(0), err
	}
	return s.Sub(now), nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AtReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&cnatv1alpha1.At{}).
		Owns(&corev1.Pod{}).
		Complete(r)
}

运行以下命令以生成和更新 CRD 清单:

make manifests

运行 Operator

捆绑 Operator,并使用 Operator Lifecycle Manager(OLM)在集群中部署。

修改 Makefile 中 IMAGE_TAG_BASE 和 IMG:

IMAGE_TAG_BASE ?= docker.io/jxlwqq/cnat-operator
IMG ?= $(IMAGE_TAG_BASE):latest

构建镜像:

make docker-build

将镜像推送到镜像仓库:

make docker-push

成功后访问:https://hub.docker.com/r/jxlwqq/cnat-operator

运行 make bundle 命令创建 Operator 捆绑包清单,并依次填入名称、作者等必要信息:

make bundle

构建捆绑包镜像:

make bundle-build

推送捆绑包镜像:

make bundle-push

成功后访问:https://hub.docker.com/r/jxlwqq/cnat-operator-bundle

使用 Operator Lifecycle Manager 部署 Operator:

# 切换至本地集群
kubectl config use-context docker-desktop
# 安装 olm
operator-sdk olm install
# 使用 Operator SDK 中的 OLM 集成在集群中运行 Operator
operator-sdk run bundle docker.io/jxlwqq/cnat-operator-bundle:v0.0.1

创建自定义资源

使用下面这个命令,获取标准时区时间戳:

TZ=UTC date +%Y-%m-%dT%H:%M:%SZ

编辑 config/samples/cnat_v1alpha1_at.yaml 上的 At CR 清单示例,使其包含以下规格:

apiVersion: cnat.programming-kubernetes.info/v1alpha1
kind: At
metadata:
  name: at-sample
spec:
  schedule: "2021-09-09T07:05:59Z"
  command: "echo YAY"

创建 CR:

kubectl apply -f config/samples/cnat_v1alpha1_at.yaml

查看 Pod 返回:

NAME            READY   STATUS      RESTARTS   AGE
at-sample-pod   0/1     Completed   0          14s

查看 Log:

kubectl logs at-sample-pod

返回:YAY

做好清理

operator-sdk cleanup cnat-operator
operator-sdk olm uninstall

更多

更多经典示例请参考:https://github.com/jxlwqq/kubernetes-examples