【Linux】进程间通信(匿名管道)
💬 hello!各位铁子们大家好哇。今日更新了Linux进程间通信的内容。
🌈个人主页:秦jh__https://blog.csdn.net/qinjh_?spm=1010.2135.3001.5343
🔥 系列专栏:https://blog.csdn.net/qinjh_/category_12625432.html
目录
前言
💬 hello! 各位铁子们大家好哇。
今日更新了Linux进程间通信的内容
🎉 欢迎大家关注🔍点赞👍收藏⭐️留言📝
进程间通信目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信发展
- 管道
- System V进程间通信
- POSIX进程间通信
进程间通信的前提:先让不同的进程,看到同一份(操作系统)资源(”一段内存“)。
进程间通信一定是某一个进程先需要通信,让OS创建一个共享资源。此时OS必须提供很多系统调用。
OS创建的共享资源的不同,系统调用接口也就不同,所以进程间通信会有不同的种类。
进程间通信分类
管道
- 匿名管道pipe
- 命名管道
System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
管道
匿名管道
一个进程将同一个文件打开两次,一次以写方式打开,另一次以读方式打开。此时会创建两个struct file,而文件的属性会共用,不会额外创建。
如果此时又创建了子进程,子进程会继承父进程的文件描述符表,指向同一个文件。我们把上面父子进程都看到的文件,叫管道文件。
管道只允许单向通信。
管道里的内容不需要刷新到磁盘。
未来要用父进程写,子进程读的话,在fork之后,各自关闭掉不用的文件描述符即可。 不用的描述符建议关闭,因为未来可能会误用,或者导致文件描述符泄露。
功能:创建匿名管道
参数:
pipefd[2]:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端。它是输出型参数。
返回值:成功返回0,失败返回错误代码
使用管道通信的demo
上图是创建管道,pipe的使用的例子。
下面是测试的完整代码:
#include <iostream>
#include <string>
#include <cerrno> // errno.h
#include <cstring> // string.h
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
const int size = 1024;
std::string getOtherMessage()
{
static int cnt = 0;
std::string messageid = std::to_string(cnt);
cnt++;
pid_t self_id = getpid();
std::string stringpid = std::to_string(self_id);
std::string message = "messageid: ";
message += messageid;
message += " my pid is : ";
message += stringpid;
return message;
}
// 子进程进行写入
void SubProcessWrite(int wfd)
{
int pipesize = 0;
std::string message = "father, I am your son prcess!";
char c = 'A';
while (true)
{
std::cerr << "+++++++++++++++++++++++++++++++++" << std::endl;
std::string info = message + getOtherMessage(); // 这条消息,就是我们子进程发给父进程的消息
write(wfd, info.c_str(), info.size()); // 写入管道的时候,没有写入\0,
std::cerr << info << std::endl;
// sleep(1); // 子进程写慢一点
// write(wfd, &c, 1);
// std::cout << "pipesize: " << ++pipesize << " write charator is : "<< c++ << std::endl;
// // if(c == 'G') break;
// sleep(1);
}
std::cout << "child quit ..." << std::endl;
}
// 父进程进行读取
void FatherProcessRead(int rfd)
{
char inbuffer[size]; // c99 , gnu g99
while (true)
{
sleep(2);
std::cout << "-------------------------------------------" << std::endl;
// sleep(500);
ssize_t n = read(rfd, inbuffer, sizeof(inbuffer) - 1);
if (n > 0)
{
inbuffer[n] = 0; // == '\0'
std::cout << inbuffer << std::endl;
}
else if (n == 0)
{
// 如果read的返回值是0,表示写端直接关闭了,我们读到了文件的结尾
std::cout << "client quit, father get return val: " << n << " father quit too!" << std::endl;
break;
}
else if(n < 0) //读取失败
{
std::cerr << "read error" << std::endl;
break;
}
// sleep(1);
// break;
}
}
int main()
{
// 1. 创建管道
int pipefd[2];
int n = pipe(pipefd); // 输出型参数,rfd, wfd
if (n != 0)
{
std::cerr << "errno: " << errno << ": "
<< "errstring : " << strerror(errno) << std::endl;
return 1;
}
std::cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << std::endl;
sleep(1);
// 2. 创建子进程
pid_t id = fork();
if (id == 0)
{
std::cout << "子进程关闭不需要的fd了, 准备发消息了" << std::endl;
sleep(1);
// 子进程 --- write
// 3. 关闭不需要的fd
close(pipefd[0]);
// if(fork() > 0) exit(0);
SubProcessWrite(pipefd[1]);
close(pipefd[1]);
exit(0);
}
std::cout << "父进程关闭不需要的fd了, 准备收消息了" << std::endl;
sleep(1);
// 父进程 --- read
// 3. 关闭不需要的fd
close(pipefd[1]);
FatherProcessRead(pipefd[0]);
std::cout << "5s, father close rfd" << std::endl;
sleep(5);
close(pipefd[0]);
int status = 0;
pid_t rid = waitpid(id, &status, 0);
if (rid > 0)
{
std::cout << "wait child process done, exit sig: " << (status&0x7f) << std::endl;
std::cout << "wait child process done, exit code(ign): " << ((status>>8)&0xFF) << std::endl;
}
return 0;
}
管道的四种情况:
- 如果管道内部是空的,并且写端fd没有关闭,此时读取条件不具备,读进程会被阻塞,读进程会等待,直到写端写入数据。
- 如果管道被写满,并且读端fd不读且没有关闭,此时写进程会被阻塞,写端会等待,直到数据被读取。
- 如果管道一直在读并且写端关闭了wfd,读端read返回值会读到0,表示读到文件结尾。
- 如果读端rfd直接关闭,写端wfd一直在写入,那么写端进程会被OS直接用13号信号关掉,相当于进程出现了异常。
管道的特征:
- 匿名管道:只用来进行具有血缘关系的进程之间,进行通信,常用于父子进程之间通信
- 管道文件的生命周期是随进程的
- 管道内部,自带进程之间同步的机制(多执行流执行代码的时候,具有明显的顺序性)
- 管道文件在通信的时候,是面向字节流的。(写的次数和读取的次数不是一一匹配的)
- 管道的通信模式,是一种特殊的半双工模式,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
- 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
- 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
原子的意思就是这次的写入操作不会被中断。写的时候,不会写一半就被读走。在读方看来,要么不写,要么写完了。
当shell执行用管道连接起来的多条命令时,shell内部会把他们各自变成一个进程,他们是同时启动的。他们的父进程都是bash,他们是兄弟关系。所以命令行上的 | 就是匿名管道。
进程池实现
ProcessPool.cc
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is : " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process : " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }
// master
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
: _wfd(wfd), _subprocessid(id), _name(name)
{
}
int GetWfd() { return _wfd; }
pid_t GetProcessId() { return _subprocessid; }
std::string GetName() { return _name; }
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if (rid > 0)
{
std::cout << "wait " << rid << " success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
// 形参类型和命名规范
// const &: 输入
// & : 输入输出型参数
// * : 输出型参数
// task_t task: 回调函数
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
for (int i = 0; i < num; i++)
{
// 1. 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 2. 创建子进程
pid_t id = fork();
if (id == 0)
{
if (!channels->empty())
{
// 第二次之后,开始创建的管道要关闭继承下来的写端
for(auto &channel : *channels) channel.CloseChannel();
}
// child - read
close(pipefd[1]);
dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入
task();
close(pipefd[0]);
exit(0);
}
// 3.构建一个channel名称
std::string channel_name = "Channel-" + std::to_string(i);
// 父进程
close(pipefd[0]);
// a. 子进程的pid b. 父进程关心的管道的w端
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
// 0 1 2 3 4 channelnum
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(Channel &channel, int taskcommand)
{
write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
void ctrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
// a. 选择一个任务
int taskcommand = SelectTask();
// b. 选择一个信道和进程
int channel_index = NextChannel(channels.size());
// c. 发送任务
SendTaskCommand(channels[channel_index], taskcommand);
std::cout << std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: "
<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{
if (times > 0)
{
while (times--)
{
ctrlProcessOnce(channels);
}
}
else
{
while (true)
{
ctrlProcessOnce(channels);
}
}
}
void CleanUpChannel(std::vector<Channel> &channels)
{
// int num = channels.size() -1;
// while(num >= 0)
// {
// channels[num].CloseChannel();
// channels[num--].Wait();
// }
for (auto &channel : channels)
{
channel.CloseChannel();
channel.Wait();
}
// // 注意
// for (auto &channel : channels)
// {
// channel.Wait();
// }
}
// ./processpool 5
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
return 1;
}
int num = std::stoi(argv[1]);
LoadTask();
std::vector<Channel> channels;
// 1. 创建信道和子进程
CreateChannelAndSub(num, &channels, work1);
// 2. 通过channel控制子进程
ctrlProcess(channels, 5);
// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程
CleanUpChannel(channels);
// sleep(100);
return 0;
}
如上图,左边是父进程,右边是子进程。创建子进程的时候,从第二个子进程开始,创建的时候会继承父进程之前的文件描述符,也就会连接到进程1的写端。随着子进程的增加,越来越多描述符指向先前的管道的写端 ,就会导致要关闭管道时,写端没关完,读端读不到,就会造成阻塞,进程就退出不了。所以要在创建第二个及以后的进程的时候,把继承的写端关掉,如下图:
Task.hpp
#pragma once
#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3
typedef void (*task_t)(); // task_t 函数指针类型
void Print()
{
std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
std::cout << "I am a download task" << std::endl;
}
void Flush()
{
std::cout << "I am a flush task" << std::endl;
}
task_t tasks[TaskNum];
void LoadTask()
{
srand(time(nullptr) ^ getpid() ^ 17777);
tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}
void ExcuteTask(int number)
{
if (number < 0 || number > 2)
return;
tasks[number]();
}
int SelectTask()
{
return rand() % TaskNum;
}
void work()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process : " << getpid() << " quit" << std::endl;
break;
}
}
}
void work1()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process : " << getpid() << " quit" << std::endl;
break;
}
}
}
void work2()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process : " << getpid() << " quit" << std::endl;
break;
}
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)