1

C++—线程同步

 3 years ago
source link: https://segmentfault.com/a/1190000040628584
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

C++ 标准库提供了如下线程同步机制:

  • 互斥量(支持超时加锁、递归加锁)
  • 读写锁(共享互斥量,也支持超时加锁)
  • 互斥量包装器(基于 RAII 的思想)
  • 信号量(二元信号量、计数信号量)
  • 栅栏(支持重用)

1. 互斥量

#include <mutex>
  • mutex:提供基础的互斥功能。

    std::mutex mtx;
    
    mtx.lock();                      // locks the mutex, blocks if the mutex is not available
    bool ok = mtx.try_lock();        // tries to lock the mutex, returns if the mutex is not available 
    mtx.unlock();                    // unlocks the mutex 
  • timed_mutex:在 mutex 的基础上增加了超时加锁的功能。

    #include <chrono>
    
    using namespace std::chrono_literals;
    
    std::timed_mutex mtx;
    
    // 以相对时间的形式指定超时时间
    if (mtx.try_lock_for(100ms))
    {
      // 已加锁
    }
    
    // 以绝对时间的形式指定超时时间
    auto now = std::chrono::steady_clock::now();
    if (mtx.try_lock_until(now + 10s))
    {
      // 已加锁
    }
  • recursive_mutex:在 mutex 的基础上增加了递归加锁的功能(此时,lock() 函数可以被同一线程在不释放锁的情况下多次调用)。

    std::recursive_mutex mtx;
    
    void fun1() {
      mtx.lock();
      // ...
      mtx.unlock();
    }
    
    void fun2() {
      mtx.lock();
      // ...
      fun1(); // recursive lock becomes useful here
      mtx.unlock();
    };
  • recursive_timed_mutex:在 timed_mutex 的基础上增加了递归加锁的功能。

2. 读写锁

#include <shared_mutex>
  • shared_mutex

    std::shared_mutex mtx;
    
    // 写者(互斥)
    mtx.lock();
    bool ok = mtx.try_lock();
    mtx.unlock();
    
    // 读者(共享)
    mtx.lock_shared();
    bool ok = mtx.try_lock_shared();
    mtx.unlock_shared();
  • shared_timed_mutex:在 shared_mutex 的基础上增加了超时加锁的功能。

    #include <chrono>
    
    using namespace std::chrono_literals;
    
    std::shared_timed_mutex mtx;
    
    /*********************************** 写者 ***********************************/
    
    // 以相对时间的形式指定超时时间
    if (mtx.try_lock_for(100ms))
    {
      // 已加锁
    }
    
    // 以绝对时间的形式指定超时时间
    auto now = std::chrono::steady_clock::now();
    if (mtx.try_lock_until(now + 10s))
    {
      // 已加锁
    }
    
    
    /*********************************** 读者 ***********************************/
    
    // 以相对时间的形式指定超时时间
    if (mtx.try_lock_shared_for(100ms))
    {
      // 已加锁
    }
    
    // 以绝对时间的形式指定超时时间
    auto now = std::chrono::steady_clock::now();
    if (mtx.try_lock_shared_until(now + 10s))
    {
      // 已加锁
    }

3. 互斥量包装器

  • lock_guard:使用了 RAII 的机制,构造时加锁,析构时解锁。

    #include <mutex>
    
    std::mutex mtx;
    
    void f()
    {
      const std::lock_guard<std::mutex> lock(mtx);
      // ...
      // mtx is automatically released when lock goes out of scope
    }
  • scoped_lock:类似于 lock_guard,但可以管理多个互斥量(可以防止死锁)。

    #include <mutex>
    
    std::mutex mtx1, mtx2;
    
    void f()
    {
      const std::scoped_lock<std::mutex, std::mutex> lock(mtx1, mtx2);
      // ...
      // mtx is automatically released when lock goes out of scope
    }
  • unique_lock:类似于 lock_guard,但支持延迟加锁、超时加锁、递归加锁。

    #include <mutex>
    #include <chrono>
    
    using namespace std::chrono_literals;
    
    std::timed_mutex mtx;
    
    // 构造时加锁
    std::unique_lock<std::timed_mutex> lock(mtx);
    
    // 延迟加锁:先不加锁
    std::unique_lock<std::timed_mutex> lock(mtx, std::defer_lock);
    
    lock.lock();
    bool ok = lock.try_lock();
    lock.unlock();
    
    bool ok = lock.try_lock_for(100ms);
    
    auto now = std::chrono::steady_clock::now();
    bool ok = mtx.try_lock_until(now + 10s);
  • shared_lock:用于管理读写锁中的读者模式(写者模式使用 unique_lock 即可),支持延迟加锁、超时加锁。

    #include <mutex>
    #include <chrono>
    
    using namespace std::chrono_literals;
    
    std::shared_mutex mtx;
    
    // 构造时加锁
    std::shared_lock<std::shared_mutexx> lock(mtx);
    
    // 延迟加锁:先不加锁
    std::shared_lock<std::shared_mutex> lock(mtx, std::defer_lock);
    
    lock.lock();
    bool ok = lock.try_lock();
    lock.unlock();
    
    bool ok = lock.try_lock_for(100ms);
    
    auto now = std::chrono::steady_clock::now();
    bool ok = mtx.try_lock_until(now + 10s);

4. 条件变量

#include <condition_variable>
  • condition_variable:等待时只能使用 std::unique_lock<std::mutex>

    std::mutex mtx;
    std::condition_variable cv;
    bool ready = false;
    
    void worker_thread()
    {
      /*
      等待,直到 ready 为 true,等价于
      while (!ready)
      {
          cv.wait(lock);
      }
      */
      std::unique_lock<std::mutex> lock(mtx);
      cv.wait(lock, []{return ready;});
     
      // after the wait, we own the lock.
      // ...
     
      // 在唤醒之前解锁,以免被唤醒的线程仍阻塞于互斥量
      lock.unlock();
      cv.notify_one();
    }
    #include <chrono>
    
    using namespace std::chrono_literals;
    
    std::mutex mtx;
    std::condition_variable cv;
    int i;
    
    std::unique_lock<std::mutex> lock(mtx);
    
    // 超时等待:相对时间
    if(cv.wait_for(lock, 100ms, []{return i == 1;}))
    {
      // 条件满足 ...
    }
    
    // 超时等待:绝对时间
    auto now = std::chrono::system_clock::now();
    if(cv.wait_until(lock, now + 100ms, [](){return i == 1;}))
    {
      // 条件满足 ...
    }
    
    // 唤醒所有等待线程
    cv.notify_all();
  • condition_variable_any:与 condition_variable 类似,但可以结合其他锁使用。

5. 信号量

#include <semaphore>
  • binary_semaphore:二元信号量,其实就是计数信号量模板的特化(计数为 1)。

    std::binary_semaphore sem;
    
    sem.acquire();        // decrements the internal counter or blocks until it can
    sem.release();        // increments the internal counter and unblocks acquirers
    
    book ok = sem.try_acquire();    // tries to decrement the internal counter without blocking
    
    // 超时 acquire:相对时间
    bool ok = sem.try_acquire_for(100ms);
    
    // 超时 acquire:绝对时间
    auto now = std::chrono::system_clock::now();
    bool ok = sem.try_acquire_until(now + 100ms);

    注:ok 为 true 表示 acquire 成功。

  • counting_semaphore:计数信号量,支持的操作同上。

    std::counting_semaphore sem(4);
  • latch:其内部维护着一个计数器,当计数不为 0 时,所有参与者(线程)都将阻塞在等待操作处,计数为 0 时,解除阻塞。计数器不可重置或增加,故它是一次性的,不可重用。

    #include <latch>
    
    std::latch work_done(4);
    
    work_done.count_down();             // decrements the counter in a non-blocking manner
    work_done.wait();                   // blocks until the counter reaches zero
    bool ok = work_done.try_wait();     // tests if the internal counter equals zero
    work_done.arrive_and_wait();        // decrements the counter and blocks until it reaches zero
  • barrier:类似于 latch,它会阻塞线程直到所有参与者线程都到达一个同步点,但它是可重用的。
    一个 barrier 的生命周期包含多个阶段,每个阶段都定义了一个同步点。一个 barrier 阶段包含:

    • 期望计数(设创建时指定的计数为 n),当期望计数不为 0 时,参与者将阻塞于等待操作处;
    • 当期望计数为 0 时,会执行创建 barrier 时指定的阶段完成步骤,然后解除阻塞所有阻塞于同步点的参与者线程。
    • 当阶段完成步骤执行完成后,会重置期望计数为 n - 调用arrive_and_drop()的次数,然后开始下一个阶段。
    #include <barrier>
    
    auto on_completion = []() noexcept { 
      // locking not needed here
      // ...
    };
    
    std::barrier sync_point(4, on_completion);
    
    sync_point.arrive();            // arrives at barrier and decrements the expected count 
    sync_point.wait();              // blocks at the phase synchronization point until its phase completion step is run
    sync_point.arrive_and_wait();    // arrives at barrier and decrements the expected count by one, then blocks until current phase completes
    sync_point.arrive_and_drop();    // decrements both the initial expected count for subsequent phases and the expected count for current phase by one

7. 调用一次

确保某个操作只被执行一次(成功执行才算),即使是多线程环境下也确保只执行一次。

#include <iostream>
#include <thread>
#include <mutex>

std::once_flag flag;

void may_throw_function(bool do_throw)
{
    if (do_throw)
    {
        std::cout << "throw: call_once will retry\n"; // this may appear more than once
        throw std::exception();
    }
    std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
}

void do_once(bool do_throw)
{
    try
    {
        std::call_once(flag, may_throw_function, do_throw);
    }
    catch (...)
    {}
}

int main()
{
    std::thread t1(do_once, true);
    std::thread t2(do_once, true);
    std::thread t3(do_once, false);
    std::thread t4(do_once, true);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}
throw: call_once will retry
throw: call_once will retry
Didn't throw, call_once will not attempt again

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK