0

C++ 多线程

 2 years ago
source link: https://yuanjie-ai.github.io/2021/08/12/cpp-thread-safe/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

C++ 多线程

发表于

2021-08-12 更新于 2021-08-14

阅读次数: 7 本文字数: 11k 阅读时长 ≈ 10 分钟

rush 项目的时候,有些地方可以并行化,可以借助 C++ 的多线程来加速程序的执行。多线程的基本概念在一年前整过了,这里只是来看一下 C++ 的多线程该怎么写,顺便查漏补缺。

在多线程进入 C++ 标准之前,人们使用 C++ 编写多线程的程序,只能依赖操作系统提供的 API。比如在 Linux 环境下就只能使用 pthread 库实现多线程,因此也一直被诟病。但有了 C++11 的 std::thread 以后,可以通过标准库在语言层面编写多线程程序了,直接的好处就是多线程程序的跨平台移植提供了便利。但是在编译的时候需要注意链接平台相关的线程库,如 g++ demo.cpp -lpthread -o test.o

#include <iostream>
#include <thread>

void show_info(std::string str) {
std::cout << str << std::endl;
}

int main() {
std::string s{"hello world"};
std::thread t{show_info, s};
t.join();
return 0;
}

以上述程序为例,来详细的剖析一下多线程期间到底发生了什么:

  1. 首先引入头文件 thread,在这个头文件中,C++ 11 提供了创建、管理线程的类和方法;
  2. 使用 std::thread 创建线程,并通过列表初始化传入函数名作为构造函数的参数。传入的函数会作为子线程的入口函数,也就是说,当子线程准备就绪之后,就会开始执行这个入口函数。由于函数名表示函数的地址,子线程可以快捷的找到函数地址进而执行。

    我们知道,每个程序都有一个入口。当程序被装载到内存,处于系统态完成一些初始化的工作之后,控制权就转交给程序入口,并以此为标志进入用户态,这是一个程序的开始。同样地,线程也需要有「开始」的地方。作为线程入口的函数,就是线程函数,也就是例子中的 show_info。线程函数必须在启动线程之前,就准备好,否则线程去执行什么呢?并在线程初始化后立即执行。1

  3. 当线程函数返回时,线程也就随之终止了,上述程序中使用 join 衔接方法确保主线程在子线程退出之后才退出,因为主线程会阻塞住,直到该子线程退出为止。如果程序员没有显式的说明线程结束该如何处理,那么线程对象在被销毁时调用的析构函数中,会调用 std::terminate() 函数,销毁当前对象。如果程序写多了,应该不至于犯主线程退出子线程还没结束的低级错误。

detach

前面说过线程的 join 会阻塞调用线程,可以使用 detach 来避免,但一定要做好控制:避免主线程退出子线程还没结束的低级错误。一个 cppreference 官网的例子:

#include <iostream>
#include <chrono>
#include <thread>

void independentThread()
{
std::cout << "Starting concurrent thread.\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Exiting concurrent thread.\n";
}

void threadCaller()
{
std::cout << "Starting thread caller.\n";
std::thread t(independentThread);
t.detach();
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Exiting thread caller.\n";
}

int main()
{
threadCaller();
std::this_thread::sleep_for(std::chrono::seconds(5));
}

// Starting thread caller.
// Starting concurrent thread.
// Exiting thread caller.
// Exiting concurrent thread.

可调用类型

在创建线程对象时,传入的参数不仅是可被调用执行的函数,类的对象如果能被调用,也是可以作为线程对象的参数,用于构造函数初始化线程对象。

#include <iostream>
#include <thread>

class Task {
private:
int cnt;
public:
explicit Task()=default;
explicit Task(int a) : cnt{a} {};
void operator()() {
std::cout << this->cnt << std::endl;
}
};

int main() {
std::thread t{Task{1}};
t.join();
return 0;
}

因为要调用对象,所以重载了 () 运算符,不然线程不知道去哪个地址执行。此外,构造函数传入的是一个类类型的对象,所以对象会被拷贝到线程的存储空间,而后再开始执行。因此,类必须做好足够的拷贝控制,不然将出现难以调试的 bug,我大概只知道深浅拷贝,等有时间了去看下移动语义

当然,不重载 () 运算符,选择类中的函数执行也是可以的,不过需要注意以下两点:

  • 必须显式地使用函数指针,作为 std::thread 构造函数的第一个参数;知道执行哪个函数。
  • 非静态成员函数的第一个参数,实际上是类实例的指针。所以在创建线程时,需要显式地填入这个参数;知道执行的函数在哪个对象。
#include <iostream>
#include <thread>

class A{
private:
int a;
public:
explicit A()=default;
explicit A(int t) : a{t} {};
void show_info() {
std::cout << this->a << std::endl;
}
};

int main() {
A a{12};
std::thread t{&A::show_info, &a};
t.join();
return 0;
}

其他要注意的数据类型

如果子线程函数的参数是引用类型,也需要格外注意。由于子线程的数据是主线程的拷贝,因此子线程函数得到的拷贝实际是「线程存储空间中的拷贝的引用」,并不是主线程中的变量,应该使用 std::ref() 来生成正确的引用绑定,否则会报错。

#include <iostream>
#include <thread>

void show_info(std::string& s) {
std::cout << s << std::endl;
}

int main() {
std::string s{"hello world"};
std::thread t{show_info, std::ref(s)};
t.join();
return 0;
}

右值引用和移动语义等我后期开坑了。

锁与线程安全

众所周知,写代码的人都学过操作系统,学过操作系统都知道线程同步。线程同步一般有三种机制:互斥量、信号量和条件变量,这三者到底什么已经在这篇博客中详细的描写过了,所以不再多说。不过当时是用 C 语言写的,现在来了解下 C++ 的写法。

mutex

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

int counter = 0;
std::mutex mtx;

void increase(int time) {
for (int i = 0; i < time; i++) {
mtx.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter++;
mtx.unlock();
}
}

int main(int argc, char** argv) {
std::thread t1(increase, 100);
std::thread t2(increase, 100);
t1.join();
t2.join();
std::cout << "counter:" << counter << std::endl;
return 0;
}
  1. 引入 mutex 头文件,创建 std::mutex 对象 mtx
  2. 对于 mtx 对象,任意时刻最多允许一个线程对其进行上锁,上锁后操作变量,就不会出错
  3. mtx.try_lock() 是尝试上锁,如果上锁不成功,当前线程不阻塞
  4. 在用完锁之后一定记得释放锁,否则会发生死锁现象

lock_guard

为了避免 mutex 忘记解锁等情况,可以使用 std::lock_guard这个类只有构造函数和析构函数,搭配 mutex 使用,在创建这个对象时传入锁,调用锁的 lock 函数;变量销毁会调用析构函数,此时调用锁的 unlock 函数,这也就是传说中的 RAII 机制 2

如下述程序 3 ,避免一个线程意外退出没来得及释放锁,导致另一个线程无法获取资源而死锁。

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <stdexcept>

int counter = 0;
std::mutex mtx;

void increase_proxy(int time, int id) {
for (int i = 0; i < time; i++) {
std::lock_guard<std::mutex> lk(mtx);
if (id == 1) {
throw std::runtime_error("throw excption....");
}
// 当前线程休眠1毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter++;
}
}

void increase(int time, int id) {
try {
increase_proxy(time, id);
}
catch (const std::exception& e){
std::cout << "id:" << id << ", " << e.what() << std::endl;
}
}

int main(int argc, char** argv) {
std::thread t1(increase, 100, 1);
std::thread t2(increase, 100, 2);
t1.join();
t2.join();
std::cout << "counter:" << counter << std::endl;
return 0;
}

lock_guard 与 adopt_lock

还有一种为了防止死锁的方式是一次性申请所有临界资源的互斥量,只有申请到才能进行之后的操作,而 std::lock 提供了这种实现 4。此外,为了防止没有锁定或提前释放互斥量导致危险,可以使用 lock_guard 并传入 std::adopt_lock,前者保证当变量销毁时释放互斥量,后者保证线程已经上锁成功时不再调用 lock() 函数。

#include <mutex>
#include <thread>

struct bank_account {
explicit bank_account(int balance) : balance(balance) {}
int balance;
std::mutex m;
};

void transfer(bank_account &from, bank_account &to, int amount) {
// avoid deadlock in case of self transfer
if(&from == &to)
return;
// lock both mutexes without deadlock
std::lock(from.m, to.m);
// make sure both already-locked mutexes are unlocked at the end of scope
std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);
std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);

from.balance -= amount;
to.balance += amount;
}

int main() {
bank_account my_account(100);
bank_account your_account(50);
std::thread t1(transfer, std::ref(my_account), std::ref(your_account), 10);
std::thread t2(transfer, std::ref(your_account), std::ref(my_account), 5);
t1.join();
t2.join();
return 0;
}

除了 adopt_lock 之外,还有 try_to_lockdefer_lock,他们都有不同的应用场景,还可以配合使用:

void print_block (int n, char c) {
//unique_lock有多组构造函数, 这里std::defer_lock不设置锁状态
std::unique_lock<std::mutex> my_lock (mtx, std::defer_lock);
//尝试加锁, 如果加锁成功则执行
//(适合定时执行一个job的场景, 一个线程执行就可以, 可以用更新时间戳辅助)
if(my_lock.try_lock()) {
for (int i = 0; i < n; ++i)
std::cout << c;
std::cout << '\n';
}
}

其他锁的内容实在是太多了,还有时间锁、递归锁、lock_unique,读写锁的 shared_lock 等等,等哪天用到在整理这些,这里只整理最简单的,详情可以参考 cppreference 5

如果按照之前 C 语言的写法,条件变量需要注意的是 wait 那边的判断一定是 while 循环。C 语言风格的代码

当然,如果按照 C++ 的写法,我们发现条件变量的 wait 方法有两个参数,第二个参数用于接受一个变量,如果继续等待,那么那个变量的取值是 false,如果不需等待,那么那个变量返回 true

#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

std::mutex g_mutex;
std::condition_variable g_cond;
int g_i = 0;
bool g_running = false;

void ThreadFunc(int n) {
for (int i = 0; i < n; ++i) {
{
std::lock_guard<std::mutex> lock(g_mutex);
++g_i;
std::cout << "plus g_i by func thread "
<< std::this_thread::get_id() << std::endl;
}
}

// 等待被唤醒
std::unique_lock<std::mutex> lock(g_mutex);
std::cout << "wait for exit" << std::endl;

g_cond.wait(lock, [=] {return g_running;});

std::cout << "func thread exit" << std::endl;
}

int main() {
int n = 100;
std::thread t1(ThreadFunc, n);
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < n; ++i) {
{
std::lock_guard<std::mutex> lock(g_mutex);
++g_i;
std::cout << "plus g_i by main thread "
<< std::this_thread::get_id() << std::endl;
}
}

// 唤醒
{
std::lock_guard<std::mutex> lock(g_mutex);
g_running = true;
g_cond.notify_one();
}

t1.join();
std::cout << "g_i = " << g_i << std::endl;
}

输出如下:

plus g_i by func thread 140476623930944
plus g_i by func thread 140476623930944
wait for exit // 表示子线程等待唤醒
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
func thread exit // 子线程被唤醒
g_i = 200

因为一开始我也不知道该怎么去写信号量,所以打开了万能的搜索引擎,看到了关于 C++ 不支持信号量这样的东西 6。如果想实现信号量,可以通过互斥量和条件变量来实现。而关于信号量和互斥量的区别,在这篇文章中已经写明了。那么来实现一个信号量的类 7

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <condition_variable>

class Semaphore {
private:
std::mutex mutex_;
std::condition_variable cv_;
int count_;

public:
explicit Semaphore(int count = 0) : count_(count) {
}

void Signal() {
std::unique_lock<std::mutex> lock(mutex_);
++count_;
cv_.notify_one();
}

void Wait() {
std::unique_lock<std::mutex> lock(mutex_);
// 第二个参数,如果返回 false 继续等待, 如果为 true,可以继续申请资源
cv_.wait(lock, [=] { return count_ > 0; });
--count_;
}
};

std::string FormatTimeNow(const char* format) {
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::tm* now_tm = std::localtime(&now_c);

char buf[20];
std::strftime(buf, sizeof(buf), format, now_tm);
return std::string(buf);
}

Semaphore g_semaphore(3);
// 防止同时抢占输出资源
std::mutex g_io_mutex;

void Worker() {
g_semaphore.Wait();

std::thread::id thread_id = std::this_thread::get_id();
std::string now = FormatTimeNow("%H:%M:%S");
{
std::lock_guard<std::mutex> lock(g_io_mutex);
std::cout << "Thread " << thread_id << ": wait succeeded"
<< " (" << now << ")" << std::endl;
}
// Sleep 1 second to simulate data processing.
std::this_thread::sleep_for(std::chrono::seconds(1));

g_semaphore.Signal();
}

int main() {
std::vector<std::thread> v;
for (std::size_t i = 0; i < 3; ++i) {
v.emplace_back(&Worker);
}
for (std::thread& t : v) {
t.join();
}
return 0;
}
  • 信号量的值为 3,表示能同时申请 3 个资源
  • 当一个线程申请资源后,即执行了 wait 操作,count_ 取值递减,表示有一个资源被占用
  • count_ 取值小于 0 时,调用条件变量的 wait 方法,当先线程等待有了资源被唤醒
  • 当一个线程释放资源后,执行了 signal 操作,count_ 取值递增,表示有一个资源被释放,并执行 notify_one 方法,即唤醒一个等待的线程

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK