4

Apollo Cyber RT 组件

 2 years ago
source link: https://dingfen.github.io/apollo/2020/10/25/CyberComponent.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Apollo Cyber RT 组件Permalink

前言Permalink

今天,我来给大家介绍一下 Apollo Cyber RT 中组件(Component)的相关知识。老规矩,在这之前,先回顾一下之前的内容。根据课题组的安排,我这段时间一直在研究 Apollo 系统,它是百度开发的自动驾驶开源框架,具有高性能和高灵活性的特点,我主要介绍 Apollo 5.5 版本。其中的 Apollo Cyber RT 是 Apollo 团队在注意到 ROS 无法满足自动驾驶系统的鲁棒性和性能要求后,专门为自动驾驶场景设计的开源、高性能运行时框架。在之前的博客中,我介绍了 Cyber RT 中定时器的相关知识,主要介绍了定时器的算法、实现以及定时器组件如何使用定时器。由此引发了我对组件实现的兴趣,那么这篇博客就详细地介绍一下组件吧。

组件 ComponentPermalink

根据百度 Apollo 团队提供的 Cyber RT 文档1,组件(Component)是 Cyber RT 用于构建应用模块的基本类。每个特定的应用模块都可以继承 Component 类并定义自己的函数 Init()Proc() ,之后,该模块就可以被装载入 Cyber RT 框架中。

一般来说,用户有两种选择来使用 Cyber RT 框架:

  • 基于二进制。应用被分别编译成二进制文件,并使用自己创建的 ReaderWriter 来与其他 Cyber RT 模块进行通信
  • 基于组件。应用被编译成一个共享库(Shared library),通过继承 Component 类并写好相应的 dag 文件,Cyber RT 框架会自动装载并运行该应用

不难看出,使用基于组件的方案有明显的优点:

  • 组件可以被不同的进程装载,部署非常灵活
  • 当需要改变接受信道的名字(或者其他属性),可以直接更改 dag 文件,不需要重新编译
  • 组件支持接收多种类型的数据
  • 组件支持提供多种混合策略

用户自定义组件Permalink

要创建并启动一个算法组件2,需要通过以下 4 个步骤:

  • 初如化组件的文件结构
  • 实现组件类
  • 设置配置文件

官方文档介绍的很详细,在这里我就不啰嗦了。

组件类Permalink

在开始前,为更方便大家的理解,建议阅读 Cyber RT 的术语解释Cyber RT Terms ,因为我会反复提到其中的某些术语。

从代码上看,组件基类 ComponentBase 是组件类 Component 和时间组件类 TimerComponent 的基类。仔细看下图(淡蓝色为背景表示它是 privateprotected 的),有几点发现:

Component.png

  • 一个 Component 类只含有一个 Node ,但可以有多个 Reader
  • Init()Proc() 这两个用户自己定义的函数,都是不可以被直接调用的
  • 用户只能使用 Initialize()Process() 函数来调用自己写的 Init()Proc()

根据代码,Component 类最多可以处理 4 个消息信道( channels of messages),这些信道——即 Reader 对象,最后都会被放入到 ComponentBase::readers_ 变量中,没错,这些所谓的信道在代码实现中就是 Reader ,Apollo 团队并没有设计出 Channel 这样的类🐶。

先从简单的 Process() 函数抓起吧。Process() 非常好理解,就是先判断一下有没有关闭该 Component 类,再调用 Proc() 函数。

template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Process(
    const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) {
  if (is_shutdown_.load())
    return true;
  return Proc(msg0, msg1);
}

我最关心的就是 Component 类的初始化过程,即 Initialize() 函数,一旦搞清楚了这一点,那么我们就可以更好地理解其他 Cyber RT 部分在整个系统中的作用。经过对代码的详细了解后,我总结出了以下过程:

  1. 创建 Node 节点
  2. 读取配置文件
  3. 调用用户定义的函数 Init()
  4. 创建信道对象,或者说读者 Reader<M>>
    1. 根据配置文件的相关内容,填充信道的相关配置信息 reader_cfg
    2. 创建消息收到时,就会触发的激活函数 invoke func
    3. 如果 is_reality_mode 为真 ,那么直接根据配置信息 reader_cfg 创建信道,不加入激活函数;若不是,需要再加入invoke func 创建信道
  5. is_reality_mode 为真,那么就需要创建数据访问类 DataVisitor 、 协程工厂、调度器,并创建调度任务。

看完我的语言描述后,我觉得是时候上代码给你们看一下它的真面目了。为确保代码简单而又不失一般性,我选择了一个含有两个信道Component 类初始化函数。这部分代码非常重要,后文我们会反复使用。

template <typename M0, typename M1>		// 消息的类型 表示 这里有两个信道
bool Component<M0, M1, NullType, NullType>::Initialize(const ComponentConfig& config) {
  // 1. 创建 Node 节点
  node_.reset(new Node(config.name()));
  // 2. 读取配置文件
  LoadConfigFiles(config);
  if (config.readers_size() < 2)
    	/* 错误处理  */
  if (!Init())
        /* 错误处理  */
  bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
  //  信道的配置信息
  ReaderConfig reader_cfg;
  reader_cfg.channel_name = config.readers(1).channel();
  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
  // 第 1 个 读信道已经创建
  auto reader1 = node_->template CreateReader<M1>(reader_cfg);
    
  reader_cfg.channel_name = config.readers(0).channel();
  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
  // 第 0 个读信道要根据是否 is_reality_mode   来决定使用何种函数创建
  std::shared_ptr<Reader<M0>> reader0 = nullptr;
  if (cyber_likely(is_reality_mode)) {
    reader0 = node_->template CreateReader<M0>(reader_cfg);
  } else {
    std::weak_ptr<Component<M0, M1>> self =
        std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());

    auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
        config.readers(1).channel());

    auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
      auto ptr = self.lock();
      if (ptr) {
        if (!blocker1->IsPublishedEmpty()) {
          auto msg1 = blocker1->GetLatestPublishedPtr();
          ptr->Process(msg0, msg1);
        }
      } else
           /*   错误处理   */
    };
    reader0 = node_->template CreateReader<M0>(reader_cfg, func);
  }
  if (reader0 == nullptr || reader1 == nullptr)
        /*  错误处理  */
  // 信道创建完毕,全部存入到 readers_ 数组中
  readers_.push_back(std::move(reader0));
  readers_.push_back(std::move(reader1));
    
  if (cyber_unlikely(!is_reality_mode)) {
    return true;
  }
  // 创建 scheduler
  auto sched = scheduler::Instance();
  std::weak_ptr<Component<M0, M1>> self =
      std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
  auto func = [self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) {
    auto ptr = self.lock();
    if (ptr) {
      ptr->Process(msg0, msg1);
    } else 
         /*  错误处理  */
  };
  // 创建 DataVisitor 和 RoutineFactory   最后创建任务
  std::vector<data::VisitorConfig> config_list;
  for (auto& reader : readers_)
    config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
  auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
  croutine::RoutineFactory factory = croutine::CreateRoutineFactory<M0, M1>(func, dv);
  return sched->CreateTask(factory, node_->Name());
}

不得不承认,初始化的过程复杂度超乎我的想象。看完代码,你应该明白之前的只言片语只是对这一复杂过程的笼统概括😅。接下来,我还需要对上述过程中提到的术语做一些解释。如果你不想看这些繁杂的细节,可以直接跳过

Node 节点Permalink

Initialize() 函数一开始,Component 类就创建了 Node 类对象。那么什么是 Node 类?根据官方文档给出的解释3,Node 节点类是 Cyber RT 的基本组成部分;每个 Component 对象都包含一个节点,可通过节点进行通信。通过定义节点中的 read/write 和/或 service/client,模块可以具有不同类型的通信模式。Node 对象负责创建 ReaderWriterServiceClient 信道对象来帮该组件获取或传输信息。

这么说好像有点抽象?那我们再来看一下上文的类继承图以及下面的代码块。从类成员的角度看,Node 对象有

  • std::string node_name_ 它的名字
  • std::string name_space_命名空间
  • map<string, ReaderBase> 类型的 readers_ ,它其实就是一个表格,负责保存信道名字 channel_name 与对应的 Reader 读者对象
  • 一个 NodeChannelImpl 指针。 NodeChannelImpl 类是 Node 用来创建与信道相关的 ReaderWriter 对象的类,在真实模式下(下文会介绍),创建的对象是 ReaderWriter,而在模拟模式下,创建的是 IntraReaderIntraWriter 对象,创建后向通信拓扑注册当前节点
  • 一个 NodeServiceImpl 指针。和NodeChannelImpl类似,只不过它创建的是 ServiceClient 对象,创建后也会注册 service
  • 上面提到的两个指针,都指向创建之前提到的四种信道对象的创建器
class Node {
 public:
    template <typename M0, typename M1, typename M2, typename M3>
    friend class Component;
    friend class TimerComponent;
 private:
    explicit Node(const std::string& node_name,
                const std::string& name_space = "");

    std::string node_name_;
    std::string name_space_;
    
    std::map<std::string, std::shared_ptr<ReaderBase>> readers_;
    std::unique_ptr<NodeChannelImpl> node_channel_impl_ = nullptr;
    std::unique_ptr<NodeServiceImpl> node_service_impl_ = nullptr;
};

回到 Initialize() 函数,Node 类创建完后,其主要任务就是创建读者信道:

reader0 = node_->template CreateReader<M0>(reader_cfg, func);

信道与配置信息Permalink

接下来的两步,读取配置文件和调用 Init() 函数都非常直白,我们直接来看读者信息的配置,或者说,看它配置了哪些读者信息。

ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();

一共三个变量被赋值(事实上 ReaderConfig 也只有这三个变量)

  • string channel_name 信道的名字。要求信道的名字不能重复
  • QosProfile qos_profile qos 属性,通信的服务质量
  • uint32_t pending_queue_size 信道缓冲区的长度,如果溢出了,会丢弃较早的消息

在真实模式(下文会提到)中,会调用 node_->template CreateReader<M1>(reader_cfg) 创建一个 Reader 对象,不加入激活函数。而若要仔细检视如何创建 Reader 对象,需要看一下 Node Channel Impl::CreateReader 。这里简单说一下,主要步骤有:

  1. 设定 RoleAttribute 的相关信息,包括信道名字,qos_file ,host_id,node_id 等等
  2. 将新的属性(Attr),激活函数和缓冲区大小作为参数,构造出 Reader<MessageT> 的对象

当然,这里面的故事还没有结束,如果有时间的话,可以进一步研究一下 Cyber RT 的通信模式。

真实模式与模拟模式Permalink

接下来,解决一下 is_reality_mode 的问题。通过 GlobalData::Instance()->IsRealityMode() 猜测,它是一个全局数据,再进一步调查发现,它只有两个值:

enum RunMode {
  MODE_REALITY = 0;
  MODE_SIMULATION = 1;
}

那么,什么时候它是 reality,什么时候是 simulation 呢?一般来说,simulation 模式多用于测试,多出现在测试文件中。其默认值为 reality 模式,但如果在测试文件(_test.cc)中,调用了如下函数,就会切换为模拟模式(simulation 模式)。而 reality 模式,即真实模式,根据我的理解,可能就是在系统真实运作、控制自动驾驶系统时的运行模式。在真实模式下,初始化工作非常的明确:为每个信道创建一个 Reader,然后创建回调函数用于调用 Process(),最后创建出对应的协程,让 Scheduler 来运行管理。

void GlobalData::EnableSimulationMode() {
  run_mode_ = RunMode::MODE_SIMULATION;
}

模拟模式与真实模式的最大差别就是,数据来源不是真实传感器实时获取的数据了。那么,模拟模式的数据从哪获得呢?在代码中,模拟模式的信道由 IntraReaderIntraWriter 类实现,这些类获取的数据也不是从协程中获得,而是通过 Blocker 类获得模拟数据(或历史数据)。为方便说明,把 Component::Initialize() 函数的模拟模式部分截取过来👇:

if (is_reality_mode) {
    // ...
} else {
	std::weak_ptr<Component<M0, M1>> self =
        std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
    auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
        config.readers(1).channel());
    // 特殊的回调函数
    auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
      auto ptr = self.lock();
      if (ptr) {
        if (!blocker1->IsPublishedEmpty()) {
          auto msg1 = blocker1->GetLatestPublishedPtr();
          ptr->Process(msg0, msg1);
        }
      } else {
        AERROR << "Component object has been destroyed.";
      }
    };
    reader0 = node_->template CreateReader<M0>(reader_cfg, func);
}

NodeChannelImpl::CreateReader() {
    // ....
    if (!is_reality_mode_) {
    reader_ptr =
        std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
  } else {
        // ...
    }
}

可以看到,在模拟模式下,如果有 n 个信道(这里 n = 2),初始化程序会先给后 n-1 个信道创建 IntraReader 对象(日后我们再讨论这些东西),然后对于 Reader<M0> 信道,它会创建一个特殊的回调函数,该回调函数的基本情况如下:

  • 在信道 0 接收到消息时触发
  • 触发时,函数会从其他 n-1 个信道的 IntraReaderBlocker::published_msg_queue_ 队列中各拿出 1 个消息,并把这些消息一起交给 Process() 函数执行
  • 该函数在创建 IntraReader 对象时,就被当成参数传入,并在IntraReader::init() 中被注册到该 Blocker 对象内的回调函数表中(日后讨论+1)

接下来看看初始化函数的最后一部分,我们现在只考虑真实模式,因为此部分代码只有在真实模式下进行。首先,获取了调度器单例对象,并建立的 func ,而 func 内容也很简单,就是在线程安全的前提下直接调用 Porcess()Process() 会调用用户自己定义的函数 Proc(),进而处理组件接收到的所有消息。

auto func = [self](const std::shared_ptr<M0>& msg0,
                     const std::shared_ptr<M1>& msg1) {
    auto ptr = self.lock();
    if (ptr) {
      ptr->Process(msg0, msg1);
    } else {
      AERROR << "Component object has been destroyed.";
    }
  };

func 会被当做参数传给 CRoutine::CreateRoutineFactory ,看这名字就知道,该函数用于创建协程工厂(工厂模式),此外该函数还涉及了消息融合,数据访问和数据分发等等,我们先略过不说。复杂的代码理解不了,还是看一个简单点的代码吧:

template <typename Function>
RoutineFactory CreateRoutineFactory(Function&& f) {
  RoutineFactory factory;
  factory.create_routine = [f = std::forward<Function&&>(f)]() { return f; };
  return factory;
}

很简单的代码,只是设定了一下 factory.create_routine。在返回 factory 后,调用了 Scheduler::CreateTask。此后,又继续调用一个函数,注意:CreateTask 的第一个参数 func 就是在 Component::Initialize() 中创建的原函数。

bool Scheduler::CreateTask(const RoutineFactory& factory, const std::string& name) {
  return CreateTask(factory.create_routine(), name, factory.GetDataVisitor());
}

bool Scheduler::CreateTask(std::function<void()>&& func,
                           const std::string& name,
                           std::shared_ptr<DataVisitorBase> visitor) {
  if (cyber_unlikely(stop_.load())) {
      /*  错误处理  */
  }
  auto task_id = GlobalData::RegisterTaskName(name);
  auto cr = std::make_shared<CRoutine>(func);		// 看这里!!!!
  cr->set_id(task_id);
  cr->set_name(name);
  if (!DispatchTask(cr))
    return false;
  if (visitor != nullptr) {
    visitor->RegisterNotifyCallback([this, task_id]() {
      if (cyber_unlikely(stop_.load()))
        return;
      this->NotifyProcessor(task_id);
    });
  }
  return true;
}

哇哈,终于抓住你了,看来 Component::Initialize() 中建立的 func ,最终会被 Cyber RT 用来创建一个协程。然后放入到 Scheduler 中,并根据我之前介绍的 Cyber RT 调度策略运行。在真实模式下,组件创建了一个特殊的回调函数,该函数对从信道接受来的消息进行处理。该函数最终会被封装为一个协程,并在 Scheduler 类的安排下执行。

定时组件类Permalink

定时组件类与组件类有所不同,它比组件类多了定时器,这部分内容我在 Cyber RT 定时器中已经提过,为了这篇博客的完整性,我再次强调一下。

Process() 函数和组件类一样,平平无奇。重点看一下 Initialize() 函数

  1. 创建 Node 类对象
  2. 读取配置文件
  3. 创建定时器对象,并开始计时
bool TimerComponent::Initialize(const TimerComponentConfig& config) {
  if (!config.has_name() || !config.has_interval()) {
		/* 错误处理 */
  }
  node_.reset(new Node(config.name()));
  LoadConfigFiles(config);
  if (!Init())
    return false;
  std::shared_ptr<TimerComponent> self =
      std::dynamic_pointer_cast<TimerComponent>(shared_from_this());
  auto func = [self]() { self->Proc(); };
  timer_.reset(new Timer(config.interval(), func, false));
  timer_->Start();
  return true;
}

注意到,与组件类不同,定时组件类在 Initialize() 中没有创建任何的 Reader 读者,也没有搞出调度器、协程工厂、创建任务等一系列复杂的操作。你可能有这样的疑问:既然定时组件类并没有创建出任务和协程,那么定时组件类的处理函数需要如何被调用呢?Cyber RT 定时器中其实已经有解答了🤪。简单来说,所有的定时器会把任务全部交给时间轮处理,过一段时间后,当时间轮发现需要执行某些定时任务时,就会把它们全部取出,然后调用 cyber::Async 异步地执行。

总结Permalink

今天我重点研究了组件类,分析了组件类的继承图关系以及它们的成员变量,并着重探究了组件类的初始化过程,进而对:信道与读者类、节点类与 NodeChannelImpl 类、调度器、数据访问类、协程、协程工厂函数等等有了一个大致的了解。

在组件类中,最重要的两个可调用函数就是 Process()Initialize() ,然而用户不可以对它们直接进行更改,必须通过重载 Proc()Init() 才能操控组件。对于如何创建一个组件类,官方的解释非常详细2。在对 Initialize() 函数的进一步的研究中,我发现在真实模式下,初始化工作主要有:为每个信道创建一个 Reader 对象,然后创建回调函数用于调用 Process(),最后创建出对应的协程,让 Scheduler 来运行管理,而在模拟模式下,数据主要是从 Blocker 类中取得的历史数据,而非感知器获得的真实数据。

我承认这篇博客很多地方没有解释很清楚,这一方面是因为篇幅限制,另一方面是因为我还未对 Cyber RT 理解透彻(尤其是通信部分),相信在课题组其他成员的帮助下,了解 Apollo 系统的真面目已经不远了。

参考Permalink

[1] Cyber RT Documents

[2] 如何使用 Cyber RT 来创建一个新的组件

[3] Cyber RT Terms


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK