3

飞哥讲代码22:C++线程安全队列

 3 years ago
source link: http://lanlingzi.cn/post/technical/2021/0217_code/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

飞哥讲代码22:C++线程安全队列

时间:

2021-02-17

  |   分类: 技术  

  |  

阅读: 5835 字 ~12分钟

本文虽是C++代码讲解,但JDK也有对应的两种实现,学习Java的同学也可阅读一并了解一下。

在多线程的并发模型中,无论是CSP还是Actor模式,都需要借助一个通道来在多个线程间传递消息来通讯。队列在计算机中是非常重要的一种数据结构,队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。

节前定位某一C++开发的部件的性能问题,涉及到阻塞队列唤醒延迟问题。队列是采用ACE提供ACE_Message_Queue,使用场景是单生产/单消费。

ACE_Message_Queue的模型是仿照System V streams提供的队列设施设计的。消息块ACE_Message_Block是消息队列中的固定的对象结构。ACE大量采用了设计模式,代码一层套一层的,这也使得代码变得复杂不容易看懂。ACE_Message_Queue为了支持在多线程或单线程不同场景使用,采用了基于traits策略,通过模板参数来指定是需否要支持多线程。

在Java语言中,JDK中有ArrayBlockingQueue/LinkedBlockingQueue(有锁,有界)与ConcurrentLinkedQueue/LinkedTransferQueue(无锁,无界),开源高性能的Disruptor框架实现了无锁队列。

为了判断ACE提供的队列是否存在调度性能问题(纯入队到出队需要40~50us延迟,并且出现抖动历害),需要进行对比测试,但由于此部件编译环境是suse11,gcc的版本是4.X,不支持c++11,也未采用Boost库(它提供无锁队列)。那我们就来参考一些开源实现来实现自己的多线程安全的队列,顺带学习一下队列背后的机制。

2 有锁队列

队列的底层结构通常是一个链表,C++程序通常对内存极其敏感,动态地创建删除链表节点会申请与释放内存,长时间运行会导致内存空洞问题。肯定不能采用普通链表了,同时考虑队列长度一定要有上限设置,否则过大会出现内存问题。即两点要求:

  • 固定内存分配

上述两点要求很快联想到循环队列,循环队列基于数组实现,借用一张图如下:

circle_qeueue

不同的线程之间通讯,需要完成如下逻辑:

  • 队列满时,消息生产者阻塞,直到队列有空闲时释放并加入新消息
  • 队列空时,消息消费者阻塞,直到队列有消息时释放并取出消息

考虑到不能采用C++11(它提供了更为友好的多线程支持),那只能采用较为底层的pthread库,涉及到两个知识点:

  • 互斥锁(pthread_mutex_t):用于对竞争资源加锁保护
  • 条件变量(pthread_cond_t): 用于等待阻塞与通知唤醒

精减的代码如下,基本等同实现了一个简化版本的ACE_Message_Queue(它的实现也是基于pthread,为了跨平台还支持其它,不是本文重点):

#include <pthread.h>
#include <vector>

typedef unsigned long ulong;
template <typename T>
class BlockingQueue {
private:
    pthread_mutex_t mutex_;
    pthread_cond_t not_full_;
    pthread_cond_t not_empty_;
    volatile ulong   head_;
    volatile ulong   tail_;
    ulong capacity_;
    std::vector<T> queue_;
public:
    explicit BlockingQueue(ulong capacity): 
        capacity_(capacity), 
        queue_(capacity + 1), 
        head_(0), 
        tail_(0) {
        pthread_mutex_init(&mutex_, NULL) ;
        pthread_cond_init(&not_full_, NULL) ;
        pthread_cond_init(&not_empty_, NULL) ;
    }

    ~BlockingQueue() {
        queue_.clear();
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&not_full_);
        pthread_cond_destroy(&not_empty_);
    }

    ulong size() const {
        // 注tail_/head_采用volatile修饰,此方法并没有加锁保护
        return (tail_ - head_ + capacity_) % capacity_;
    }

    void push(const T& e) {
        pthread_mutex_lock(&mutex_);
        while (is_full()) { // [1] 为什么要使用while
            pthread_cond_wait(&not_full_, &mutex_); // [2]为什么要传入互斥锁
        }

        queue_[tail++] = e;
        tail_ %= (capacity_ + 1);
        pthread_cond_signal(&not_empty_); // [3] 采用signal还是broadcast
        pthread_mutex_unlock(&mutex_);
    }

    void pop(T& t) {
        pthread_mutex_lock(&mutex_);
        while (is_empty()) { 
            pthread_cond_wait(&not_empty_, &mutex_); 
        }

        T res = queue_[head_++];
        head_ %= (capacity_ + 1);
        pthread_cond_signal(&not_full_);
        pthread_mutex_unlock(&mutex_);
        t = res;
    }

private:
    inline bool is_empty() {
        return tail_ == head_;
    }

    inline bool is_full() {
        return (head_ + capacity_ - tail_) % (capacity_ + 1) == 0;
    }
};

代码只实现最基本的能力:

  • 采用模板,参数T可参实例化为ACE_Message_Block*
  • capacity_: 指定队列的容量,即上限
  • push: 消息入队列,当队列满阻塞
  • pop: 消息出队列,当队列空阻塞

代码中三处注释说明一下:

  • [1] 为什么要使用while?因为pthread_cond_wait存在虚假唤醒(Spurious Wakeup),spurious wakeup 指的是一次 signal() 调用唤醒两个或以上 wait/waiting 的线程,或者没有调用 signal() 却有线程从 wait() 返回。 pthread_cond_wait()底层是futex系统调用。在linux中,任何慢速的阻塞的系统调用当接收到信号的时候,就会返回-1,并且设置errno为EINTR。在系统调用返回前,用户程序注册的信号处理函数会被调用处理。 所以当虚假唤醒时,采用while来检查是否再次满足条件,从而避免导致问题。
  • [2] 为什么要传入互斥锁?调用pthread_cond_wait()阻塞自己,但是它持有的锁怎么办呢,如果他不归还操作系统,那么其他线程将会无法访问公有资源。pthread_cond_wait()内部实现机制会自动释放互斥锁,释放时机是线程从调用pthread_cond_wait到操作系统把他放在线程等待队列之后。
  • [3] 采用signal还是broadcast? pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。

学习Java的同学,可以参考JDK中ArrayBlockingQueue的源码实现,它同样采用了上述机制。

3 无锁队列

无锁并不是不支持多线程,而是在需要确保操作是线程安全的地方使用CAS(Compare And Swap/Set)等原子操作。CAS是一个CPU级别的指令(X86下对应的是CMPXCHG汇编指令)。工作方式类似于乐观锁,参见Wikipedia的Compare And Swap

C++的Boost也提供了无锁队列实现(单生产者/单消费者),Boost是一个庞大的库,我们先不引入,还是手写一个。无锁队列可以说历史悠久了,无锁队列实现主要有两篇论文可参考:

同样我需要考虑不能动态申请与释放内存,采用循环队列数组实现。C++的CAS也有像JavaJDK提供相应的高阶封装,C++11才提供。GCC编译器支持多个原子操作(GCC Atomic Builtins),我们需要使用两个函数:

  • __sync_bool_compare_and_swap: CAS,用于对原子数据的修改
  • __sync_synchronize: 内存屏障,用于对原子数据的存放与读取

熟悉CAS的同学可能知道,CAS存在ABA的问题(参见WikipediaABA_problem)。Wikipedia上给了一个方法,使用double-CAS,借助另一个辅助计数器,当ABA发生时,虽然值一样,但是计数器不一样了。采用循环数组可以避免ABA问题。

代码如下,各节点以及队列同时记录tail与head,代码改自craflin/LockFreeQueue

#include <memory>
#include <vector>
#include <unistd.h>

typedef unsigned long ulong;
template <typename T> 
class LockFreeQueue {
private:
  struct Node
  {
    T data;
    ulong tail; // [1] 用于判断是否队列是否空,或满
    ulong head;
  };

  std::vector<Node> queue_;
  ulong capacity_;
  ulong capacity_mask_;
  ulong tail_;
  ulong head_;
public:
  explicit LockFreeQueue(ulong capacity): queue_ (capacity) {
    capacity_mask_ = capacity - 1;
    // capacity是2^N,方便 idx & capacity_mask_
    for(unsigned long i = 1; i <= sizeof(void*) * 4; i <<= 1) {
      capacity_mask_ |= capacity_mask_ >> i; 
    }

    capacity_ = capacity_mask_ + 1;  
    queue_.resize(capacity_);
    for(unsigned long i = 0; i < capacity_; ++i) {
      queue_[i].tail = i; // [2] 用于标识是否此下标被占用
      queue_[i].head = -1;
    }

    tail_ = 0;
    head_ = 0;
  }

  ~LockFreeQueue() {
    queue_.clear();
  }
  
  ulong size() const {
    ulong head = load(head_);
    return tail_ - head;
  }
  
  bool push_without_wait(const T& data) {
    Node* node;
    ulong next, tail = tail_;
    while(tail != next) { 
      // [3] 定位尾节点,并判断队列是否已满
      node = &queue_[tail & capacity_mask_];
      if(load(node->tail) != tail) { // 双指针,一个是入队时tail_+ 1, 一个是出队时node->tail = head + capacity_
        return false;
      }

      // [4] 更新tail_ +=1,操作之前的值next与tail相同,可以插入,否则说明多线程修改了,需要循环再次判断
      if((next = compare_and_swap(tail_, tail, tail + 1)) == tail) {
        break;
      }
    }

    // [5] 插入,节点的head指向原tail,即数组的下标,标识有值
    node->data = data;
    store(node->head, tail); 
    return true;
  }

  bool pop_without_wait(T& result) {
    Node* node;
    ulong next, head = head_;
    while(head != next) {
      // [6] 定位首节点,并判断是否为空
      node = &queue_[head & capacity_mask_];
      if(load(node->head) != head) {
        return false;
      }

      // [7] head_ += 1
      if((next = compare_and_swap(head_, head, head + 1)) == head) {
          break;
      }
    }

    result = node->data;
    store(node->tail, (head + capacity_);
    return true;
  }

  void pop(T& t) {
      // 队列空则休眠
      while(!pop_without_wait(t)) {
          usleep(5);
      }
  }

private:
    inline ulong load(const ulong volatile& var) {
        __sync_synchronize(); 
        return var;
    }

    inline void store(ulong volatile& var, ulong value) {
        __sync_synchronize(); 
        var = value;
    }

    inline ulong compare_and_swap(ulong volatile& var, ulong oldVal, ulong newVal) {
        // 返回值是操作之前的值
        return __sync_val_compare_and_swap(&var, oldVal, newVal);
    }
};

代码逻辑结合了数组与链表两种实现思想方式:

  • [1] 数组的元素是一个结构体Node,data存放数据,head/tail指针记录循环数据数组的首尾下标
  • [2] 节点初始化,tail标实为数组下标,表示此节点未被占用
  • [3] 入队:定位尾节点,tail节点的tail值若为tail_值,说明没有被占用,若占用,说明队列已满
  • [4] 循环队列的tail_ + 1,可能存在多线程操作,通过CAS确保只有一个线程在获以得的系统调度时间片上原子操作
  • [5] 通过CAS之后,可以真正插入值,则节点的head指向队列的tail,标识有值

上面的注释还是难以理解,先忽略数组方式,假定有一个堆栈,节点如下定义(参见设计不使用互斥锁的并发数据结构):

struct Node { 
    T data; 
    Node* next;
    Node(const T& d) : data(d), next(0) { } 
} Node;

插入操作:

push(const T& data) { 
    Node *n = new Node(data); 
    while (1) { 
        n->next = top;
        if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS
            break;
        }
    }
}

若是多线程,逻辑如下,采用CAS,关键是需要采用循环不断判断(Re-Try-Loop):

  • 可能有两个或更多线程同时试图把数据压入堆栈, 假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30
  • 而线程 A 先获得了时间片,在 n->next = top 指令结束之后,调度程序暂停了线程 A
  • 现在线程 B 获得了时间片, 能够完成 CAS,把 30 压入堆栈后结束
  • 接着线程 A 恢复执行,显然对于这个线程 *top 和 n->next 不匹配,因为线程 B 修改了 top 位置的内容
  • 代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的)调用 CAS,把 20 压入堆栈后结束

学习Java的同学,可以参考JDK中ConcurrentLinkedQueue源码,它是基于论文2实现,你可以对着它的源码进一步了解一下其机制。

4 等待策略

我们通常会认为,无锁队列会比较有锁队列的性能高,但无锁队列有一个缺点,当队列为空或满时,调用者怎么处理?这涉及到线程的等待,通常的等待策略是采用nanosleep/usleep休眠。但它们与pthread_cond_wait面临的问题是一样的,需要系统调度唤醒。像Linux这种非实时操作系统,只要存在唤醒就会出现调度上的延迟。

事实上,我们在单生产/单消费场景下,对有锁队列与无锁队列进行性能测试:

  • 带usleep的无锁队列 与 有锁队列是在平均唤醒延迟上是差不多的,这取决于系统的调度算法
  • 不带usleep的无锁队列,会导致空闲时CPU空转,因为不存在唤醒,也就不存在唤醒延迟问题。平均延迟的确下降了,但CPU占用也上升了

无锁队列并不是为了性能而生(要看使用的场景),它能解决有锁队列可能代码问题导致出现死锁或崩溃(假设会崩溃)的问题,因为锁是一定需要释放才能被其它的线程使用。无锁队列也可能带来一定的性能问题,因为它的实现需要Re-Try-Loop,若这个Loop中的CAS长期处理失败状态(生产或消费的线程过多),会加重无谓的CPU占用。

注:实际对比验证并不是前面参照开源实现的无锁队列,而是使用另一成熟老产品已实现的无锁队列,代码中部分还存在汇编代码优化(公司资产不便贴出代码)。

除了sleep函数,linux下还提供sched_yield函数,它的作用是让当前线程放弃CPU使用权,同时把这个线程移动到它静态优先级的调度队列末尾,并且从调度队列中找一个合适的线程来运行。如果当前线程的线程优先级列表是最高的,那么该函数立刻返回,当前线程将继续运行。合理地使用sched_yield将有助于减少竞争来提高性能。

参考Java的Disruptor中实现几种等待策略(高性能队列——Disruptor)

  • BlockingWaitStrategy: 加锁,吞吐量和延迟并不重要的场景
  • BusySpinWaitStrategy: 自旋,通过不断重试,减少切换线程导致的系统调用,而降低延迟。
  • PhasedBackoffWaitStrategy: 自旋 + yield + 自定义策略,吞吐量和延迟并不重要的场景
  • SleepingWaitStrategy: 自旋 + yield + sleep, 性能和CPU资源之间有很好的折中。延迟不均匀
  • TimeoutBlockingWaitStrategy: 加锁 + 超时限制, 吞吐量和延迟并不重要的场景
  • YieldingWaitStrategy: 自旋 + yield + 自旋, 性能和CPU资源之间有很好的折中。延迟比较均匀

5 系统调度

前面所提到pthread_cond_wait与usleep,它都是阻塞调用,会进入系统队列等待再次被调度。线程从创建、运行到结束总是处于下面四个状态:

  • 准备:等待可用的CPU资源,其他条件一切准备好。当线程被pthread_create创建时或者阻塞状态结束后就处于准备状态
  • 运行:线程已经获得CPU的使用权,并且正在运行,在多核心的机器中同时存在多个线程正在运行。
  • 阻塞:指一个线程在执行过程中暂停,以等待某个条件的触发。
  • 终止:线程已经从回调函数中返回,或者调用pthread_exit返回,或者被强制终止。

linux内核的三种调度策略:

  • SCHED_OTHER:分时调度策略,系统默认
  • SCHED_FIFO:实时调度策略,先到先服务
  • SCHED_RR:实时调度策略,时间片轮转

在测试中,我们调整了进程的调度策略(SCHED_OTHER -> SCHED_RR),发现有采用有锁队列也在达到我们预期想要的结果(平均延迟下降了,延迟也平稳了)。 注:pthread_create创建线程时可以指定线程调度的策略与优先级,也可以采用chrt命令来更改调度策略与优先级,不过都需要root权限。

这里点一下SCHED_RR,操作系统对待相同优先级别的线程是平等,操作系统以轮叫循环的方式分配时间片给最高优先级的线程。当我们把进程的内的线程调度都设置为SCHED_RR,则分给他们的时间片更小,也更均匀,所以延迟也平稳了。

我们能不能把应用进程设置为实时调度策略?设计实时进程的时候要谨慎,这类进程变得至高无上,也可能轻易托跨整个系统:

  • 任何一个CPU密集型循环在一个实时进程中会继续无止境地运行下去
  • 实时进程会好用系统上一切资源,可能出现让系统其他进程等不到处理时间
  • 如果不会出现死循环,存在调用阻塞,且延迟敏感的程序则可以考虑实时调度

网上找了三篇系统调度文章:

队列是多线程编程的基础设施,各种语言都有现成的实现。文章中的代码应该不是最优的,网上也有很多文章介绍无锁队列的,也有针对不同场景(SPSC/MPMC,Single/Multi Producer/Consumer)的极致优化,并不是无锁就比有锁性能好,还得看场景。这次为了把遇到的性能问题进一步验证对比缩小范围,面向搜索编程与学习,打开了解队列的实现机制。从模仿编写代码中我们可以初步了解到C++中的pthread互斥锁的用法、gcc的atomic内建能力,以及操作系统的调度机制。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK