cpp11 库并发

C++11 标准库扩展 — 并发

线程

线程是程序中执行/计算的表示。在 C++11 中,与许多现代计算一样,一个线程可以——而且通常会——与其他线程共享地址空间。在这方面,它与进程不同,进程通常不直接与其他进程共享数据。C++ 过去曾为各种硬件和操作系统提供了许多线程实现,新的是一个标准库线程库。

关于并发、并行和线程,已经有许多厚厚的书籍和数万篇论文,本 FAQ 条目只是触及表面。清晰地思考并发是困难的。如果你想进行并发编程,至少要读一本书。不要仅仅依赖于手册、标准或 FAQ。

通过使用函数或函数对象(包括lambda)构造 std::thread 来启动线程。

    #include <thread>

    void f();

    struct F {
        void operator()();
    };

    int main()
    {
        std::thread t1{f};     // f() executes in separate thread
        std::thread t2{F()};   // F()() executes in separate thread
    }

不幸的是,这不太可能给出任何有用的结果——无论 f()F() 可能做什么。症结在于程序可能在 t1 执行 f() 之前或之后,以及在 t2 执行 F() 之前或之后终止。我们需要等待两个任务完成。

    int main()
    {
        std::thread t1{f};  // f() executes in separate thread
        std::thread t2{F()};    // F()() executes in separate thread

        t1.join();  // wait for t1
        t2.join();  // wait for t2
    }

join() 确保我们不会在线程完成之前终止。“join”意味着等待线程终止。

通常,我们希望将一些参数传递给要执行的任务(这里我们将由线程执行的东西称为“任务”)。例如:

    void f(vector<double>&);

    struct F {
        vector<double>& v;
        F(vector<double>& vv) :v{vv} { }
        void operator()();
    };

    int main()
    {
        std::thread t1{std::bind(f,some_vec)};  // f(some_vec) executes in separate thread
        std::thread t2{F(some_vec)};        // F(some_vec)() executes in separate thread

        t1.join();
        t2.join();
    }

基本上,标准库bind从其参数中创建一个函数对象。

通常,我们还希望从已执行的任务中获取结果。对于普通任务,没有返回值的概念;std::future是为此的正确默认选择。或者,我们可以将一个参数传递给任务,告诉它将结果放在哪里:例如

    void f(vector<double>&, double* res);   // place result in res

    struct F {
        vector<double>& v;
        double* res;
        F(vector<double>& vv, double* p) :v{vv}, res{p} { }
        void operator()();  // place result in res
    };

    int main()
    {
        double res1;
        double res2;

        std::thread t1{std::bind(f,some_vec,&res1)};    // f(some_vec,&res1) executes in separate thread
        std::thread t2{F(some_vec,&res2)};      // F(some_vec,&res2)() executes in separate thread

        t1.join();
        t2.join();

        std::cout << res1 << ' ' << res2 << '\n';
    }

但是错误呢?如果任务抛出异常怎么办?如果任务抛出异常并且没有自行捕获,则会调用 std::terminate()。这通常意味着程序终止。我们通常会努力避免这种情况。std::future可以将异常传输到父/调用线程;这就是喜欢 future 的一个原因。否则,返回某种错误代码。

当一个线程超出作用域时,如果它的任务没有完成,程序就会被 terminate()。这显然应该避免。

没有办法请求线程终止(即请求它尽快并尽可能优雅地退出)或强制线程终止(即杀死它)。我们只剩下以下选项:

  • 设计我们自己的协作“中断机制”(通过共享数据块,调用线程可以设置它,而被调用线程可以检查它,并在设置时快速优雅地退出),
  • 通过使用 thread::native_handle() 访问操作系统的线程概念来“使用原生方式”,
  • 杀死进程 (std::quick_exit()),
  • 杀死程序 (std::terminate())。

这是委员会能达成一致的所有内容。特别是,POSIX 代表强烈反对任何形式的“线程取消”,无论 C++ 的资源模型多么依赖析构函数。对于每个系统和每个可能的应用程序,都没有完美的解决方案。

线程的基本问题是数据竞争;也就是说,在单个地址空间中运行的两个线程可以独立地以导致未定义结果的方式访问对象。如果一个(或两个)写入对象,另一个(或两个)读取对象,它们就会发生“竞争”,看谁先完成其操作。结果不仅未定义;它们通常完全不可预测。因此,C++11 为程序员提供了一些规则/保证以避免数据竞争:

  • C++ 标准库函数不得直接或间接访问当前线程之外可由其他线程访问的对象,除非这些对象直接或间接通过函数参数(包括 this)访问。
  • C++ 标准库函数不得直接或间接修改当前线程之外可由其他线程访问的对象,除非这些对象直接或间接通过函数的非 const 参数(包括 this)访问。
  • C++ 标准库实现被要求在同一序列中的不同元素被并发修改时避免数据竞争。

除非另有说明,否则多个线程对流对象、流缓冲区对象或 C 库流的并发访问可能会导致数据竞争。因此,除非您以某种方式控制对其的访问,否则不要在两个线程之间共享输出流。

你可以

  • 指定时间等待线程
  • 通过互斥控制对某些数据的访问
  • 使用控制对某些数据的访问
  • 使用条件变量等待另一个任务的操作
  • 通过future从线程返回一个值

另请参见

互斥

互斥体是一种用于在多线程系统中控制访问的原始对象。最基本的用法是:

    std::mutex m;
    int sh; // shared data
    // ...
    m.lock();
    // manipulate shared data:
    sh+=1;
    m.unlock();

lock()unlock() 之间的代码区域(通常称为临界区)中,一次只能有一个线程。如果第一个线程正在该区域执行时,第二个线程尝试 m.lock(),那么该第二个线程将被阻塞,直到第一个线程执行 m.unlock()。这很简单。不简单的是以一种不会导致严重问题的方式使用互斥体:如果一个线程“忘记”了 unlock() 怎么办?如果一个线程尝试 lock() 同一个互斥体两次怎么办?如果一个线程等待很长时间才执行 unlock() 怎么办?如果一个线程需要 lock() 两个互斥体才能完成其工作怎么办?完整的答案填满了书。这里(以及在锁部分)只是基本知识。

除了 lock() 之外,mutex 还有一个 try_lock() 操作,可用于尝试进入临界区而没有被阻塞的风险。

    std::mutex m;
    int sh; // shared data
    // ...
    if (m.try_lock()) {
        // manipulate shared data:
        sh+=1;
        m.unlock();
    else {
        // maybe do something else
    }

recursive_mutex 是一个可以被同一个线程多次获取的互斥体。

    std::recursive_mutex m;
    int sh; // shared data
    // ...
    void f(int i)
    {
        // ...
        m.lock();
        // manipulate shared data:
        sh+=1;
        if (--i>0) f(i);
        m.unlock();
        // ...
    }

这里,我们明确地让 f() 调用它自己。通常,代码会更精妙。递归调用将通过 f() 调用 g()g() 调用 h()h() 再调用 f() 的方式间接进行。

如果您需要在接下来的十秒内获取一个互斥体怎么办?为此提供了 timed_mutex 类。它的操作是 try_lock() 的专用版本,带有关联的时间限制:

    std::timed_mutex m;
    int sh; // shared data
    // ...
    if (m.try_lock_for(std::chrono::seconds(10))) {
        // manipulate shared data:
        sh+=1;
        m.unlock();
    }
    else {
        // we didn't get the mutex; do something else
    }

try_lock_for() 接受一个相对时间,即一个duration作为其参数。如果您想等到一个固定的时间点,即一个time_point,您可以使用 try_lock_until()

    std::timed_mutex m;
    int sh; // shared data
    // ...
    if (m.try_lock_until(midnight)) {
        // manipulate shared data:
        sh+=1;
        m.unlock();
    }
    else {
        // we didn't get the mutex; do something else
    }

“午夜”是个微不足道的笑话:对于像互斥体这样低层次的机制,时间尺度更可能是毫秒而不是小时。

当然也有 recursive_timed_mutex

互斥量被视为一种资源(因为它通常用于表示真实资源),并且为了有用,它必须对至少两个线程可见。因此,它不能被复制或移动(你不能简单地复制一个硬件输入寄存器)。

要使 lock()unlock() 匹配可能出奇地困难。想想复杂的控制结构、错误和异常。如果您有选择,请使用来管理您的互斥量;这将为您和您的用户省去很多麻烦。

另请参见

锁是一个可以持有互斥量引用并在锁销毁期间(例如离开块作用域时)unlock() 互斥量的对象。线程可以使用锁以异常安全的方式管理互斥量所有权。换句话说,锁实现了互斥的资源获取即初始化。例如:

    std::mutex m;
    int sh; // shared data
    // ...
    void f()
    {
        // ...
        std::unique_lock lck(m);
        // manipulate shared data: lock will be released even if this code throws an exception
        sh+=1;
    }

锁可以移动(锁的目的是表示对非本地资源的局部所有权),但不能复制(哪一个复制会拥有资源/互斥量?)。

unique_lock 具备实现互斥体几乎所有功能的特性,但更安全,这使得锁的简单概念变得复杂。例如,我们可以使用 unique_lock 进行 try_lock

    std::mutex m;
    int sh; // shared data
    // ...
    void f()
    {
        // ...
        std::unique_lock lck(m,std::defer_lock);    // make a lock, but don't acquire the mutex
        // ...
        if (lck.try_lock()) {
            // manipulate shared data:
            sh+=1;
        }
        else {
            // maybe do something else
        }
    }

类似地,unique_lock 支持 try_lock_for()try_lock_until()。使用锁而不是直接使用互斥量的好处是异常处理和防止忘记 unlock()。在并发编程中,我们需要所有可能的帮助。

如果我们需要两个由两个互斥体表示的资源怎么办?朴素的方法是按顺序获取互斥体:

    std::mutex m1;
    std::mutex m2;
    int sh1;    // shared data
    int sh2
    // ...
    void f()
    {
        // ...
        std::unique_lock lck1(m1);
        std::unique_lock lck2(m2);
        // manipulate shared data:
        sh1+=sh2;
    }

这有一个潜在的致命缺陷,即其他线程可能尝试以相反的顺序获取 m1m2,导致它们各自持有一个锁,然后永远等待第二个锁(这是死锁的一种经典形式)。在系统中存在许多锁的情况下,这是一个真正的危险。因此,标准锁提供了两个函数来(安全地)尝试获取两个或更多锁:

    void f()
    {
        // ...
        std::unique_lock lck1(m1,std::defer_lock);  // make locks but don't yet try to acquire the mutexes
        std::unique_lock lck2(m2,std::defer_lock);
        std::unique_lock lck3(m3,std::defer_lock);
        lock(lck1,lck2,lck3);
        // manipulate shared data
    }

显然,lock() 的实现必须精心设计以避免死锁。本质上,它将执行相当于小心使用 try_lock()s 的操作。如果 lock() 未能获取所有锁,它将抛出异常。实际上,lock() 可以接受任何带有 lock()try_lock()unlock() 成员函数的参数(例如 mutex),所以我们无法具体说明 lock() 可能抛出什么异常;这取决于它的参数。

如果你更喜欢自己使用 try_lock(),有一个与 lock() 等效的帮助函数。

    void f()
    {
        // ...
        std::unique_lock lck1(m1,std::defer_lock);  // make locks but don't yet try to acquire the mutexes
        std::unique_lock lck2(m2,std::defer_lock);
        std::unique_lock lck3(m3,std::defer_lock);
        int x;
        if ((x = try_lock(lck1,lck2,lck3))==-1) {   // welcome to C land
            // manipulate shared data
        }
        else {
            // x holds the index of a mutex we could not acquire
            // e.g. if lck2.try_lock() failed x==1
        }
    }

另请参见

  • 标准:30.4.3 锁 [thread.lock]

条件变量

条件变量提供了同步原语,用于阻塞线程,直到被另一个线程通知某个条件满足或达到系统时间。

另请参见

  • 标准:30.5 条件变量 [thread.condition]

原子操作

待撰写。

与此同时,请参阅

未来和承诺

并发编程可能很困难,特别是如果你试图巧妙地使用线程。如果你必须使用条件变量原子操作(用于无锁编程),那会更困难。C++11 提供了 futurepromise 用于从单独线程生成的任务中返回值,以及 packaged_task 用于帮助启动任务。关于 futurepromise 的重要一点是,它们可以在两个任务之间传输值,而无需显式使用锁;“系统”有效地实现了传输。基本思想很简单:当一个任务想向启动它的线程返回值时,它将值放入 promise。不知何故,实现使该值出现在附加到 promisefuture 中。调用者(通常是任务的启动者)随后可以读取该值。为了进一步简化,请参阅async()

标准提供了三种 future:最简单的 future,以及用于一些复杂情况的 shared_futureatomic_future。这里,我们只介绍 future,因为它最简单,并且能完成我们需要的所有事情。如果我们有一个名为 ffuture,我们可以从它那里 get() 一个类型为 X 的值。

    X v = f.get();  // if necessary wait for the value to get computed

如果值尚未到达,我们的线程将被阻塞直到它到达。如果无法计算该值并且任务抛出了异常,则调用 get() 将重新抛出该异常到调用 get() 的代码。

我们可能不想等待结果,所以我们可以询问 future 是否有结果到达。

    if (f.wait_for(0)) {    // there is a value to get()
        // do something
    }
    else {
        // do something else
    }

然而,future 的主要目的是提供那个简单的 get()

promise 的主要目的是提供一个简单的 set() 来匹配 future 的 get()futurepromise 的名称是历史性的;它们也是双关语的肥沃来源。

如果你有一个 promise,并且需要向 future 发送一个类型为 X 的结果(返回),基本上你可以做两件事:传递一个值和传递一个异常。

    try {
        X res;
        // compute a value for res
        p.set_value(res);
    }
    catch (...) {   // oops: couldn't compute res
        p.set_exception(std::current_exception());
    }

到目前为止一切顺利,但是如何获取一对匹配的 future/promise,一个在我的线程中,另一个在其他线程中呢?嗯,由于 future 和 promise 可以移动(不能复制),所以有多种可能性。最明显的想法是,无论谁想完成一个任务,都可以创建一个线程,并将 promise 交给它,同时保留相应的 future 作为结果的存放地点。使用async()是后一种技术的最极致/优雅的变体。

提供了 packaged_task 类型以简化启动线程执行任务。特别是,它负责设置连接到 promise 的 future,并提供包装代码以将任务的返回值或异常放入 promise。例如:

    double comp(vector<double>& v)
    {
        // package the tasks:
        // (the task here is the standard accumulate() for an array of doubles):
        packaged_task<double(double*,double*,double)> pt0{std::accumulate<double*,double*,double>};
        packaged_task<double(double*,double*,double)> pt1{std::accumulate<double*,double*,double>};

        auto f0 = pt0.get_future(); // get hold of the futures
        auto f1 = pt1.get_future();

        pt0(&v[0],&v[v.size()/2],0);    // start the threads
        pt1(&v[v.size()/2],&v[size()],0);

        return f0.get()+f1.get();   // get the results
    }

另请参见

异步

下面是一个程序员超越混乱的线程加锁并发编程级别的方法的例子:

    template<class T, class V> struct Accum  {  // simple accumulator function object
        T* b;
        T* e;
        V val;
        Accum(T* bb, T* ee, const V& v) : b{bb}, e{ee}, val{vv} {}
        V operator() () { return std::accumulate(b,e,val); }
    };

    double comp(vector<double>& v)
        // spawn many tasks if v is large enough
    {
        if (v.size()<10000) return std::accumulate(v.begin(),v.end(),0.0);

        auto f0 {async(Accum{&v[0],&v[v.size()/4],0.0})};
        auto f1 {async(Accum{&v[v.size()/4],&v[v.size()/2],0.0})};
        auto f2 {async(Accum{&v[v.size()/2],&v[v.size()*3/4],0.0})};
        auto f3 {async(Accum{&v[v.size()*3/4],&v[v.size()],0.0})};

        return f0.get()+f1.get()+f2.get()+f3.get();
    }

这是对并发的一种非常简单的使用(注意“魔术数字”),但请注意没有显式线程、锁、缓冲区等。f 变量的类型由标准库函数 async() 的返回类型决定,它是一个future。如有必要,对 future 调用 get() 会等待线程完成。在这里,async() 的职责是根据需要生成线程,而 future 的职责是适当地 join() 线程。“简单”是 async()/future 设计最重要的方面;future 也可以与线程一起使用,但不要考虑使用 async() 启动执行 I/O、操作互斥体或以其他方式与其他任务交互的任务。async() 背后的思想与range-for语句背后的思想相同:提供一种处理最简单、相当常见的用例的简单方法,并将更复杂的例子留给完全通用的机制。

可以请求 async() 在新线程中启动,在除调用者线程之外的任何线程中启动,或者仅当 async() “认为”这是一个好主意时才在不同线程中启动。后者从用户的角度来看最简单,并且可能最有效(仅适用于简单任务)。

委员会正在积极致力于提供一种更通用的执行器(执行代理)形式,它将 subsume std::async。同时,std::async 是标准中包含的简单设施。

另请参见

放弃进程

待撰写。

与此同时,请参阅