6

源码分析 kubernetes cronjob controller 控制器的实现原理

 1 year ago
source link: https://xiaorui.cc/archives/7337
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

源码分析 kubernetes cronjob controller 控制器的实现原理 – 峰云就她了

专注于Golang、Kubernetes、Nosql、Istio

源码分析 kubernetes cronjob controller 控制器的实现原理

我们可以利用 CronJob 执行基于 crontab 调度的 Job 任务. 创建的 Job 资源是立即执行, 而使用 cronjob 后, 可以周期性的延迟创建 job 任务.

一个例子

每隔一分钟访问一下网站.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            args:
            - /bin/sh
            - -c
            - curl xiaorui.cc
          restartPolicy: OnFailure

并发性规则

.spec.concurrencyPolicy 也是可选的。它声明了 CronJob 创建的任务执行时发生重叠如何处理。 spec 仅能声明下列规则中的一种:

  • Allow: CronJob 允许并发任务执行.
  • Forbid: 如果新任务执行时, 老 job 还未完事, 则直接忽略.
  • Replace: 如果新任务执行时, 老 job 还未完事, 则清理旧 job, 创建新 job 任务.

实例化入口

实例化 cronjob controller 控制器, 传递进去 job 和 cronjob 的 informer 对象, 注册 eventHandler.

在 jobInformer 里注册的 eventHandler 逻辑简单, 从 job 拿到 cronjob 对象然后格式化 key, 再扔到 queue 里, cronjob 也做了同样的逻辑.

func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
    jm := &ControllerV2{
        queue:       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
        jobControl:     realJobControl{KubeClient: kubeClient},
        cronJobControl: &realCJControl{KubeClient: kubeClient},
        jobLister:     jobInformer.Lister(),
        cronJobLister: cronJobsInformer.Lister(),
    }

    // 在 job informer 注册 eventHandler
    jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    jm.addJob,
        UpdateFunc: jm.updateJob,
        DeleteFunc: jm.deleteJob,
    })

    // 在 cronjob informer 注册 eventHandler
    cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            jm.enqueueController(obj)
        },
        UpdateFunc: jm.updateCronJob,
        DeleteFunc: func(obj interface{}) {
            jm.enqueueController(obj)
        },
    })

    return jm, nil
}

启动多个 worker 协程, 每个 worker 先从 queue 获取任务, 然后执行 sync 方法, 如果同步异常则扔到队列里重试. 如果无异常, 则根据下次执行时间把任务放到延迟队列里, 等待下次调度.

func (jm *ControllerV2) Run(ctx context.Context, workers int) {
    // 等待 job 和 cronjob 同步到本地
    if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) {
        return
    }

    // 启动多个 worker
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, jm.worker, time.Second)
    }

    // 阻塞, 直到 ctx 被取消
    <-ctx.Done()
}

func (jm *ControllerV2) worker(ctx context.Context) {
    for jm.processNextWorkItem(ctx) {
    }
}

func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
    // 从队列后去 cronjob 的 key
    key, quit := jm.queue.Get()
    if quit {
        return false
    }
    defer jm.queue.Done(key)

    // 执行 cronjob 的核心同步方法
    requeueAfter, err := jm.sync(ctx, key.(string))
    switch {
    case err != nil:
        // 失败, 把任务放到队列里进行重试
        jm.queue.AddRateLimited(key)

    case requeueAfter != nil:
        // 从队列中剔除
        jm.queue.Forget(key)

        // 通过上面的 sync 计算出下次执行的时间, 然后把任务放到 queue 的延迟队列里, 延迟时间为 requeueAfter.
        jm.queue.AddAfter(key, *requeueAfter)
    }
    return true
}

核心 sync 代码

sync() 是 cronjob controller 里核心处理代码的入口, 而 syncCronJob() 实现了 cronjob 的主要逻辑.

源码逻辑流程如下:

  1. 从 informer lister 获取 cronjob 对象;
  2. 获取 cronjob 关联的 jobs 对象集合;
  3. 同步 cronjob 的状态, 按照不同的 ConcurrencyPolicy 策略, 选择不同的动作, 计算下次执行的时间;
  4. 清理已经完成的 jobs;
  5. 更新 cronjob 的状态.
func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Duration, error) {
    // 从 key 中拆分 ns 和 name 字段
    ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)
    if err != nil {
        return nil, err
    }

    // 从 informer lister 获取 cronjob 对象
    cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
    switch {
    case errors.IsNotFound(err):
        return nil, nil
    case err != nil:
        // for other transient apiserver error requeue with exponential backoff
        return nil, err
    }

    // 获取 cronjob 关联的 jobs 对象集合
    jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)
    if err != nil {
        return nil, err
    }

    // 获取 cronjob 对象, 下次执行的时间, 是否更新状态等
    cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled)
    if err != nil {
        if updateStatus {
            // 更新 cronjob 的状态
            if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
                return nil, err
            }
        }
        return nil, err
    }

    // 清理已经完成的 jobs.
    if jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) {
        updateStatus = true
    }

    // 更新 cronjob 的状态
    if updateStatus {
        if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
            return nil, err
        }
    }

    // 如果拿到了下次执行的 duration, 则返回该 duration
    if requeueAfter != nil {
        return requeueAfter, nil
    }
    return nil, nil
}

syncCronJob 同步定时任务

代码流程如下:

  • 遍历 cronjob active 集合, 如果对应的 job 不再存在, 则从 active list 中删除 job 引用. 避免 cronjob 可能永远处于活动模式 ;
  • 如果删除时间不为空, 说明该对象已被删除, 后面无需处理了 ;
  • 该 cronjob 已暂停则直接退出 ;
  • 获取开源 cron 库的调度解释器 ;
  • 根据 crontab spec 表达式计算下次执行的时间;
  • 如果配置了 ForbidConcurrent 策略, 且当前已经有 job 还在运行, 则直接跳出 ;
  • 如果配置了 ReplaceConcurrent 策略, 则需要清理以前还在运行的 Job ;
  • 获取 cronjob 对应的 job 模板, 然后创建 job 对象资源 ;
  • 在 cronjob 对象里关联 job 对象, 记录 LastScheduleTime 时间和状态等 ;
  • 获取下次执行的 duration.
  • return
func (jm *ControllerV2) syncCronJob(
    ctx context.Context,
    cronJob *batchv1.CronJob,
    jobs []*batchv1.Job) (*batchv1.CronJob, *time.Duration, bool, error) {

    cronJob = cronJob.DeepCopy()
    now := jm.now()
    updateStatus := false
    timeZoneEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CronJobTimeZone)

    // 创建一个集合映射 job.uid.
    childrenJobs := make(map[types.UID]bool)
    for _, j := range jobs {
        childrenJobs[j.ObjectMeta.UID] = true

        // job 是否在 cronJob active 里
        found := inActiveList(*cronJob, j.ObjectMeta.UID)

        // 如果 job uid 跟 cronjob uid 不相同, 且任务没完成 
        if !found && !IsJobFinished(j) {
            // 获取 cronjob 对象
            cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
            if err != nil {
                return nil, nil, updateStatus, err
            }

            // 再次判断是否相等, 如果相等则使用新拿到的 cronjob 对象
            if inActiveList(*cjCopy, j.ObjectMeta.UID) {
                cronJob = cjCopy
                continue
            }

        } else if found && IsJobFinished(j) {
            // 如果相同, 且 job 已完成
            _, status := getFinishedStatus(j)

            // 从 list 中删除该 job
            deleteFromActiveList(cronJob, j.ObjectMeta.UID)

            // 往 event 里输出该状态
            jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
            updateStatus = true

        } else if IsJobFinished(j) {
            // 如果该 job 已完成, 则更新时间.
            if cronJob.Status.LastSuccessfulTime == nil {
                cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
                updateStatus = true
            }
        }
    }

    // 遍历 cronjob active 集合, 如果对应的 job 不再存在, 则从 active list 中删除 job 引用. 避免 cronjob 可能永远处于活动模式.
    for _, j := range cronJob.Status.Active {
        _, found := childrenJobs[j.UID]
        if found {
            continue
        }

        _, err := jm.jobControl.GetJob(j.Namespace, j.Name)
        switch {
        case errors.IsNotFound(err):
            jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)

            // 在 active 里有, 但 job 里不存在, 则需要在 cronJob active 里删除, 为了一致性.
            deleteFromActiveList(cronJob, j.UID)
            updateStatus = true
        case err != nil:
            return cronJob, nil, updateStatus, err
        }
    }

    // 如果删除时间不为空, 说明该对象已被删除, 后面无需处理了.
    if cronJob.DeletionTimestamp != nil {
        return cronJob, nil, updateStatus, nil
    }

    // timezone 异常则退出, 输出一条未知时区的错误事件
    if timeZoneEnabled && cronJob.Spec.TimeZone != nil {
        if _, err := time.LoadLocation(*cronJob.Spec.TimeZone); err != nil {
            timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "")
            jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
            return cronJob, nil, updateStatus, nil
        }
    }

    // 该 cronjob 已暂停则直接退出.
    if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
        return cronJob, nil, updateStatus, nil
    }

    // 获取开源 cron 库的调度解释器
    sched, err := cron.ParseStandard(formatSchedule(timeZoneEnabled, cronJob, jm.recorder))
    if err != nil {
        return cronJob, nil, updateStatus, nil
    }

    // 根据 crontab spec 表达式计算下次执行的时间
    scheduledTime, err := getNextScheduleTime(*cronJob, now, sched, jm.recorder)
    if err != nil {
        // 异常则发送 cron spec 解析失败的事件
        jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
        return cronJob, nil, updateStatus, nil
    }

    // 如果事件为空, 则尝试使用 nextScheduledTimeDuration 来计算下次时间.
    if scheduledTime == nil {
        t := nextScheduledTimeDuration(*cronJob, sched, now)
        return cronJob, t, updateStatus, nil
    }

    tooLate := false
    // 如果配置 StartingDeadlineSeconds, 则开启 toolate 延后
    if cronJob.Spec.StartingDeadlineSeconds != nil {
        tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
    }
    if tooLate {
        t := nextScheduledTimeDuration(*cronJob, sched, now)
        return cronJob, t, updateStatus, nil
    }

    // 如果配置了 ForbidConcurrent 策略, 且当前已经有 job 还在运行, 则直接跳出 
    if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
        t := nextScheduledTimeDuration(*cronJob, sched, now)
        return cronJob, t, updateStatus, nil
    }

    // 如果配置了 ReplaceConcurrent 策略, 则需要清理以前还在运行的 Job.
    if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
        for _, j := range cronJob.Status.Active {
            // 获取当前的 job 对象
            job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
            if err != nil {
                return cronJob, nil, updateStatus, err
            }
            // 删除该 job 对象
            if !deleteJob(cronJob, job, jm.jobControl, jm.recorder) {
                return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
            }
            updateStatus = true
        }
    }

    // 获取 cronjob 对应的 job 模板
    jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
    if err != nil {
        return cronJob, nil, updateStatus, err
    }

    // 创建 job 任务
    jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)

    // 在 cronjob 对象里关联 job 对象, 记录 LastScheduleTime 时间和状态等
    jobRef, err := getRef(jobResp)
    cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
    cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
    updateStatus = true

    // 获取下次调度执行的 duration
    t := nextScheduledTimeDuration(*cronJob, sched, now)
    return cronJob, t, updateStatus, nil
}

计算下次调度的时间 nextScheduledTimeDuration

nextScheduledTimeDuration 用来获取下次调度的时间, 代码的逻辑相对有些绕, 核心目的就是为了拿到合理的调度时间.

首先下次的调度不能在 now 之前, 如果在 now 之前则尝试使用下下次的时间. 另外从调度时间点计算 duration 时长时, 加入了 100ms 的抖动, 该抖动应该是为了应对拿到负数 duration.

var (
    nextScheduleDelta = 100 * time.Millisecond
)

func nextScheduledTimeDuration(cj batchv1.CronJob, sched cron.Schedule, now time.Time) *time.Duration {
    // 默认使用创建时间, 通常第一次的时候, 没有上次的调度时间.
    earliestTime := cj.ObjectMeta.CreationTimestamp.Time
    if cj.Status.LastScheduleTime != nil {
        earliestTime = cj.Status.LastScheduleTime.Time
    }

    // 计算最近的时间点
    mostRecentTime, _, err := getMostRecentScheduleTime(earliestTime, now, sched)
    if err != nil {
        mostRecentTime = &now
    } else if mostRecentTime == nil {
        // 为空, 则使用 earliestTime 时间
        mostRecentTime = &earliestTime
    }

    // 依据 mostRecentTime 时间点和 crontab spec 计算出下次 cron 调度时间点, 增加 100ms 的波动, 减去当前时间为 duration 时长
    t := sched.Next(*mostRecentTime).Add(nextScheduleDelta).Sub(now)

    return &t
}

func getMostRecentScheduleTime(earliestTime time.Time, now time.Time, schedule cron.Schedule) (*time.Time, int64, error) {
    t1 := schedule.Next(earliestTime)
    t2 := schedule.Next(t1)

    // 如果 now 在 t1 前面, 直接退出, 可以直接使用 earliestTime 作为时间点计算.
    if now.Before(t1) {
        return nil, 0, nil
    }

    // 如果 now 在 t1 后面, 说明当前的 next 时间不能用了, 需要使用 next next 的时间点, 再来判断.

    // 如果 now 在 next.next 的前面, 可以退出, 使用 t1 的时间.
    if now.Before(t2) {
        return &t1, 1, nil
    }

    // 说实话没看懂啥意思... 
    timeBetweenTwoSchedules := int64(t2.Sub(t1).Round(time.Second).Seconds())
    if timeBetweenTwoSchedules < 1 {
        return nil, 0, fmt.Errorf("time difference between two schedules less than 1 second")
    }
    timeElapsed := int64(now.Sub(t1).Seconds())
    numberOfMissedSchedules := (timeElapsed / timeBetweenTwoSchedules) + 1
    t := time.Unix(t1.Unix()+((numberOfMissedSchedules-1)*timeBetweenTwoSchedules), 0).UTC()
    return &t, numberOfMissedSchedules, nil
}

cronjob 如何解决时间临界点的问题 ?

出现的原因:

当 time.sub 的时候, 由于时间精度问题导致拿到的 diff 时间会变小, 放到 workqueue delay heap 中等待调度, 当再次被调度时, 有可能在 scheduledTime 之前就被调度起来了. 那么再次求 next 时, 很可能跟上次 scheduledTime 一样.

几年前在开发基于 crontab 定时器时, 遇到过该问题. https://github.com/rfyiamcool/cronlib

解决方法:

cronjob 内部会判断当前计算出来的 scheduledTime 是否跟 LastScheduledTime 一致, 如一致则使用下下次的时间. 每次 time.Sub 时会多加 nextScheduleDelta 100ms, 这个极大的减少了上面 case 概率的发生.

清理已完成的 job 对象 cleanupFinishedJobs

清理已经完成状态的 job 对象, removeOldestJobs 最后还是调用 deleteJob 来清理 job.

func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) bool {
    if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
        return false
    }

    updateStatus := false
    failedJobs := []*batchv1.Job{}
    successfulJobs := []*batchv1.Job{}

    for _, job := range js {
        isFinished, finishedStatus := jm.getFinishedStatus(job)
        if isFinished && finishedStatus == batchv1.JobComplete {
            successfulJobs = append(successfulJobs, job)
        } else if isFinished && finishedStatus == batchv1.JobFailed {
            failedJobs = append(failedJobs, job)
        }
    }

    if cj.Spec.SuccessfulJobsHistoryLimit != nil &&
        jm.removeOldestJobs(cj,
            successfulJobs,
            *cj.Spec.SuccessfulJobsHistoryLimit) {
        updateStatus = true
    }

    if cj.Spec.FailedJobsHistoryLimit != nil &&
        jm.removeOldestJobs(cj,
            failedJobs,
            *cj.Spec.FailedJobsHistoryLimit) {
        updateStatus = true
    }

    return updateStatus
}

大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc
weixin_new.jpg

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK