C++ 多线程
发表于
2021-08-12
更新于
2021-08-14
阅读次数:
7
本文字数:
11k
阅读时长 ≈
10 分钟
rush 项目的时候,有些地方可以并行化,可以借助 C++ 的多线程来加速程序的执行。多线程的基本概念在一年前整过了,这里只是来看一下 C++ 的多线程该怎么写,顺便查漏补缺。
在多线程进入 C++ 标准之前,人们使用 C++ 编写多线程的程序,只能依赖操作系统提供的 API。比如在 Linux 环境下就只能使用 pthread 库实现多线程,因此也一直被诟病。但有了 C++11 的 std::thread
以后,可以通过标准库在语言层面编写多线程程序了,直接的好处就是多线程程序的跨平台移植提供了便利。但是在编译的时候需要注意链接平台相关的线程库,如 g++ demo.cpp -lpthread -o test.o
。
| #include <iostream> #include <thread>
void show_info(std::string str) { std::cout << str << std::endl; }
int main() { std::string s{"hello world"}; std::thread t{show_info, s}; t.join(); return 0; }
|
以上述程序为例,来详细的剖析一下多线程期间到底发生了什么:
- 首先引入头文件
thread
,在这个头文件中,C++ 11 提供了创建、管理线程的类和方法;
- 使用
std::thread
创建线程,并通过列表初始化传入函数名作为构造函数的参数。传入的函数会作为子线程的入口函数,也就是说,当子线程准备就绪之后,就会开始执行这个入口函数。由于函数名表示函数的地址,子线程可以快捷的找到函数地址进而执行。
我们知道,每个程序都有一个入口。当程序被装载到内存,处于系统态完成一些初始化的工作之后,控制权就转交给程序入口,并以此为标志进入用户态,这是一个程序的开始。同样地,线程也需要有「开始」的地方。作为线程入口的函数,就是线程函数,也就是例子中的 show_info。线程函数必须在启动线程之前,就准备好,否则线程去执行什么呢?并在线程初始化后立即执行。1
- 当线程函数返回时,线程也就随之终止了,上述程序中使用
join
衔接方法确保主线程在子线程退出之后才退出,因为主线程会阻塞住,直到该子线程退出为止。如果程序员没有显式的说明线程结束该如何处理,那么线程对象在被销毁时调用的析构函数中,会调用 std::terminate()
函数,销毁当前对象。如果程序写多了,应该不至于犯主线程退出子线程还没结束的低级错误。
detach
前面说过线程的 join
会阻塞调用线程,可以使用 detach
来避免,但一定要做好控制:避免主线程退出子线程还没结束的低级错误。一个 cppreference 官网的例子:
| #include <iostream> #include <chrono> #include <thread>
void independentThread() { std::cout << "Starting concurrent thread.\n"; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "Exiting concurrent thread.\n"; }
void threadCaller() { std::cout << "Starting thread caller.\n"; std::thread t(independentThread); t.detach(); std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Exiting thread caller.\n"; }
int main() { threadCaller(); std::this_thread::sleep_for(std::chrono::seconds(5)); }
// Starting thread caller. // Starting concurrent thread. // Exiting thread caller. // Exiting concurrent thread.
|
可调用类型
在创建线程对象时,传入的参数不仅是可被调用执行的函数,类的对象如果能被调用,也是可以作为线程对象的参数,用于构造函数初始化线程对象。
| #include <iostream> #include <thread>
class Task { private: int cnt; public: explicit Task()=default; explicit Task(int a) : cnt{a} {}; void operator()() { std::cout << this->cnt << std::endl; } };
int main() { std::thread t{Task{1}}; t.join(); return 0; }
|
因为要调用对象,所以重载了 ()
运算符,不然线程不知道去哪个地址执行。此外,构造函数传入的是一个类类型的对象,所以对象会被拷贝到线程的存储空间,而后再开始执行。因此,类必须做好足够的拷贝控制,不然将出现难以调试的 bug,我大概只知道深浅拷贝,等有时间了去看下移动语义。
当然,不重载 ()
运算符,选择类中的函数执行也是可以的,不过需要注意以下两点:
- 必须显式地使用函数指针,作为
std::thread
构造函数的第一个参数;知道执行哪个函数。
- 非静态成员函数的第一个参数,实际上是类实例的指针。所以在创建线程时,需要显式地填入这个参数;知道执行的函数在哪个对象。
| #include <iostream> #include <thread>
class A{ private: int a; public: explicit A()=default; explicit A(int t) : a{t} {}; void show_info() { std::cout << this->a << std::endl; } };
int main() { A a{12}; std::thread t{&A::show_info, &a}; t.join(); return 0; }
|
其他要注意的数据类型
如果子线程函数的参数是引用类型,也需要格外注意。由于子线程的数据是主线程的拷贝,因此子线程函数得到的拷贝实际是「线程存储空间中的拷贝的引用」,并不是主线程中的变量,应该使用 std::ref()
来生成正确的引用绑定,否则会报错。
| #include <iostream> #include <thread>
void show_info(std::string& s) { std::cout << s << std::endl; }
int main() { std::string s{"hello world"}; std::thread t{show_info, std::ref(s)}; t.join(); return 0; }
|
右值引用和移动语义等我后期开坑了。
锁与线程安全
众所周知,写代码的人都学过操作系统,学过操作系统都知道线程同步。线程同步一般有三种机制:互斥量、信号量和条件变量,这三者到底什么已经在这篇博客中详细的描写过了,所以不再多说。不过当时是用 C 语言写的,现在来了解下 C++ 的写法。
mutex
| #include <iostream> #include <thread> #include <mutex> #include <chrono>
int counter = 0; std::mutex mtx;
void increase(int time) { for (int i = 0; i < time; i++) { mtx.lock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); counter++; mtx.unlock(); } }
int main(int argc, char** argv) { std::thread t1(increase, 100); std::thread t2(increase, 100); t1.join(); t2.join(); std::cout << "counter:" << counter << std::endl; return 0; }
|
- 引入
mutex
头文件,创建 std::mutex
对象 mtx
- 对于
mtx
对象,任意时刻最多允许一个线程对其进行上锁,上锁后操作变量,就不会出错
mtx.try_lock()
是尝试上锁,如果上锁不成功,当前线程不阻塞
- 在用完锁之后一定记得释放锁,否则会发生死锁现象
lock_guard
为了避免 mutex
忘记解锁等情况,可以使用 std::lock_guard
,这个类只有构造函数和析构函数,搭配 mutex
使用,在创建这个对象时传入锁,调用锁的 lock
函数;变量销毁会调用析构函数,此时调用锁的 unlock
函数,这也就是传说中的 RAII 机制 2。
如下述程序 3 ,避免一个线程意外退出没来得及释放锁,导致另一个线程无法获取资源而死锁。
| #include <iostream> #include <thread> #include <mutex> #include <chrono> #include <stdexcept>
int counter = 0; std::mutex mtx;
void increase_proxy(int time, int id) { for (int i = 0; i < time; i++) { std::lock_guard<std::mutex> lk(mtx); if (id == 1) { throw std::runtime_error("throw excption...."); } // 当前线程休眠1毫秒 std::this_thread::sleep_for(std::chrono::milliseconds(1)); counter++; } }
void increase(int time, int id) { try { increase_proxy(time, id); } catch (const std::exception& e){ std::cout << "id:" << id << ", " << e.what() << std::endl; } }
int main(int argc, char** argv) { std::thread t1(increase, 100, 1); std::thread t2(increase, 100, 2); t1.join(); t2.join(); std::cout << "counter:" << counter << std::endl; return 0; }
|
lock_guard 与 adopt_lock
还有一种为了防止死锁的方式是一次性申请所有临界资源的互斥量,只有申请到才能进行之后的操作,而 std::lock
提供了这种实现 4。此外,为了防止没有锁定或提前释放互斥量导致危险,可以使用 lock_guard
并传入 std::adopt_lock
,前者保证当变量销毁时释放互斥量,后者保证线程已经上锁成功时不再调用 lock()
函数。
| #include <mutex> #include <thread>
struct bank_account { explicit bank_account(int balance) : balance(balance) {} int balance; std::mutex m; };
void transfer(bank_account &from, bank_account &to, int amount) { // avoid deadlock in case of self transfer if(&from == &to) return; // lock both mutexes without deadlock std::lock(from.m, to.m); // make sure both already-locked mutexes are unlocked at the end of scope std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock); std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);
from.balance -= amount; to.balance += amount; }
int main() { bank_account my_account(100); bank_account your_account(50); std::thread t1(transfer, std::ref(my_account), std::ref(your_account), 10); std::thread t2(transfer, std::ref(your_account), std::ref(my_account), 5); t1.join(); t2.join(); return 0; }
|
除了 adopt_lock
之外,还有 try_to_lock
,defer_lock
,他们都有不同的应用场景,还可以配合使用:
| void print_block (int n, char c) { //unique_lock有多组构造函数, 这里std::defer_lock不设置锁状态 std::unique_lock<std::mutex> my_lock (mtx, std::defer_lock); //尝试加锁, 如果加锁成功则执行 //(适合定时执行一个job的场景, 一个线程执行就可以, 可以用更新时间戳辅助) if(my_lock.try_lock()) { for (int i = 0; i < n; ++i) std::cout << c; std::cout << '\n'; } }
|
其他锁的内容实在是太多了,还有时间锁、递归锁、lock_unique
,读写锁的 shared_lock
等等,等哪天用到在整理这些,这里只整理最简单的,详情可以参考 cppreference 5。
如果按照之前 C
语言的写法,条件变量需要注意的是 wait
那边的判断一定是 while
循环。C
语言风格的代码。
当然,如果按照 C++
的写法,我们发现条件变量的 wait
方法有两个参数,第二个参数用于接受一个变量,如果继续等待,那么那个变量的取值是 false
,如果不需等待,那么那个变量返回 true
。
| #include <iostream> #include <mutex> #include <thread> #include <condition_variable>
std::mutex g_mutex; std::condition_variable g_cond; int g_i = 0; bool g_running = false;
void ThreadFunc(int n) { for (int i = 0; i < n; ++i) { { std::lock_guard<std::mutex> lock(g_mutex); ++g_i; std::cout << "plus g_i by func thread " << std::this_thread::get_id() << std::endl; } }
// 等待被唤醒 std::unique_lock<std::mutex> lock(g_mutex); std::cout << "wait for exit" << std::endl;
g_cond.wait(lock, [=] {return g_running;});
std::cout << "func thread exit" << std::endl; }
int main() { int n = 100; std::thread t1(ThreadFunc, n); std::this_thread::sleep_for(std::chrono::seconds(1)); for (int i = 0; i < n; ++i) { { std::lock_guard<std::mutex> lock(g_mutex); ++g_i; std::cout << "plus g_i by main thread " << std::this_thread::get_id() << std::endl; } }
// 唤醒 { std::lock_guard<std::mutex> lock(g_mutex); g_running = true; g_cond.notify_one(); }
t1.join(); std::cout << "g_i = " << g_i << std::endl; }
|
输出如下:
| plus g_i by func thread 140476623930944 plus g_i by func thread 140476623930944 wait for exit // 表示子线程等待唤醒 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 func thread exit // 子线程被唤醒 g_i = 200
|
因为一开始我也不知道该怎么去写信号量,所以打开了万能的搜索引擎,看到了关于 C++ 不支持信号量这样的东西 6。如果想实现信号量,可以通过互斥量和条件变量来实现。而关于信号量和互斥量的区别,在这篇文章中已经写明了。那么来实现一个信号量的类 7:
| #include <iostream> #include <mutex> #include <thread> #include <vector> #include <condition_variable>
class Semaphore { private: std::mutex mutex_; std::condition_variable cv_; int count_;
public: explicit Semaphore(int count = 0) : count_(count) { }
void Signal() { std::unique_lock<std::mutex> lock(mutex_); ++count_; cv_.notify_one(); }
void Wait() { std::unique_lock<std::mutex> lock(mutex_); // 第二个参数,如果返回 false 继续等待, 如果为 true,可以继续申请资源 cv_.wait(lock, [=] { return count_ > 0; }); --count_; } };
std::string FormatTimeNow(const char* format) { auto now = std::chrono::system_clock::now(); std::time_t now_c = std::chrono::system_clock::to_time_t(now); std::tm* now_tm = std::localtime(&now_c);
char buf[20]; std::strftime(buf, sizeof(buf), format, now_tm); return std::string(buf); }
Semaphore g_semaphore(3); // 防止同时抢占输出资源 std::mutex g_io_mutex;
void Worker() { g_semaphore.Wait();
std::thread::id thread_id = std::this_thread::get_id(); std::string now = FormatTimeNow("%H:%M:%S"); { std::lock_guard<std::mutex> lock(g_io_mutex); std::cout << "Thread " << thread_id << ": wait succeeded" << " (" << now << ")" << std::endl; } // Sleep 1 second to simulate data processing. std::this_thread::sleep_for(std::chrono::seconds(1));
g_semaphore.Signal(); }
int main() { std::vector<std::thread> v; for (std::size_t i = 0; i < 3; ++i) { v.emplace_back(&Worker); } for (std::thread& t : v) { t.join(); } return 0; }
|
- 信号量的值为 3,表示能同时申请 3 个资源
- 当一个线程申请资源后,即执行了
wait
操作,count_
取值递减,表示有一个资源被占用
- 当
count_
取值小于 0 时,调用条件变量的 wait
方法,当先线程等待有了资源被唤醒
- 当一个线程释放资源后,执行了
signal
操作,count_
取值递增,表示有一个资源被释放,并执行 notify_one
方法,即唤醒一个等待的线程