# C++多线程
# bind函数
有时候需要根据需要对函数的参数进行定制,这时就需要使用functional.h
头文件中提供的bind
函数,其作用是给函数设定默认值,并使用placeholder
设置函数中的参数,然后返回一个函数对象。
#include <iostream>
#include <functional>
void substract(int a, int b, int c)
{
std::cout << a - b - c << std::endl;
}
int main(int argc, char **argv)
{
// 方式1
auto f = std::bind(substract,
std::placeholders::_1,
2,
std::placeholders::_2);
f(10, 12); // -4
// 方式2
auto ff = std::bind(substract,
std::placeholders::_2,
2,
std::placeholders::_1);
ff(10, 12); // 0
return 0;
}
在以上示例程序中,
- 方式1,
_1
对应的是函数substract
的参数a
,参数b
的默认值是2,_2
对应的是函数substract
的参数b
- 方式1,
_1
对应的是函数substract
的参数c
,参数b
的默认值是2,_2
对应的是函数substract
的参数c
此外,当对类的成员函数使用bind
函数,进行参数定指时,第一个参数需要传入指向对象的指针,如在类成员函数中传入this
,在类外传入对象指针,以使得bind
后的函数还能访问类的成员。
#include <iostream>
#include <functional>
class A {
public:
int value = 12;
void substract(int a, int b, int c)
{
std::cout << a - b - c - this->value << std::endl;
}
};
int main(int argc, char **argv)
{
A a;
auto f = std::bind(&A::substract,
&a,
std::placeholders::_1,
2,
std::placeholders::_2);
f(10, 12); // -16
auto ff = std::bind(&A::substract,
&a,
std::placeholders::_2,
2,
std::placeholders::_1);
ff(10, 12); // -12
return 0;
}
# std::thread
thread
类表示单个可执行线程,通过多个线程可以使几个函数同时运行。
线程在构造关联的线程对象后,从作为构造函数参数提供的顶级函数开始,立即执行。
顶级函数可以通过共享变量或std::promise
和主线程之间交换返回值,这可能需要使用std::mutex
或std::atomic
来做线程同步。
使用join
函数,可以在主线程结束前阻塞主线程以等待子线程先退出。
但是,使用detach
后会结束主线程和子线程之间的关联关系,使得子线程不再joinble
,不能使用join()
阻塞主线程。
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
class A {
public:
A() {
t1 = std::thread(std::bind(&A::increment, this));
}
int value = 12;
void increment() {
int v = 0;
for(int i=0; i< 1000; i++) {
v += i;
std::cout << v << std::endl;
usleep(1000000);
}
}
~A() {
if(t1.joinable())
t1.join();
}
private:
std::thread t1;
};
int main(int argc, char **argv)
{
A a;
return 0;
}
注意,上述代码,需要在类的析构函数中调用join
函数,否则,程序主线程结束时会杀死子线程,导致程序退出,报如下错误:
terminate called without an active exception
# std::mutex
mutex
具有并发执行代码时的互斥(互斥)的功能,可以显式避免数据竞争。
当一个线程访问时,先给该线程所获取的资源上锁,防止其他线程修改资源,一个线程访问结束时,通过解锁释放资源。
假如没有std::mutex
,
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
class A {
public:
A() {
t1 = std::thread(std::bind(&A::increment, this));
t2 = std::thread(std::bind(&A::increment, this));
}
int value = 12;
void increment() {
for(int i=0; i< 1000; i++) {
value++;
std::cout << value << std::endl;
usleep(10);
}
}
~A() {
if(t1.joinable())
t1.join();
if(t2.joinable())
t2.join();
std::cout << "A.value" << value << std::endl;
}
private:
std::thread t1;
std::thread t2;
};
int main(int argc, char **argv)
{
A a;
return 0;
}
上述代码,期望最后打印的A.value
是2012
,但代码的实际运行中,其输出有可能是
2011
A.value2011
为什么会这样呢?自增操作value++
不是原子操作,而是由多条汇编指令完成的。多个线程对同一个变量进行读写操作就会出现不可预期的操作。
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
class A {
public:
A() {
t1 = std::thread(std::bind(&A::increment, this));
t2 = std::thread(std::bind(&A::increment, this));
}
int value = 12;
void increment() {
for(int i=0; i< 1000; i++) {
mtx.lock();
value++;
mtx.unlock();
std::cout << value << std::endl;
usleep(10);
}
}
~A() {
if(t1.joinable())
t1.join();
if(t2.joinable())
t2.join();
std::cout << "A.value" << value << std::endl;
}
private:
std::thread t1;
std::thread t2;
std::mutex mtx;
};
int main(int argc, char **argv)
{
A a;
return 0;
}
如上,使用mutex
利用锁来保护共享变量,访问时上锁,访问结束释放锁。
上述中调用了互斥量的lock
函数,上锁不成功的话线程会被阻塞,其还有另外个函数try_lock
,此线程在上锁不成功时也不阻塞当前线程。
# 死锁
使用mutex
存在死锁问题,考虑,线程1和线程2共用互斥量m
,线程1调用m.lock()
上锁后抛出了异常没有来得及执行m.unlock()
释放锁,这时候线程2将一直处于等待状态,导致死锁。
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
class A {
public:
A() {
t1 = std::thread(std::bind(&A::increment, this, 1));
t2 = std::thread(std::bind(&A::increment, this, 2));
}
int value = 12;
void increment(int id) {
try {
for(int i=0; i< 1000; i++) {
std::cout << "[id:" << id << "]waiting...\n";
mtx.lock();
value++;
if(value > 13)
throw std::runtime_error("throw excption....");
mtx.unlock();
std::cout << value << std::endl;
usleep(10);
}
} catch (const std::exception& e){
std::cout << "id:" << id << ", " << e.what() << std::endl;
}
}
~A() {
if(t1.joinable())
t1.join();
if(t2.joinable())
t2.join();
std::cout << "A.value" << value << std::endl;
}
private:
std::thread t1;
std::thread t2;
std::mutex mtx;
};
int main(int argc, char **argv)
{
A a;
return 0;
}
如上代码,将导致线程1一直处于等待状态:
d$ ./main
[id:1]waiting...
13
[id:2]waiting...
id:2, throw excption....
[id:1]waiting...
# std::lock_guard VS std::unique_lock
避免死锁的一种方式是使用std::lock_guard
,std::lock_guard
对象构造时,自动调用mtx.lock()
进行上锁,对象析构时,自动调用mtx.unlock()
释放锁。
#include <main.h>
#include <memory>
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
class A {
public:
A() {
t1 = std::thread(std::bind(&A::increment, this, 1));
t2 = std::thread(std::bind(&A::increment, this, 2));
}
int value = 12;
void increment(int id) {
try {
for(int i=0; i< 1000; i++) {
std::cout << "[id:" << id << "]waiting...\n";
std::lock_guard<std::mutex> lock(mtx);
value++;
if(value > 13)
throw std::runtime_error("throw excption....");
std::cout << value << std::endl;
usleep(10);
}
} catch (const std::exception& e){
std::cout << "id:" << id << ", " << e.what() << std::endl;
}
}
~A() {
if(t1.joinable())
t1.join();
if(t2.joinable())
t2.join();
std::cout << "A.value" << value << std::endl;
}
private:
std::thread t1;
std::thread t2;
std::mutex mtx;
};
int main(int argc, char **argv)
{
A a;
return 0;
}
将代码改成如上形式,发现就不会死锁了,抛出异常离开lock_guard
作用域时会调用其析构函数,自动释放锁。
std::unique_lock
与std::lock_guard
功能类似,都支持在构造时自动上锁,在析构时自动解锁,其与std::lock_guard
区别在于,std::unique_lock
支持在构造时推迟上锁,可以选择在需要时手动lock()
,且还能保证在析构时释放锁
std::mutex mtx;
lck = std::unique_lock<std::mutex>(mtx, std::defer_lock);
// 手动上锁
std::lock(lck1);
# std::condition_variable
条件变量的作用是用于多线程之间的线程同步。
线程同步是指线程间需要按照预定的先后顺序进行的行为,比如我想要线程1完成了某个步骤之后,才允许线程2开始工作,这个时候就可以使用条件变量来达到目的。
当 std::condition_variable
对象的某个 wait
函数被调用的时候,它使用 std::unique_lock
(通过 std::mutex
来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable
对象上调用了 notify
函数来唤醒当前线程。
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
class A {
public:
A() {
t1 = std::thread(std::bind(&A::increment, this, 1));
t2 = std::thread(std::bind(&A::add5, this, 2));
}
int value = 12;
void increment(int id) {
for(int i=0; i< 20; i++) {
std::cout << "[id:" << id << "]waiting...\n";
std::lock_guard<std::mutex> lock(mtx);
value++;
std::cout << value << std::endl;
if(i % 10 == 0)
updated.notify_one();
usleep(10);
}
flag = false;
}
void add5(int id) {
while(true)
{
std::unique_lock<std::mutex> lock(mtx);
updated.wait(lock);
std::cout << "[id:" << id << "]waiting...\n";
value += 5;
std::cout << value << std::endl;
}
}
~A() {
if(t1.joinable())
t1.join();
if(t2.joinable())
t2.join();
std::cout << "A.value" << value << std::endl;
}
private:
std::thread t1;
std::thread t2;
bool flag = true;
std::condition_variable updated;
std::mutex mtx;
};
int main(int argc, char **argv)
{
A a;
return 0;
}
以上代码,线程2在while
循环中调用了updated.wait(lock)
函数,会一直处于阻塞状态,直到收到notify
的信号,
[id:1]waiting...
30
[id:1]waiting...
31
[id:1]waiting...
32
[id:1]waiting...
33
[id:1]waiting...
34
[id:1]waiting...
35
[id:1]waiting...
36
[id:1]waiting...
37
[id:2]waiting...
42
std::condition_variable::notify_all
解除当前等待此条件的所有线程的阻塞。std::condition_variable::notify_one
解除当前等待此条件的线程之一的阻塞。如果没有线程在等待,则该函数不执行任何操作。如果有多个线程等待此条件,notify_one
无法指定选择哪个线程。
# std::atomic
对于线程间共享的变量,进行读取和写入操作时,常用的同步方式就是加锁,但是每一次循环都要加锁解锁会导致程序开销很大。
为了提高性能,C++11
提供了原子类型(std::atomic<T>
),它提供了多线程间的原子操作。
可以把原子操作理解成一种不需要用互斥量加锁(无锁)来实现多线程并发编程的方式。
原子类型是封装了一个值的类型,它的访问保证不会导致数据的竞争,并且可以用于在不同的线程之间同步内存访问。
从效率上来说,原子操作要比互斥量的方式效率要高。互斥量的加锁一般是针对一个代码段,而原子操作针对的一般都是一个变量。
原子操作,一般都是指“不可分割的操作”;是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。
原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了 CAS(Compare and swap)
循环,当大量的冲突发生时,仍然需要等待,总体比使用锁性能要好。
std::atomic
类模板,允许用户使用自定义类型创建一个原子变量(除了标准原子类型之外),需要满足一定的标准才可以使用std::atomic<>
,为了使用std::atomic<UDT>
(UDT是用户定义类型),这个类型必须有拷贝赋值运算符。这就意味着这个类型不能有任何虚函数或虚基类,以及必须使用编译器创建的拷贝赋值操作。
#include <main.h>
#include <memory>
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include <atomic>
class A {
public:
A() {
flag.store(true);
t1 = std::thread(std::bind(&A::increment, this, 1));
t2 = std::thread(std::bind(&A::add5, this, 2));
}
int value = 12;
void increment(int id) {
for(int i=0; i< 20; i++) {
std::cout << "[id:" << id << "]waiting...\n";
std::unique_lock<std::mutex> lock(mtx);
value++;
std::cout << value << std::endl;
std::cout << "i=" << i << std::endl;
if(i % 10 == 0)
updated.notify_one();
usleep(10);
}
updated.notify_one(); // 这里需要notify,否则线程2可能一直阻塞
flag.store(false);
std::cout << "1" << (flag.load() ? "Y\n" : "N\n");
}
void add5(int id) {
while(flag.load())
{
std::unique_lock<std::mutex> lock(mtx);
updated.wait(lock);
std::cout << "[id:" << id << "]waiting...\n";
value += 5;
std::cout << value << std::endl;
}
}
~A() {
if(t1.joinable())
t1.join();
if(t2.joinable())
t2.join();
std::cout << "A.value" << value << std::endl;
}
private:
std::thread t1;
std::thread t2;
std::atomic<bool> flag;
std::condition_variable updated;
std::mutex mtx;
};
int main(int argc, char **argv)
{
A a;
return 0;
}
如上代码store()
改变原子变量的值,load()
获取原子变量的值。
# std::promise和std::future
std::promise
类型的对象可以和std::future
类型结合使用,实现在线程之间传递类型为T
(泛型)的值。调用future.get()
函数时,当promise
没有给值之前,future.get
调用所在的线程将一直处于阻塞状态。
void printInt(std::future<int> &fut)
{
std::cout << "printInt func is waiting value...\n";
int x = fut.get();
std::cout << "x: " << x << std::endl;
}
int main(int argc, char **argv)
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t1(printInt, std::ref(fut));
prom.set_value(1);
t1.join();
return 0;
}
如上代码,演示了如何使用std::promise
和std::future
。
promise
可以翻译成诺言,承诺在未来会给future
对象一个值,future
对象在没有获得承诺的值之前,会一直等待。future
对象只有从promise
对象中获取才有意义。没有承诺的未来没有意义,什么都不会有。
promise
对象只能set_value
一次,否则将报错,
void printInt(std::future<int> &fut)
{
std::cout << "printInt func is waiting value...\n";
int x = fut.get();
std::cout << "x: " << x << std::endl;
}
int main(int argc, char **argv)
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t1(printInt, std::ref(fut));
prom.set_value(1);
prom.set_value(2); // exception
t1.join();
return 0;
}
以上代码,将报错如下:
terminate called after throwing an instance of 'std::future_error'
what(): std::future_error: Promise already satisfied
printInt func is waiting value...
Aborted (core dumped)
future
对象也只能获取一次值,
void printInt(std::future<int> &fut)
{
int idx = 0;
while(idx < 2)
{
std::cout << "printInt func is waiting value...\n";
int x = fut.get();
std::cout << "x: " << x << std::endl;
idx += 1;
}
}
int main(int argc, char **argv)
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t1(printInt, std::ref(fut));
prom.set_value(1);
t1.join();
return 0;
}
以上代码将报错如下:
printInt func is waiting value...
x: 1
printInt func is waiting value...
terminate called after throwing an instance of 'std::future_error'
what(): std::future_error: No associated state
Aborted (core dumped)
# std::ref
值得注意的是当使用‵std::bind修改的函数参数或
std::thread绑定的函数参数是引用类型时,必须使用
std::ref来创建一个模拟引用类型的对象,这是因为,在
std::bind和
std::thread`中传递的参数是按值传递,会进行复制,而普通的引用类型不支持复制。
使用std::ref
会返回一个模拟引用的类型std::reference_warper
,这个类型支持复制。
struct Box {
int x;
int y;
};
void printInt(Box &v)
{
int idx = 0;
while(idx < 2)
{
std::cout << "printInt func is waiting value...\n";
v.x = 100;
std::cout << "x: " << v.x << std::endl;
idx += 1;
}
}
int main(int argc, char **argv)
{
Box b;
b.x = 1000;
std::thread t1(printInt, std::ref(b));
t1.join();
std::cout << "b.x: " << b.x << std::endl;
return 0;
}
// printInt func is waiting value...
// x: 100
// printInt func is waiting value...
// x: 100
// b.x: 100
假如没有使用std::ref
在编译的时候就会报错:
/usr/include/c++/9/thread:120:44: error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues
# reference
- 1.https://cplusplus.com/reference/functional/bind/ (opens new window)
- 2.https://zhuanlan.zhihu.com/p/91062516#:~:text=%E5%AE%9A%E4%B9%89%E4%B8%80%E4%B8%AAstd%3A%3Amutex,%E9%98%BB%E5%A1%9E%EF%BC%8C%E7%9B%B4%E5%88%B0%E5%8A%A0%E9%94%81%E6%88%90%E5%8A%9F%E3%80%82 (opens new window)
- 3.https://stackoverflow.com/questions/20516773/stdunique-lockstdmutex-or-stdlock-guardstdmutex (opens new window)
- 4.https://cplusplus.com/reference/condition_variable/condition_variable/notify_one/ (opens new window)
- 5.https://juejin.cn/post/7086226046931959838 (opens new window)
- 6.https://cplusplus.com/reference/future/promise/ (opens new window)
- 7.https://stackoverflow.com/questions/11004273/what-is-stdpromise (opens new window)