0

client-go实战之八:更新资源时的冲突错误处理 - 程序员欣宸

 11 months ago
source link: https://www.cnblogs.com/bolingcavalry/p/17724756.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

  • 本文是《client-go实战》系列的第七篇,来了解一个常见的错误:版本冲突,以及client-go官方推荐的处理方式
  • 本篇由以下部分组成
  1. 什么是版本冲突(from kubernetes官方)
  2. 编码,复现版本冲突
  3. 版本冲突的解决思路(from kubernetes官方)
  4. 版本冲突的实际解决手段(from client-go官方)
  5. 编码,演示如何解决版本冲突
  6. 自定义入参,对抗更高的并发

什么是版本冲突(from kubernetes官方)

  1. 首先,在逻辑上来说,提交冲突是肯定存在的,多人同时获取到同一个资源的信息(例如同一个pod),然后各自在本地修改后提交,就有可能出现A的提交把B的提交覆盖的情况,这一个点就不展开了,数据库的乐观锁和悲观锁都可以用来处理并发冲突
  2. kubernetes应对提交冲突的方式是资源版本号,属于乐观锁类型(Kubernetes leverages the concept of resource versions to achieve optimistic concurrency)
  3. 基于版本实现并发控制是常见套路,放在kubernetes也是一样,基本原理如下图所示,按照序号看一遍即可理解:左右两人从后台拿到的资源都是1.0版本,然而右侧提交的1.1的时候,服务器上已经被左侧更新到1.1了,于是服务器不接受右侧提交
    在这里插入图片描述

编码,复现版本冲突

  • 接下来,咱们将上述冲突用代码复现出来,具体的功能如下
  1. 创建一个deployment资源,该资源带有一个label,名为biz-version,值为101
  2. 启动5个协程,每个协程都做同样的事情:读取deployment,得到label的值后,加一,再提交保存
  3. 正常情况下,label的值被累加了5次,那么最终的值应该等于101+5=106
  4. 等5个协程都执行完毕后,再读读取一次deployment,看label值是都等于106
$ tree client-go-tutorials
client-go-tutorials
├── action
│   ├── action.go
│   ├── conflict.go
│   └── list_pod.go
├── client-go-tutorials
├── go.mod
├── go.sum
└── main.go
  • 接下来的代码都写在conflict.go中
  • 首先是新增两个常量
const (
	// deployment的名称
	DP_NAME string = "demo-deployment"
	// 用于更新的标签的名字
	LABEL_CUSTOMIZE string = "biz-version"
)
  • 然后是辅助方法,返回32位整型的指针,后面会用到
func int32Ptr(i int32) *int32 { return &i }
  • 创建deployment的方法,要注意的是增加了一个label,名为LABEL_CUSTOMIZE,其值为101
// 创建deployment
func create(clientset *kubernetes.Clientset) error {
	deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)

	deployment := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:   DP_NAME,
			Labels: map[string]string{LABEL_CUSTOMIZE: "101"},
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: int32Ptr(1),
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{
					"app": "demo",
				},
			},
			Template: apiv1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: map[string]string{
						"app": "demo",
					},
				},
				Spec: apiv1.PodSpec{
					Containers: []apiv1.Container{
						{
							Name:  "web",
							Image: "nginx:1.12",
							Ports: []apiv1.ContainerPort{
								{
									Name:          "http",
									Protocol:      apiv1.ProtocolTCP,
									ContainerPort: 80,
								},
							},
						},
					},
				},
			},
		},
	}

	// Create Deployment
	fmt.Println("Creating deployment...")
	result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
	if err != nil {
		return err
	}

	fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())

	return nil
}
  • 按照名称删除deployment的方法,实战的最后会调用,将deployment清理掉
// 按照名称删除
func delete(clientset *kubernetes.Clientset, name string) error {
	deletePolicy := metav1.DeletePropagationBackground

	err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Delete(context.TODO(), name, metav1.DeleteOptions{PropagationPolicy: &deletePolicy})

	if err != nil {
		return err
	}

	return nil
}
  • 再封装一个get方法,用于所有更新操作完成后,获取最新的deployment,检查其label值是否符合预期
// 按照名称查找deployment
func get(clientset *kubernetes.Clientset, name string) (*v1.Deployment, error) {
	deployment, err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Get(context.TODO(), name, metav1.GetOptions{})
	if err != nil {
		return nil, err
	}

	return deployment, nil
}
  • 接下来是最重要的更新方法,这里用的是常见的先查询再更新的方式,查询deployment,取得标签值之后加一再提交保存
// 查询指定名称的deployment对象,得到其名为biz-version的label,加一后保存
func updateByGetAndUpdate(clientset *kubernetes.Clientset, name string) error {

	deployment, err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Get(context.TODO(), name, metav1.GetOptions{})

	if err != nil {
		return err
	}

	// 取出当前值
	currentVal, ok := deployment.Labels[LABEL_CUSTOMIZE]

	if !ok {
		return errors.New("未取得自定义标签")
	}

	// 将字符串类型转为int型
	val, err := strconv.Atoi(currentVal)

	if err != nil {
		fmt.Println("取得了无效的标签,重新赋初值")
		currentVal = "101"
	}

	// 将int型的label加一,再转为字符串
	deployment.Labels[LABEL_CUSTOMIZE] = strconv.Itoa(val + 1)

	_, err = clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Update(context.TODO(), deployment, metav1.UpdateOptions{})
	return err
}
  • 最后,是主流程代码,为了能在现有工程框架下运行,这里新增一个struct,并实现了action接口的DoAction方法,这个DoAction方法中就是主流程
type Confilct struct{}

func (conflict Confilct) DoAction(clientset *kubernetes.Clientset) error {

	fmt.Println("开始创建deployment")

	// 开始创建deployment
	err := create(clientset)

	if err != nil {
		return err
	}

	// 如果不延时,就会导致下面的更新过早,会报错
	<-time.NewTimer(1 * time.Second).C

	// 一旦创建成功,就一定到删除再返回
	defer delete(clientset, DP_NAME)

	testNum := 5

	waitGroup := sync.WaitGroup{}
	waitGroup.Add(testNum)

	fmt.Println("在协程中并发更新自定义标签")

	startTime := time.Now().UnixMilli()

	for i := 0; i < testNum; i++ {

		go func(clientsetA *kubernetes.Clientset, index int) {
			// 避免进程卡死
			defer waitGroup.Done()

			err := updateByGetAndUpdate(clientsetA, DP_NAME)

			// var retryParam = wait.Backoff{
			// 	Steps:    5,
			// 	Duration: 10 * time.Millisecond,
			// 	Factor:   1.0,
			// 	Jitter:   0.1,
			// }

			// err := retry.RetryOnConflict(retryParam, func() error {
			// 	return updateByGetAndUpdate(clientset, DP_NAME)
			// })

			if err != nil {
				fmt.Printf("err: %v\n", err)
			}

		}(clientset, i)
	}

	// 等待协程完成全部操作
	waitGroup.Wait()

	// 再查一下,自定义标签的最终值
	deployment, err := get(clientset, DP_NAME)

	if err != nil {
		fmt.Printf("查询deployment发生异常: %v\n", err)
		return err
	}

	fmt.Printf("自定义标签的最终值为: %v,耗时%v毫秒\n", deployment.Labels[LABEL_CUSTOMIZE], time.Now().UnixMilli()-startTime)

	return nil
}
  • 最后还要修改main.go,增加一个action的处理,新增的内容如下
    在这里插入图片描述
  • 这里给出完整main.go
package main

import (
	"client-go-tutorials/action"
	"flag"
	"fmt"
	"path/filepath"

	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func main() {
	var kubeconfig *string
	var actionFlag *string

	// 试图取到当前账号的家目录
	if home := homedir.HomeDir(); home != "" {
		// 如果能取到,就把家目录下的.kube/config作为默认配置文件
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		// 如果取不到,就没有默认配置文件,必须通过kubeconfig参数来指定
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}

	actionFlag = flag.String("action", "list-pod", "指定实际操作功能")

	flag.Parse()

	fmt.Println("解析命令完毕,开始加载配置文件")

	// 加载配置文件
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err.Error())
	}

	// 用clientset类来执行后续的查询操作
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	fmt.Printf("加载配置文件完毕,即将执行业务 [%v]\n", *actionFlag)

	var actionInterface action.Action

	// 注意,如果有新的功能类实现,就在这里添加对应的处理
	switch *actionFlag {
	case "list-pod":
		listPod := action.ListPod{}
		actionInterface = &listPod
	case "conflict":
		conflict := action.Confilct{}
		actionInterface = &conflict
	}

	err = actionInterface.DoAction(clientset)
	if err != nil {
		fmt.Printf("err: %v\n", err)
	} else {
		fmt.Println("执行完成")
	}
}
  • 最后,如果您用的是vscode,可以修改launch.json,调整输入参数
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Launch Package",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${workspaceFolder}",
            "args": ["-action=conflict"]
        }
    ]
}
  • 回顾上面的代码,您会发现是5个协程并行执行先查询再修改提交的逻辑,理论上会出现前面提到的冲突问题,5个协程并发更新,会出现并发冲突,因此最终标签的值是小于101+5=106的,咱们来运行代码试试

  • 果然,经过更新后,lable的最终值等于102,也就是说过5个协程同时提交,只成功了一个

    在这里插入图片描述
  • 至此,咱们通过代码证明了资源版本冲突问题确实存在,接下来就要想办法解决此问题了

版本冲突的解决思路(from kubernetes官方)

  • 来看看kubernetes的官方对于处理此问题是如何建议的,下面是官方原话
In the case of a conflict, the correct client action at this point is to GET the resource again, apply the changes afresh, and try submitting again
  • 很明显,在更新因为版本冲突而失败的时候,官方建议重新获取最新版本的资源,然后再次修改并提交
  • 听起来很像CAS
  • 在前面复现失败的场景,如果是5个协程并发提交,总有一个会失败多次,那岂不是要反复重试,把代码变得更复杂?
  • 还好,client-go帮我们解决了这个问题,按照kubernetes官方的指导方向,将重试逻辑进行了封装,让使用者可以很方便的实现完成失败重试

版本冲突的实际解决手段(from client-go官方)

  • client-go提供的是方法,下面是该方法的源码
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
	return OnError(backoff, errors.IsConflict, fn)
}
  • 从上述方法有两个入参,backoff用于控制重试相关的细节,如重试次数、间隔时间等,fn则是常规的先查询再更新的自定义方法,由调用方根据自己的业务自行实现,总之,只要fn返回错误,并且该错误是可以通过重试来解决的,RetryOnConflict方法就会按照backoff的配置进行等待和重试
  • 可见经过client-go的封装,对应普通开发者来说已经无需关注重试的实现了,只要调用RetryOnConflict即可确保版本冲突问题会被解决
  • 接下来咱们改造前面有问题的代码,看看能否解决并发冲突的问题

编码,演示如何解决版本冲突

  • 改成client-go提供的自动重试代码,整体改动很小,如下图所示,原来是直接调用updateByGetAndUpdate方法,现在注释掉,改为调用RetryOnConflict,并且将updateByGetAndUpdate作为入参使用
    在这里插入图片描述
  • 再次运行代码,如下图,这次五个协程都更新成功了,不过耗时也更长,毕竟是靠着重试来实现最终提交成功的
    在这里插入图片描述

自定义入参,对抗更高的并发

  • 前面的验证过程中,并发数被设置为5,现在加大一些试试,改成10,如下图红色箭头位置
    在这里插入图片描述
  • 执行结果如下图所示,10个并发请求,只成功了5个,其余5个就算重试也还是失败了
    在这里插入图片描述
  • 出现这样的问题,原因很明显:下面是咱们调用方法时的入参,每个并发请求最多重试5次,显然即便是重试5次,也只能确保每一次有个协程提交成功,所以5次过后没有重试机会,导致只成功了5个
var retryParam = wait.Backoff{
	Steps:    5,
	Duration: 10 * time.Millisecond,
	Factor:   1.0,
	Jitter:   0.1,
}
  • 找到了原因就好处理了,把上面的Steps参数调大,改为10,再试试
    在这里插入图片描述
  • 如下图,这一次结果符合预期,不过耗时更长了
    在这里插入图片描述
  • 最后留下一个问题:Steps参数到底该设置成多少呢?这个当然没有固定值了,5是client-go官方推荐的值,结果在并发为10的时候依然不够用,所以具体该设置成多少还是要依照您的实际情况来决定,需要大于最大的瞬间并发数,才能保证所有并发冲突都能通过重试解决,当然了,实际场景中,大量并发同时修改同一个资源对象的情况并不多见,所以大多数时候可以直接使用client-go官方的推荐值
  • 至此,kubernetes资源更新时的版本冲突问题,经过实战咱们都已经了解了,并且掌握了解决方法,基本的增删改查算是没问题了,接下来的文章,咱们要聚焦的是client-go另一个极其重要的能力:List&Watch
  • 敬请期待,欣宸原创必不会辜负您
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) [email protected]:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本篇的源码在tutorials/client-go-tutorials文件夹下,如下图红框所示:
    在这里插入图片描述

欢迎关注博客园:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK