7

源码分析 kubernetes kubelet pod 管理的设计实现

 1 year ago
source link: https://xiaorui.cc/archives/7322
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

源码分析 kubernetes kubelet pod 管理的设计实现 – 峰云就她了

专注于Golang、Python、DB、cluster

源码分析 kubernetes kubelet pod 管理的设计实现

实例化 kubelet 服务实例, 并启动 syncLoop 调度核心.

代码位置: pkg/kubelet/kubelet.go

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    // 创建且实例化 kubelet 服务
    k, err := createAndInitKubelet(kubeServer,
        kubeDeps,
        hostname,
        hostnameOverridden,
        nodeName,
        nodeIPs)
    if err != nil {
        return fmt.Errorf("failed to create kubelet: %w", err)
    }
    podCfg := kubeDeps.PodConfig

    // 启动 kubelet
    startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
}

// 启动 kubelet
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, ...) {
    go k.Run(podCfg.Updates())
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    ctx := context.Background()

    ...

    kl.pleg.Start()

    // kubelet 的核心调度代码
    kl.syncLoop(ctx, updates, kl)
}

updates 这个管道很重要,在初始化 kubelet 对象时, 内部通过 makePodSourceConfig 方法可以监听 apiserver 的配置更新, 把更新的事件扔到这个 updates 管道里. 另外它还监听了文件及http的接口.

调用关系: createAndInitKubelet -> NewMainKubelet -> makePodSourceConfig

func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
    // source of all configuration
    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker)

    // define file config source
    if kubeCfg.StaticPodPath != "" {
        klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
        config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
    }

    // define url config source
    if kubeCfg.StaticPodURL != "" {
        klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
        config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
    }

    if kubeDeps.KubeClient != nil {
        klog.InfoS("Adding apiserver pod source")
        config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
    }
    return cfg, nil
}

调度核心 (syncLoop)

syncLoop 是 kubelet 的调度核心, 内部定义了两个定时器, 一个用来同步的 syncTicker 定时器, 一个是 用来清理异常 pods 的 housekeepingTicker 定时器.

循环调度 syncLoopIteration 方法.

func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    klog.InfoS("Starting kubelet main sync loop")

    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()

    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()

    plegCh := kl.pleg.Watch()

    ...
    for {
        kl.syncLoopMonitor.Store(kl.clock.Now())
        if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}

kubelet 的 pods 同步逻辑都在 syncLoopIteration 这里. syncLoopIteration 同时监听下面的 chan, 根据事件做不同的处理.

  • configCh: 监听 file, http, apiserver 的事件更新
  • syncCh: 定时器管道, 每隔一秒去同步最新保存的 pod 状态
  • houseKeepingCh: housekeeping 事件的管道,做 pod 清理工作
  • plegCh: 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
  • livenessManager.Updates: 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {

    select {
    case u, open := <-configCh: // 来自 apiserver 的 pod 事件
        if !open {
            return false
        }

        switch u.Op {
        case kubetypes.ADD: // 添加
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE: // 更新
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.RECONCILE: // 协调
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE: // 删除
            handler.HandlePodUpdates(u.Pods)
        default:
            klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
        }

        kl.sourcesReady.AddSource(u.Source)

    case e := <-plegCh: // 由 pleg 子模块上报的事件, pleg 会扫描当前所有容器, 当状态发生变更时发出事件
        if isSyncPodWorthy(e) {
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                handler.HandlePodSyncs([]*v1.Pod{pod})
            }
        }

        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }
    case <-syncCh: // 由定时器触发更新
        podsToSync := kl.getPodsToSync()
        handler.HandlePodSyncs(podsToSync)
    case update := <-kl.livenessManager.Updates(): // 当 liveness 状态发生变更时
        if update.Result == proberesults.Failure {
            handleProbeSync(kl, update, handler, "liveness", "unhealthy")
        }
    case update := <-kl.readinessManager.Updates(): // 当 readiness 状态变更时
        kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

        handleProbeSync(kl, update, handler, "readiness", status)
    case update := <-kl.startupManager.Updates(): // 当 startup 状态变更时
        kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
        handleProbeSync(kl, update, handler, "startup", status)

    case <-housekeepingCh: // 定时器触发
        handler.HandlePodCleanups(ctx)
    }
    return true
}

增加 pod 流程 (HandlePodAdditions)

202212181730298.png

HandlePodAdditions 是创建 Pod 的核心代码. 首先对传入的 pods 进行排序, 保证先提交创建请求的 pod 被先创建, 最后调用 dispatchWork 来创建 pod.

另外静态 pod 是走 handleMirrorPod 流程.

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()

    // 按照创建事件的先后对传入的 pods 进行排序, 保证是 fifo 的模型. 
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
        existingPods := kl.podManager.GetPods()

        // 把 pod 添加到 podManager 里
        kl.podManager.AddPod(pod)

        // 判断是否是静态 pod
        if kubetypes.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }

        // 在 dispatchWork 里去做 pod 操作, 这里操作为创建 pod
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
}

另外, 主调度核心里对 pod 进行增删改操作, 其实最后都会跳到 dispatchWork 方法上.

该方法里主要定义了类型 kubetypes.SyncPodType, 然后调用 podWrokers.UpdatePod 异步操作 pod.

代码位置: pkg/kubelet/pod_workers.go

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        StartTime:  start,
    })
}

由于 kubelet 创建 pod 容器路径太深, 索性忽略下面的路径,直接跳到 syncPod 方法中.

podWorkers.UpdatePod -> podWorkers.managePodLoop -> podWorkers.syncPodFn -> kubelet.syncPod

为 pod 做准备工作及创建 pod ( syncPod )

kubelet.syncPod 主要用来实现 pod 资源的创建, 内部会做好 pod 的准备工作, 流程如下:

  1. 更新 pod 状态到 statusManager
  2. 检查网络插件是否就绪
  3. 把 pod 注册到 secretManager 和 configMapManager 管理器里
  4. 创建更新 cgroups 策略
  5. 为静态pod创建一个 mirror pod
  6. 为 pod 实例化数据目录
  7. 配置 volume 挂载
  8. 为 pod 拉取 secrets 配置
  9. 为 pod 添加探针检测
  10. 调用容器的运行时 SyncPod 完成容器重建

代码位置: pkg/kubelet/kubelet.go : syncPod()

func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    // 更新 pod 状态到 statusManager
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // 检查网络插件是否就绪
    if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
        return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
    }

    // 把 pod 注册到 secretManager 和 configMapManager 管理器里
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        if kl.secretManager != nil {
            kl.secretManager.RegisterPod(pod)
        }
        if kl.configMapManager != nil {
            kl.configMapManager.RegisterPod(pod)
        }
    }

    pcm := kl.containerManager.NewPodContainerManager()
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        // 创建更新 cgroups 策略
        if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
            if !pcm.Exists(pod) {
                if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
                }
            }
        }
    }

    // 为静态 pod 创建一个 mirror pod, 
    if kubetypes.IsStaticPod(pod) {
        deleted := false
        // 如果不为空, 则需要清理
        if mirrorPod != nil {
            if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
                podFullName := kubecontainer.GetPodFullName(pod)
                deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
            }
        }
        // 如果为空, 则需要为静态pod创建 mirror pod
        if mirrorPod == nil || deleted {
            kl.podManager.CreateMirrorPod(pod)
        }
    }

    // 为 pod 实例化数据目录
    if err := kl.makePodDataDirs(pod); err != nil {
        return false, err
    }

    // 配置 volume 挂载
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        // 同步等待 volumes 完成
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            return false, err
        }
    }

    // 为 pod 拉取 secrets 配置
    pullSecrets := kl.getPullSecretsForPod(pod)

    // 为 pod 添加探针检测
    kl.probeManager.AddPod(pod)

    // 调用容器的运行时 SyncPod 完成容器重建
    result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
        return false, nil
    }

    return false, nil
}

创建启动 pod 内的容器 (SyncPod)

SyncPod 会依次创建启动 pod 内的容器, 流程如下:

  1. 创建 sandbox 容器, sandbox 其实就是 pause 容器.
  2. 创建临时容器
  3. 创建 init 容器
  4. 创建业务容器
  5. 完成容器创建

代码位置: pkg/kubelet/kuberuntime/kuberuntime_manager.go

func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // 计算 sandbox 和 容器发生的变动
    podContainerChanges := m.computePodActions(pod, podStatus)
    // 如果 sandbox 发生变动, 则干
    if podContainerChanges.KillPod {
        // 干掉 pod
        killResult := m.killPodWithSyncResult(ctx, pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)

        if podContainerChanges.CreateSandbox {
            // 并清理 init 容器
            m.purgeInitContainers(ctx, pod, podStatus)
        }
    } else {
        // 干掉不能运行的容器
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            if err := m.killContainer(ctx, pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
                return
            }
        }
    }

    var podIPs []string
    if podStatus != nil {
        podIPs = podStatus.IPs
    }

    // 为 pod 创建 sandbox
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        // 创建 pod sandbox
        podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
        if err != nil {
            ...
            return
        }

        // 如果 pod 网络是 host 模式,容器也相同;其他情况下,容器会使用 None 网络模式,让 kubelet 的网络插件自己进行网络配置
        if !kubecontainer.IsHostNetworkPod(pod) {
            podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, resp.GetStatus())
            klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
        }
    }

    podIP := ""
    if len(podIPs) != 0 {
        podIP = podIPs[0]
    }

    // 获取 pod sandbox 的配置, 里面有 metadata,clusterDNS,容器的端口映射等.
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    if err != nil {
        configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
        return
    }

    // 定义一个匿名的启动各种类型容器的方法
    start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
        ...

        // 启动容器
        if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            ...
            return err
        }

        return nil
    }

    // 启动临时容器, 临时容器不会自动重启, 通常配合 kubectl debug 调试使用.
    for _, idx := range podContainerChanges.EphemeralContainersToStart {
        start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
    }

    // 启动 init 容器
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
    }

    // 启动业务容器
    for _, idx := range podContainerChanges.ContainersToStart {
        // 调用上方定义的 start 匿名函数.
        start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }

    return
}

真正去创建启动容器 (startContainer)

startContainer 是真正创建容器的方法, 流程如下:

  1. 拉取容器镜像, 不存在则直接拉取.
  2. 配置新容器的重启次数为 0, 重启被干掉后重建的容器, 重启次数会累加.
  3. 为新容器实例化日志目录
  4. 配置容器的日志目录
  5. 生成容器配置
  6. 日志目录做软连接
  7. 执行 post start hook, 出错则需要干掉启动的容器.

代码位置: pkg/kubelet/kuberuntime/kuberuntime_container.go

func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    container := spec.container

    // 拉取容器镜像, 不存在拉取
    imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, pod, container, pullSecrets, podSandboxConfig)
    if err != nil {
        return msg, err
    }

    // 配置新容器的重启次数为 0, 重启被干掉后重建的容器, 重启次数会累加.
    restartCount := 0
    containerStatus := podStatus.FindContainerStatusByName(container.Name)
    if containerStatus != nil {
        restartCount = containerStatus.RestartCount + 1
    } else {
        // 新容器则需要创建容器的日志目录
        logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)
    }

    // 根据传递进来的参数生成容器配置
    containerConfig, cleanupAction, err := m.generateContainerConfig(ctx, container, pod, restartCount, podIP, imageRef, podIPs, target)

    // PreCreateContainer, 尝试是否做cpu亲和及内存 numa, 现在只有 linux 平台做了具体实现.
    err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
    if err != nil {
        return s.Message(), ErrPreCreateHook
    }

    // 正式创建容器
    containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
    if err != nil {
        return s.Message(), ErrCreateContainer
    }

    // 预先启动容器
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    if err != nil {
        return s.Message(), ErrPreStartHook
    }

    // 启动上面生成的容器
    err = m.runtimeService.StartContainer(ctx, containerID)
    if err != nil {
        return s.Message(), kubecontainer.ErrRunContainer
    }

    // 为容器的日志目录做软连接
    if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
        if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
            klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
                "containerID", containerID, "containerLogPath", containerLog)
        }
    }

    // 执行 post start hook
    if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
        kubeContainerID := kubecontainer.ContainerID{
            Type: m.runtimeName,
            ID:   containerID,
        }
        // 如果在容器启动后, 执行 post start hook 失败, 则干掉容器
        msg, handlerErr := m.runner.Run(ctx, kubeContainerID, pod, container, container.Lifecycle.PostStart)
        if handlerErr != nil {
            if err := m.killContainer(ctx, pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
            }
            return msg, ErrPostStartHook
        }
    }

    return "", nil
}

删除 pod 流程 (HandlePodRemoves)

删除 pod 的流程, 首先在 podManager 清理 pod, 然后调用 podWorkers.UpdatePod 方法来更新 Pod, 只是 updateType 为 kubetypes.SyncPodKill.

func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
    for _, pod := range pods {
        kl.podManager.DeletePod(pod)
        if kubetypes.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }

        kl.deletePod(pod)
    }
}

func (kl *Kubelet) deletePod(pod *v1.Pod) error {
    if !kl.sourcesReady.AllReady() {
        return fmt.Errorf("skipping delete because sources aren't ready yet")
    }
    kl.podWorkers.UpdatePod(UpdatePodOptions{
        Pod:        pod,
        UpdateType: kubetypes.SyncPodKill,
    })
    return nil
}

大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc
weixin_new.jpg

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK