【Linux系统编程】第四十六弹---线程同步与生产消费模型深度解析
Linux线程同步:同步概念与竞态条件,条件变量;生产消费模型:为何要使用生产消费模型,生产者消费者模型优点,编写生产消费模型;测试生产消费模型~~~
✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】
目录
1、Linux线程同步
在上一弹的上锁抢票代码中我们可以看到,会有很长一段时间使用的是同一个线程,这样的方式没有错,但是不合理,怎么解决这个问题呢?
先通过一个实际情况分析此问题,再解决该问题。
假设学校有一个VIP自习室,一次只允许一个人进来,进入自习室需要用到门口的一把锁。
- 有一个uu今天想去里面自习,就早早5点起床去了VIP自习室,但是他又想,竟然来了就多学习一会,此时外面也有人想进来自习,但是没有钥匙只能在外面等
- 此时这个uu已经学了一上午了,很饿了,想去吃饭,走到门口,刚放回钥匙,又后悔了,如果现在还钥匙了,后面就不能进自习室了,因此这个uu又拿了钥匙进入了自习室(因为uu离钥匙比较近,因此还是他先拿到钥匙)
结论:其他人长时间无法进入自习室 --- 无法获取临界资源 -- 导致饥饿问题!!!
因此我们可以修改规则,让进入自习室更公平!
每一个同学归还钥匙后:
1、不能立马申请
2、第二次申请,必须排队(换句话说,其他人也得排队)
1.1、同步概念与竞态条件
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
1.2、条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
1.2.1、认识条件变量接口
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 全局或者静态只需初始化
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所以线程
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒一个线程
1.2.2、举例子认识条件变量
假设有两个人,一个盘子,一个人放苹果到盘子里,另一个人从盘子里取苹果(前提是有苹果,因此需要先检查是否有苹果),但是互相都不知道什么时候放和取苹果,因此只能一次次的去尝试,是够放好,是否被取,但是这样会导致一个问题,如果一个人不放,那么另一个会一直去检查盘子里有没有苹果,这样就太浪费(线程)资源了,我们可以改进一下策略!!!
优化
我们可以再加一个铃铛,当取苹果的时候,如果盘子里面还没有苹果,那么就可以在铃铛处等待,等另一个人放了苹果了,就来铃铛处通知,这样两个人就能高效利用资源了!!
铃铛就是我们讲解的条件变量:
1.需要一个线程队列
2.需要有通知机制
- 全部叫醒
- 叫醒一个
1.2.3、测试代码
新线程等待函数
const int num = 5;
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
void* Wait(void* args)
{
std::string name = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&gmutex);
pthread_cond_wait(&gcond,&gmutex);
usleep(10000);
std::cout << "I am " << name << std::endl;
pthread_mutex_unlock(&gmutex);
}
}
主函数
int main()
{
// 1.创建保存线程tid的数组
pthread_t threads[num];
for(int i=0;i<num;i++)
{
char* name = new char[1024];
snprintf(name,1024,"thread-%d",i + 1);
pthread_create(threads + i,nullptr,Wait,(void*)name);
usleep(1000);
}
sleep(1);
// 2.唤醒其他线程
while(true)
{
// pthread_cond_signal(&gcond); // 唤醒一个线程
pthread_cond_broadcast(&gcond); // 唤醒所有线程
std::cout << "唤醒一个线程..." << std::endl;
sleep(2);
}
// 3.终止线程
for(int i=0;i<num;i++)
{
pthread_join(threads[i],nullptr);
}
return 0;
}
运行结果
2、生产消费模型
2.1、为何要使用生产消费模型
- 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯。
- 所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区(一段内存空间),平衡了生产者和消费者的处理能力。
- 这个阻塞队列就是用来给生产者和消费者解耦的。
2.2、生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
思考切入点:"321"原则
- 1、一个交易场所(特定数据结构形式存在的一段内存空间)
- 2、两种角色(生产角色 消费角色)生产线程,消费线程
- 3、三种关系(生产和生产[互斥] 消费和消费[互斥] 生产和消费[同步和互斥])
实现生产消费模型,本质就是通过代码实现321原则,用锁和条件变量(或者其他方式)来实现三种关系!!!
2.3、编写生产消费模型
BlockingQueue
- 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
2.3.1、BlockQueue类基本结构
此处的类设计成模板形式,让结构更加灵活!!!
template<typename T>
class BlockQueue
{
private:
bool IsFull();
bool IsEmpty();
public:
BlockQueue(int cap = defaultcap);
// 消费者出队列
void Pop(T* out);
// 生产者入队列
void Equeue(const T& in);
~BlockQueue();
private:
std::queue<T> _block_queue; // 临界资源
int _max_cap;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 生产着条件变量
pthread_cond_t _c_cond; // 消费者条件变量
};
2.3.2、构造析构函数
构造函数用于初始化最大容量和初始化锁以及条件变量,析构函数用于释放锁和条件变量!
// 构造
BlockQueue(int cap = defaultcap) :_max_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
}
// 析构
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
2.3.3、判空判满函数
判断是否为空即判断队列是否为空即可,判断是否未满即判断队列成员个数是否与最大容量相等!!
// 判满
bool IsFull()
{
return _block_queue.size() == _max_cap;
}
// 判空
bool IsEmpty()
{
return _block_queue.empty();
}
2.3.4、生产者入队
入队是将数据插入到队尾中,可能出现数据不一致问题,因此需要加锁和条件变量,如果满了则需要等待,不为满则需要插入数据,并唤醒消费者!!!
// 生产者入队列
void Equeue(const T& in)
{
pthread_mutex_lock(&_mutex); // 上锁
while(IsFull())
{
// 满了,生产着不能生产,必须等待
// 可是在临界区里面!pthread_cond_wait
// 被调用的时候,除了让自己排队等待,还会自己释放传入的锁
// 函数返回的时候,不就还在临界区了?
// 返回时:必须参与锁的竞争,重新加上锁才能返回
pthread_cond_wait(&_p_cond,&_mutex);
}
// 1.没有满 || 2.被唤醒了
_block_queue.push(in); // 生产到阻塞队列
pthread_mutex_unlock(&_mutex); // 解锁
pthread_cond_signal(&_c_cond); // 唤醒消费者,解锁前解锁后均可
}
2.3.5、消费者出队
出队即删除队头数据,并获取队头的数据,为空则需要等待,不为空则可以删除队头数据,并唤醒生产者!!!
// 消费者出队列
void Pop(T* out)
{
pthread_mutex_lock(&_mutex);
// 为空,消费者不能消费,必须等待
while(IsEmpty())
{
// 添加尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒!
pthread_cond_wait(&_c_cond,&_mutex);
}
// 1.没有空 || 2.被唤醒
*out = _block_queue.front(); // 输出型参数
_block_queue.pop();
pthread_mutex_unlock(&_mutex);
// 唤醒生产着生产
pthread_cond_signal(&_p_cond);
}
2.4、测试生产消费模型
2.4.1、内置类型
Consumer
void* Consumer(void* args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
// 1.获取数据
int t;
bq->Pop(&t);
// 2.处理数据
std::cout << "Consumer->" << t << std::endl;
}
}
Productor
void* Productor(void* args)
{
srand(time(nullptr) ^ getpid());
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
// 1.构建数据/任务
int x = rand() % 10 + 1; // [1,10]
sleep(1); // 1秒生产一个数据
// 2.生产数据
bq->Equeue(x);
std::cout << "Productor->" << x << std::endl;
}
}
主函数
int main()
{
BlockQueue<int>* bq = new BlockQueue<int>();
// 单生产 单消费
pthread_t c,p;
// 创建线程
pthread_create(&c,nullptr,Consumer,bq);
pthread_create(&p,nullptr,Productor,bq);
// 终止线程
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
运行结果
2.4.2、类类型
Task类
设计一个加法的Task类,内部封装仿函数,测试函数!!!
class Task
{
public:
Task()
{}
// 带参构造
Task(int x, int y) : _x(x), _y(y)
{}
// 仿函数,直接使用()访问Excute函数
void operator()()
{
Excute();
}
void Excute()
{
_result = _x + _y;
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
Consumer
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1.获取数据
Task t;
bq->Pop(&t);
// 2.处理数据
// t.Excute();
t(); // 使用仿函数
std::cout << "Consumer->" << t.result() << std::endl;
}
}
Productor
void *Productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1.构建数据/任务
int x = rand() % 10 + 1; // [1,10]
usleep(1000); // 尽量保证随机数不同
int y = rand() % 10 + 1;
Task t(x,y);
// 2.生产数据
bq->Equeue(t);
std::cout << "Productor->" << t.debug() << std::endl;
sleep(1);
}
}
主函数
int main()
{
BlockQueue<Task> *bq = new BlockQueue<Task>();
// 单生产 单消费
pthread_t c, p;
// 创建线程
pthread_create(&c, nullptr, Consumer, bq);
pthread_create(&p, nullptr, Productor, bq);
// 终止线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
运行结果
2.4.3、函数类型
函数与声明与实现
// typedef std::function<void()> task_t;
using task_t = std::function<void()>; // 包装器
void Download()
{
std::cout << "我是一个下载的任务" << std::endl;
}
Consumer
void *Consumer(void *args)
{
BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
while (true)
{
// 1.获取数据
task_t t;
bq->Pop(&t);
// 2.处理数据
t(); // 使用仿函数
}
}
Productor
void *Productor(void *args)
{
BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
while (true)
{
// 1.生产数据
bq->Equeue(Download);
std::cout << "Productor-> Download" << std::endl;
sleep(1);
}
}
主函数
int main()
{
BlockQueue<task_t> *bq = new BlockQueue<task_t>();
// 单生产 单消费
pthread_t c, p;
// 创建线程
pthread_create(&c, nullptr, Consumer, bq);
pthread_create(&p, nullptr, Productor, bq);
// 终止线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
运行结果
2.4.4、多生产多消费
int main()
{
BlockQueue<task_t> *bq = new BlockQueue<task_t>();
// 多生产 多消费
pthread_t c1,c2,p1,p2,p3;
// 创建线程
pthread_create(&c1, nullptr, Consumer, bq);
pthread_create(&c2, nullptr, Consumer, bq);
pthread_create(&p1, nullptr, Productor, bq);
pthread_create(&p2, nullptr, Productor, bq);
pthread_create(&p3, nullptr, Productor, bq);
// 终止线程
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
运行结果
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)