3

volcano 云原生批量计算平台的 controller 控制器设计实现

 1 year ago
source link: https://xiaorui.cc/archives/7417
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

volcano 的原文地址在 xiaorui.cc, 后面对 volcano 的架构及技术实现原理会持续补充.

volcano controller 的实现

  • [controller 控制器实现](#volcano controller 的实现)
    • [QueueController 控制器](#QueueController 控制器)
      • [启动入口 Run](#启动入口 Run)
      • processNextWorkItem
      • handleQueue
      • [state 状态处理](#state 状态处理)
      • [三种 queue 处理方法](#三种 queue 处理方法)
    • [JobController 控制器](#JobController 控制器)

    • [PodGroupController 控制器](#PodGroupController 控制器)

如何启动 volcano controller 控制器

// xiaorui.cc
// Run the controller.
func Run(opt *options.ServerOption) error {
    // 获取启动 controllers 的方法 
    run := startControllers(config, opt)

    // 创建选举客户端
    leaderElectionClient, err := kubeclientset.NewForConfig(rest.AddUserAgent(config, "leader-election"))
    if err != nil {
        return err
    }

    // ...

    // 创建选举锁对象
    rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
        opt.LockObjectNamespace,
        "vc-controller-manager",
        leaderElectionClient.CoreV1(),
        leaderElectionClient.CoordinationV1(),
        resourcelock.ResourceLockConfig{
            Identity:      id,
            EventRecorder: eventRecorder,
        })
    if err != nil {
        return fmt.Errorf("couldn't create resource lock: %v", err)
    }

    // 进行选举,如果拿到锁,则执行控制器逻辑,没拿到则等待。
    // 如果在拿到锁后,发生异常导致续锁失败,导致被抢占,则退出进程。
    leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: leaseDuration,
        RenewDeadline: renewDeadline,
        RetryPeriod:   retryPeriod,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        },
    })
    return fmt.Errorf("lost lease")
}

func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
    controllerOpt := &framework.ControllerOption{}

    controllerOpt.SchedulerName = opt.SchedulerName
    controllerOpt.WorkerNum = opt.WorkerThreads
    controllerOpt.MaxRequeueNum = opt.MaxRequeueNum

    controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config)
    controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
    controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)

    return func(ctx context.Context) {
        framework.ForeachController(func(c framework.Controller) {
            if err := c.Initialize(controllerOpt); err != nil {
                return
            }

            go c.Run(ctx.Done())
        })

        <-ctx.Done()
    }
}

// 启动时 queue、job、pg 会注册到这里。
var controllers = map[string]Controller{}

func ForeachController(fn func(controller Controller)) {
    // 遍历执行各个控制器。
    for _, ctrl := range controllers {
        fn(ctrl)
    }
}

QueueController 控制器

启动入口 Run

启动 queue、pg、cmd informer,在等待同步完成后,异步调用 worker 和 commandWorker 协程。

func (c *queuecontroller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer c.queue.ShutDown()
    defer c.commandQueue.ShutDown()

    klog.Infof("Starting queue controller.")
    defer klog.Infof("Shutting down queue controller.")

    go c.queueInformer.Informer().Run(stopCh)
    go c.pgInformer.Informer().Run(stopCh)
    go c.cmdInformer.Informer().Run(stopCh)

    if !cache.WaitForCacheSync(stopCh, c.queueSynced, c.pgSynced, c.cmdSynced) {
        klog.Errorf("unable to sync caches for queue controller.")
        return
    }

    go wait.Until(c.worker, 0, stopCh)
    go wait.Until(c.commandWorker, 0, stopCh)

    <-stopCh
}

processNextWorkItem

监听 queue 队列的数据,该队列的数据由 queueInformer eventHandler 来产生。

func (c *queuecontroller) worker() {
    for c.processNextWorkItem() {
    }
}

func (c *queuecontroller) processNextWorkItem() bool {
    obj, shutdown := c.queue.Get()
    if shutdown {
        return false
    }
    defer c.queue.Done(obj)

    req, ok := obj.(*apis.Request)
    if !ok {
        klog.Errorf("%v is not a valid queue request struct.", obj)
        return true
    }

    // 这里是 handleQueue
    err := c.syncHandler(req)
    c.handleQueueErr(err, obj)

    return true
}

handleQueue

根据 queue 状态返回不同的状态处理方法,然后变更状态。

// xiaorui.cc
func (c *queuecontroller) handleQueue(req *apis.Request) error {
    // 获取 queue 对象
    queue, err := c.queueLister.Get(req.QueueName)
    if err != nil {
        if apierrors.IsNotFound(err) {
            return nil
        }

        return fmt.Errorf("get queue %s failed for %v", req.QueueName, err)
    }

    // 根据 queue 状态返回不同的状态处理方法
    queueState := queuestate.NewState(queue)
    if queueState == nil {
        return fmt.Errorf("queue %s state %s is invalid", queue.Name, queue.Status.State)
    }

    // 处理状态,本质都是更新 queue 对象状态.
    if err := queueState.Execute(req.Action); err != nil {
        return err
    }

    return nil
}

state 状态处理

queue 有各种各样的状态处理方法,这里拿 openState 举例说明.

type openState struct {
    queue *v1beta1.Queue
}

func (os *openState) Execute(action v1alpha1.Action) error {
    switch action {
    case v1alpha1.OpenQueueAction:
        // open 状态,进行 syncQueue 调和.
        return SyncQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {
            status.State = v1beta1.QueueStateOpen
        })
    case v1alpha1.CloseQueueAction:
        // close 状态,进行 queue 收尾操作.
        return CloseQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {
            if len(podGroupList) == 0 {
                status.State = v1beta1.QueueStateClosed
                return
            }
            status.State = v1beta1.QueueStateClosing
        })
    default:
        // 其他状态,则调用 syncQueue 调和.
        return SyncQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {
            specState := os.queue.Status.State
            if len(specState) == 0 || specState == v1beta1.QueueStateOpen {
                status.State = v1beta1.QueueStateOpen
                return
            }

            if specState == v1beta1.QueueStateClosed {
                if len(podGroupList) == 0 {
                    status.State = v1beta1.QueueStateClosed
                    return
                }
                status.State = v1beta1.QueueStateClosing

                return
            }

            status.State = v1beta1.QueueStateUnknown
        })
    }
}

三种 queue 处理方法

syncQueue
  • 获取 queue 对应的 podGroup 集合
  • 根据各个 podGroup 的状态,累计 queue status 指标
  • 更新 queue 对象
openQueue
  • 从 client 获取最新的 queue 对象
  • 更新 queue 的状态
closeQueue
  • 从 client 获取最新的 queue 对象
  • 更新 queue 的状态

JobController 控制器

根据 job 获取和构建 pods 信息,然后按照 action 决定创建和销毁 pods.

启动入口 Run

// xiaorui.cc
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
    // 启动 informer
    go cc.jobInformer.Informer().Run(stopCh)
    go cc.podInformer.Informer().Run(stopCh)
    go cc.pvcInformer.Informer().Run(stopCh)
    go cc.pgInformer.Informer().Run(stopCh)
    go cc.svcInformer.Informer().Run(stopCh)
    go cc.cmdInformer.Informer().Run(stopCh)
    go cc.pcInformer.Informer().Run(stopCh)
    go cc.queueInformer.Informer().Run(stopCh)

    // 等待 informer 同步完成
    cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
        cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced)

    // 监听处理 commands 信号. 
    go wait.Until(cc.handleCommands, 0, stopCh)

    // 并发启动 worker
    var i uint32
    for i = 0; i < cc.workers; i++ {
        go func(num uint32) {
            wait.Until(
                func() {
                    cc.worker(num)
                },
                time.Second,
                stopCh)
        }(i)
    }

    // jobcache 清理
    go cc.cache.Run(stopCh)

    // 处理异常的 task
    go wait.Until(cc.processResyncTask, 0, stopCh)

    klog.Infof("JobController is running ...... ")
}

worker.processNextReq

volcano 为了提高处理性能,在 jobcontroller 内部抽象了队列数组,启动多个 worker 协程,每个协程绑定一个消费的 queue,而入队时会根据 fnv_hash(namespace/job) 取摸的方式来获取对应的 queue。

通过多队列多 worker 模式来提高处理性能,而且更重要的是可保证 job 级别的有序处理。

func (cc *jobcontroller) worker(i uint32) {
    klog.Infof("worker %d start ...... ", i)

    for cc.processNextReq(i) {
    }
}

func (cc *jobcontroller) processNextReq(count uint32) bool {
    // 获取 worker 对应的 queue 对象.
    queue := cc.queueList[count]
    obj, shutdown := queue.Get()
    if shutdown {
        klog.Errorf("Fail to pop item from queue")
        return false
    }

    req := obj.(apis.Request)
    defer queue.Done(req)

    // 拼装 ns/job-name
    key := jobcache.JobKeyByReq(&req)

    // 如果入队异常,则重新选择入队.
    if !cc.belongsToThisRoutine(key, count) {
        queueLocal := cc.getWorkerQueue(key)
        queueLocal.Add(req)
        return true
    }

    // 从缓存中获取 jobinfo,cache 的数据是由 informer eventhandler 来操作的.
    jobInfo, err := cc.cache.Get(key)
    if err != nil {
        return true
    }

    // 根据 job 状态获取不同的状态处理方法.
    st := state.NewState(jobInfo)
    if st == nil {
        return true
    }

    // 应用 job 中指定的策略
    action := applyPolicies(jobInfo.Job, &req)
    if err := st.Execute(action); err != nil {
        // ...
    }

    queue.Forget(req)

    return true
}

SyncJob

pkg/controllers/job/job_controller_actions.go

初始化 job 相关配置.

  • 根据 job 对象从 queue informer lister 里获取 queue 对象.
  • initiateJob
    • initJobStatus, init job status
    • createJobIOIfNotExist, 如果 job 没有绑定 pvc,为其分配 pvc
    • createOrUpdatePodGroup, 为 pg 创建 podgroup.
  • 遍历 job.spec.tasks 根据当前 jobinfo 状态构建需要创建和删除的 pods 集合.
    • createJobPod 在构建 pod 对象时会赋值 SchedulerName 自定义调度器,默认为 volcano
    • 判断 jobInfo 里是否有对应的 pod
    • 如果不存在,则进行创建.
    • 如果存在,则在 pods 字段里删掉, 然后删掉一些缩容后多余的 pods.
  • 更新 job 的状态
  • 更新内部的 cache 组件

KillJob

pkg/controllers/job/job_controller_actions.go

  • 遍历 jobinfo 的 pods 集合, 调用 deleteJobPod 删除 pod 对象.
  • 更新 job 状态
  • 删除 job 关联的 podgroup 对象.

State

job state 用来处理不同 job 状态下的动作行为.

NewState 根据 job 状态构建不同的方法.

// NewState gets the state from the volcano job Phase.
func NewState(jobInfo *apis.JobInfo) State {
    job := jobInfo.Job
    switch job.Status.State.Phase {
    case vcbatch.Pending:
        return &pendingState{job: jobInfo}
    case vcbatch.Running:
        return &runningState{job: jobInfo}
    case vcbatch.Restarting:
        return &restartingState{job: jobInfo}
    case vcbatch.Terminated, vcbatch.Completed, vcbatch.Failed:
        return &finishedState{job: jobInfo}
    case vcbatch.Terminating:
        return &terminatingState{job: jobInfo}
    case vcbatch.Aborting:
        return &abortingState{job: jobInfo}
    case vcbatch.Aborted:
        return &abortedState{job: jobInfo}
    case vcbatch.Completing:
        return &completingState{job: jobInfo}
    }

    return &pendingState{job: jobInfo}
}

拿 pending state 待处理方法举例说明,异常状态都走 killjob, 而正常状态则走 syncjob.

其他 state 实现类似,根据不同的 action 走 killjob 或 syncjob。

type pendingState struct {
    job *apis.JobInfo
}

func (ps *pendingState) Execute(action v1alpha1.Action) error {
    switch action {
    case v1alpha1.RestartJobAction:
        // 如果需要重启, 则在 killjob 里判断 restart 次数,满足阈值则进行 pod 删除.
        return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
            status.RetryCount++
            status.State.Phase = vcbatch.Restarting
            return true
        })

    case v1alpha1.AbortJobAction:
        // 终止,则直接进行 pod 删除. 
        return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
            status.State.Phase = vcbatch.Aborting
            return true
        })
    case v1alpha1.CompleteJobAction:
        // 当任务已完成,则进行收尾删除.
        return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
            status.State.Phase = vcbatch.Completing
            return true
        })
    case v1alpha1.TerminateJobAction:
        // 任务终止,也需要进行删除。
        return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
            status.State.Phase = vcbatch.Terminating
            return true
        })
    default:
        // 其他 action,都走 syncjob,该逻辑主要是 reconcile 调和.
        return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
            if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed {
                status.State.Phase = vcbatch.Running
                return true
            }
            return false
        })
    }
}

PodGroupController 控制器

pg 的逻辑比较简单,就是维护 podgroup pod.

启动入口 Run

启动 pod 和 pg informer,等待着两个 informer 数据同步完毕后,则启动一个协程执行 worker.

func (pg *pgcontroller) Run(stopCh <-chan struct{}) {
    go pg.podInformer.Informer().Run(stopCh)
    go pg.pgInformer.Informer().Run(stopCh)

    cache.WaitForCacheSync(stopCh, pg.podSynced, pg.pgSynced)

    go wait.Until(pg.worker, 0, stopCh)

    klog.Infof("PodgroupController is running ...... ")
}

processNextReq

func (pg *pgcontroller) worker() {
    for pg.processNextReq() {
    }
}

func (pg *pgcontroller) processNextReq() bool {
    obj, shutdown := pg.queue.Get()
    if shutdown {
        return false
    }

    req := obj.(podRequest)
    defer pg.queue.Done(req)

    // 从 pod informer lister 获取 pg 关联的 pod 对象.
    pod, err := pg.podLister.Pods(req.podNamespace).Get(req.podName)
    if err != nil {
        return true
    }

    // 如果 pg 不存在, 则进行创建
    if err := pg.createNormalPodPGIfNotExist(pod); err != nil {
        pg.queue.AddRateLimited(req)
        return true
    }

    // If no error, forget it.
    pg.queue.Forget(req)

    return true
}

volcano 社区没以前活跃了,几个月前提交的代码,现在都没有合并。😅


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc
weixin_new.jpg

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK