28

谈谈一写多读的高效并发控制

 4 years ago
source link: https://mp.weixin.qq.com/s/0bmqZPVRIup24EK_6jBJwA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

问题

问题很简单:实现多线程环境下,一写多读、读多写少的高效并发控制。

无锁双 buffer + 切换等待

“双 buffer”大概是这样工作的:

  1. 事先申请好两个 buffer,一个是只读的(read buffer),一个是读写的(write buffer)。

  2. 读线程只从 read buffer 读取。

  3. 写线程修改 write buffer,然后切换 read buffer 和 write buffer。

伪代码实现如下:

const int kBufCnt = 2;
atomic<int> curr_idx = 0;
T buffers[kBufferCnt];

// Reader 调用
const T* Get() {
  return buffers[curr_idx];
}

// Writer 调用
T* Mutable() {
  return buffers[(curr_idx + 1) % kBufferCnt];
}

// Writer 更新完,调用 Switch 切换 buffer
void Switch() {  // writer使用
   curr_idx = (curr_idx + 1) % kBufferCnt;
}

// Reader: 很多个线程会执行 Read()
void Read() {
  const T* buf = Get();
  // Read something from *buf...
}

// Writer: 只有一个线程会执行 Write()
void Write() {
  T* buf = Mutable();
  // Write something to *buf...
  Switch();
}

这段代码逻辑是有线程安全问题的:reader 获取指针之后,不知道什么时候才会读完。而 writer 也可能随时会 Switch。比如:

  1. 线程 r 获取指针之后,开始读取。

  2. 线程 w 写入后调用 Switch。

  3. 线程 w 再次写入... => 这个时候会写入到线程 r 正在读取的 buffer,出现读写冲突。

改善方案是:writer 在进行 Switch 之后等待一段时间。

void Write() {
  T* buf = Mutable();
  // Write something to *buf...
  Switch();
   // Sleep for a while...
}
  1. 线程 r 获取指针之后,开始读取。

  2. 线程 w 写入后调用 Switch。

  3. 线程 w sleep 一段时间... => 线程 r 保证在这段时间内能完成读取。

  4. 线程 w 再次写入。

但是 sleep 多久能保证 reader 一定读完?常见的做法是 sleep 一个“远超过” reader 正常操作耗时的时间——但这是和具体应用场景相关的,无法确定一个通用的 sleep 时间。而且这里面有一些不确定性,比如:

  1. 如果是有 GC 的语言,reader 可能因为 GC 被卡住“很久”。

  2. Reader 需要读取 buffer 的逻辑可能很复杂,耗时很长。

综上,“双 buffer + 切换后等待一段时间”无法从根本上解决读写冲突的问题。

DoublyBufferedData

brpc 实现了一个一写多读且并发安全的双 buffer —— DoublyBufferedData [1] 。理论上,双 buffer 的读写都必须加锁才能保证绝对并发安全。DoublyBufferedData 的加锁思路比较特别:

读拿一把 thread-local 锁,写需要拿到所有的 thread-local 锁。

这里“写需要拿到所有的 thread-local 锁”的说法其实不太严谨,其实际逻辑是:writer 在进行 Switch 之后,需要一个个获取所有的 thread-local 锁(每个锁获取成功之后立马释放)”。伪代码如下:

// Reader: 很多个线程会执行 Read()
void Read() {
   LockGuard(local_lock);
  const T* buf = Get();
  // Read something from *buf...
}

// Writer: 只有一个线程会执行 Write()
void Write() {
  T* buf = Mutable();
  // Write something to *buf...
  Switch();
  for local_lock of readers {
    LockGuard(local_lock)
  }
}

如果 writer 成功获得 reader 的 local_lock,说明 reader 肯定没在进行读取操作。

直观上,这种加锁方式和直接使用读写锁差别不大:读请求不相互阻塞,读写请求会相互阻塞。相比读写锁,这种加锁方式的好处是,writer 每次只会阻塞一个 reader 线程。

当然,实际效果如何,具体场景测试了才知道。另外,如果 reader 的临界区比较长,writer 可能会被阻塞很久。

shared_ptr

一种通用的替代“双 buffer”的方式是:

  1. 读的时候返回一个 shared_ptr(指向的只读 buffer)。

  2. 写的时候生成一个新的对象,替换原来的。

伪代码:

std::shared_ptr<T> buffer;

// Reader 调用
std::shared_ptr<T> Get() {
  return buffer;
}

// Writer 调用
void Update() {
    buffer.reset(new T);
}

// Reader: 很多个线程会执行 Read()
void Read() {
  std::shared_ptr<T> buf = Get();
  // Read something from *buf...
}

// Writer: 只有一个线程会执行 Write()
void Write() {
  // Write something to...
  Update();
}

这里有几个问题要解决:

  1. std::shared_ptr [2] 的构造函数、 operator= 不是原子的。

  2. 每次 Get 都生成一个 std::shared_ptr<T>对象。

  3. 每次 Update 都要生成一个全新的对象 T。

问题 1

  1. C++20 可以用 std::atomic<std::shared_ptr<T>> 替代 std::shared_ptr<T>。

  2. C++11 可以用  std::atomic_* [3] 函数族对 std::shared_ptr<T> 进行原子操作。

  3. C++11 之前可以用 boost 库的等价实现代替。

问题 2

构造一个 std::shared_ptr   对象的主要开销是:

  1. 两个指针:一个指向实际数据,一个指向控制块。

  2. 引用计数加 1。

个人认为,这样的开销在绝大部分情况下都是可以接受的。

问题 3

在读多写少、更新频率很低的场景下,个人认为这部分的性能开销是可以忽略不计。

使用 shared_ptr 的好处是,writer 和 reader 可能冲突的临界区很小——只有复制或替换 shared_ptr 的时候,与 writer、reader 的具体逻辑无关。

小结

本文介绍了三种一写多读、读多写少的并发控制方式。

  1. 无锁双 buffer + 切换等待。

  2. brpc 的 DoublyBufferedData。

  3. shared_ptr。

“无锁双 buffer + 切换等待”性能是最好的,但是理论上存在线程安全问题。根据具体应用场景调整好 sleep 的时间,可以保证基本不会出现读写冲突的情况。

DoublyBufferedData 是线程安全的,但是每次读都要加一次锁,虽然绝大部分情况下都不会有争用。

shared_ptr 的方式的主要缺点是每次都要构造一个 shared_ptr,但是 shared_ptr 的性能开销是比较可控的。

参考资料

[1]

DoublyBufferedData: https://github.com/apache/incubator-brpc/blob/master/docs/cn/lalb.md#doublybuffereddata

[2]

std::shared_ptr : https://en.cppreference.com/w/cpp/memory/shared_ptr/shared_ptr

[3]

std::atomic_*: https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic

mYFrMzr.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK