一、前言

互斥量是多线程间同时访问某一共享变量时,保证变量可被安全访问的手段。但单靠互斥量无法实现线 程的同步。线程同步是指线程间需要按照预定的先后次序顺序进行的行为。C++11对这种行为也提供了 有力的支持,这就是条件变量。条件变量位于头文件condition_variable下。【官方说明文档】

条件变量使用过程:

  1. 拥有条件变量的线程获取互斥量。
  2. 循环检查某个条件,如果条件不满足则阻塞直到条件满足;如果条件满足则向下执行。
  3. 某个线程满足条件执行完之后调用notify_one或notify_all唤醒一个或者所有等待线程。

二、成员函数

条件变量提供了两类操作:wait和notify。这两类操作构成了多线程同步的基础。

2.1、wait函数

函数原型:

void wait (unique_lock<mutex>& lck);
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);

包含两种重载,第一种只包含unique_lock对象,另外一个Predicate 对象(等待条件),这里必须使用 unique_lock,因为wait函数的工作原理:

  • 当前线程调用wait()后将被阻塞并且函数会解锁互斥量,直到另外某个线程调用notify_one或者 notify_all唤醒当前线程;一旦当前线程获得通知(notify),wait()函数也是自动调用lock(),同理不 能使用lock_guard对象。
  • 如果wait没有第二个参数,第一次调用默认条件不成立,直接解锁互斥量并阻塞到本行,直到某一 个线程调用notify_one或notify_all为止,被唤醒后,wait重新尝试获取互斥量,如果得不到,线程 会卡在这里,直到获取到互斥量,然后无条件地继续进行后面的操作。
  • 如果wait包含第二个参数,如果第二个参数不满足,那么wait将解锁互斥量并堵塞到本行,直到某 一个线程调用notify_one或notify_all为止,被唤醒后,wait重新尝试获取互斥量,如果得不到,线 程会卡在这里,直到获取到互斥量,然后继续判断第二个参数,如果表达式为false,wait对互斥 量解锁,然后休眠,如果为true,则进行后面的操作。

2.2、wait_for函数

函数原型:

template <class Clock, class Duration>
    cv_status wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time);
template <class Clock, class Duration, class Predicate>
    bool wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time,Predicate pred);

和wait不同的是,wait_for可以执行一个时间段,在线程收到唤醒通知或者时间超时之前,该线程都会 处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for返回,剩下操作和wait类似。

2.3、wait_until函数

函数原型:

template <class Clock, class Duration>
    cv_status wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time);
template <class Clock, class Duration, class Predicate>
    bool wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time,Predicate pred);

与wait_for类似,只是wait_until可以指定一个时间点,在当前线程收到通知或者指定的时间点超时之 前,该线程都会处于阻塞状态。如果超时或者收到唤醒通知,wait_until返回,剩下操作和wait类似 。

2.4、notify_one函数

函数原型:

void notify_one() noexcept;

解锁正在等待当前条件的线程中的一个,如果没有线程在等待,则函数不执行任何操作,如果正在等待的线程多于一个,则唤醒的线程是不确定的。随机唤醒。

2.5、notify_all函数

函数原型:

void notify_all() noexcept;

解锁正在等待当前条件的所有线程,如果没有正在等待的线程,则函数不执行任何操作。

三、使用示例

使用条件变量实现一个同步队列,同步队列作为一个线程安全的数据共享区,经常用于线程之间数据读取。

(sync_queue.h)

#ifndef SYNC_QUEUE_H
#define SYNC_QUEUE_H

#include <list>
#include <mutex>
#include <iostream>
#include <thread>
#include <condition_variable>

template<typename T>

class SyncQueue
{
private:

        std::list<T> _queue;                    //缓冲区
        std::mutex _mutex;                      //互斥量和条件变量结合起来使用
        std::condition_variable_any _notEmpty;  //不为空的条件变量
        std::condition_variable_any _notFull;   //没有满的条件变量
        int _maxSize;                           //同步队列最大的size

        bool IsFull() const
        {
                return _queue.size() ==_maxSize;
        }
        bool IsEmpty() const
        {
                return _queue.empty();
        }
public:
        SyncQueue(int maxSize):_maxSize(maxSize)
        {
        }
        void Put(const T& x)
        {
                std::lock_guard<std::mutex> locker(_mutex);
                while(IsFull())
                {
                        std::cout << "full wait..." << std::endl;
                        _notFull.wait(_mutex);
                }
                _queue.push_back(x);
                _notEmpty.notify_one();
        }
        void Take(T& x)
        {
                std::lock_guard<std::mutex> locker(_mutex);
                while (IsEmpty())
                {
                        std::cout << "empty wait.." << std::endl;
                        _notFull.notify_one();
                        _notEmpty.wait(_mutex);
                }
                x=_queue.front();
                _queue.pop_front();
                _notFull.notify_one();
        }
        bool Empty()
        {
                std::lock_guard<std::mutex> locker(_mutex);
                return _queue.empty();
        }
        bool Full()
        {
                std::lock_guard<std::mutex> locker(_mutex);
                return _queue.size() == _maxSize;
        }
        size_t Size()
        {
                std::lock_guard<std::mutex> locker(_mutex);
                return _queue.size();
        }
        int Count()
        {
                return _queue.size();
        }
};

#endif

(http://main.cc)

#include <iostream>
#include "sync_queue.h"
using namespace std;
SyncQueue<int> syncQueue(5);

void PutDatas()
{
        for (int i = 0; i < 20; ++i)
        {
                syncQueue.Put(888);
        }
        std::cout << "PutDatas finish\n";
}
void TakeDatas()
{
        int x = 0;
        for (int i = 0; i < 20; ++i)
        {
                syncQueue.Take(x);
                std::cout << x << std::endl;
        }
        std::cout << "TakeDatas finish\n";
}
int main(void)
{
        std::thread t1(PutDatas);
        std::thread t2(TakeDatas);
        t1.join();
        t2.join();
        std::cout << "main finish\n";
        return 0;
}

代码中用到了std::lock_guard,它利用RAII机制可以保证安全释放mutex。

std::lock_guard<std::mutex> locker(_mutex);
while(IsFull())
{
      std::cout << "full wait..." << std::endl;
      _notFull.wait(_mutex);
}

可以改为:

std::lock_guard<std::mutex> locker(_mutex);
_notFull.wait(_mutex, [this] {return !IsFull();});

两种写法效果是一样的,但是后者更简洁,条件变量会先检查判断式是否满足条件,如果满足条件则重 新获取mutex,然后结束wait继续往下执行;如果不满足条件则释放mutex,然后将线程置为waiting状 态继续等待。

这里需要注意的是,wait函数中会释放mutex,而lock_guard这时还拥有mutex,它只会在出了作用域 之后才会释放mutex,所以这时它并不会释放,但执行wait时会提前释放mutex。

从语义上看这里使用lock_guard会产生矛盾,但是实际上并不会出问题,因为wait提前释放锁之后会处 于等待状态,在被notify_one或者notify_all唤醒后会先获取mutex,这相当于lock_guard的mutex在 释放之后又获取到了,因此,在出了作用域之后lock_guard自动释放mutex不会有问题。

这里应该用unique_lock,因为unique_lock不像lock_guard一样只能在析构时才释放锁,它可以随时释 放锁,因此在wait时让unique_lock释放锁从语义上更加准确。

使用unique_lock和condition_variable改写为用等待一个判 断式的方法来实现一个简单的队列:

(vim sync_queue2.h)

#ifndef SIMPLE_SYNC_QUEUE_H
#define SIMPLE_SYNC_QUEUE_H
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <iostream>
template<typename T>
class SimpleSyncQueue
{
        public:
                SimpleSyncQueue(){}
                void Put(const T& x)
                {
                        std::lock_guard<std::mutex> locker(_mutex);
                        _queue.push_back(x);
                        _notEmpty.notify_one();
                }
                void Take(T& x)
                {
                        std::unique_lock<std::mutex> locker(_mutex);
                        _notEmpty.wait(locker, [this]{return !_queue.empty(); });
                        x = _queue.front();
                        _queue.pop_front();
                }
                bool Empty()
                {
                        std::lock_guard<std::mutex> locker(_mutex);
                        return _queue.empty();
                }
                size_t Size()
                {
                        std::lock_guard<std::mutex> locker(_mutex);
                        return _queue.size();
                }
        private:
                std::list<T> _queue;
                std::mutex _mutex;
                std::condition_variable _notEmpty;
};
#endif // SIMPLE_SYNC_QUEUE_H

(main.cpp)

#include <iostream>
#include "sync_queue2.h"
using namespace std;
SimpleSyncQueue<int> syncQueue;

void PutDatas()
{
        for (int i = 0; i < 20; ++i)
        {
                syncQueue.Put(888);
        }
        std::cout << "PutDatas finish\n";
}
void TakeDatas()
{
        int x = 0;
        for (int i = 0; i < 20; ++i)
        {
                syncQueue.Take(x);
                std::cout << x << std::endl;
        }
        std::cout << "TakeDatas finish\n";
}
int main(void)
{
        std::thread t1(PutDatas);
        std::thread t2(TakeDatas);
        t1.join();
        t2.join();
        std::cout << "main finish\n";
        return 0;
}

总结

条件变量是一个对象,能够在通知恢复之前阻止调用线程。它使用在调用其等待函数之一时锁定线程。线程将保持阻塞状态,直到被另一个调用同一对象上的通知函数的线程唤醒。

后言

本专栏知识点是通过<零声教育>的系统学习,进行梳理总结写下文章,对c/c++linux系统提升感兴趣的读者,可以点击链接查看详细的服务:C/C++服务器开发

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐