一、I/O多路转接之 epoll

1. epoll 接口

(1)epoll_create()

首先 epoll_create() 这个接口就是帮我们创建一个 epoll 模型,这个模型是什么我们后面介绍原理的时候再讲。

其中 epoll_create() 的接口如下:

在这里插入图片描述

其中 epoll_create1() 是新标准,我们不介绍。而 epoll_create() 的参数 size 也已经废弃了,这个参数传什么也无所谓了,只要大于 0 就可以了。它的返回值也是一个文件描述符,成功则返回一个新的文件描述符,失败则返回 -1.

(2)epoll_wait()

epoll 模型创建好之后,我们想往这个 epoll 模型中新增一个要关心的 fd 及其事件;修改一个或者删除一个文件描述符及其事件;就需要用到 epoll_wait() 这个接口。该接口如下:

在这里插入图片描述

epoll_wait() 本质就是获取已经就绪的文件描述符。第一个参数 epfd 就是 epoll_create() 的返回值;第二个和第三个参数就是我们将来定义的一个用户级缓冲区,返回已经就绪的 fd 和 事件;最后一个参数的含义和 polltimeout 一模一样,单位为毫秒。而返回值表示已经就绪的文件描述符的个数。

其中我们看到第二个参数中带有 struct epoll_event 这个类型的结构体,这个结构体是什么呢?我们来看一下:

在这里插入图片描述

如上图,epoll_event 中的 events 表示哪些事件,它的类型是 uint32_t,也就是一个位图,和 poll 中的 events 一样,以位图的形式传递标记位事件;而第二个字段 data 的类型 epoll_data_t 是一个联合体,就是可以选择该联合体字段中的任意一个,通常用来保存的是用户级的数据,有关这个字段我们后面再说。

其中 events 可以是以下几个宏的集合:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。

(3)epoll_ctl()

为了使用这个 epoll,首先我们也需要将 listen 套接字添加到 epoll 模型里,所以就需要用到 epoll_ctl() 接口。该接口如下:

在这里插入图片描述

它的作用主要是我们想向系统里新增一个文件描述符,及其要关心的事件,想要修改一个特定的文件描述符关心的事件。所以 epoll_ctl() 支持 epoll 来进行相关的管理工作。

其中第一个参数就是 epoll_create() 的返回值。第二个参数 op 就是以下三个选项的其中一个,分别代表增加,修改,删除:

在这里插入图片描述

第三个参数和第四个参数代表哪一个文件描述符上的哪些事件。

所以我们需要使用 epoll 的话,需要使用以上三个系统调用,而 selectpoll 都只有一个系统调用。

2. epoll 原理

无论是 selectpoll 都是用数组来管理文件描述符和对应的事件,更重要的是该数组这个数据结构是由用户来维护的!接下来我们解释一下 epoll 的原理,就能明白为什么需要三个系统调用来使用 epoll 了。

操作系统在硬件层面,通过硬件中断的方式知道网卡上有数据了,然后通过网卡驱动上的方法将数据拷贝到网卡驱动上的数据链路层。同时,操作系统为了支持 epoll,它为我们提供了三种机制:

  • 操作系统在内部会帮我们维护一颗红黑树

其中红黑树中的节点包含的最重要的字段是:int fduint32_t event,分别代表内核要关心的文件描述符和要关心的事件。

  • 操作系统会为我们维护一个就绪队列

一旦红黑树中有特定的一个节点,比如某个节点上的文件描述符的某个事件就绪了,就可以把该节点添加到就绪队列中;其中该就绪队列中每个节点中的字段包含 int fduint32_t event,分别代表已经就绪的文件描述符和已经就绪的事件。

  • 操作系统的底层网卡驱动是允许操作系统去注册一些回调机制的

操作系统内部会提供一个回调函数,这个回调函数是用来干什么的呢?首先网卡通过硬件中断的方式将数据搬到了网卡驱动层。当网卡驱动层中的数据链路层有数据就绪了,主动会调用该回调函数。然后该回调方法会做如下几个操作:

  1. 向上交付
  2. 交付给 TCP 的接收队列
  3. 根据文件描述符为键值查找红黑树,确认这个接受队列和哪一个文件描述符是关联的,再判断该 fd 是否关心了 EPOLLIN 或者 EPOLLOUT 读写事件,如果有,由于数据已经就绪,所以接下来第四步
  4. 构建就绪节点,插入到就绪队列中

实际上我们用 epoll 的时候,操作系统就会把该回调函数注册到底层,然后底层数据一旦就绪就会自动回调执行上面的四个方法。所以对于用户来说,只需要在就绪队列中获取就绪节点即可!整套机制都是由操作系统完成的!

我们把这三套机制叫做 epoll 模型,如下图:

请添加图片描述

其中 eventpollepoll 对象;epitem 为红黑树的节点。

所以接口 epoll_create() 创建 epoll 模型本质就是创建红黑树,创建就绪队列以及注册底层的回调机制。所以该 epoll 模型怎么让进程找到呢?其实给 epoll 模型放入到 struct file 对象即可,把它也当作文件!因为在 Linux 中一切皆文件!struct file 中也有指针指向 epoll 模型!所以再把该 struct file 对象添加到进程的文件描述符表中即可!

所以 epoll_create() 实质上就是在操作系统中创建 struct file,其中的指针指向整个 epoll 对象,对应的文件描述符就能挂接到进程的文件描述符表中,最后把该文件描述符返回给用户,所以我们就可以通过该文件描述符找到 struct file 并找到 epoll 模型了。epoll_ctl() 实质上的增加、修改、删除都是在对红黑树进行操作。其中 epoll_wait() 的第二个参数是输出型参数,它会将就绪队列中所有就绪的节点一个一个地放进 struct epoll_event 里。

在这里插入图片描述

3. epoll 的优点

基于 epoll 的原理,我们可以得到 epoll 的优势:

  • 检测就绪的时间复杂度为 O(1),因为只需要看队列是否为空就可以了。而获取就绪的时间复杂度为 O(n),因为需要将就绪队列中的节点一个一个拷贝到应用层。
  • fd 和 event 没有上限,因为该红黑树有多大由操作系统说了算
  • 由于该红黑树是操作系统帮我们维护的,所以不需要在用户层由用户维护一个数组这样的数据结构,来管理所有的文件描述符及其要关心的事件了
  • epoll_wait() 的返回值 n,表示有 n 个 fd 就绪了,那么该接口还会将已经就绪的节点放入到它的输出型参数 events 中,所以就绪事件是连续的,有 n 个!这意味着,上层用户处理已经就绪的事件,就不再需要像以前一样检测有哪些 fd 是非法的,哪些是没有就绪的了;只需要根据返回值 n,遍历 events 即可!

4. epoll 的使用

我们对 epoll 的相关接口进行一下简单的封装成为 Epoller.hpp,如下:

				#pragma once
				#include "NoCopy.hpp"
				#include "log.hpp"
				
				#include <sys/epoll.h>
				 
				#include <cstring>
				#include <cerrno> 
				
				class Epoller : public NoCopy
				{
				    static const int size = 128;
				public:
				    Epoller()
				    {
				        _epfd = epoll_create(size);
				        if(_epfd == -1){
				            lg(Fatal, "epoll_create error: %s", strerror(errno));
				        }
				        else{
				            lg(Info, "epoll_create success: %d", _epfd);
				        }
				    }
				
				    int EpollerWait(struct epoll_event revents[], int num)
				    {
				        int n = epoll_wait(_epfd, revents, num, _timeout);
				        return n;
				    }
				
				    int EpollerCtl(int oper, int sockfd, uint32_t event)
				    {   
				        int n = 0;
				        if(oper == EPOLL_CTL_DEL)
				        {
				            n = epoll_ctl(_epfd, oper, sockfd, nullptr);
				            if(n != 0){
				                lg(Error, "epoll_ctl delete error!");
				            }
				        }
				        else
				        {
				            // EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD
				            // 设置进内核的红黑树中
				            struct epoll_event ev;
				            ev.events = event;
				            ev.data.fd = sockfd;    // 方便我们后面处理的时候知道是哪一个 fd 就绪了
				
				            n = epoll_ctl(_epfd, oper, sockfd, &ev);
				            if(n != 0){
				                lg(Error, "epoll_ctl error!");
				            }
				        }
				        return n;
				    }
				
				    ~Epoller()
				    {
				        if(_epfd >= 0){
				            close(_epfd);
				        }
				    }
				private:
				    int _epfd;
				    int _timeout = 1000;
				};

接下来编写 epollSever.hpp,如下:

				#pragma once
				
				#include <iostream>
				#include <memory>
				#include <sys/epoll.h>
				 
				#include "Socket.hpp"
				#include "log.hpp"
				#include "Epoller.hpp"
				#include "NoCopy.hpp"
				
				uint32_t EVENT_IN = (EPOLLIN);
				uint32_t EVENT_OUT = (EPOLLOUT);
				
				class EpollServer : public NoCopy
				{
				    static const int maxevents = 64;
				public:
				    EpollServer(uint16_t port)
				        :_port(port)
				        ,_listenSocket_ptr(new Sock())
				        ,_epoller_ptr(new Epoller())
				    {}
				
				    void Init()
				    {
				        _listenSocket_ptr->Socket();
				        _listenSocket_ptr->Bind(_port);
				        _listenSocket_ptr->Listen();
				
				        lg(Info, "create listen socket success: %d\n", _listenSocket_ptr->GetFd());
				    }
				
				    void Accepter()
				    {
				        std::string client_ip;
				        uint16_t client_port;
				        int sockfd = _listenSocket_ptr->Accept(&client_ip, &client_port);
				        if(sockfd > 0){
				            // 不能直接读取,而是将它添加到内核的红黑树中,让 epoll 关心即可
				            _epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, EVENT_IN);
				            lg(Info, "get a new link, client info@ %s:%d", client_ip.c_str(), client_port);
				        }
				        else{
				            return;
				        }
				    }
				
				    // for test
				    void Recver(int fd)
				    {
				        char buffer[1024];
				        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
				        if (n > 0)
				        {
				            buffer[n] = 0;
				            std::cout << "get a message: " << buffer << std::endl;
				
				            // write
				            std::string echo_str = "sever echo $ ";
				            echo_str += buffer;
				            write(fd, echo_str.c_str(), echo_str.size());
				        }
				        else if (n == 0)
				        {
				            lg(Info, "client quit, me too, close fd is: %d", fd);
				            // 先在内核红黑树中移除 fd,再关闭 fd
				            _epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);
				            close(fd);
				        }
				        else
				        {
				            lg(Warning, "recv error, fd is: %d", fd);
				            _epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);
				            close(fd);
				        }
				    }
				
				    void Dispatcher(struct epoll_event revs[], int num)
				    {
				        for(int i = 0; i < num; ++i){
				            uint32_t event = revs[i].events;
				            int fd = revs[i].data.fd;
				
				            if(event & EVENT_IN){
				                // 读事件就绪
				                if(fd == _listenSocket_ptr->GetFd()){
				                    // 获取到一个新连接,连接管理器
				                    Accepter();
				                }
				                else{
				                    // 其它 fd 上的普通读取事件就绪
				                    Recver(fd);
				                }
				            }
				            else if(event & EVENT_OUT){
				                // 写事件就绪
				                // ...
				            }
				            else{
				                // ...
				            }
				        }
				    }
				
				    void Start()
				    {
				        // 将 listenSocket 添加到 epoll 中
				        // 也就是将 listenSocket 和它所关心的事件添加到内核 epoll 模型中的红黑树中!
				        _epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, _listenSocket_ptr->GetFd(), EVENT_IN);
				        
				        struct epoll_event revs[maxevents];
				        while(true)
				        {
				            // 其中 n 最大是 maxevents
				            int n = _epoller_ptr->EpollerWait(revs, maxevents);
				            if(n > 0)
				            {
				                // 有事件就绪,分派事件
				                lg(Debug, "event happend, fd is: %d", revs[0].data.fd);
				                Dispatcher(revs, n); 
				            }
				            else if(n == 0)
				            {
				                lg(Info, "time out...");
				            }
				            else 
				            {
				                lg(Error, "epoll wait error");
				            }
				        }
				    }
				
				    ~EpollServer()
				    {
				        _listenSocket_ptr->Close(); 
				    }
				
				private:
				    std::shared_ptr<Sock> _listenSocket_ptr;
				    std::shared_ptr<Epoller> _epoller_ptr;
				    uint16_t _port;
				};

我们上面两个模块都用到了 NoCopy 这个类,也就是禁止拷贝,代码如下:

				#pragma once
				
				class NoCopy
				{
				public:
				    NoCopy(){}
				    NoCopy(const NoCopy&) = delete;
				    const NoCopy& operator=(const NoCopy&) = delete;
				};

5. epoll 的工作模式

(1)水平触发 Level Triggered 工作模式(LT 模式)

epoll 默认所处的工作模式就是 LT 模式。例如我们上面所写的简单的 epoll 服务器,每次有新的连接到来时,如果我们不处理它,epoll 会每次都通知我们有连接到来了。这种一旦有新的连接到来,或者有新的数据到来,上层如果不取走,底层就会一直通知上层,让上层把数据尽快取走,这种模式就叫做 LT 模式。就像示波器中的高电平,一直有效。

(2)边缘触发 Edge Triggered 工作模式(ET 模式)

ET 模式指的是,数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。正是因为 ET 模式有这种特点,才会倒逼程序员每次通知都必须把本轮数据全部取走,怎么保证数据全部取走呢?所以就需要循环读取,直到读取出错!但是我们使用 read() 或者 recv() 在缓冲区中读取数据的时候,当缓冲区的数据没有了,因为它们的读取方式默认是阻塞的,所以此时就会阻塞,服务器就会被挂起!所以我们在 ET 模式下,所有的 fd 必须是要设置为非阻塞的!

(3)LT 和 ET 的对比

  • selectpoll 其实也是工作在 LT 模式下;epoll 既可以支持 LT,也可以支持 ET

  • 普遍地我们认为,ET 的工作模式比 LT 的工作模式通知效率更高,因为通知一次就可以倒逼上层把全部数据读取走。同时也看得出来 ET 模式的 IO 效率也更高,这也就意味着,TCP 会向对方通告一个更大的窗口,从而从概率上让对方一次给自己发送更多的数据!

  • 所谓的 LT 模式和 ET 模式,本质就是向就绪队列中放入多个或者一个就绪的事件

  • 但是 ET 模式就一定比 LT 模式的效率高吗?不一定!因为 LT 也可以将所有的 fd 设置为非阻塞,然后循环读取,也就是当通知一次的时候,就把数据全部取走了,就和 ET 一样了!所以谁的效率高不一定,要看具体的实现。

二、Reactor

1. 概念

我们在上面编写的 epoll 服务器的代码中,在其他普通的 fd 读取事件就绪时,也就是在 Recver() 中,读取是有问题的,因为我们不能区分每次读取上来的数据是一个完整的报文。另外还有其它各种问题,所以我们要对上面的代码使用 Reactor 的设计模式作修改。

所谓的 Reactor 是一种设计模式,翻译过来称为反应堆模式。用于处理事件驱动的系统中的并发操作。它提供了一种结构化的方式来处理输入事件,并将其分发给相应的处理程序。Reactor 模式通常用于网络编程中,特别是在服务器端应用程序中。

要进行正确的 IO 处理,就应该有如下的理解:在应用层一定存在大量的连接,每一个连接在应用层都叫做文件描述符。而在读取每一个文件描述符上的数据的时候,可能根本就没有读取完,此时我们就需要把该文件描述符上的数据临时保存起来。所以我们在写服务器的时候,我们要保证每一个文件描述符及其连接及其缓冲区,都是独立的!

2. 实现

(1)Epoller.hpp

Epoller.hpp 是对 epoll 的系统调用的封装,代码如下:

			#pragma once
			#include "NoCopy.hpp"
			#include "log.hpp"
			
			#include <sys/epoll.h>
			   
			#include <cstring>
			#include <cerrno> 
			
			class Epoller : public NoCopy
			{
			    static const int size = 128;
			public:
			    Epoller()
			    {
			        _epfd = epoll_create(size);
			        if(_epfd == -1){
			            lg(Fatal, "epoll_create error: %s", strerror(errno));
			        }
			        else{
			            lg(Info, "epoll_create success: %d", _epfd);
			        }
			    }
			
			    int EpollerWait(struct epoll_event revents[], int num, int timeout)
			    {
			        int n = epoll_wait(_epfd, revents, num, timeout);
			        return n;
			    }
			
			    int EpollerCtl(int oper, int sockfd, uint32_t event)
			    {   
			        int n = 0;
			        if(oper == EPOLL_CTL_DEL)
			        {
			            n = epoll_ctl(_epfd, oper, sockfd, nullptr);
			            if(n != 0){
			                lg(Error, "epoll_ctl delete error! sockfd: %d", sockfd);
			            }
			        }
			        else
			        {
			            // EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD
			            // 设置进内核的红黑树中
			            struct epoll_event ev;
			            ev.events = event;
			            ev.data.fd = sockfd;    // 方便我们后面处理的时候知道是哪一个 fd 就绪了
			
			            n = epoll_ctl(_epfd, oper, sockfd, &ev);
			            if(n != 0){
			                lg(Error, "epoll_ctl error!");
			            }
			        }
			        return n;
			    }
			
			    ~Epoller()
			    {
			        if(_epfd >= 0){
			            close(_epfd);
			        }
			    }
			private:
			    int _epfd;
			    int _timeout = 1000;
			};

(2)TcpServer.hpp

TcpServer.hpp 是处理 IO 的服务器,代码如下:

			#pragma once
			#include <iostream>
			#include <string>
			#include <unordered_map>
			#include <memory>
			#include <functional>
			
			#include <cerrno>  
			
			#include "log.hpp"
			#include "NoCopy.hpp"
			#include "Epoller.hpp"
			#include "Socket.hpp"
			#include "Comm.hpp"
			
			// 设置 ET 模式
			uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
			uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
			const static int g_buffer_size = 128;
			
			class Connection;
			class TcpServer;
			
			using func_t = std::function<void(std::weak_ptr<Connection>)>;
			using except_t = std::function<void(std::weak_ptr<Connection>)>;
			
			// 管理每一个连接
			class Connection 
			{
			public:
			    Connection(int sockfd)
			        :_sockfd(sockfd)
			    {}
			
			    void SetHandler(func_t recv_cb, func_t send_cb, except_t except_cb)
			    {
			        _recv_cb = recv_cb;
			        _send_cb = send_cb;
			        _except_cb = except_cb;
			    }
			
			    ~Connection()
			    {}
			private:
			    int _sockfd;
			
			    // 充当缓冲区
			    std::string _inbuffer; 
			    std::string _outbuffer;
			public:
			    // 回指指针
			    // std::shared_ptr<TcpServer> _tcpServer_ptr;
			    std::weak_ptr<TcpServer> _tcpServer_ptr;
			
			    // 回调方法
			    func_t _recv_cb;
			    func_t _send_cb;
			    except_t _except_cb;
			
			    std::string _ip;
			    uint16_t _port;
			
			    int Sockfd()
			    {
			        return _sockfd;
			    }
			
			    void AppendInBuffer(const std::string& info)
			    {
			        _inbuffer += info;
			    }
			
			    void AppendOutBuffer(const std::string& info)
			    {
			        _outbuffer += info;
			    }
			
			    std::string& Inbuffer()
			    {
			        return _inbuffer;
			    }
			
			    std::string& Outbuffer()
			    {
			        return _outbuffer;
			    }
			
			    void SetWeakPtr(std::weak_ptr<TcpServer> tcpServer_ptr)
			    {
			        _tcpServer_ptr = tcpServer_ptr;
			    }
			};
			
			
			// enable_shared_from_this 可以提供返回当前对象的 this 对应的 shared_ptr
			class TcpServer : public std::enable_shared_from_this<TcpServer>, public NoCopy
			{
			    static const int num = 64;
			public:
			    TcpServer(uint16_t port, func_t OnMessage)
			        :_port(port)
			        ,_quit(true)
			        ,_OnMessage(OnMessage)
			        ,_epoller_ptr(new Epoller())
			        ,_listenSock_ptr(new Sock())
			    {}
			
			    void Init()
			    {
			        _listenSock_ptr->Socket();
			        // 将 fd 设置为非阻塞
			        SetNonBlockOrDie(_listenSock_ptr->GetFd());
			        _listenSock_ptr->Bind(_port);
			        _listenSock_ptr->Listen();
			        lg(Info, "create listen socket success: %d\n", _listenSock_ptr->GetFd());
			
			        AddConnection(_listenSock_ptr->GetFd(), EVENT_IN, \
			            std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
			    }
			
			    void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, except_t except_cb,\
			        const std::string& ip = "0.0.0.0", uint16_t port = 0)
			    {
			        // 1. 给所有的套接字建立一个 connection 对象
			        std::shared_ptr<Connection> new_connection(new Connection(sockfd));
			
			        // shared_from_this(): 返回当前对象的 shared_ptr
			        new_connection->SetWeakPtr(shared_from_this()); 
			        
			        new_connection->SetHandler(recv_cb, send_cb, except_cb);
			        new_connection->_ip = ip;
			        new_connection->_port = port;
			
			        // 2. 将套接字和 Connection 添加到 unordered_map 中
			        _connections.insert(std::make_pair(sockfd, new_connection));
			
			        // 3. 将 listen 套接字或其它事件添加到 epoll 模型中
			        _epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, event);
			
			        lg(Debug, "add a new connection success, sockfd is: %d", sockfd);
			    }
			
			    // listen 套接字的连接管理器,即有事件就绪的时候,就是有连接到来,就需要处理新连接
			    void Accepter(std::weak_ptr<Connection> conn)
			    {
			        auto connection = conn.lock();
			        // 不断检测是否还有新连接,直到读取出错
			        while(true){
			            struct sockaddr_in peer;
			            socklen_t len = sizeof(peer);
			            int sockfd = ::accept(connection->Sockfd(), (sockaddr*)&peer, &len);
			
			            // 获取到新连接设置为非阻塞,然后构建 Connection 对象放入哈希表中和内核红黑树中
			            if(sockfd > 0){
			                uint16_t peer_port = ntohs(peer.sin_port);
			                char ipbuffer[128];
			                inet_ntop(AF_INET, &peer.sin_addr, ipbuffer, sizeof(ipbuffer));
			                lg(Debug, "get a new client, get info-> [%s: %d], sockfd: %d", ipbuffer, peer_port, sockfd);
			
			                SetNonBlockOrDie(sockfd);
			                AddConnection(sockfd, EVENT_IN, \
			                    std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
			                    std::bind(&TcpServer::Sender, this, std::placeholders::_1),\
			                    std::bind(&TcpServer::Excepter, this, std::placeholders::_1),\
			                    ipbuffer, peer_port);
			            }
			            else{
			                if(errno == EWOULDBLOCK){   // 读取完毕
			                    break;
			                }
			                else if(errno == EINTR){ // 信号原因中断
			                    continue;
			                }
			                else{
			                    break;
			                }
			            }
			        }
			    }
			
			    // 普通事件的事件管理器
			    // 对于服务器而言只需要进行IO,不需要关心是否读完和报文的格式
			    void Recver(std::weak_ptr<Connection> conn)
			    {
			        if(conn.expired()) return;
			        auto connection = conn.lock();
			
			        int sockfd = connection->Sockfd();
			        while(true){
			            char buffer[g_buffer_size];
			            memset(buffer, 0, sizeof(buffer));
			            ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);   // 非阻塞读取
			            if(n > 0){
			                connection->AppendInBuffer(buffer);
			            }
			            else if(n == 0){
			                lg(Info, "sockfd: %d, client info %s: %d quit...", sockfd, connection->_ip.c_str(), connection->_port);
			                connection->_except_cb(connection);
			            }
			            else{
			                if(errno == EWOULDBLOCK){
			                    break;
			                }
			                else if(errno == EINTR){
			                    continue;
			                }
			                else{
			                    lg(Warning, "sockfd: %d, client info %s: %d recv error...", sockfd, connection->_ip.c_str(), connection->_port);
			                    connection->_except_cb(connection);
			                    return;
			                }
			            }
			        }
			        // 交给上层处理,读取到的数据都在 connection 中
			        // 1. 检测
			        // 2. 如果有完整报文,就处理
			        _OnMessage(connection);
			    }
			
			    void Sender(std::weak_ptr<Connection> conn)
			    {
			        if(conn.expired()) return;
			        auto connection = conn.lock();
			        
			        auto& outbuffer = connection->Outbuffer();
			        while(true){
			            ssize_t n = send(connection->Sockfd(), outbuffer.c_str(), outbuffer.size(), 0);
			            if(n > 0){
			                outbuffer.erase(0, n);
			                if(outbuffer.empty()){
			                    break;
			                }
			            }
			            else if(n == 0){
			                return;
			            }
			            else{
			                if(errno == EWOULDBLOCK){
			                    break;
			                }
			                else if(errno == EINTR){
			                    continue;
			                }
			                else{
			                    lg(Warning, "sockfd: %d, client info %s: %d send error...", connection->Sockfd(), connection->_ip.c_str(), connection->_port);
			                    connection->_except_cb(connection);
			                    return;
			                }
			            }
			        }
			        // 没发完
			        if(!outbuffer.empty()){
			            // 开始对写事件关心
			            EnableEvent(connection->Sockfd(), true, true);
			        }
			        else{
			            // 关闭对写事件关心
			            EnableEvent(connection->Sockfd(), true, false);
			        }
			    }
			
			    void Excepter(std::weak_ptr<Connection> connection)
			    {
			        if(connection.expired()) return;
			        auto conn = connection.lock();
			
			        lg(Debug, "Excepter hander sockfd: %d, client info %s: %d excepter handler", \
			            conn->Sockfd(), conn->_ip.c_str(), conn->_port);
			
			        // 1. 移除对特定 fd 的关心
			        // EnableEvent(connection->Sockfd(), false, false);
			        _epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, conn->Sockfd(), 0);
			        // 2. 关闭异常的 fd
			        lg(Debug, "close %d done...\n", conn->Sockfd());
			        close(conn->Sockfd());
			        // 3. 从 _connections 中移除 fd 和 Connection 的映射关系
			        lg(Debug, "remove %d from _connections...\n", conn->Sockfd());
			        _connections.erase(conn->Sockfd());
			    }
			
			    void EnableEvent(int sockfd, bool readAble, bool writeAble)
			    {
			        uint32_t events = 0;
			        events |= ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0) | EPOLLET);
			        _epoller_ptr->EpollerCtl(EPOLL_CTL_MOD, sockfd, events);
			    }
			
			    bool IsConnectionExist(int fd)
			    {
			        auto iter = _connections.find(fd);
			        if(iter == _connections.end()){
			            return false;
			        }
			        else{
			            return true;
			        }
			    }
			
			    void Dispatcher(int timeout)
			    {
			        int n = _epoller_ptr->EpollerWait(revs, num, timeout);
			        for(int i = 0; i < n; i++){
			            uint32_t event = revs[i].events;
			            int sockfd = revs[i].data.fd;
			
			            // 一旦事件异常,统一把异常转换为读写问题
			            if(event & EPOLLERR){
			                event |= (EPOLLIN | EPOLLOUT);
			            }
			            if(event & EPOLLHUP){
			                event |= (EPOLLIN | EPOLLOUT);
			            }
			            
			            // 只需要处理读写
			            if((event & EPOLLIN) && IsConnectionExist(sockfd)){
			                if(_connections[sockfd]->_recv_cb){
			                    _connections[sockfd]->_recv_cb(_connections[sockfd]);
			                }
			            }
			            if((event & EPOLLOUT) && IsConnectionExist(sockfd)){
			                if(_connections[sockfd]->_send_cb){
			                    _connections[sockfd]->_send_cb(_connections[sockfd]);
			                }
			            }
			        }
			    }
			
			    void Loop()
			    {
			        _quit = false;
			        
			        // AddConnection();
			        while(!_quit)
			        {
			            // 事件派发
			            // Dispatcher(3000);
			            Dispatcher(-1);
			            PrintConnection();
			        }
			        _quit = true;
			    }
			
			    void PrintConnection()
			    {
			        std::cout << "_connections fd list: ";
			        for(auto& connection : _connections){
			            std::cout << connection.second->Sockfd() << ", ";
			            std::cout << "inbuffer: " << connection.second->Inbuffer() << " ";
			        }
			        std::cout << std::endl;
			    }
			
			    ~TcpServer()
			    {}
			private:
			    std::shared_ptr<Epoller> _epoller_ptr;
			    std::shared_ptr<Sock> _listenSock_ptr;   
			    uint16_t _port;
			    bool _quit;
			
			    struct epoll_event revs[num];
			
			    // fd 到对应连接到映射,_connections 就是当前服务器管理的所有连接
			    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
			
			    // 让上层处理信息
			    func_t _OnMessage;
			};

(3)Calculator.hpp

Calculator.hpp 是上层处理业务的具体处理方法,代码如下:

			#pragma once
			#include <string>
			#include <iostream>
			#include "Protocol.hpp"
			
			enum
			{
			    DIV_ERR = 1,
			    MOD_ERR = 2,
			    OP_ERR = 3
			};
			
			
			// 上层业务
			class Calculator
			{
			public:
			    Calculator()
			    {}
			
			    Response CalculatorHelper(const Request &req)
			    {
			        Response resp(0, 0);
			        switch (req._op)
			        {
			        case '+':
			            resp._result = req._x + req._y;
			            break;
			        case '-':
			            resp._result = req._x - req._y;
			            break;
			        case '*':
			            resp._result = req._x * req._y;
			            break;
			        case '%':
			        {
			            if (req._y == 0)
			                resp._code = MOD_ERR;
			            else
			                resp._result = req._x % req._y;
			        }
			        break;
			        case '/':
			        {
			            if (req._y == 0)
			                resp._code = DIV_ERR;
			            else
			                resp._result = req._x / req._y;
			        }
			        break;
			        default:
			            resp._code = OP_ERR;
			            break;
			        }
			        return resp;
			    }
			
			    // "len"\n"10 + 20"\n
			    std::string Handler(std::string &package)
			    {
			        std::string content;
			        bool ret = Decode(package, &content); // content = "10 + 20"
			        if (!ret)
			            return "";
			
			        Request req;
			        ret = req.Deserialize(content); // x = 10, y = 20, op = '+'
			        if (!ret)
			            return "";
			
			        content = "";
			        Response resp = CalculatorHelper(req); // result = 30, code = 0
			
			        resp.Serialize(&content);              // content = "30 0"
			        content = Encode(content);             // content = "len"\n"30 0\n"
			
			        return content;
			    }
			
			    ~Calculator()
			    {}
			};

(4)main.cpp

下面是主函数的调用:

			#include <iostream>
			#include <memory>
			#include <functional>
			
			#include "TcpServer.hpp"    // 处理IO
			#include "Calculator.hpp"   // 处理业务
			#include "log.hpp"
			
			Calculator calculator;
			
			void DefaultOnMessage(std::weak_ptr<Connection> conn)
			{
			    if(conn.expired()) return;
			    auto connection_ptr = conn.lock();
			
			    std::cout << "Application layerget a message: " << connection_ptr->Inbuffer() << std::endl;
			
			    // 对报文进行处理
			    std::string response_str = calculator.Handler(connection_ptr->Inbuffer());
			    if(response_str.empty()){
			        return;
			    }
			    lg(Debug, "%s", response_str.c_str());
			
			    // response_str 发送出去
			    connection_ptr->AppendOutBuffer(response_str);
			
			    // 因为写事件(发送缓冲区是否有空间,经常是ok的),经常是就绪的
			    // 所以如果我们设置对 EPOLLOUT 关心,那么 EPOLLOUT 几乎每次都是就绪的
			    // 就导致 epollserver 经常返回,浪费 CPU 资源
			    // 所以,对于读取,我们设置为常关心;对于写,我们设置为按需设置
			    // 处理写事件:直接写入,如果写入完成,就结束。
			    // 如果写入完成,但是数据还没有写完,_outbuffer 里还有内容,我们就需要设置对写事件进行关心了,如果写完了,就去掉写事件的关心
			    // connection_ptr->_send_cb(connection_ptr);
			    auto tcpserver = connection_ptr->_tcpServer_ptr.lock();
			    tcpserver->Sender(connection_ptr);
			}
			
			int main()
			{ 
			    std::shared_ptr<TcpServer> tcp_svr(new TcpServer(8888, DefaultOnMessage));
			    tcp_svr->Init();
			    tcp_svr->Loop();
			
			    return 0;
			}

其中有一些头文件例如 Socket.hpp 和 log.hpp 我们以前已经用过,这里就不再放出来了。

(5)CMakeLists.txt

			cmake_minimum_required(VERSION 2.8)
			project(ReactorServer)
			
			add_executable(reactorServer main.cc)
			target_link_libraries(reactorServer jsoncpp)
			
			add_executable(clientCal ClientCal.cc)
			target_link_libraries(clientCal jsoncpp)

3. 总结

Reactor 其实是一个半同步半异步模型,那么 IO 等于等待+数据拷贝,所以 Reactor 的半同步半异步体现在,等待是由 epoll 完成,这是体现同步;异步体现在 Reactor 可以进行回调处理。

Reactor 模式中,有一个事件循环(Event Loop)负责监听和分发事件。当有新的事件到达时,事件循环会将其分发给相应的处理程序进行处理。这种方式可以实现高效的并发处理,避免了线程创建和销毁的开销。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐