15

Apollo Cyber RT 调度系统

 2 years ago
source link: https://dingfen.github.io/apollo/2020/10/17/Cyber-RT.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Apollo Cyber RT 调度系统解析Permalink

前言Permalink

上篇博客中,我简要地给大家介绍了 Apollo 系统,以及它的代码文件结构,并说明了一下 Cyber RT 在 Apollo 系统中的地位(是的,我调整了一下博客的内容,使之变得更加均衡合理)。Cyber RT 在系统的任务调度方面有重要的作用,又和实时系统要求密切相关。因此,我打算将调度系统作为一个切入点,在本篇博客中,我将会给大家介绍一下 Cyber RT 的调度系统。

Cyber中的调度Permalink

自动驾驶系统的有三大流程:感知、决策、执行。例如,车上的传感器感知到障碍物,判断其类型、运动轨迹等,再做出决策,最后再到刹车、油门和方向的控制,这会经过一系列模块的计算。这些模块在计算过程中会产生数据依赖,如果用箭头将模块间的数据依赖表示出来,就会形成图的拓扑结构。

在 Apollo 项目中,通常使用 DAG file 来描述计算图的拓扑结构。由于自动驾驶系统牵扯到很多的步骤,有很复杂的流程,相应的,这些计算图也十分庞大。因此,如何调度整个计算图使系统能满足各种时间约束,达到系统的实时性和确定性,是个巨大的挑战。

接下来,我们就详细地剖析一下 Cyber RT 的调度系统💪!

conf 配置文件Permalink

Cyber 调度的配置文件在 cyber/conf 文件夹中,配置文件详细说明了线程名、线程的 CPU 亲和性、调度策略,对于协程,还有分组情况、协程的优先级等等。

方便起见,我从官方文档中举例,文档对每项设置的说明都非常具体:

scheduler_conf {
    // 1. 设置调度器策略
    policy: "classic"
    // 2. 设置cpu set
    process_level_cpuset: "0-7,16-23" # all threads in the process are on the cpuset
    // 3. 设置线程的cpuset,调度策略和优先级
    threads: [
        {
            name: "async_log"
            cpuset: "1"
            policy: "SCHED_OTHER"   # policy: SCHED_OTHER,SCHED_RR,SCHED_FIFO
            prio: 0
        }, {
            name: "shm"
            cpuset: "2"
            policy: "SCHED_FIFO"
            prio: 10
        }
    ]
    classic_conf {
        // 4. 设置分组,线程组的cpuset,cpu亲和性,调度策略和优先级
        // 设置调度器创建"processor"对象的个数,以及协程的优先级。  
        groups: [
            {
                name: "group1"
                processor_num: 16
                affinity: "range"
                cpuset: "0-7,16-23"
                processor_policy: "SCHED_OTHER"
         // policy: 
         // SCHED_OTHER  默认策略 分时调度策略
         // SCHED_RR	 实时调度策略 时间片轮转
         // SCHED_FIFO   队列 先到先服务策略
                processor_prio: 0
                tasks: [
                    {
                        name: "E"
                        prio: 0
                    }
                ]
            },{
                name: "group2"
                processor_num: 16
                affinity: "1to1"
                cpuset: "8-15,24-31"
                processor_policy: "SCHED_OTHER"
                processor_prio: 0
                tasks: [
                    {
                        name: "A"
                        prio: 0
                    },{
                        name: "B"
                        prio: 1
                    },{
                        name: "C"
                        prio: 2
                    },{
                        name: "D"
                        prio: 3
                    }
                ]
            }
        ]
    }
}

那么上面的 conf 文档描述了怎么样的调度策略呢?

还是参考官方文档,对于配置文件中已经编排好的任务,其拓扑结构就依据优先级确定了。我们根据上面的 conf 文档,可以简单画出任务的优先级拓扑情况,如下图,A、B、C、D 任务在第一个 group 中执行,E在第二个 group 中执行,对于没有出现在配置中的任务,比如F默认会放到第一个 group 中执行(下文会提到哦)。 而且配置中我们对于任务进行了优先级设置,A、B、C、D 的任务优先级依次增大,正好对应下图的拓扑依赖关系,在链路中越靠后的任务优先级越高。其实,数据也是这样在任务拓扑图中传递,数据走到最后,执行的任务优先级越高,这是为保证整个流程可以快速走完,不被其他流程的任务打断。

topo_sched.png

调度策略Permalink

Apollo 提供了两种调度策略,一种是 classic 策略,在代码中,用 SchedulerClassic 类实现;另一种是 Choreography 策略,代码中用 SchedulerChoreography 类实现。对于这两个策略,我们先给一个大致的描述,好让大家理解之间的差异,稍后在代码分析中,会详细解释这些实现 :smile:

  • classic 策略
    • 较为通用的调度策略
    • 如果对当前自动驾驶车辆上的 DAG 结构不清楚,建议使用此策略
    • 相关协程任务以组为单位与线程作绑定
    • SchedulerClassic 采用了协程池的概念,协程不会绑定到具体的 Processor,而是放在全局的优先级队列中。Processor 运行时,每次从最高优先级的任务开始调度执行。
  • choreography 策略
    • 需要对车上的任务、结构足够熟悉
    • 根据任务的执行依赖关系、任务的执行时长、任务 CPU 消耗情况、消息频率等,对某些任务进行编排
    • SchedulerChoreography 类采用了本地队列和全局队列相结合的方式。他将主链路(choreography开头的配置)进行编排;而对非主链路的任务放到线程池中使用 classic 策略执行。

当使用 choreography 策略时,具体该怎么办?Well,根据任务优先级、执行时长、频率与调度之间的关系,任务编排有如下几个依据(经验):

  • 在同一个路径上的任务尽量编排在同一个 Processor 中,如果 Processor 负载过高,可考虑将部分任务拆分到其他 Processor
  • 为防止优先级倒挂,同一个路径上的任务从开始到结束,优先级应逐级升高
  • 不同路径上的任务尽量混排
  • 高频且短耗时任务尽量编排在同一个 Processor

另:据 Dig-into-Apollo中的说法,该调度策略与 Go 语言中的 GPM 模型相似。

Scheduler、Processor & ContextPermalink

我从参考文献1中找到一张不错的类关系图,可以很好地帮我说明一下这些类的关系:

class.png

SchedulerPermalink

Scheduler 类使用单例模式Instance() 方法被第一次调用时会加载 conf 文件,此时会根据配置文件中指定的类型创建 SchedulerClassic 或者 SchedulerChoreography 对象。

这就涉及到 Scheduler 类的构造函数,在这里,我不想展示具体代码的任何细节,因为这只会把原本就复杂的事物越搞越乱(如果你真的想仔细了解,还是亲自下场看一下代码吧),我经过总结,认为其步骤:

  1. 读取 conf 配置文件,如果读取配置文件失败,会设置默认值
  2. 将内部线程信息保存到查询表中,并设置线程的亲和性和优先级等属性
  3. 将所有的任务按照配置文件的要求分为 group ,设置进程级别的cpuset
  4. 根据配置文件要求创建线程,并包装为 Processor(执行器),绑定上下文。

这边我要多提一句,对于 choreography 策略,在创建线程时会比较特殊,它依照配置文件的要求,把所有线程分为两个部分(就如你在本篇博客最末尾看到的那样),其中一部分线程与 ClassicContext 绑定,另一部分与 Choreography 绑定,这意味着 choreography 策略负责编排一部分任务:

void SchedulerChoreography::CreateProcessor() {
  for (uint32_t i = 0; i < proc_num_; i++) {
    auto proc = std::make_shared<Processor>();
    auto ctx = std::make_shared<ChoreographyContext>();
    proc->BindContext(ctx);
    SetSchedAffinity(proc->Thread(), choreography_cpuset_,
                     choreography_affinity_, i);
    SetSchedPolicy(proc->Thread(), choreography_processor_policy_,
                   choreography_processor_prio_, proc->Tid());
    pctxs_.emplace_back(ctx);
    processors_.emplace_back(proc);
  }

  for (uint32_t i = 0; i < task_pool_size_; i++) {
    auto proc = std::make_shared<Processor>();
    auto ctx = std::make_shared<ClassicContext>();
    proc->BindContext(ctx);
    SetSchedAffinity(proc->Thread(), pool_cpuset_, pool_affinity_, i);
    SetSchedPolicy(proc->Thread(), pool_processor_policy_, pool_processor_prio_,
                   proc->Tid());
    pctxs_.emplace_back(ctx);
    processors_.emplace_back(proc);
  }
}

除了构造函数之外,Scheduler 还负责分发、移除任务,也可以唤醒(Notify)某个任务。

Update at 11.00 29th Oct in 2020.


一开始写这篇博客时,并没有把Scheduler 类的创建、分发、唤醒、移除任务讲清楚,那么今天我来把这个坑补上。

首先是创建任务,在 Cyber RT 组件中,我说过 Component::Initialize() 中创建的处理消息函数,会被首先用于创建协程工厂,然后 Scheduler 会使用 CreateTask() 函数创建协程。

bool Scheduler::CreateTask(std::function<void()>&& func,
                           const std::string& name,
                           std::shared_ptr<DataVisitorBase> visitor) {
  if (cyber_unlikely(stop_.load())) {
		/*  错误处理  */
  }
  // 登记任务名字,获得id  
  // GlobalData 中有一个 id 与 name 一一对应的表格
  // 比起直译map这个单词 我更喜欢用表 因为这更容易让人理解
  auto task_id = GlobalData::RegisterTaskName(name);
  // 创建协程
  auto cr = std::make_shared<CRoutine>(func);
  cr->set_id(task_id);
  cr->set_name(name);
  // 分配任务
  if (!DispatchTask(cr))
    return false;
// 数据访问者 登记回调函数
  if (visitor != nullptr) {
    visitor->RegisterNotifyCallback([this, task_id]() {
      if (cyber_unlikely(stop_.load()))
        return;
      // 唤醒 Processor  让它开始处理任务
      this->NotifyProcessor(task_id);
    });
  }
  return true;
}

在上面的代码中,可以看到协程创建完毕后,还会被分配任务,那么分配任务具体做什么呢?在 SchedulerClassic 类中,DispatchTask() 首先把协程 id 和其指针一一对应起来,放入到 id_cr_ 表中(下图),然后根据配置文件设置这个协程的优先级和协程组,如果配置文件中没有提到该协程,默认放入组 0 中,最后将该协程根据其优先级,放入到队列(下图就有)中,最后唤醒所在组的上下文;对于 SchedulerChoreography 类,还需要多一步来判断协程对应的 Processor 类。那么相似的道理,移除任务就是先将协程停止,从表中、队列中移除。

classic.png

最后 Scheduler::NotifyProcessor() 会唤醒 Processor ,它首先检查传入 id 对应的协程的状态,然后就调用了上下文的 Notify() 函数,让控制线程的条件变量唤醒一个线程(Processor 类)。那么等待一个线程呢?也是控制这个条件变量就行了😂,使用 Wait_for 函数就可以满足要求。

void ClassicContext::Notify(const std::string& group_name) {
  (&mtx_wq_[group_name])->Mutex().lock();
  notify_grp_[group_name]++;
  (&mtx_wq_[group_name])->Mutex().unlock();
  cv_wq_[group_name].Cv().notify_one();
}

void ClassicContext::Wait() {
  std::unique_lock<std::mutex> lk(mtx_wrapper_->Mutex());
  cw_->Cv().wait_for(lk, std::chrono::milliseconds(1000),
                     [&]() { return notify_grp_[current_grp] > 0; });
  if (notify_grp_[current_grp] > 0) 
    notify_grp_[current_grp]--;
}

ProcessorPermalink

前面说到,调度器中会创建线程,并包装为 ProcessorScheduler 根据 conf 文件初始化线程,并创建若干个 Processor。注意,我再此强调 Processor 并不是物理上的处理器,本质就是一个线程而已。为了加强印象,也为了后续方便理解,我在这里给出 Processor 的定义代码:

class Processor {
    public:
    	//...
    private:
      std::shared_ptr<ProcessorContext> context_;	// 上下文
      std::condition_variable cv_ctx_;				// 条件变量
      std::once_flag thread_flag_;					// 线程 flag
      std::mutex mtx_ctx_;							// 互斥锁
      std::thread thread_;							// 线程
      std::atomic<pid_t> tid_{-1};					// 线程id
      std::atomic<bool> running_{false};			// 是否运行

好了,Processor 最重要的部分还是它如何运行。重要到什么程度?额嗯,我甚至可以花大篇幅把这部分代码完整地列出来。

void Processor::Run() {
  // 1. 获取线程的PID,系统内唯一
  tid_.store(static_cast<int>(syscall(SYS_gettid)));
  snap_shot_->processor_id.store(tid_);

  while (cyber_likely(running_.load())) {
    if (cyber_likely(context_ != nullptr)) {
      // 2. 获取优先级最高并且准备就绪的协程
      auto croutine = context_->NextRoutine();
      if (croutine) {
        snap_shot_->execute_start_time.store(cyber::Time::Now().ToNanosecond());
        snap_shot_->routine_name = croutine->name();
        // 3. 执行协程任务,完成后释放协程
        croutine->Resume();
        croutine->Release();
      } else {
        snap_shot_->execute_start_time.store(0);
        // 4. 如果协程组中没有空闲的协程,则等待
        context_->Wait();
      }
    } else {
      // 5. 如果上下文为空,则线程阻塞10毫秒
      std::unique_lock<std::mutex> lk(mtx_ctx_);
      cv_ctx_.wait_for(lk, std::chrono::milliseconds(10));
    }
  }
}

其实 Processor::Run() 的逻辑很简单,就是不断地调用 ProcessorContext::NextRoutine() 函数,取得下一个协程(任务)。如果没取到,就调用 ProcessorContext::Wait() 等待。如果取到了,就调用 CRoutine::Resume() ,让任务继续运行。

看起来关键是 NextRoutine() 是如何挑选下一个任务的。参考上图,ProcessorContext 有两个派生类 ClassicContextChoreographyContext 。它们的实现因调度策略不同而不同。前者是按优先级从高到低从所在 group 对应的任务队列中取任务,取到后,需要判断其状态是否为 READY;后者也是按优先级从高到低的顺序,会将主链路上的任务与非主链路的任务分开列队,并且会为主链路上的任务提供指定的 Processor 。具体的内容很快我们就会在下一小节看到😀。

ContextPermalink

ProcessorContext 类是一个抽象基类,它的实现非常简单,你甚至不用怀疑你的第一直觉,没错,NextRoutine()Wait() 函数就是它最重要的部分。

class ProcessorContext {
 public:
  virtual void Shutdown();
  virtual std::shared_ptr<CRoutine> NextRoutine() = 0;
  virtual void Wait() = 0;

 protected:
  std::atomic<bool> stop_{false};
};

我们重点关注 ClassicContextChoreographyContext 。首先,看一下最关键的 ProcessorContext::NextRoutine() 的代码:

//  classic 调度策略的 NextRoutine()
std::shared_ptr<CRoutine> ClassicContext::NextRoutine() {
  if (cyber_unlikely(stop_.load()))
    return nullptr;
   // 1. 从优先级最高的队列开始遍历
  for (int i = MAX_PRIO - 1; i >= 0; --i) {
   // 2. 获取当前优先级队列的锁
    ReadLockGuard<AtomicRWLock> lk(lq_->at(i));
    for (auto& cr : multi_pri_rq_->at(i)) {
      if (!cr->Acquire())
        continue;
    // 3. 返回状态就绪的协程
      if (cr->UpdateState() == RoutineState::READY)
        return cr;
      cr->Release();
    }
  }
  return nullptr;
}
// Choreography 的调度策略的 NextRoutine() 
std::shared_ptr<CRoutine> ChoreographyContext::NextRoutine() {
  if (cyber_unlikely(stop_.load()))
    return nullptr;
  ReadLockGuard<AtomicRWLock> lock(rq_lk_);
  for (auto it : cr_queue_) {
    auto cr = it.second;
    if (!cr->Acquire())
      continue;
    if (cr->UpdateState() == RoutineState::READY)
      return cr;
    cr->Release();
  }
  return nullptr;
}

我们仔细比较一下上文的两个函数,emm……乍一看似乎没什么区别,确实,但细节往往隐藏了海量信息。注意到,ClassicContext 使用的队列是 multi_pri_rq_ChoreographyContext 使用的队列是 cr_queue_ ,两者类型如下:

using CROUTINE_QUEUE = std::vector<std::shared_ptr<CRoutine>>;
using MULTI_PRIO_QUEUE = std::array<CROUTINE_QUEUE, 20>;
using CR_GROUP = std::unordered_map<std::string, MULTI_PRIO_QUEUE>;

MULTI_PRIO_QUEUE *multi_pri_rq_;	// 这是 ClassicContext 内的一个私有成员
std::multimap<uint32_t, std::shared_ptr<CRoutine>, std::greater<uint32_t>> cr_queue_;
alignas(CACHELINE_SIZE) static CR_GROUP  cr_group_;

看清楚了吧,事实上 ClassicContext 使用了多个优先级队列,而ChoreographyContext 用的“队列”只是了一个表(对于 multimapmap ,我更喜欢用表格称呼它们)。如果你想进一步了解 multi_pri_rq_,那么可以仔细看一下下面这张图。再结合上面的代码,可以得出两点结论:

  • CR_GROUP 相当于组名与优先级队列的表格,是一个全局变量,有序地存放了系统中所有的协程。所以说,在 Classic 策略中,“协程是放在全局的优先级队列中处理的”。
  • 每一个协程组与一个 ClassicContext 对象对应,一个 ClassicContext 也与一个 Processor 绑定,即与一个线程绑定。这就是我前文所说的“相关协程以组为单位与线程做绑定”的直接证据。

scheduler_data.png

而反观 Choreography 策略,ChoreographyContext 只管理那些主链路上的协程(任务),不会对协程进行分组。而对于非主链路上的任务,Choreography 策略会把它们扔给 ClassicContext 上下文处理。

说完这个,我们再看看 Processor 的线程阻塞和唤醒是怎么操作的:没错,通过上下文的 Wait()Notify() 函数 :man_shrugging:

void ClassicContext::Wait() {
  // 1. 获取锁
  std::unique_lock<std::mutex> lk(mtx_wrapper_->Mutex());
  // 2. 等待条件大于0
  cw_->Cv().wait_for(lk, std::chrono::milliseconds(1000),
                     [&]() { return notify_grp_[current_grp] > 0; });
  // 3. 对应协程组的唤醒条件减1
  if (notify_grp_[current_grp] > 0) {
    notify_grp_[current_grp]--;
  }
}
void ClassicContext::Notify(const std::string& group_name) {
  // 1. 加锁
  (&mtx_wq_[group_name])->Mutex().lock();
  // 2. 协程唤醒条件加1
  notify_grp_[group_name]++;
  (&mtx_wq_[group_name])->Mutex().unlock();
  // 3. 唤醒线程
  cv_wq_[group_name].Cv().notify_one();
}

看来唤醒和等待都是通过上下文的条件变量 cw_ 实现的,对于 Choreography 策略来说,实现也是类似的。

整体结构Permalink

现在,我们跳出这些恼人的代码,俯视整个调度结构。幸运的是,参考文献 [1] 已经提供了两张非常好的图片:我在这里也不厌其烦地重复说一下吧😓

  • 调度系统中,Scheduler 类统管了所有资源,很显然,它必然是单例。
  • 每个 Processor 封装了一个 std::thread ,并于一个 ProcessorContext 对象绑定。
  • 切换协程(任务)由上下文完成,过程几乎都是优先级从高到底遍历,选中已经就绪的协程开始运行。
  • 在一个完整的动作流程中,任务的优先级几乎都是从低到高的。
  • Choreography 调度策略,主要是针对主链路上的任务进行编排,这些任务会被分配到程序员指定的 Processor 上,且执行先后关系明确,需要对系统有足够深入的了解。

classic.png

choreograph.png

参考文献Permalink

[1] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的任务调度

[2] 百度 Apollo Cyber Docs

[3] Dig-into-Apollo

[4] Golang 中的协程

[5] 自动驾驶平台Apollo 3.5阅读手记:Cyber RT中的协程


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK