1. 任务简介

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个著名的进程同步问题的经典案例。它描述的是有一组生产者进程在生产产品,并将这些产品提供给一组消费者进程去消费。为使生产者进程和消费者进程能够并发执行,在这两者之间设置里一个具有 n n n个缓冲区的缓冲池,生产者进程将他所生产的的产品放入一个缓冲区中;消费者进程可从一个缓冲区中取走产品并进行消费。尽管所有的生产者进程和消费者进程都是以异步方式运行的,但亡们之间必须保特同步,即不允许消费者进程到一个空缓冲区中去取产品;也不允许生产者进程向一个已装满产品且产品尚未被取走的缓冲区投放产品。

本项目要求利用Linux多进程实现生产者消费者问题。

2. 思路分析

我们分析题目中的同步和互斥关系:

2.1 同步关系

  • 当缓冲区有空位时,生产者进程才可以生产
  • 当缓冲区有产品是,消费者进程才可以消费

2.2 互斥关系

  • 生产者进程与消费者进程对缓冲区的访问是互斥的

在这里插入图片描述

2.3 整体思路

总体思路如下:

  • 设置一个生产者进程,负责生产产品
  • 设置一个消费这进程,负责消费产品
  • 生产者与消费者进程间的通讯通过共享内存实现
  • 设置一个互斥信号量,实现对共享内存的互斥访问
  • 设置两个信号量,用于标记资源的数目,实现进程间的两个同步关系

具体流程如下图所示:

在这里插入图片描述

3. 代码实现

3.1 头文件

首先,我们包含实现问题所需的头文件:

#include <time.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/ipc.h>

3.2 预定义和数据结构

  • 我们采用共同协商关键字SEMKEYSHMKEY的方法使得不同进程间可以取得同一个信号量和共享内存
  • 定义了一个结构体Buffer来作为缓冲池存储产品
#define SEMKEY 123
#define SHMKEY 456
#define BUFNUM 10
#define SEMNUM 3

#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/*   union   semun   is   defined   by   including   <sys/sem.h>   */ 
#else 
/*   according   to   X/OPEN   we   have   to   define   it   ourselves   */ 
union semun
{
	int val;
	struct semid_ds *buf;
	unsigned short *array;
};
#endif

struct Buffer
{
    int start, end;
    char buffer[BUFNUM];
};

3.3 初始化函数

  • 我们利用协商好的SEMKEY生成一个信号量集,其中第一个信号量为empty,表示缓冲池为空的个数;第二个信号量为full,表示缓冲池中的产品个数;第三个信号量为mutex,控制对缓冲池的读取权限。
  • 利用协商好的SHMKEY生成一个共享内存集
  • 我们利用*returnSemId*returnShmId**returnShm三个指针来返回初始化的参数

具体实现如下:

void Initialize(int *returnSemId, int *returnShmId, struct Buffer **returnShm)
{
    int semId = -1, shmId = -1, values[SEMNUM] = {BUFNUM, 0, 1};

    /*  semSet[0]: empty, initial value: n
        semSet[1]: full, initial value 0
        semSet[2]: mutex, initial value 1   */

    semId = semget(SEMKEY, SEMNUM, IPC_CREAT | 0666);
    if(semId == -1)
    {
        printf("semaphore creation failed!\n");
        exit(EXIT_FAILURE);
    }

    int i = 0;
    union semun semUn;
    for( i = 0; i < SEMNUM; i ++)
    {
        semUn.val = values[i];
        if(semctl(semId, i, SETVAL, semUn) < 0)
        {
            printf("semaphore %d initialization failed!\n", i);
            exit(EXIT_FAILURE);
        }
    }

    shmId = shmget(SHMKEY, sizeof(struct Buffer), IPC_CREAT | 0666);
    if(shmId == -1)
    {
        printf("share memory creation failed!\n");
        exit(EXIT_FAILURE);
    }

    void *temp = NULL;
    struct Buffer *shm = NULL;
    temp = shmat(shmId, 0, 0);
    if(temp == (void *) -1)
    {
        printf("share memory attachment failed!\n");
        exit(EXIT_FAILURE);        
    }
    shm = (struct Buffer *) temp;

    shm -> start = 0;
    shm -> end = 0;
    for(i = 0; i < BUFNUM; i++)
    {
        shm -> buffer[i] = ' ';
    }

    *returnSemId = semId;
    *returnShmId = shmId;
    *returnShm = shm;
}

3.4 PV操作

给定信号量集的semId以及待操作的信号量下标semNum,其P操作和V如下所示:

void SemWait(int semId, int semNum)
{
    struct sembuf semBuf;
    semBuf.sem_num = semNum;
    semBuf.sem_op = -1;
    semBuf.sem_flg = SEM_UNDO;
    if(semop(semId, &semBuf, 1) == -1)
    {
        printf("semaphore P operation failed!\n");
        exit(EXIT_FAILURE);
    }
}

void SemSignal(int semId, int semNum)
{
    struct sembuf semBuf;
    semBuf.sem_num = semNum;
    semBuf.sem_op = 1;
    semBuf.sem_flg = SEM_UNDO;
    if(semop(semId, &semBuf, 1) == -1)
    {
        printf("semaphore V operation failed!\n");
        exit(EXIT_FAILURE);
    }
}

3.5 生产者进程

生产者首先申请一个空闲缓冲区资源,再申请临界缓冲区访问。当产生一个产品后,发送一个信号,使得已有缓冲区资源数量加一,同时唤醒阻塞的消费者进程,具体代码如下:

void Producer(int semId, struct Buffer *shm)
{
    do{
        // wait empty region
        SemWait(semId, 0);
        // wait mutex
        SemWait(semId, 2);

        Add(shm);

        // signal mutex
        SemSignal(semId, 2);
        // singal full region
        SemSignal(semId, 1);

        sleep(random() % 2);

    }while(1);
}

执行Add操作时,随机产生一个大写英文字母模拟产品,放入缓冲区,同时调整队尾指针end,具体代码如下:

void Add(struct Buffer *shm)
{
    char product = 'A' + rand() % 26;
    printf("producer %d: added product %c into buffer:\t", getpid(), product);
    shm -> buffer [shm -> end] = product;
    shm -> end = (shm -> end + 1) % BUFNUM;
    printf("|%s|\n", shm -> buffer);
}

3.6 消费者进程

消费者首先申请一个已有缓冲区资源,再申请临界缓冲区访问。当消费一个产品后,发送一个信号,使得空闲缓冲区资源数量加一,同时唤醒阻塞的生产者进程,具体代码如下:

void Producer(int semId, struct Buffer *shm)
{
    do{
        // wait empty region
        SemWait(semId, 0);
        // wait mutex
        SemWait(semId, 2);

        Add(shm);

        // signal mutex
        SemSignal(semId, 2);
        // singal full region
        SemSignal(semId, 1);

        sleep(random() % 2);
        
    }while(1);
}

执行Remove操作时,将当前缓冲区资源清空,同时调整队首指针start,具体代码如下:

void Remove(struct Buffer *shm)
{
    char product = shm -> buffer [shm -> start];
    printf("consumer %d: removed product %c from buffer:\t", getpid(), product);
    shm -> buffer [shm -> start] = ' ';
    shm -> start = (shm -> start + 1) % BUFNUM;
    printf("|%s|\n", shm -> buffer);
}

3.7 主函数

从控制台通过-n命令读入产生生产者和消费者进程的数目,首先初始化变量,之后通过fork()产生等量的生产者和消费者进程。

注意:此处主进程在产生完其他子进程之后不能够直接退出,否则子进程会修改父进程为systemd,试的我们无法通过控制台的ctrl + c命令结束程序。

int main(int argc, char *argv[])
{
    int semId = -1, shmId = -1, i=0;
    int processNum = atoi(argv[2]);
    if(processNum <= 0) processNum = 1;
    struct Buffer *shm = NULL;

    Initialize(&semId, &shmId, &shm);
    for(i = 0; i < 2 * processNum; i ++)
    {
        pid_t pid = fork();
        if(pid < 0)
        {
            printf("fork failed!\n");
            exit(EXIT_FAILURE);
        }
        else if(pid == 0)
        {
            sleep(1);
            if(i % 2 == 0)
            {
                printf("producer process %d created\n", getpid());
                Producer(semId, shm);            
            }
            else
            {
                printf("consumer process %d created\n", getpid());
                Consumer(semId, shm);
            }
            return 0;
        }
    }
    getchar();
    Destroy(semId, shmId, shm);
    return 0;
}

3.8 实验代码

完整实验代码如下:

#include <time.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/ipc.h>


#define SEMKEY 123
#define SHMKEY 456
#define BUFNUM 10
#define SEMNUM 3

#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/*   union   semun   is   defined   by   including   <sys/sem.h>   */ 
#else 
/*   according   to   X/OPEN   we   have   to   define   it   ourselves   */ 
union semun
{
	int val;
	struct semid_ds *buf;
	unsigned short *array;
};
#endif

struct Buffer
{
    int start, end;
    char buffer[BUFNUM];
};

void Initialize(int *returnSemId, int *returnShmId, struct Buffer **returnShm)
{
    int semId = -1, shmId = -1, values[SEMNUM] = {BUFNUM, 0, 1};

    /*  semSet[0]: empty, initial value: n
        semSet[1]: full, initial value 0
        semSet[2]: mutex, initial value 1   */

    semId = semget(SEMKEY, SEMNUM, IPC_CREAT | 0666);
    if(semId == -1)
    {
        printf("semaphore creation failed!\n");
        exit(EXIT_FAILURE);
    }

    int i = 0;
    union semun semUn;
    for(i = 0; i < SEMNUM; i ++)
    {
        semUn.val = values[i];
        if(semctl(semId, i, SETVAL, semUn) < 0)
        {
            printf("semaphore %d initialization failed!\n", i);
            exit(EXIT_FAILURE);
        }
    }

    shmId = shmget(SHMKEY, sizeof(struct Buffer), IPC_CREAT | 0666);
    if(shmId == -1)
    {
        printf("share memory creation failed!\n");
        exit(EXIT_FAILURE);
    }

    void *temp = NULL;
    struct Buffer *shm = NULL;
    temp = shmat(shmId, 0, 0);
    if(temp == (void *) -1)
    {
        printf("share memory attachment failed!\n");
        exit(EXIT_FAILURE);        
    }
    shm = (struct Buffer *) temp;

    shm -> start = 0;
    shm -> end = 0;
    for(i = 0; i < BUFNUM; i++)
    {
        shm -> buffer[i] = ' ';
    }

    *returnSemId = semId;
    *returnShmId = shmId;
    *returnShm = shm;
}

void Add(struct Buffer *shm)
{
    char product = 'A' + rand() % 26;
    printf("producer %d: added product %c into buffer:\t", getpid(), product);
    shm -> buffer [shm -> end] = product;
    shm -> end = (shm -> end + 1) % BUFNUM;
    printf("|%s|\n", shm -> buffer);
}

void Remove(struct Buffer *shm)
{
    char product = shm -> buffer [shm -> start];
    printf("consumer %d: removed product %c from buffer:\t", getpid(), product);
    shm -> buffer [shm -> start] = ' ';
    shm -> start = (shm -> start + 1) % BUFNUM;
    printf("|%s|\n", shm -> buffer);
}

void ShmDestroy(int semId, struct Buffer * shm)
{
    if(shmdt(shm) < 0)
    {
        printf("share memory detachment failed!\n");
        exit(EXIT_FAILURE);
    } 
    if(shmctl(semId, IPC_RMID, 0) < 0)
    {
        printf("share memory destruction failed!\n");
        exit(EXIT_FAILURE);        
    }
}

void SemWait(int semId, int semNum)
{
    struct sembuf semBuf;
    semBuf.sem_num = semNum;
    semBuf.sem_op = -1;
    semBuf.sem_flg = SEM_UNDO;
    if(semop(semId, &semBuf, 1) == -1)
    {
        printf("semaphore P operation failed!\n");
        exit(EXIT_FAILURE);
    }
}

void SemSignal(int semId, int semNum)
{
    struct sembuf semBuf;
    semBuf.sem_num = semNum;
    semBuf.sem_op = 1;
    semBuf.sem_flg = SEM_UNDO;
    if(semop(semId, &semBuf, 1) == -1)
    {
        printf("semaphore V operation failed!\n");
        exit(EXIT_FAILURE);
    }
}

void SemDestroy(int semId)
{
    union semun semUn;
    if(semctl(semId, 0, IPC_RMID, semUn) < 0)
    {
        printf("semaphore destruction failed!\n");
        exit(EXIT_FAILURE);
    }
}

void Destroy(int semId, int shmId, struct Buffer *shm)
{
    SemDestroy(semId);
    ShmDestroy(shmId, shm);
    printf("destruction finished! exit\n");
}

void Producer(int semId, struct Buffer *shm)
{
    do{
        // wait empty region
        SemWait(semId, 0);
        // wait mutex
        SemWait(semId, 2);

        Add(shm);

        // signal mutex
        SemSignal(semId, 2);
        // singal full region
        SemSignal(semId, 1);

        sleep(random() % 2);

    }while(1);
}

void Consumer(int semId, struct Buffer *shm)
{
    do{
        // wait full region
        SemWait(semId, 1);
        // wait mutex
        SemWait(semId, 2);

        Remove(shm);

        // signal mutex
        SemSignal(semId, 2);
        // singal empty region
        SemSignal(semId, 0);

        sleep(random() % 2);

    }while(1);
}

int main(int argc, char *argv[])
{
    int semId = -1, shmId = -1, i=0;
    int processNum = atoi(argv[2]);
    if(processNum <= 0) processNum = 1;
    struct Buffer *shm = NULL;

    Initialize(&semId, &shmId, &shm);
    for(i = 0; i < 2 * processNum; i ++)
    {
        pid_t pid = fork();
        if(pid < 0)
        {
            printf("fork failed!\n");
            exit(EXIT_FAILURE);
        }
        else if(pid == 0)
        {
            sleep(1);
            if(i % 2 == 0)
            {
                printf("producer process %d created\n", getpid());
                Producer(semId, shm);            
            }
            else
            {
                printf("consumer process %d created\n", getpid());
                Consumer(semId, shm);
            }
            return 0;
        }
    }
    getchar();
    Destroy(semId, shmId, shm);
    return 0;
}

4. 实验结果

我们通过gcc编译器编译源程序producer_consumer.c,生成目标文件producer_consumer

在这里插入图片描述

4.1 单个生产者消费者

我们从控制台输入命令$ ./producer_consumer -n 1,来模拟一个生产者和一个消费者的情况:

在这里插入图片描述

我们可以清楚的看到一个生产者一个消费者进程存在时的状况。

4.2 多个生产者消费者

同理,我们从控制台输入命令$ ./producer_consumer -n 5,来模拟多个生产者和多个消费者的情况,实验结果节选片段如下:

在这里插入图片描述

我们可以很轻易的通过上图所示的缓冲池可视化结果,验证我们程序的正确性,至此实验部分介绍完毕。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐