5

编写一个kubernetes controller - Cylon

 2 years ago
source link: https://www.cnblogs.com/Cylon/p/16390946.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Overview#

根据Kuberneter文档对Controller的描述,Controller在kubernetes中是负责协调的组件,根据设计模式可知,controller会不断的你的对象(如Pod)从当前状态与期望状态同步的一个过程。当然Controller会监听你的实际状态与期望状态。

Writing Controllers#

package main

import (
	"flag"
	"fmt"
	"os"
	"time"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog"
)

type Controller struct {
	lister     cache.Indexer
	controller cache.Controller
	queue      workqueue.RateLimitingInterface
}

func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller {
	return &Controller{
		lister:     lister,
		controller: controller,
		queue:      queue,
	}
}

func (c *Controller) processItem() bool {
	item, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(item)
	fmt.Println(item)
	err := c.processWrapper(item.(string))
	if err != nil {
		c.handleError(item.(string))
	}
	return true
}

func (c *Controller) handleError(key string) {

	if c.queue.NumRequeues(key) < 3 {
		c.queue.AddRateLimited(key)
		return
	}
	c.queue.Forget(key)
	klog.Infof("Drop Object %s in queue", key)
}

func (c *Controller) processWrapper(key string) error {
	item, exists, err := c.lister.GetByKey(key)
	if err != nil {
		klog.Error(err)
		return err
	}
	if !exists {
		klog.Info(fmt.Sprintf("item %v not exists in cache.\n", item))
	} else {
		fmt.Println(item.(*v1.Pod).GetName())
	}
	return err
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()
	klog.Infof("Starting custom controller")

	go c.controller.Run(stopCh)

	if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {
		utilruntime.HandleError(fmt.Errorf("sync failed."))
		return
	}

	for i := 0; i < threadiness; i++ {
		go wait.Until(func() {
			for c.processItem() {
			}
		}, time.Second, stopCh)
	}
	<-stopCh
	klog.Info("Stopping custom controller")
}

func main() {
	var (
		k8sconfig  *string //使用kubeconfig配置文件进行集群权限认证
		restConfig *rest.Config
		err        error
	)
	if home := homedir.HomeDir(); home != "" {
		k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")
	}
	k8sconfig = k8sconfig
	flag.Parse()
	if _, err := os.Stat(*k8sconfig); err != nil {
		panic(err)
	}

	if restConfig, err = rest.InClusterConfig(); err != nil {
		// 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一
		restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)
		if err != nil {
			panic(err)
		}
	}
	restset, err := kubernetes.NewForConfig(restConfig)
	lister := cache.NewListWatchFromClient(restset.CoreV1().RESTClient(), "pods", "default", fields.Everything())
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	indexer, controller := cache.NewIndexerInformer(lister, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			fmt.Println("add ", obj.(*v1.Pod).GetName())
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}

		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			fmt.Println("update", newObj.(*v1.Pod).GetName())
			if newObj.(*v1.Pod).Status.Conditions[0].Status == "True" {
				fmt.Println("update: the Initialized Status", newObj.(*v1.Pod).Status.Conditions[0].Status)
			} else {
				fmt.Println("update: the Initialized Status ", newObj.(*v1.Pod).Status.Conditions[0].Status)
				fmt.Println("update: the Initialized Reason ", newObj.(*v1.Pod).Status.Conditions[0].Reason)
			}

			if len(newObj.(*v1.Pod).Status.Conditions) > 1 {
				if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" {
					fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status)
				} else {
					fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status)
					fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason)
				}

				if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" {
					fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status)
				} else {
					fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status)
					fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason)
				}

				if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" {
					fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status)
				} else {
					fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status)
					fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason)
				}
			}

		},
		DeleteFunc: func(obj interface{}) {
			fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase)
			// 上面是事件函数的处理,下面是对workqueue的操作
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	c := NewController(indexer, controller, queue)
	stopCh := make(chan struct{})
	stopCh1 := make(chan struct{})
	c.Run(1, stopCh)
	defer close(stopCh)
	<-stopCh1
}

通过日志可以看出,Pod create后的步骤大概为4步:

  • Initialized:初始化好后状态为Pending
  • PodScheduled:然后调度
  • PodCondition
  • Ready
add  netbox
default/netbox
netbox
update netbox status Pending to Pending
update: the Initialized Status True
update netbox status Pending to Pending
update: the Initialized Status True
update: the Ready Status  False
update: the Ready Reason  ContainersNotReady
update: the PodCondition Status  False
update: the PodCondition Reason  ContainersNotReady
update: the PodScheduled Status True


update netbox status Pending to Running
update: the Initialized Status True
update: the Ready Status True
update: the PodCondition Status True
update: the PodScheduled Status True

大致上与 kubectl describe pod 看到的内容页相似

default-scheduler  Successfully assigned default/netbox to master-machine
  Normal  Pulling    85s   kubelet            Pulling image "cylonchau/netbox"
  Normal  Pulled     30s   kubelet            Successfully pulled image "cylonchau/netbox"
  Normal  Created    30s   kubelet            Created container netbox
  Normal  Started    30s   kubelet            Started container netbox

Reference

controllers.md


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK