5

c++标准库:并发(三) —— 锁 mutex 和 lock

 2 years ago
source link: https://www.zoucz.com/blog/2021/06/09/2ae59bd0-c87d-11eb-9fe7-534bbf9f369d/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

c++标准库:并发(三) —— 锁 mutex 和 lock

作者: 邹成卓 2021-06-09 01:15:48 分类: c++

标签: c++

评论数:

接上篇 c++标准库:c++标准库:并发(二) —— 低层接口 thread 和 promise 我要说话

在(一)和(二)中,说了一些方法可以在c++中做到并发执行任务,然而并发执行任务时某些情况可能会导致未知错误: 我要说话

  • 未同步化的数据:多个并发同时读,不知道哪个先来
  • 写到一半的数据:另一个线程读取到写了一半的数据
  • 重排的语句:语句执行顺序被编译器重新安排,在多线程中造成的异常。

这三个场景 【很关键】 ,是此篇与后面几篇为什么要做线程同步的基础问题来源。 我要说话

其中第三点有个说明,是因为编译器在保证单线程中执行的结果不受影响的前提下,允许对语句的执行顺序进行重排优化。例如
我要说话

data = 42;
readyFlag = true;

我要说话

在单线程中执行时,真实执行的顺序可能是
我要说话

readyFlag = true;
data = 42;

我要说话

所以当另一个线程通过
我要说话

while(!readyFlag) {}
foo(data);

我要说话

来访问data的值时,可能拿到的是 data=42 被执行之前的值。因为赋值顺序的改变不影响他们在各自单线程中的结果,编译器认为执行语句可以重排。 我要说话

标准库提供了一些api,来保证任务的执行顺序是可控的,或者是原子性的,来规避上面的问题。 我要说话

mutex互斥锁

首先定义一个 std::mutex 互斥锁对象,和两个函数。其中一个函数直接执行,另一个函数在执行开始前加互斥锁,执行结束后释放互斥锁。
我要说话

std::mutex mutex;
void run(const std::string msg) {
int i = 5;
while (i>0)
{
i--;
std::cout << "thread id:" << std::this_thread::get_id() << " msg:" << msg << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 1000));
}
}

void runWithLock(const std::string msg){
mutex.lock();
run(msg);
mutex.unlock();
}

我要说话

测试不用mutex和用mutex的函数在独立线程中并发执行,t1和t2交替输出,t3和t4顺序输出。我要说话

void TestMutex(){

std::thread t1(run, "test mutex1");
std::thread t2(run, "test mutex2");
t1.join();
t2.join();
std::cout << "no mutex job done!" << std::endl;
std::thread t3(runWithLock, "test mutex3");
std::thread t4(runWithLock, "test mutex4");
t3.join();
t4.join();
std::cout << " mutex job done!" << std::endl;
}

std::lock_guard自动管理生命周期

上面的例子中可以看到,加锁的代码块执行完毕后需要手动释放互斥锁,当代码在执行过程中抛出异常后,可能锁一直都不会被释放。
为了避免编码上的失误,可以使用 std::lock_guard 来自动管理锁的生命周期,在析构时自动unlock。 我要说话

void runWithLockGuard(const std::string msg){
//lock_guard可以在局部作用于内自动管理对mutex的lock,结束后自动释放
run(msg);
{
std::lock_guard<std::mutex> lg(mutex);
run(msg);
}
}
void TestLockGuard(){
std::thread t1(runWithLockGuard, "test mutex1");
std::thread t2(runWithLockGuard, "test mutex2");
t1.join();
t2.join();
std::cout << " lock_guard job done!" << std::endl;
}

std::recursive_mutex 递归锁 — 防止同一线程死锁

下面的例子中,获取了一次锁,然后在获取锁后尝试第二次获取锁,就会造成死锁的情况。
可以使用std::recursive_mutex,允许同一个线程内对锁多次lock。我要说话

std::recursive_mutex r_mutex;
void runWithRecursiveMutex(){
std::lock_guard<std::recursive_mutex> lg(r_mutex);
std::cout << "runWithRecursiveMutex" << std::endl;
}

//recursive_mutex 递归锁,解决递归死锁问题
void TestRecursiveMutex(){

// 这里存在对mutex的递归lock,当runWithLockGuard运行到需要获取锁的地方时,会形成死锁
// std::lock_guard<std::mutex> lg(mutex);
// std::cout << "test no recursive mutex" << std::endl;
// runWithLockGuard("test mutex");


//rescursive_mutex,允许 !同一线程! 对rescursive_mutex多次lock,在最后一次的unlcok处释放
// 跨线程还是会死锁的
std::lock_guard<std::recursive_mutex> lg(r_mutex);
std::cout << "test recursive mutex" << std::endl;
runWithRecursiveMutex();
// 这里即使用rescursive_mutex,也会形成死锁,因为跨线程了
// std::thread t1(runWithRecursiveMutex);
// t1.join();
std::cout << "job done!" << std::endl;
}

try_lock / try_lock_for / try_lock_until 控制获取锁失败的情况

可以使用 try_lock / try_lock_for 来处理获取锁失败 / 一段时间内获取锁失败的情况。 我要说话

  • try_lock 成功了,还想使用 lock_guard 来管理生命周期的话,需要给 lock_guard 传入一个 std::adopt_lock,告诉 lock_guard 不要重复获取锁了。
  • try_lock 可能存在 “假性失败” 的情况,可以考虑用重试来处理。
//try_lock 和 lock_for,控制获取锁失败的情况
void TestTryLockAndLockFor(){
//加锁失败放弃执行
{
mutex.lock();
if(mutex.try_lock() == true){
run("try_lock success");
} else {
std::cout << "try_lock fail" << std::endl;
}
mutex.unlock();
}

//加锁成功,用std::adopt_lock使用lock_guard
{
if(mutex.try_lock() == true){
//这里因为在try_lock的时候已经lock了,所以用lock_guard要传入一个adopt_lock
std::lock_guard<std::mutex>(mutex, std::adopt_lock);
run("try_lock success");
//std::cout << "try_lock success" << std::endl;
} else {
std::cout << "try_lock fail" << std::endl;
}
}

// try_lock_for/try_lock_until等待一定时间,需要使用 timed_mutex或者 rescursive_timed_mutex
// 这里要注意的是 gcc4.8下运行此代码, t_mutex.try_lock_for总是会立即返回失败,不符合预期中的等待30s
// 据说高版本gcc上才能比较好的支持c++11的特性 https://bbs.csdn.net/topics/397728258
{
std::thread t(runWithTimedLock, "lock t_mutex");
//sleep 1s,确保lock线程先起来,好让waitfor真的进入等待状态
std::this_thread::sleep_for(std::chrono::seconds(1));
std::thread t2([]{
if(t_mutex.try_lock_for(std::chrono::seconds(30)) == true){
std::lock_guard<std::timed_mutex> lg(t_mutex, std::adopt_lock);
run("try_lock_for success");
} else {
std::cout << "try_lock_for fail" << std::endl;
}
});
t.join();
t2.join();
}
}

同时锁定多个

有时候需要在一个代码块里边获取多个锁后执行逻辑 我要说话

std::mutex m1, m2;
// 同时锁定多个mutex
void TestMultiMutex(){
//上面的lock、try_lock,都能应用于多个mutex,只有都成功时才一起lock所有的mutex。其中try_lock在都lock成功时返回-1,若不成功,则哪个不成功就返回其索引
std::lock(m1, m2);
std::lock_guard<std::mutex>(m1, std::adopt_lock);
std::lock_guard<std::mutex>(m2, std::adopt_lock);
//Do Something
//使用try_lock的话,会返回第一个获取锁失败的index,如果全部成功,会返回-1
int failIndex = std::try_lock(m1, m2);
if(failIndex < 0){
std::lock_guard<std::mutex>(m1, std::adopt_lock);
std::lock_guard<std::mutex>(m2, std::adopt_lock);
} else {
// failIndex 就是第一个失败的mutex的索引
// 这里不需要手动释放锁定成功的mutex,因为一旦有锁定失败的,之前锁定成功的也会自动释放
}
}

std::unique_lock

std::unique_lock作用与std::lock_guard类似,也会在生命周期结束时自动unlock。
区别是可控性更强: 我要说话

  • 生命周期期间不一定lock住了一个mutex,可以用owns_lock(),或者bool()来查询当前是否锁住了
  • 多了一个构造函数:可以与try_to_lock配合使用, std::unique_lock< std::mutex> ul(mutex, std::try_to_lock),尝试获取lock,但是不阻塞
  • 多了一个构造函数:可以传入一个时间,类似try_lock_for,尝试在一段时间内锁定(待验证,也许gcc4.8不行)std::unique_lock< std::timed_mutex> ul(t_mutex, std::chrono::seconds(1))
  • 多了一个构造函数:可以传入defer_lock,初始化但不锁住,后续手动锁,std::unique_lock< std::mutex> ul(mutex, std::defer_lock), 后续ul.lock()
  • 生命周期结束时,若是获得锁的状态,则自动释放锁;若没有获得,则啥都不做
  • 提供release来释放,或者转移lock,此函数返回一个指向关联的mutex,并且unlock它;与之相对,ul.mutex()返回mutex的指针,但是不unlock
std::mutex readyFlagMutex;
bool readyFlag;
void prepareThread(){
//做某种准备工作,花费5秒钟
std::this_thread::sleep_for(std::chrono::seconds(5));
//加锁然后将readyFlag设置为true,代表已经准备好了
std::lock_guard<std::mutex> lg(readyFlagMutex);
readyFlag = true;
//设置完毕后,lock_guard生命周期结束,自动释放锁
}

void waitForPrepareThread(){
{
//在此作用域内,声明一个unique_lock,阻塞并尝试获取锁
std::unique_lock<std::mutex> ul(readyFlagMutex);
//死循环等待readyFlag变为true
while (!readyFlag)
{
//如果还没有变为true,就sleep 500ms,把cpu让出来让程序去做准备工作或者其它事情
//这里注意在等待之前,一定要把锁释放掉,不然prepare thread可能因为此处占着锁而没办法修改readyFlag,造成死锁
ul.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << " wait for prepare... " << std::endl;
//sleep完成后,再次加锁判断,直到readyFlag变为true
ul.lock();
}
//注意:当readyFlag变为true时,ul到这里还是拥有锁的状态,随着它生命周期结束,锁会被自动释放
}
std::cout << " prepare job done, run something! " << std::endl;
};

void TestUniqueLock(){
std::thread t1(prepareThread);
std::thread t2(waitForPrepareThread);
t1.join();
t2.join();
}

上面实现了一个利用unique_lock拥有一个不一定处于锁住状态的互斥锁的特性,实现了一个线程等待另一个线程的操作。
但是这种实现还是听不优雅的,主要在于这个等待sleep的时间,步子小了太娘炮,步子大了扯着蛋。
后面会提到的基于事件驱动的 condition_variable 才是专业做这种事情的。我要说话

std::once_flag 只执行一次

多线程中,用下面的方式来判断数据初始化,可能因并发读而引起未知异常
我要说话

bool initialized = false;
if(!initialized){
initialized();
initialized = true;
}

我要说话

可以用来 std::once_flag 控制多线程环境下的数据初始化,避免出现数据并发读冲突问题。
我要说话

std::once_flag oc;
std::call_once(oc, callable);

我要说话

本文链接:https://www.zoucz.com/blog/2021/06/09/2ae59bd0-c87d-11eb-9fe7-534bbf9f369d/我要说话

☞ 参与评论我要说话


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK