4

Kubernetes-Operator:扩展Kubernetes API Resource与Custom Controller (上)

 2 years ago
source link: https://chinalhr.github.io/post/kubernetes-crd-operator-1/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文介绍了如何对Kubernetes 核心组件、扩展机制与API Resource设计概念,以及如何使用定制资源(Custom Resource)与定制控制器(Custom Controller)实现对Kubernetes API Resource的扩展

Kubernetes架构设计

kubernetes核心组件

Kubernetes系统架构整体采用的是C/S 的架构,其中Master节点作为 Server,各Worker 节点作为 Client。

  • Master Node:
    • kube-apiserver:提供了资源操作的唯一入口,提供REST API接口,并提供认证、授权、访问控制、API 注册和发现等机制;常见的与kube-apiserver进行交互可以通过kubectl 、client-go
    • kube-scheduler:负责kubernetes内资源的调度,按照调度策略将Pod调度到相应的Node上
    • kube-controller-manager:控制器管理器,提供了一些内置的Controller,自动化地管理集群状态,包括了资源状态,节点状态等;核心功能是确保集群始终处于预期状态,控制Kubernetes中的资源它们向 spec 配置的期望状态进行收敛
    • etcd:持久化集群状态、元数据、集群资源对象
    • kube-proxy:负责 Kubernetes 中 Service 的服务发现和负载均衡功能实现
  • Worker Node:
    • kubelet:负责管理Worker Node上Pod资源的生命周期,kubelet相当于一个代理执行器,接收到kube-apiserver的请求后执行Pod的管理逻辑,定期监控资源的使用状态上报kube-apiserver,以及如下接口的管理
      • CRI(Container Runtime Interface):容器运行时接口,提供计算资源
      • CNI(Container Network Interface):容器网络接口,提供网络资源
      • CSI(Container Storage Interface):容器存储接口,提供存储资源
    • Container Runtime: 负责镜像管理及 Pod 和容器运行时接口实现(CRI)

image

kubernetes扩展性

参考:https://kubernetes.io/zh/docs/concepts/extend-kubernetes/

Kubernetes的架构设计是高度可配置、可扩展的,这里主要关注Kubernetes自身的扩展性,即扩充 Kubernetes 的能力并深度集成软件组件。

扩展模式:

  • 控制器模式:编写Kubernetes 的客户端程序的一种特定模式,控制器通常读取一个对象的 spec 字段,可能做出一些处理,然后更新对象的 status 字段。
  • Webhook模式:Kubernetes作为客户端调用远程服务的模式。
  • Binary Plugin模式:Kubernetes作为客户端执行一个二进制插件程序。

image

扩展kubernetes的核心组件:

  • kube-controller-manager与API Resource扩展:通过使用crd与custom controller、operator framework实现,本文主要讲解这方面的扩展实现。
  • kube-apiserver扩展:通过使用API Aggregation layer在不修改 Kubernetes 核心代码的同时扩展 Kubernetes api-server,即将第三方服务注册成Kubernetes api-server提供服务。
  • kube-scheduler扩展:通过Kubernetes Scheduling Framework扩展调度机制。

扩展Kubernetes API Resource相关概念

资源(Resource): Resource是 Kubernetes API中的一个端点,用于存储某个类别的API对象的一个集合;如YAML中的kind:Pod、CronJob…

定制资源(Custom Resource): 自定义 API 资源,Custom Resource是对 Kubernetes API 的扩展,定制资源所代表的是对特定Kubernetes安装的一种定制。

定制控制器(Custom Controller): Custom Resource本身只能用来存取结构化的数据,需要将Custom Resource与Custom Controller结合,Custom Controller负责监控Customer Resource的变化(创建、删除…)并执行具体的动作。

CRD(CustomResourceDefinitions): Custom Resource的定义,Kubernetes CustomResourceDefinition API资源允许自定义Custom Resource。 定义CRD对象的操作会使用你所设定的名字和模式定义(Schema)创建一个新的Custom Resource,Kubernetes API 会为Custom Resource提供存储和访问服务。

简单的说,可以通过CRD定制自定义API资源即Custom Resource,动态注册到Kubernetes集群中;注册完成后用户可以通过 kubectl 来创建访问自定义API资源对象,类似于操作 Pod 一样。CRD 仅仅只是做资源的定义,需要配合控制器即Custom Controller 去监听Custom Resource的事件触发执行对应的处理逻辑。

Kubernetes 的 Resource 设计概念

通过上文的介绍不难看出,Kubernetes是一个以资源为中心容器编排平台,核心组件kube-api-server通过REST API暴漏资源操作接口、kube-controller-manager控制管理资源的状态、kube-scheduler进行资源的调度。

Group/Version/Resource

kubernetes在资源的概念上进行了分组与版本化,,一个 API对象在Etcd 里的完整资源路径,是由:Group(API 组)、Version(API 版本)和 Resource(API 资源类型)三个部分组成,如下图所示。 image

资源间的关系如下:

  • Kubernetes支持多个 Group(资源组)
  • 每个Group支持多个Version(资源版本)
  • 每个Version支持多种Resource(资源),部分资源还拥有自己的子资源
  • Kind 与 Resource 属于同一级概念,Kind 用于描述 Resource 的种类,一般情况下Kind与Resource是一一对应的关系,例如pods Resource 对应于 Pod Kind

定位资源的形式如下:

<GROUP>/<VERSION>/<RESOURCE>[/<SUBSOURCE>]

# 以Deployment为例子
apps/v1/deployments/status

资源对象(资源描述)即Resource Object描述如下:

<GROUP>/<VERSION>, Kind=<RESOURCE_NAME>

# 以Deployment为例子
apps/v1, Kind=Deployment

Group

  • 资源组的划分依据是资源的功能,Kubernetes支持不同资源组中拥有不同资源版本,方便组内资源迭代升级
  • 对于 Kubernetes 里的核心 API 对象(如:Pod…)是无组名Group(即:Group 是“”),在 /api 这个层级下;对于Kubernetes里的非核心 API 对象(如:CronJob…)是有组名Group,在 /apis 这个层级下
有组名 Group 资源: .../apis/<GROUP>/<VERSION>/<RESOURCE>
无组名 Group 资源: .../api/<VERSION>/<RESOURCE>

Version

Kubernetes 的资源版本 Version 采用语义化的版本号

  • Alpha 阶段:内部测试版本,Alpha 版本中的功能默认情况下会被禁用,常见命名方式如 v1alpha1。
  • Beta 阶段:相对稳定版本,经过了官方和社区的测试,Beta 阶段下的功能默认是开启的,常见命名方式如 v2beta1。
  • Stable 阶段:正式发布版本,命名方式如 v1、v2 。

Resouce

Resource 是 Kubernetes 中的核心概念

  • Resource 实例化后称为一个 Resource Object。
  • Kubernetes 中所有的 Resource Object 都称为 Entity。
  • 可以通过 Kubenetes API Server 去操作 Resource Object。

Kubernetes 目前的 Entity 分为两大类:

  • Persistent Entity:持久化实体,Resource Object 创建后会持久存在,如 Deployment / Service。
  • Ephemeral Entity: 短暂实体,Resource Object 创建后不稳定,如出现故障/调度失败后不再重建,如Pod。

资源操作方法

Kubernetes资源YAML文件提交给kube-apiserver后,会被转换为Resouce Object,序列化后持久化到Etcd中;对资源的操作方法主要有如下8种:

  • create:Resource Object 创建

  • delete:Resource Object 删除

  • deletecollection:多个 Resource Objects 删除

  • patch:Resource Object 局部字段更新

  • update:Resource Object 整体更新

  • get:Resource Object 获取

  • list:多个 Resource Objects 获取

  • watch:Resource Objects 监控

如何创建CRD

参考文档:

https://kubernetes.io/zh/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  # 名字必需与下面的 spec 字段匹配,并且格式为 '<名称的复数形式>.<组名>'
  name: crontabs.stable.example.com
spec:
  # 组名称,用于 REST API: /apis/<组>/<版本>
  group: stable.example.com
  # 列举此 CustomResourceDefinition 所支持的版本
  versions:
    - name: v1
      # 每个版本都可以通过 served 标志来独立启用或禁止
      served: true
      # 其中一个且只有一个版本必需被标记为存储版本
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                cronSpec:
                  type: string
                image:
                  type: string
                replicas:
                  type: integer
  # 可以是 Namespaced 或 Cluster
  scope: Namespaced
  names:
    # 名称的复数形式,用于 URL:/apis/<组>/<版本>/<名称的复数形式>
    plural: crontabs
    # 名称的单数形式,作为命令行使用时和显示时的别名
    singular: crontab
    # kind 通常是单数形式的帕斯卡编码(PascalCased)形式。你的资源清单会使用这一形式。
    kind: CronTab
    # shortNames 允许你在命令行使用较短的字符串来匹配资源
    shortNames:
    - ct

如上CRD所示, 指定group为stable.example.com,version为v1,CustomResource为CronTab,scope为Namespaced(CronTab属于Namespace的对象),然后需要设置CustomResource的对象描述,包括:Spec、Status …;

通过以下命令创建CRD后,会创建一个新的 namespace 级别的 RESTful API 就会被创建:/apis/stable.example.com/v1/namespaces/*/crontabs/...,用以创建和管理CustomResource CronTab。在创建CRD时,Kubernetes 会对我们提交的声明文件进行校验(基于 OpenAPI v3 schem 进行规范)。如果想要更加复杂的校验,需要通过 Kubernetes 的 admission webhook 进行实现。

kubectl apply -f crd.yaml

创建完CRD后,可以通过如下方式定义一个CronTab资源对象

apiVersion: "stable.example.com/v1"
kind: CronTab
metadata:
  name: my-cron-object
spec:
  cronSpec: "* * * * */5"
  image: my-image

创建完CronTab资源对象后,就可以通过kubectl来管理自定义资源对象

$ kubectl get crontab
NAME          AGE
my-cron-tab   93s
$ kubectl get crontab -o yaml
...

如何创建Custom Controller(sample-controller示例)

kubernetes官方Custom Controller示例:https://github.com/kubernetes/sample-controller

示例演示了:

  • 如何使用 CustomResourceDefinition注册类型的新自定义资源(自定义资源类型)Foo

  • 如何创建/获取/列出新资源类型的实例Foo

  • 如何基于控制器处理资源Foo创建/更新/删除事件

示例控制器基于client-go 库 进行Controller开发;该项目的tools/cache目录下包含了开发Custom Controller 使用的各种工具、机制。

client-go client对象

  • RESTClient:client-go 中最基础的客户端,其它 client 都基于 RESTClient 实现,RESTClient 实现了 RESTful 风格的 API 请求封装,可以实现对任意 Kubernetes 资源(包括内置资源及 CRDs)的 RESTful 风格交互,如 Post() / Delete() / Put() / Get(),同时支持 Json 和 protobuf;
  • ClientSet:与 Kubernetes 内置资源对象交互最常用的 Client,强调,只能处理 Kubernetes 内置资源,不包括 CRD 自定义资源,使用时需要指定 Group、指定 Version,然后根据 Resource 获取。ClientSet 的操作代码是通过 client-gen 代码生成器自动生成的;
  • DynamicClient:DynamicClient 能处理包括 CRD 自定义资源在内的任意 kubernetes 资源。如果一个 Controller 中需要控制所有的 API,可以使用Dynamic Client,DynamicClient 只支持JSON;
  • DiscoveryClient:用于发现 kube-apiserver 支持的 Group / Version / Resource 信息;

client-go 组件工作流程与以及与Custom Controller的交互点

image

client-go组件:

  • Reflector:通过 Kubernetes API 监控 Kubernetes 的资源类型,通过List/Watch 机制, 获取&监听资源对象实例的变化,添加 object 对象到 FIFO 队列,后续Informer 会从队列中进行数据获取。
  • Informer:controller 机制的基础,控制循环(Control Loop)处理,从队列中取出数据,添加到 Indexer 进行数据缓存,提供对象监听事件回调处理的 handler 接口,通过 给Informer 添加 ResourceEventHandler 实例的回调函数,通过实现OnAdd(obj interface{})、 OnUpdate(oldObj, newObj interface{}) 和 OnDelete(obj interface{}) 方法处理资源的创建、更新和删除操作。
  • Indexer:提供 object 对象的索引,缓存对象信息,indexer是线程安全的存储。

Custom Controller组件:

  • Informer与Indexer的reference,通过client-go 提供的NewIndexerInformer函数进行创建。
  • Resource Event Handlers:Informer在要将object 对象传递给Custom Controller 时将调用的回调函数,Resource Event Handlers 被回调后会将Object Key写入到Work queue中。
  • Process Item:从Work queue中取出Object key(事件通知) 进行后续处理。

simaple-controller核心逻辑

└── sample-controller
    ├── artifacts 						# yaml示例,如crd.yaml、example-foo.yaml
    │   └── examples
    ├── code-of-conduct.md
    ├── CONTRIBUTING.md
    ├── controller.go					# custom controller实现,核心逻辑
    ├── controller_test.go
    ├── docs
    │   ├── controller-client-go.md
    │   └── images
    ├── go.mod
    ├── go.sum
    ├── hack							# code generation 工具类
    │   ├── boilerplate.go.txt
    │   ├── custom-boilerplate.go.txt
    │   ├── tools.go
    │   ├── update-codegen.sh
    │   └── verify-codegen.sh
    ├── LICENSE
    ├── main.go							# 启动函数,参数配置与初始化逻辑
    ├── OWNERS
    ├── pkg								# 资源定义文件与自动生成的代码
    │   ├── apis
    │   ├── generated
    │   └── signals
    ├── README.md
    └── SECURITY_CONTACTS

此项目利用k8s.io/code-generator中的生成器 来生成typed client、informers、listers 和deep-copy functions。自动生成了如下文件与目录

pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go

pkg/generated/

  • 核心逻辑-main.go(pseudocode)
func main() {
	...
	// 创建clientset,kubeClient(操作除自定义资源组外的其他资源)、exampleClient(操作自定义资源组)
	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
	if err != nil {
		klog.Fatalf("Error building kubeconfig: %s", err.Error())
	}

	kubeClient, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("error building kubernetes clientset: %s", err.Error())
	}

	exampleClient, err := clientset.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("Error building example clientset: %s", err.Error())
	}
	// 创建Informer
	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

	// 创建 controller,传入 clientset 和 informer
	controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

	// 运行 Informer,Start方法非阻塞,运行在单独的 goroutine 中
	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)
	
	//运行Custom Controller
	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}
}
  • 核心逻辑controller.go (pseudocode)
/*
*** main.go
*/
// 创建 clientset
kubeClient, err := kubernetes.NewForConfig(cfg)		// k8s clientset, "k8s.io/client-go/kubernetes"
exampleClient, err := clientset.NewForConfig(cfg)	// sample clientset, "k8s.io/sample-controller/pkg/generated/clientset/versioned"

// 创建 Informer
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)		// k8s informer, "k8s.io/client-go/informers"
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)		// sample informer, "k8s.io/sample-controller/pkg/generated/informers/externalversions"

// 创建 controller,传入 clientset 和 informer
controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

// 运行 Informer,Start 方法为非阻塞,会运行在单独的 goroutine 中
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)

// 运行 controller
controller.Run(2, stopCh)

/*
*** controller.go 
*/
NewController() *Controller {}
	// 将 CRD 资源类型定义加入到 Kubernetes 的 Scheme 中,以便 Events 可以记录 CRD 的事件
	utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))

	//创建 Event Broadcaster
	eventBroadcaster := record.NewBroadcaster()
	// ... ...

// 监听 CRD 类型Foo变化并注册 ResourceEventHandler方法,当Foo的实例变化时获取Foo资源并将其转换为 namespace/name字符(Key),然后将其放入工作队列中。
	fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueFoo,
		UpdateFunc: func(old, new interface{}) {
			controller.enqueueFoo(new)
		},
	})

	// 监听Deployment变化并注册ResourceEventHandler方法,
	// 当它的 ownerReferences 为 Foo 类型实例时,将该Foo资源加入 work queue
	deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.handleObject,
		UpdateFunc: func(old, new interface{}) {
			newDepl := new.(*appsv1.Deployment)
			oldDepl := old.(*appsv1.Deployment)
			if newDepl.ResourceVersion == oldDepl.ResourceVersion {
				return
			}
			controller.handleObject(new)
		},
		DeleteFunc: controller.handleObject,
	})

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {}
	// 在启动 worker 前等待缓存同步
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}
	// 运行两个 worker 来处理资源
	for i := 0; i < workers; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
	// 无限循环,不断的调用 processNextWorkItem 处理下一个对象
	func (c *Controller) runWorker() {
		for c.processNextWorkItem() {
		}
	}
	// 从workqueue中获取下一个对象并进行处理,通过调用 syncHandler
	func (c *Controller) processNextWorkItem() bool {
        //从workqueue获取obj
		obj, shutdown := c.workqueue.Get()
		if shutdown {
			return false
		}
		err := func(obj interface{}) error {
			// 调用 workqueue.Done(obj) 方法告诉 workqueue 当前项已经处理完毕,
			// 如果我们不想让当前项重新入队,一定要调用 workqueue.Forget(obj)。
			// 当我们没有调用Forget时,当前项会重新入队 workqueue 并在一段时间后重新被获取。
			defer c.workqueue.Done(obj)
			var key string
			var ok bool
			// 格式校验,我们期望的是 key 'namespace/name' 格式的 string
			if key, ok = obj.(string); !ok {
				// 无效的项调用Forget方法,避免重新入队。
				c.workqueue.Forget(obj)
				utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
				return nil
			}
            //运行 syncHandler,传递Foo资源的namespace/name字符串
			if err := c.syncHandler(key); err != nil {
				// 放回workqueue避免偶发的异常
				c.workqueue.AddRateLimited(key)
				return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
			}
            
			// 如果没有异常,Forget当前项,同步成功
			c.workqueue.Forget(obj)
			klog.Infof("Successfully synced '%s'", key)
			return nil
		}(obj)
        
		if err != nil {
			utilruntime.HandleError(err)
			return true
		}

		return true
	}
	// 将实际状态与期望的状态进行比较,然后尝试将两者收敛,然后它用资源的当前状态更新Foo资源的Status块。
	func (c *Controller) syncHandler(key string) error {
		// 通过 workqueue 中的 key 解析出 namespace 和 name
		namespace, name, err := cache.SplitMetaNamespaceKey(key)
		// 调用 lister 接口通过 namespace 和 name 获取 Foo 实例
		foo, err := c.foosLister.Foos(namespace).Get(name)
		deploymentName := foo.Spec.DeploymentName
		// 获取 Foo 实例中定义的 deploymentname
		deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
		//如果没有发现对应的 deployment,创建一个新的deployment。并还在资源上设置适当的 OwnerReferences,以便 handleObject 可以发现“拥有”它的 Foo 资源。
		if errors.IsNotFound(err) {
			deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
		}
		// deployment OwnerReferences 不是 Foo 实例,warning并返回错误
		if !metav1.IsControlledBy(deployment, foo) {
			msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
			c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
			return fmt.Errorf(msg)
		}
		// deployment 中 的配置和 Foo 实例中 Spec 的配置不一致,即更新 deployment
		if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
			deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
		}
		// 更新 Foo 实例状态
		err = c.updateFooStatus(foo, deployment)
		c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	}
# assumes you have a working kubeconfig, not required if operating in-cluster
go build -o sample-controller .
./sample-controller -kubeconfig=$HOME/.kube/config

# create a CustomResourceDefinition
kubectl create -f artifacts/examples/crd-status-subresource.yaml

# create a custom resource of type Foo
kubectl create -f artifacts/examples/example-foo.yaml

# check deployments created through the custom resource
kubectl get deployments

Kubernetes Controller 间通讯方式

Kubernetes 三大核心组建之一的kube-controller-manager,是运行Controller组建进程的控制平面组件,包含了如下Controller集合

image

Kubernetes中不同的Controller间也会进行通讯,以Deployment Controller为例子:

  1. 用户通过 Kubectl 创建 Deployment,APIServer接收到请求后会对该请求进行权限、准入校验,鉴权通过后将 Deployment 的资源信息存储到 Etcd中。
  2. Deployment Controller 基于List/Watch机制,收到Deployment资源的Add事件并处理,为该 Deployment 创建 Replicaset。
  3. APIServer 接收到 Replicaset创建请求后,Replicaset的Add事件将被发布,随后ReplicaSet Controller接收到该事件,进行对应的处理逻辑:创建 Pod。

image

可以看到,Kubernetes Controller基于事件订阅-分发的工作方式,进行Controller间的通信、协调操作;也得其开放的工作机制,让我们可以自由的定制、开发自己的Custom Controller。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK