# C++多线程

# bind函数

有时候需要根据需要对函数的参数进行定制,这时就需要使用functional.h头文件中提供的bind函数,其作用是给函数设定默认值,并使用placeholder设置函数中的参数,然后返回一个函数对象。

#include <iostream>
#include <functional>

void substract(int a, int b, int c)
{
    std::cout << a - b - c << std::endl;
}

int main(int argc, char **argv)
{
    // 方式1
    auto f = std::bind(substract,
                      std::placeholders::_1,
                      2,
                      std::placeholders::_2);
    f(10, 12); // -4
    // 方式2
    auto ff = std::bind(substract,
                      std::placeholders::_2,
                      2,
                      std::placeholders::_1);
    ff(10, 12); // 0
    return 0;
}

在以上示例程序中,

  • 方式1, _1对应的是函数substract的参数a,参数b的默认值是2,_2对应的是函数substract的参数b
  • 方式1, _1对应的是函数substract的参数c,参数b的默认值是2,_2对应的是函数substract的参数c

此外,当对类的成员函数使用bind函数,进行参数定指时,第一个参数需要传入指向对象的指针,如在类成员函数中传入this,在类外传入对象指针,以使得bind后的函数还能访问类的成员。

#include <iostream>
#include <functional>

class A {
    public:
        int value = 12;
        void substract(int a, int b, int c)
        {
            std::cout << a - b - c - this->value << std::endl;
        }
};

int main(int argc, char **argv)
{
    A a;
    auto f = std::bind(&A::substract,
                      &a,
                      std::placeholders::_1,
                      2,
                      std::placeholders::_2);
    f(10, 12); // -16
    
    auto ff = std::bind(&A::substract,
                      &a,
                      std::placeholders::_2,
                      2,
                      std::placeholders::_1);
    ff(10, 12); // -12
    return 0;
}

# std::thread

thread类表示单个可执行线程,通过多个线程可以使几个函数同时运行。

线程在构造关联的线程对象后,从作为构造函数参数提供的顶级函数开始,立即执行。

顶级函数可以通过共享变量或std::promise和主线程之间交换返回值,这可能需要使用std::mutexstd::atomic来做线程同步。

使用join函数,可以在主线程结束前阻塞主线程以等待子线程先退出。

但是,使用detach后会结束主线程和子线程之间的关联关系,使得子线程不再joinble,不能使用join()阻塞主线程。

#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
class A {
    public:
        A() {
            t1 = std::thread(std::bind(&A::increment, this));
        }
        int value = 12;
        void increment() {
            int v = 0;
            for(int i=0; i< 1000; i++) {
                v += i;
                std::cout << v << std::endl;
                usleep(1000000);
            }
        }
        ~A() {
            if(t1.joinable())
                t1.join();
        }
    private:
        std::thread t1;

};

int main(int argc, char **argv)
{
    A a;
    return 0;
}

注意,上述代码,需要在类的析构函数中调用join函数,否则,程序主线程结束时会杀死子线程,导致程序退出,报如下错误:

terminate called without an active exception

# std::mutex

mutex具有并发执行代码时的互斥(互斥)的功能,可以显式避免数据竞争。

当一个线程访问时,先给该线程所获取的资源上锁,防止其他线程修改资源,一个线程访问结束时,通过解锁释放资源。

假如没有std::mutex,

#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
class A {
    public:
        A() {
            t1 = std::thread(std::bind(&A::increment, this));
            t2 = std::thread(std::bind(&A::increment, this));
        }
        int value = 12;
        void increment() {
            for(int i=0; i< 1000; i++) {
                value++;
                std::cout << value << std::endl;
                usleep(10);
            }
        }
        ~A() {
            if(t1.joinable())
                t1.join();
            if(t2.joinable())
                t2.join();

            std::cout << "A.value" << value << std::endl;
        }
    private:
        std::thread t1;
        std::thread t2;

};

int main(int argc, char **argv)
{
    A a;
    return 0;
}

上述代码,期望最后打印的A.value2012,但代码的实际运行中,其输出有可能是

2011
A.value2011

为什么会这样呢?自增操作value++不是原子操作,而是由多条汇编指令完成的。多个线程对同一个变量进行读写操作就会出现不可预期的操作。

#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
class A {
    public:
        A() {
            t1 = std::thread(std::bind(&A::increment, this));
            t2 = std::thread(std::bind(&A::increment, this));
        }
        int value = 12;
        void increment() {
            for(int i=0; i< 1000; i++) {
                mtx.lock();
                value++;
                mtx.unlock();
                std::cout << value << std::endl;
                usleep(10);
            }
        }
        ~A() {
            if(t1.joinable())
                t1.join();
            if(t2.joinable())
                t2.join();

            std::cout << "A.value" << value << std::endl;
        }
    private:
        std::thread t1;
        std::thread t2;
        std::mutex mtx;

};

int main(int argc, char **argv)
{
    A a;
    return 0;
}

如上,使用mutex利用锁来保护共享变量,访问时上锁,访问结束释放锁。

上述中调用了互斥量的lock函数,上锁不成功的话线程会被阻塞,其还有另外个函数try_lock,此线程在上锁不成功时也不阻塞当前线程。

# 死锁

使用mutex存在死锁问题,考虑,线程1和线程2共用互斥量m,线程1调用m.lock()上锁后抛出了异常没有来得及执行m.unlock()释放锁,这时候线程2将一直处于等待状态,导致死锁。

#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
class A {
    public:
        A() {
            t1 = std::thread(std::bind(&A::increment, this, 1));
            t2 = std::thread(std::bind(&A::increment, this, 2));
        }
        int value = 12;
        void increment(int id) {
            try {
                for(int i=0; i< 1000; i++) {
                    std::cout << "[id:" << id << "]waiting...\n";
                    mtx.lock();
                    value++;
                    if(value > 13)
                        throw std::runtime_error("throw excption....");
                    mtx.unlock();
                    std::cout << value << std::endl;
                    usleep(10);
                }
            } catch (const std::exception& e){
                 std::cout << "id:" << id << ", " << e.what() << std::endl;
            }
        }
        ~A() {
            if(t1.joinable())
                t1.join();
            if(t2.joinable())
                t2.join();

            std::cout << "A.value" << value << std::endl;
        }
    private:
        std::thread t1;
        std::thread t2;
        std::mutex mtx;

};

int main(int argc, char **argv)
{
    A a;
    return 0;
}

如上代码,将导致线程1一直处于等待状态:

d$ ./main 
[id:1]waiting...
13
[id:2]waiting...
id:2, throw excption....
[id:1]waiting...

# std::lock_guard VS std::unique_lock

避免死锁的一种方式是使用std::lock_guard,std::lock_guard对象构造时,自动调用mtx.lock()进行上锁,对象析构时,自动调用mtx.unlock()释放锁。

#include <main.h>
#include <memory>
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
class A {
    public:
        A() {
            t1 = std::thread(std::bind(&A::increment, this, 1));
            t2 = std::thread(std::bind(&A::increment, this, 2));
        }
        int value = 12;
        void increment(int id) {
            try {
                for(int i=0; i< 1000; i++) {
                    std::cout << "[id:" << id << "]waiting...\n";
                    std::lock_guard<std::mutex> lock(mtx);
                    value++;
                    if(value > 13)
                        throw std::runtime_error("throw excption....");
                    std::cout << value << std::endl;
                    usleep(10);
                }
            } catch (const std::exception& e){
                 std::cout << "id:" << id << ", " << e.what() << std::endl;
            }
        }
        ~A() {
            if(t1.joinable())
                t1.join();
            if(t2.joinable())
                t2.join();

            std::cout << "A.value" << value << std::endl;
        }
    private:
        std::thread t1;
        std::thread t2;
        std::mutex mtx;

};

int main(int argc, char **argv)
{
    A a;
    return 0;
}

将代码改成如上形式,发现就不会死锁了,抛出异常离开lock_guard作用域时会调用其析构函数,自动释放锁。

std::unique_lockstd::lock_guard功能类似,都支持在构造时自动上锁,在析构时自动解锁,其与std::lock_guard区别在于,std::unique_lock支持在构造时推迟上锁,可以选择在需要时手动lock(),且还能保证在析构时释放锁

std::mutex mtx;
lck = std::unique_lock<std::mutex>(mtx, std::defer_lock);

// 手动上锁
std::lock(lck1);

# std::condition_variable

条件变量的作用是用于多线程之间的线程同步。

线程同步是指线程间需要按照预定的先后顺序进行的行为,比如我想要线程1完成了某个步骤之后,才允许线程2开始工作,这个时候就可以使用条件变量来达到目的。

std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notify 函数来唤醒当前线程。

#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
#include <condition_variable>

class A {
    public:
        A() {
            t1 = std::thread(std::bind(&A::increment, this, 1));
            t2 = std::thread(std::bind(&A::add5, this, 2));
        }
        int value = 12;
        void increment(int id) {
            for(int i=0; i< 20; i++) {
                std::cout << "[id:" << id << "]waiting...\n";
                std::lock_guard<std::mutex> lock(mtx);
                value++;
                std::cout << value << std::endl;
                if(i % 10 == 0)
                    updated.notify_one();
                usleep(10);
            }
            flag = false;
        }
        void add5(int id) {
            while(true)
            {
                std::unique_lock<std::mutex> lock(mtx);
                updated.wait(lock);
                std::cout << "[id:" << id << "]waiting...\n";
                value += 5;
                std::cout << value << std::endl;
            }
        }

        ~A() {
            if(t1.joinable())
                t1.join();
            if(t2.joinable())
                t2.join();

            std::cout << "A.value" << value << std::endl;
        }
    private:
        std::thread t1;
        std::thread t2;
        bool flag = true;
        std::condition_variable updated;
        std::mutex mtx;

};

int main(int argc, char **argv)
{
    A a;
    return 0;
}

以上代码,线程2在while循环中调用了updated.wait(lock)函数,会一直处于阻塞状态,直到收到notify的信号,

[id:1]waiting...
30
[id:1]waiting...
31
[id:1]waiting...
32
[id:1]waiting...
33
[id:1]waiting...
34
[id:1]waiting...
35
[id:1]waiting...
36
[id:1]waiting...
37
[id:2]waiting...
42
  • std::condition_variable::notify_all解除当前等待此条件的所有线程的阻塞。
  • std::condition_variable::notify_one解除当前等待此条件的线程之一的阻塞。如果没有线程在等待,则该函数不执行任何操作。如果有多个线程等待此条件,notify_one无法指定选择哪个线程。

# std::atomic

对于线程间共享的变量,进行读取和写入操作时,常用的同步方式就是加锁,但是每一次循环都要加锁解锁会导致程序开销很大。

为了提高性能,C++11提供了原子类型(std::atomic<T>),它提供了多线程间的原子操作。

可以把原子操作理解成一种不需要用互斥量加锁(无锁)来实现多线程并发编程的方式。

原子类型是封装了一个值的类型,它的访问保证不会导致数据的竞争,并且可以用于在不同的线程之间同步内存访问。

从效率上来说,原子操作要比互斥量的方式效率要高。互斥量的加锁一般是针对一个代码段,而原子操作针对的一般都是一个变量。

原子操作,一般都是指“不可分割的操作”;是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。

原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了 CAS(Compare and swap) 循环,当大量的冲突发生时,仍然需要等待,总体比使用锁性能要好。

std::atomic类模板,允许用户使用自定义类型创建一个原子变量(除了标准原子类型之外),需要满足一定的标准才可以使用std::atomic<>,为了使用std::atomic<UDT>(UDT是用户定义类型),这个类型必须有拷贝赋值运算符。这就意味着这个类型不能有任何虚函数或虚基类,以及必须使用编译器创建的拷贝赋值操作。

#include <main.h>
#include <memory>
#include <iostream>
#include <functional>
#include <thread>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include <atomic>

class A {
    public:
        A() {
            flag.store(true);
            t1 = std::thread(std::bind(&A::increment, this, 1));
            t2 = std::thread(std::bind(&A::add5, this, 2));
        }
        int value = 12;
        void increment(int id) {
            for(int i=0; i< 20; i++) {
                std::cout << "[id:" << id << "]waiting...\n";
                std::unique_lock<std::mutex> lock(mtx);
                value++;
                std::cout << value << std::endl;
                std::cout << "i=" << i << std::endl;
                if(i % 10 == 0)
                    updated.notify_one();
                usleep(10);
            }
            updated.notify_one(); // 这里需要notify,否则线程2可能一直阻塞
            flag.store(false); 
            std::cout << "1" << (flag.load() ? "Y\n" : "N\n"); 
        }
        void add5(int id) {
            while(flag.load())
            {
                std::unique_lock<std::mutex> lock(mtx);
                updated.wait(lock);
                std::cout << "[id:" << id << "]waiting...\n";
                value += 5;
                std::cout << value << std::endl;
            }
        }

        ~A() {
            if(t1.joinable())
                t1.join();
            if(t2.joinable())
                t2.join();

            std::cout << "A.value" << value << std::endl;
        }
    private:
        std::thread t1;
        std::thread t2;
        std::atomic<bool> flag;
        std::condition_variable updated;
        std::mutex mtx;

};

int main(int argc, char **argv)
{
    A a;
    return 0;
}

如上代码store()改变原子变量的值,load()获取原子变量的值。

# std::promise和std::future

std::promise类型的对象可以和std::future类型结合使用,实现在线程之间传递类型为T(泛型)的值。调用future.get()函数时,当promise没有给值之前,future.get调用所在的线程将一直处于阻塞状态。

void printInt(std::future<int> &fut)
{
    std::cout << "printInt func is waiting value...\n";
    int x = fut.get();
    std::cout << "x: " << x << std::endl;
}

int main(int argc, char **argv)
{
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    std::thread t1(printInt, std::ref(fut));

    prom.set_value(1);
    t1.join();
    return 0;
}

如上代码,演示了如何使用std::promisestd::future

promise可以翻译成诺言,承诺在未来会给future对象一个值,future对象在没有获得承诺的值之前,会一直等待。future对象只有从promise对象中获取才有意义。没有承诺的未来没有意义,什么都不会有。

promise对象只能set_value一次,否则将报错,

void printInt(std::future<int> &fut)
{
    std::cout << "printInt func is waiting value...\n";
    int x = fut.get();
    std::cout << "x: " << x << std::endl;
}

int main(int argc, char **argv)
{
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    std::thread t1(printInt, std::ref(fut));

    prom.set_value(1);
    prom.set_value(2); // exception
    t1.join();
    return 0;
}

以上代码,将报错如下:

terminate called after throwing an instance of 'std::future_error'
  what():  std::future_error: Promise already satisfied
printInt func is waiting value...
Aborted (core dumped)

future对象也只能获取一次值

void printInt(std::future<int> &fut)
{
    int idx = 0;
    while(idx < 2)
    {
        std::cout << "printInt func is waiting value...\n";
        int x = fut.get();
        std::cout << "x: " << x << std::endl;
        idx += 1;
    }
}

int main(int argc, char **argv)
{
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    std::thread t1(printInt, std::ref(fut));
    prom.set_value(1);
    t1.join();
    return 0;
}

以上代码将报错如下:

printInt func is waiting value...
x: 1
printInt func is waiting value...
terminate called after throwing an instance of 'std::future_error'
  what():  std::future_error: No associated state
Aborted (core dumped)

# std::ref

值得注意的是当使用‵std::bind修改的函数参数或std::thread绑定的函数参数是引用类型时,必须使用std::ref来创建一个模拟引用类型的对象,这是因为,在std::bindstd::thread`中传递的参数是按值传递,会进行复制,而普通的引用类型不支持复制。

使用std::ref会返回一个模拟引用的类型std::reference_warper,这个类型支持复制。

struct Box {
    int x;
    int y;
};

void printInt(Box &v)
{
    int idx = 0;
    while(idx < 2)
    {
        std::cout << "printInt func is waiting value...\n";
        v.x = 100;
        std::cout << "x: " << v.x << std::endl;
        idx += 1;
    }
}

int main(int argc, char **argv)
{
    Box b;
    b.x = 1000;
    std::thread t1(printInt, std::ref(b));
    t1.join();

    std::cout << "b.x: " << b.x << std::endl;
    return 0;
}
// printInt func is waiting value...
// x: 100
// printInt func is waiting value...
// x: 100
// b.x: 100

假如没有使用std::ref在编译的时候就会报错:

/usr/include/c++/9/thread:120:44: error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues

# reference