7

飞哥讲代码21:C++TLS在Envoy中应用

 3 years ago
source link: http://lanlingzi.cn/post/technical/2021/0124_code/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

飞哥讲代码21:C++TLS在Envoy中应用

时间:

2021-01-24

  |   分类: 技术  

  |  

阅读: 4126 字 ~9分钟

本文虽分析的是C++源码,但是对Evnoy的设计思想分析,并不影响其它语言开发者阅读。

Envoy是Service Mesh框架Istio推荐的SideCar,基于C++开发(大量使用了Google开源C++项目absl),具有高性能的特点,被广大微服务框架爱好者所熟悉。它的高性能一方面也源自它的优秀线程模型,我们可以通过这篇 Envoy为什么能战胜Ngnix——线程模型分析篇 可以进一步了解它的设计思路。这是对Envoy架构师的 博文 翻译,原文内容较深入较长,总结如下:

  • 采用单进程多线程的线程模型,其中一个主线程控制一些零散的协作任务
  • 若干worker线程负责连接监听,以及连接请求消息的过滤、转发
  • 一旦监听器接受了连接,连接的后续生命周期都绑定到单个工作线程
  • 使用非阻塞的网络调用,配置的Worker数与CPU核数(线程线)一致,即可完成大部分工作负载

上述做法也是最为常见的多线程设计模式,似乎没有什么特别之处。但写过C++多线程的同学就会知道其中的痛苦,虽然主线程与Worker线程职责分离了,但是xDS会随时动态更新监听器/路由/集群等信息,Worker线程又需要实时查询这些信息以作出变更。这涉及到数据的线程安全,若采用传统的多线程加锁获取共享数据,必然会影响到连接处理请求的性能。

出于高性能的诉求,Envoy采用无锁编程(针对上述数据访问)方案。主线程把需要同步的数据通过Slot推送给Worker线程,由Worker线程在沉寂期完成数据更新,这样需要访问此类数据时就不需要加锁。这就是Envoy中线程模型中最为巧妙的地方。先借用原博文中的一张图片描述一下:

envoy_thread_slot

我们以Envoy 1.13.8的源码来看一下如何做到无锁编程,Slot数据推送是结合C++的TLS(thread local stroage)特性使用的。核心代码实现相当的优雅和高内聚,我们可以在 这里这里 找到他们。

  • 前者主要定义了Slot与SlotAllocator接口
  • 后者主要实现Slot对象的管理与其数据推送
    • SlotAllocator接口实者InstanceImp:对Slot对象的管理,以及支持集中管理各线程TLS上的Slot数据
    • Slot接口实现者SlotImpl:提供set/get函数来设置与获取数据,屏蔽对象上的数据的存储与同步细节

实现逻辑核心思路如下:

  • 主线程维护一个Slot对象列表,可以通过index直接访问Slot对象
  • 主线程分配置Slot槽位,每个Slot上可以存储不同类型的数据,如xDS数据
  • 主线程对Slot对象设置数据时,则把数据发送到所有worker线程作为一个正常的事件循环处理,让Worker线程也把数据设置到它的TLS上,从来在TLS上具有与主线程相同的Slot对象
  • Worker线程处理其它事件时,可以从对应的Slot上获取它的所能获取的数据

2 Dispatcher

为了弄清楚主线程与Worker线程的交互,先要了解一下Enovy中的Dispatcher机制,我们可以从 源码分析——Envoy Dispatcher 了解到:

  • 每一个worker线程运行一个非阻塞的EventLoop
  • Dispatcher是一接口,它本质上实现是一个EventLoop,其承担了任务队列、网络事件处理、定时器、信号处理等核心功能
  • Envoy复用了Libevent中的event_base,Envoy在Libevent的基础上进行了二次封装并抽象出一些事件类,如FileEvent、SignalEvent、Timer等
  • Dispatcher的内部有一个任务队列,会有一个工作线程专门处理任务队列中的任务
  • Worker外可以通过Dispatcher的post函数将任务投递到worker的任务队列中,交给Dispatcher的线程去处理

Dispatcher是一个典型生产-消费设计模式。

3 Slot

SlotImpl实现了Slot接口,是InstanceImpl的内部类。它负责维护数据,我们先来看一下其set函数实现:

void InstanceImpl::SlotImpl::set(InitializeCb cb) { // [4] 参数InitializeCb
  ASSERT(Thread::MainThread::isMainThread()); // [1] 是否主线程调用
  ASSERT(!parent_.shutdown_);

  // [2] dispatcher.post推送事件
  for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
    // See the header file comments for still_alive_guard_ for why we capture index_.
    dispatcher.post(wrapCallback(
        [index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }));
  }

  // [3] Handle main thread.
  setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}

从上述代码中可以看出:

  • [1] Slot对象存放数据只能是主线程调用
  • [2] 给每个注册的工作线程推送数据,调用dispatcher.post压入一个lambda的回调函数,在此回调函数会调用setThreadLocal函数来把数据放在Worker线程的TLS中,其中index_是Slot对象槽位号
  • [3] 再调用主线程的setThreadLocal来把数据放在主线程的TLS中,因为主线程也可以在收到其它事件来获了相应的槽位数据
  • [4] InitializeCb是一个回调函数指针,它传递dispatcher参数,返回要存储的对象指针。主线程与工作线程实际存储的数据都是通过调用InitializeCb函数来拿到真实要存放的数据ThreadLocalObjectSharedPtr,对于同一个InitializeCb函数,会在不同的工作线程中调用多次,相当于对槽位数据进行拷贝

InitializeCb参见:using InitializeCb = std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)>

再来看一下setThreadLocal函数的实现:

// [1] 把对象加入到TLS对象data_列表中
void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
  if (thread_local_data_.data_.size() <= index) {
    thread_local_data_.data_.resize(index + 1);
  }

  thread_local_data_.data_[index] = object;
}

// [2]前面的thread_local_data_类型如下
struct ThreadLocalData {
    Event::Dispatcher* dispatcher_{};
    std::vector<ThreadLocalObjectSharedPtr> data_;
};

// [3]thread_local_data_
thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;

从上述代码中可以看出:

  • [1] thread_local_data_.data_ 支持存储多个对象,数组大小不够,则扩充,每个index对应Slot的槽位号
  • [2] thread_local_data_.data_ 的类型是一个struct,包括dispatcher指针(每个注册的线程),以及存储的对象列表(不同的Slot存储不同的数据)
  • [3] thread_local_data_ 是InstanceImpl的静态成员,它是一个TLS对象。thread_local是C++11新引入,thread_local关键字修饰的变量具有线程周期(thread duration),这些变量(或者说对象)在线程开始的时候被生成(allocated),在线程结束的时候被销毁(deallocated),thread_local的变量绑定的对象即TLS对象

再来看一下get函数实现:

bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) {
  return thread_local_data_.data_.size() > index;
}

ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) {
  ASSERT(currentThreadRegisteredWorker(index)); // [1] 判断是否当前工作线程的Slot是否存在
  return thread_local_data_.data_[index]; // [2] 根据槽位号取对应的数据
}

ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); }

从上述代码中可以看出:

  • get函数直接调用getWorker函数
  • [1] 判断Slot的槽位在当前线程的TLS是否存在
  • [2] 根据槽位号index_从当前线程的TLS上取对应的数据

Slot接口还提供两个函数来直接传递数据给所有注册的线程:

  • 相同点:机制与set函数实现类似,采用dispatcher.post把数据分发给各注册线程处理,通过回调UpdateCb函数指针来产生数据
  • 不同点:不会把UpdateCb函数指针产生数据更新到TLS;TLS中存储的数据适合于多次需要读取的场景,runOnAllThreads函数适合一次数据传递,不需要长期存储
    void runOnAllThreads(const UpdateCb& cb) override; // 更新,UpdateCb产生数据
    void runOnAllThreads(const UpdateCb& cb, const Event::PostCb& complete_cb) override; // 所有线程完成回调complete_cb
  • Slot.set函数是根据slot的槽位号index_设置数据到TLS上,通过dispatcher.post让各自的注册的dispatcher调用InitializeCb来产生数据,并设置到各自的线程TLS上,即把数据同步到各自线程的TLS中存储
  • Slot.get函数是根据slot的槽位号index_从TLS上获取数据,而不是从一个共享的地方获取数据,实现了无锁读取数据

4 SlotAllocator

InstanceImpl实现SlotAllocator接口,先来看一下allocateSlot函数实现:

SlotPtr InstanceImpl::allocateSlot() {
  ASSERT(Thread::MainThread::isMainThread()); // [1] 判断是否主线程调用
  ASSERT(!shutdown_);

  if (free_slot_indexes_.empty()) { // [2] 空闲的Slot槽位号不够
    SlotPtr slot = std::make_unique<SlotImpl>(*this, slots_.size());
    slots_.push_back(slot.get());
    return slot;
  }
  const uint32_t idx = free_slot_indexes_.front(); // [3] 从空闲队首取一个
  free_slot_indexes_.pop_front();
  ASSERT(idx < slots_.size());
  SlotPtr slot = std::make_unique<SlotImpl>(*this, idx);
  slots_[idx] = slot.get();
  return slot;
}

主要代码逻辑:

  • [1] 分配Slot槽位,只能是主线程,分配Slot与给Slot设置数据都在主线程上,所以不需要加锁
  • [2] InstanceImpl维护一个空闲槽位号列表,当空闲不够,则分配一个新的
  • [3] 有空闲槽位,则创建Slot对象,并获取Slot在主线程TLS上存储的对象

再来看一下registerThread函数实现:

void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
  ASSERT(Thread::MainThread::isMainThread()); // [1] 判断是否主线程调用
  ASSERT(!shutdown_);

  if (main_thread) {  // [2] 当是注册主线程时
    main_thread_dispatcher_ = &dispatcher;
    thread_local_data_.dispatcher_ = &dispatcher;
  } else {  // [3] 当是注册工作线程时
    ASSERT(!containsReference(registered_threads_, dispatcher));
    registered_threads_.push_back(dispatcher);
    dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
  }
}

主要代码逻辑:

  • [1] 注册工作线程只能在主线程中完成,即主线程创建工作线程时,由工作线程的WorkerImpl实现者在其构造函数中调用registerThread
  • [2] 主线程也需要注册,因为allocateSlot函数也会调用slot.get(),用于获取主线程产生的数据
  • [3] 工作线程不允许重复注册,并通过dispatcher.post,把工作线程的dispatcher设置到其线程的TLS对象thread_local_data_上

此外,InstanceImpl还提供removeSlot函数,会删除Slot对象,在SlotImpl析构函数中调用removeSlot函数,释放一个空闲槽位。

  • 主线程创建工作线程,在工作线程的构造方法中,调用InstanceImpl.registerThread注册工作线程的dispatcher给InstanceImpl
  • 主线程创建其它对象(xDS/runtime/statflush/admin等),在这些对象的构造方法中,调用InstanceImpl.allocateSlot来产生槽位存储相应的数据
  • 主线程接收取xDS等更新消息时,调用对应的对象持有的slot对象的set函数,把数据同步给所有注册的工作线程

Envoy通过InstanceImpl对象集中管理多个工作线程需要使用的数据,通过对不同类型数据分配Slot对象来存储,所有线程采用TLS方式来持有相同的Slot对象,Slot对象屏蔽了对象的存储与同步。当Slot在设置数据的时候,会同时分发到所有线程执行数据设置逻辑,从而所有线程的Slot对象数据异步更新。天下没有完美的方案,其本质是采用复制来减少对共享数据加锁访问,达到数据最终一致性。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK