前言: 本节内容讲述linux下的线程的信号量, 我们在之前进程间通信那里学习过一部分信号量, 但是那个是systemV版本的信号量,是以进程间通信的视角谈的。 但是本篇内容会以线程的视角谈一谈信号量。 

        ps:本篇内容建议学习了生产者消费者模型的友友们进行观看哦。

目录

信号量

信号量接口

初始化信号量

环形队列的生产消费模型

创建文件

RingQueue.h

main.cpp

多生产和多消费的循环队列

RingQueue

main.cpp


 

信号量

         我们之前在快速写一个生产者消费者模型的时候, 我们的交易场所设置的一个队列。 当时我们的queue当作整体使用的, 但是queue只有一份, 所以我们加锁保证了这一份资源的安全性 但是, 我们也要认识到, 共享资源可以被看到多份。 就比如我们今天有一个全局的数组, 有三个线程。 假如数组一共有300个元素,这个数组分为左中右三份。 中间的线程只能访问数组的中间的一份。 左边线程访问左边一份, 右边线程访问右边一份。 因为我们线程各自访问不同的区域, 所以多线程可以同时访问这个数组吗?答案是可以的。

        但是如果是四个线程呢? 我们只有三份资源, 但是要给四个线程使用。 所以呢, 我们就只能放进来三个线程来访问这三个资源。 那么我们的共享资源为了保证并发度, 那么就保持了分成了几份资源, 那么就允许多少个线程进来并发执行。 所以, 为了保护我们的临界资源, 就引入了信号量。 而理解信号量的切入点就是:共享资源也可以被看成多份。

        信号量, 也叫做信号灯, 这把计数器用来描述临界资源中资源数目是多少。 就比如电影院, 里面有很多很多的座位。 假如有100个座位。 我们想要抢占其中的一个座位, 就要提前买一下票!所以, 买票的本质就是对电影院座位资源的预定机制。 那么我们可不可以使用一个变量替换掉这个计数器呢? 答案是不可以, 因为普通变量的++或者--其实不是原子的 多线程并发访问时就会出现问题!!!所以, 我们就要使用一个支持pv操作的原子的计数器——信号量.

        问题是, 信号量的本质就是一把计数器, 而计数器的本质是什么呢? 本质就是临界资源的数量。 所以, 一旦我们用p操作, 我们p操作之后,还用判断资源是就绪的吗? 答案是不需要, 因为只要申请成功了, 就一定有你的。 申请不成功的, 就要去信号量下面去等待了, 所以p操作当中, p操作只要成功, 那么就不需要判断资源就没就绪!所以, 这把计数器的本质是什么? 我们说是用来描述资源数目的, 把资源是否就绪放在了临界区之外申请信号量时, 其实就间接的已经在做判断了!!!

  •         我们之前使用的互斥锁, 其实就可以理解为一个二元信号量, 这个信号量只能为零为一。 一个线程拿到锁后, 只要不释放, 其他的线程无法再次拿到锁!!!

信号量接口

初始化信号量

        第一个参数是信号量对象的地址。 第二个参数代表表示的是线程共享还是进程共享。默认为零,是线程共享。 非零表示进程共享。 第三个参数就是设定的信号量的初始值。

        destroy表示的是清空信号量。         

        sem_wait功能是等待信号量, 将信号量的值减一。 也就是P操作

        sem_post的功能是发布信号量,表示资源使用完毕,该归还资源了, 将信号量加一。也就是V操作。

环形队列的生产消费模型

        我们学习环形队列的生产消费模型的目的是为了理解我们的信号量,以及熟悉一下我们信号量的使用。 其实这里的环形队列就类似于我们的循环队列。 上面的head就是放一个数据就向前走一格子,放一个数据就向前走一个格子。 但是和循环队列不同的是当我们放满的时候两个指针还是指向同一个位置, 因为我们有信号量计数器, 所以不利用指针的指向位置判断空和满!

        我们这里以单生产和单消费为例(因为多生产多消费要复杂, 先实现单生产单消费的代码再来考虑多生产和多消费):

 

        生产和消费只要没有访问同一个位置, 我们就能让他们同时是生产和消费。 也就是说, 我们生产和消费必须遵守三个原则:

  •         1、指向同一个位置的时候, 不能同时访问。
  •         2、消费者不能超过生产者。
  •         3、生产者不能超过消费者一个圈。

那么, 当生产者和消费者在这个追逐的过程中, 什么时候才会指向同一个位置呢? 其实就是生产者生产满了的时候。——》

  •         不空和不满的时候, 指向的一定是不同的位置, 两者可以同时访问!!
  •         为空的时候:只能生产者访问!!
  •         为满的时候:只能消费者访问!!
  •         生产者关注什么资源呢?——还有多少剩余空间!
  •         消费者关注什么资源呢?——还有多少剩余数据!

        在定义信号量的时候,我们可以定义两把信号量, 一个叫做SpaceSem,一个叫做DataSem。 最开始的时候, SpaceSem = MAX DataSem = 0;然后对于生产者来说, 要P(SpaceSem), 就代表生产者生产数据让SpaceSem减一,同时V(DataSem)。 对于消费者来说就P(DataSem), 就代表消费者消耗掉一个数据。

        但是, 当我们的队列为空的时候, 一定是生产者先执行, 然后消费者再来消费。 当生产者生产到SpaceSem为零的时候, 生产者就不能再消费了。下面开始实现代码:

创建文件

        先创建好三个文件, 一个makefile, 一个RingQueue.h用来实现环形队列这个交易场所。然后main.cpp用来实现生产者消费者线程以及生产动作和消费动作

RingQueue.h

        首先我们思考一下我们定义的这个生产消费模型里面要有什么——首先一定要有一个环形队列, 这里我们用vector来模拟。 然后我们要规定环形队列的大小, 所以要有一个变量来规定这个大小, 我们这里定义一个cap_变量。 然后要有一个消费者下标, 一个生产者下标来表示生产者和消费者生产或消费的位置。 最后还要定义两把计数器(信号量)让生产者消费者来预定临界资源。所以代码如下:

#include<iostream>
using namespace std;
#include<vector>
#include<pthread.h>
#include<semaphore.h>


const static int defaultcap = 5; //用来初始化cap_


template<class T>
class RingQueue
{
public:

private:
    vector<T> ringqueue_;
    int cap_;   //队列的容量大小

    int c_step_;    //消费者下标
    int p_step_;    //生产者下标

    //注意, 信号量不能保证生产者之间的互斥, 也不能保证消费者之间的互斥。  
    sem_t cdata_sem_;   //消费者关注的数据资源
    sem_t pspace_sem_;  //生产者关注的空间资源



};

        环形生产消费模型里面有什么方法呢? 首先, 一定要有构造和析构。 然后构造就是对vector, 计数器来进行初始化。 析构同理。 所以代码如下:

#include<iostream>
using namespace std;
#include<vector>
#include<pthread.h>
#include<semaphore.h>


const static int defaultcap = 5;


template<class T>
class RingQueue
{
public:
    RingQueue(int cap = defaultcap)
        :cap_(cap)
        , ringqueue_(cap)
        , c_step_(0)
        , p_step_(0)
    {
        sem_init(&cdata_sem_, 0, 0); 
        sem_init(&pspace_sem_, 0, cap);
    }

    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);
    }
private:
    vector<T> ringqueue_;
    int cap_;   //队列的容量大小

    int c_step_;    //消费者下标
    int p_step_;    //生产者下标

    //注意, 信号量不能保证生产者之间的互斥, 也不能保证消费者之间的互斥。  
    sem_t cdata_sem_;   //消费者关注的数据资源
    sem_t pspace_sem_;  //生产者关注的空间资源

};

         然后还有两个方法就是pop和push, push用来向交易场所中加入数据, pop用来从交易场所中拿数据:


    void Push(const T& in)
    {
        //生产数据先要申请信号量空资源
        P(&pspace_sem_);

        ringqueue_[p_step_] = in;
        //维持环形特征
        p_step_++;
        p_step_ %= cap_;

        V(&cdata_sem_);
    }

    //在为空和为满的时候就表现出了局部性的互斥特征!!为空的时候,生产者先运行;为满的时候, 消费者先运行——》这不就是生产和消费具有一定的顺序性, 这是局部性的同步。
    //如果不为空并且不为满,那么我们对应的step两个下标的值一定是不一样的。 两个就叫做并发运行!!!和之前将讲解的不太一样, 
    void Pop(T* out)  //利用指针将数据从队列里面拿出来
    {
        P(&cdata_sem_);


        *out = ringqueue_[c_step_];
        c_step_++;
        c_step_ %= cap_;

        V(&pspace_sem_);
    }

        这里我们需要思考一下, 首先在为空和为满的时候, 是不是这个时候只能消费者消费, 或者生产者进行生产? 而这,不就是表现出了局部性的互斥特征!!然后为空的时候,生产者先运行;为满的时候, 消费者先运行——》这不就是生产和消费具有一定的顺序性, 这是局部性的同步。        

        然后如果不为空并且不为满,那么我们对应的step两个下标的值一定是不一样的。 两个就叫做并发运行!!!        

main.cpp

        主函数比较简单, 分为三个板块。 一个板块是主函数, 用来创建线程, 以及交易场所。 然后第二三个板块用来定义生产者和消费者线程需要执行的代码。 代码如下:
 

#include"RingQueue.h"

#include<unistd.h>
#include<ctime>
#include<iostream>
using namespace std;



void* Productor(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
    
    while (true)
    {
        //获取数据
        int data = rand() % 10 + 1;

        //生产数据
        rq->Push(data);
        cout << "Productor data done, data is: " << data << endl;
    }

    return nullptr;
}

void* Consumer(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);

    while (true)
    {
        sleep(1);//这里我们的消费者是一秒消费一次。 要知道, 我们的线程跑的是很快的, 所以
//运行后其实生产者一瞬间就能把交易场所打满, 然后就是一秒消费一次, 生产一次!!
        //消费数据
        int data = 0;
        rq->Pop(&data); 

        //处理数据
        cout << "Comsumer data done, data is: " << data << endl; 


    }
    return nullptr;
}


int main()


    //常见循环队列
    RingQueue<int>* rq = new RingQueue<int>();


    //创建线程
    pthread_t c, p;
    
    //运行线程
    pthread_create(&c, nullptr, Productor, rq);
    pthread_create(&p, nullptr, Consumer, rq);

    //等待线程
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);


    //销毁循环队列
    delete rq;

    return 0;
}

运行结果:

多生产和多消费的循环队列

        多生产和多消费要维护它们的互斥关系, 消费者和消费者之间也要维护它们的互斥关系。 如何变成支持多生产和多消费的呢? 答案是加锁!现在我们已经有了生产者和消费者之间的互斥关系。 那么我们只需要再利用锁将生产者之间, 以及消费者之间建立起互斥关系, 就能满足三种关系相互互斥, 就能满足资源的安全。 

RingQueue

#include<iostream>
using namespace std;
#include<vector>
#include<pthread.h>
#include<semaphore.h>


const static int defaultcap = 5;


template<class T>
class RingQueue
{
private:
    void P(sem_t* sem)
    {
        sem_wait(sem);
    }

    void V(sem_t* sem)
    {
        sem_post(sem);
    }

//第一个改动是封装了锁的加锁和解锁

    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }

public:
    RingQueue(int cap = defaultcap)
        :cap_(cap)
        , ringqueue_(cap)
        , c_step_(0)
        , p_step_(0)
    {
        sem_init(&cdata_sem_, 0, 0); 
        sem_init(&pspace_sem_, 0, cap);
    }

    void Push(const T& in)
    {
        //生产数据先要申请信号量空资源
        P(&pspace_sem_);
        Lock(c_mutex_); //加锁在申请信号量之后比较好

        ringqueue_[p_step_] = in;
        //维持环形特征
        p_step_++;
        p_step_ %= cap_;

        Unlock(c_mutex_);
        V(&cdata_sem_);
    }

    //在为空和为满的时候就表现出了局部性的互斥特征!!为空的时候,生产者先运行;为满的时候, 消费者先运行——》这不就是生产和消费具有一定的顺序性, 这是局部性的同步。
    //如果不为空并且不为满,那么我们对应的step两个下标的值一定是不一样的。 两个就叫做并发运行!!!和之前将讲解的不太一样, 
    void Pop(T* out)  //利用指针将数据从队列里面拿出来
    {
        P(&cdata_sem_);
        Lock(p_mutex_);

        *out = ringqueue_[c_step_];
        c_step_++;
        c_step_ %= cap_;

        Unlock(p_mutex_);
        V(&pspace_sem_);
    }


    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);
    }
private:
    vector<T> ringqueue_;
    int cap_;   //队列的容量大小

    int c_step_;    //消费者下标
    int p_step_;    //生产者下标

    //注意, 信号量不能保证生产者之间的互斥, 也不能保证消费者之间的互斥。  
    sem_t cdata_sem_;   //消费者关注的数据资源
    sem_t pspace_sem_;  //生产者关注的空间资源


    //定义两把锁
    pthread_mutex_t c_mutex_;
    pthread_mutex_t p_mutex_;

};

上面有三个地方改动, 一个是定义了两把锁变量。 第二个就是封装了加锁和解锁的方法。 第三个就是将push和pop里面申请信号量之后的代码加锁变成了临界区。 这里有一个要思考的点就是为什么加锁要在申请信号量之后? 我们可以这样想, 如果我们加锁在信号量之前, 那么我们的所有的生产者之间在执行push的时候就只能串行执行push。 但是如果我们加锁在信号量之后, 我们的生产者之间就能只串行申请信号量或者串行后面的代码。 就可以令申请信号量和执行后面的代码并行起来!!!所以效率就会提高!!!

main.cpp

主函数改动不大, 就是利用了for循环创建线程:

#include"RingQueue.h"

#include<unistd.h>
#include<ctime>
#include<iostream>
using namespace std;

void* Productor(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
    
    while (true)
    {
        //获取数据
        int data = rand() % 10 + 1;

        //生产数据
        rq->Push(data);
        cout << "Productor data done, data is: " << data << endl;
    }

    return nullptr;
}

void* Consumer(void* args)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);

    while (true)
    {
        sleep(1);
        //消费数据
        int data = 0;
        rq->Pop(&data); 

        //处理数据
        cout << "Comsumer data done, data is: " << data << endl; 
    }
    return nullptr;
}

int main()
{
    //多生产和多生产之间要维护它们的互斥关系, 消费者和消费者之间也要维护它们的互斥关系。 如何变成支持多生产和多消费呢?
    //答案是加锁, 现在我们已经有了生产者和消费者之间的互斥关系。 那么我们只需要再利用锁将生产者之间,以及消费者之间建立起
    //互斥关系, 就能满足三种关系相互互斥, 就能满足资源的安全

    //常见循环队列
    RingQueue<int>* rq = new RingQueue<int>();

    //创建线程
    pthread_t c[3], p[5];
    
    //运行线程
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, Productor, rq);
        sleep(1);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(p + i, nullptr, Consumer, rq);
        sleep(1);
    }
    
    for (int i = 0; i < 3; i++)
    {
        //等待线程
        pthread_join(c[i], nullptr);
        sleep(1);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
        sleep(1);
    }

    //销毁循环队列
    delete rq;

    return 0;
}

然后看一下运行结果(注意, 只看运行结果是看不出来的, 所以这里我们使用了一下监视脚本)

我们可以看右边就会看到我们的线程每一秒就会多一个!!这就是我们的多生产和多消费的情况。 

  ——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!   

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐