前言

多路转接(也称为多路复用)是一种用于管理多个I/O通道的技术,它能实现同时监听和处理多个I/O事件,而不是为每个I/O通道创建单独的线程或进程。其中,select是实现多路转接的一种常用方法。

select()

select函数是系统提供的一个多路转接接口,用于让我们的程序同时监视多个文件描述符(file descriptor,简称fd)的状态变化,如读就绪、写就绪或异常。其函数原型如下:

#include <sys/select.h>  
#include <sys/time.h>  
#include <unistd.h>  
  
int select(int nfds, fd_set *readfds, fd_set *writefds,  
           fd_set *exceptfds, struct timeval *timeout);

参数说明

  • nfds:是文件描述符集合中最大文件描述符值加1。这个参数实际上被忽略,因为现在的系统不再需要它来确定文件描述符的范围。
  • readfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被读取。
  • writefds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被写入。
  • exceptfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看是否有异常条件发生(例如,带外数据到达)。
  • timeout:是一个指向 timeval 结构的指针,该结构指定了函数等待的最大时间长度。如果 timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪。

返回值

  • 成功时,select() 返回准备就绪的文件描述符的总数(不包括 exceptfds 中的文件描述符)。
  • 如果在调用时没有任何文件描述符准备就绪,并且 timeout 非空且指定的时间已过,则返回 0。
  • 如果出现错误,则返回 -1,并设置 errno 以指示错误原因。

timeval是一个用于表示时间的结构体:

#include <sys/time.h>  
 
struct timeval {  
   time_t tv_sec;  // 秒  
   suseconds_t tv_usec;  // 微秒  
};

tv_sec:秒数,从 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)开始计算的秒数(时间戳)。
tv_usec:微秒数,范围从 0 到 999999。

fd_set

fd_set 实际上是一个位图(Bitmask)或位向量(Bitvector),它的每一位代表一个文件描述符。通过设置或清除位的方式,可以将文件描述符添加到或从 fd_set 中移除。

typedef struct {  
    __fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];  
} fd_set;

其中,__fd_mask 通常是一个整数类型(如 long),__FD_SETSIZE 定义了 fd_set 中可以包含的最大文件描述符数量(通常是 1024),__NFDBITS 是 __fd_mask 中包含的位数。

与此同时,系统还提供一些接口来操作fd_set:

  • FD_ZERO(fd_set *set):将 fd_set 中的所有位清零,即将所有文件描述符从集合中移除。
  • FD_SET(int fd, fd_set *set):将指定的文件描述符 fd 添加到 fd_set 中。
  • FD_CLR(int fd, fd_set *set):将指定的文件描述符 fd 从 fd_set 中移除。
  • FD_ISSET(int fd, fd_set *set):检查指定的文件描述符 fd 是否在 fd_set 中。在调用 select() 后,使用此宏来检测哪些文件描述符已准备就绪。

select的执行过程

对于执行过程,大致分为四个步骤:

1.初始化

  • 创建一个或多个fd_set结构,用于存储需要监视的文件描述符。
  • 使用FD_ZERO宏清空这些fd_set结构。
  • 使用FD_SET宏将需要监视的文件描述符添加到相应的fd_set结构中。
  • 对于添加到位图中的事件, 都会在位图表示成1
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(_fd_array[i], &rfds);

2.调用select函数

  • 将最大的文件描述符值加1作为nfds参数。
  • 将之前创建的fd_set结构作为readfdswritefdsexceptfds参数(如果需要监视相应的事件)。
  • 设置timeout参数以控制select的等待时间。
  • 调用select函数。
struct timeval timeout = {0, 0};//设置等待时间
int n=select(max_fd+1,&rfds,nullptr,nullptr,&timeout);

3.处理返回结果

  • 此时经过select函数的执行后,已经影响了fd_set 中的之前添加进来的文件描述符,只要没有就绪的话,那么都会被清空为0
  • 如果select返回大于0的值,表示有文件描述符的事件已经就绪。
  • 利用for循环对每个事件进行检查是否就绪,
  • 使用FD_ISSET宏检查哪些文件描述符的事件已经就绪。
  • 根据就绪的文件描述符执行相应的操作,如读取数据、写入数据或处理异常事件。
if(n>0)
{
	 for (int i = 0; i <1024; i++)
     {    
    	 if (FD_ISSET(_fd_array[i], &rfds))
   		{
    		//处理对应的就绪事件 
  		}
 	 }
}

4.重新设置并继续监视

  • 由于select函数会修改传入的fd_set结构,因此在每次调用select之前都需要重新设置这些结构。
  • 根据需要更新timeout参数。
  • 重复执行上述步骤以继续监视文件描述符的状态变化。
while(1)
{
	//包含以上内容
}

注意事项

  • 每次调用select之前都需要重新设置fd_set结构和timeout参数。
  • select函数只负责等待文件描述符的状态变化,并不负责数据的拷贝。数据的拷贝需要使用如read、write等函数来完成。
  • select函数有一个限制,即它能够监视的文件描述符数量是有限的,通常取决于fd_set结构的大小(在32位系统上通常为1024个)。

利用select()建立一个Server服务器

InetAddr.hpp

包含网络地址的头文件:

#pragma once

#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

class InetAddr
{
private:
    void GetAddress(std::string *ip, uint16_t *port)
    {
        *port = ntohs(_addr.sin_port);
        *ip = inet_ntoa(_addr.sin_addr);
    }

public:
    InetAddr(const struct sockaddr_in &addr) : _addr(addr)
    {
        GetAddress(&_ip, &_port);
    }
    InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)
    {
        _addr.sin_family = AF_INET;
        _addr.sin_port = htons(_port);
        _addr.sin_addr.s_addr = inet_addr(_ip.c_str());
    }
    InetAddr()
    {}
    std::string Ip()
    {
        return _ip;
    }
    bool operator == (const InetAddr &addr)
    {
        // if(_ip == addr._ip)
        if(_ip == addr._ip && _port == addr._port) // 方便测试
        {
            return true;
        }
        return false;
    }
    // bool operator = (const struct sockaddr_in &addr)
    // {
    //     _addr = addr;
    // }
    struct sockaddr_in Addr()
    {
        return _addr;
    }
    uint16_t Port()
    {
        return _port;
    }
    ~InetAddr()
    {
    }

private:
    struct sockaddr_in _addr;
    std::string _ip;
    uint16_t _port;
};

Log.hpp

打印日志的头文件:

#pragma once

#include<iostream>
#include<fstream>
#include<ctime>
#include<cstdarg>
#include<string>
#include<sys/types.h>
#include<unistd.h>
#include<cstdio>
#include"LockGuard.hpp"

using namespace std;


bool gIsSave=false;//默认输出到屏幕
const string logname="log.txt";
//1.日志是有等级的
enum Level
{
    DEBUG=0,
    INFO,
    WARNING,
    ERROR,
    FATAL 
};


void SaveFile(const string& filename,const string& messages)
{
    ofstream out(filename,ios::app);
    if(!out.is_open())
    {
        return;
    }
    out<<messages;
    out.close();
}
//等级转化为字符串
string LevelToString(int level)
{
    switch (level)
    {
    case DEBUG:
        return "Debug";
    case INFO:
        return "Info";
    case WARNING:
        return "Warning";
    case ERROR:
        return "Error";
    case FATAL:
        return "Fatal";
    default:
        return "Unkonwn";
    }
}

//获取当前时间
string GetTimeString()
{
    time_t curr_time=time(nullptr);//时间戳
    struct tm* format_time=localtime(&curr_time);//转化为时间结构
    if(format_time==nullptr)
        return "None";
    char time_buffer[1024];
    snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
             format_time->tm_year + 1900,
             format_time->tm_mon + 1,
             format_time->tm_mday,
             format_time->tm_hour,
             format_time->tm_min,
             format_time->tm_sec);
    return time_buffer;

}

pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//获取日志信息
void LogMessage(string filename,int line,bool issave,int level,char* format,...)
{
    string levelstr=LevelToString(level);
    string timestr=GetTimeString();
    pid_t selfid=getpid();

    char buffer[1024];
    va_list arg;
    va_start(arg,format);
    vsnprintf(buffer,sizeof(buffer),format,arg);
    va_end(arg);

    string message= "[" + timestr + "]" + "[" + levelstr + "]" +
                          "[" + std::to_string(selfid) + "]" +
                          "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";


    LockGuard lockguard(&lock);
    if(!issave)
    {
        cout<<message;
    }
    else
    {
        SaveFile(logname,message);
    }
                                                        
}

 #define LOG(level,format,...)                                               \
    do                                                                        \
    {                                                                          \
        LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);       \
    } while (0)


#define EnableFile()         \
    do                       \
    {                        \
        gIsSave=true;        \  
    } while (0)

#define EnableScreen()         \
    do                         \
    {                          \
        gIsSave=false;         \  
    } while (0)
    

LockGuard.hpp

互斥锁的头文件:

#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__

#include <iostream>
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};

#endif

Socket.hpp

包含一系列Socket套接字的接口函数,还有TcpSocket专门的接口:

#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"

namespace socket_ns
{
    class Socket;
    const static int gbacklog=8;
    using socket_sptr=std::shared_ptr<Socket>;//套接字指针

    enum
    {
        SOCKET_ERROR = 1,
        BIND_ERROR,
        LISTEN_ERROR,
        USAGE_ERROR
    };

    //在基类创建一系列虚函数,只要派生类能用到就在这里创建
    class Socket
    {
    public:
        virtual void CreateSocketOrDie() =0; //创建套接字
        virtual void BindSocketOrDie(InetAddr& addr) =0;  //绑定套接字
        virtual void ListenSocketOrDie()=0; //监听套接字
        virtual int Accepter(InetAddr* addr) =0; //接受客户端
        virtual bool Connector(InetAddr &addr) = 0; //连接客户端
        virtual void SetSocketAddrReuse() = 0; // 重启指定端口
        virtual int SockFd() = 0; //获取Sockfd
        virtual int Recv(std::string *out) = 0; //接收对方信息
        virtual int Send(const std::string &in) = 0; //发送给对方信息

    public:
        //创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建
        void BuildListenSocket(InetAddr& addr)
        {
            CreateSocketOrDie();
            SetSocketAddrReuse();
            BindSocketOrDie(addr);
            ListenSocketOrDie();
        }
        bool BuildClientSocket(InetAddr &addr)
        {
            CreateSocketOrDie();
            return Connector(addr);
        }
    };

    class TcpSocket : public Socket
    {
    public:
        TcpSocket(int sockfd=-1)
        :_sockfd(sockfd)
        {}
        void CreateSocketOrDie() override  //override明确的重写基类函数
        {
            _sockfd=socket(AF_INET,SOCK_STREAM,0);
            if(_sockfd<0)
            {
                LOG(FATAL, "socket error");
                exit(SOCKET_ERROR);
            }
            LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
        }
        void BindSocketOrDie(InetAddr& addr) override
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(addr.Port());
            local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());

            int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
            if (n < 0)
            {
                LOG(FATAL, "bind error\n");
                exit(BIND_ERROR);
            }
            LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
        }
        void ListenSocketOrDie() override
        {
            int n=listen(_sockfd,gbacklog);
            if (n < 0)
            {
                LOG(FATAL, "listen error\n");
                exit(LISTEN_ERROR);
            }
            LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
        }
        int Accepter(InetAddr* addr) override
        {
            struct sockaddr_in peer;
            socklen_t len=sizeof(peer);
            int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);
            if (sockfd < 0)
            {
                LOG(WARNING, "accept error\n");
                return -1;
            }
            *addr=peer;
            return sockfd;
        }
        virtual bool Connector(InetAddr& addr)
        {
            struct sockaddr_in server;
            memset(&server,0,sizeof(server));
            server.sin_family=AF_INET;
            server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
            server.sin_port=htons(addr.Port());

            int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));
            if (n < 0)
            {
                std::cerr << "connect error" << std::endl;
                return false;
            }
            return true;
        }
        void SetSocketAddrReuse() override
        {
            int opt = 1;
            setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); //快速重启端口
        }
        int Recv(std::string *out) override
        {
            char inbuffer[1024];
            ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
            if (n > 0)
            {
                inbuffer[n] = 0;
                *out += inbuffer; // 接收次数可能不只一次,一般是多次的,
                
            }
            return n;
        }
        int Send(const std::string &in) override
        {
            int n = send(_sockfd,in.c_str(),in.size(),0);
            return n;
        }
        int SockFd() override
        {
            return _sockfd;
        }
        ~TcpSocket()
        {}
    private:
        int _sockfd;
    };
}


SelectServer.hpp

#pragma once

#include <iostream>
#include <string>
#include <memory>
#include "Socket.hpp"

using namespace socket_ns;

//select服务器要正确的编写,需要借助一个第三方数组来完成,保存合法的,所有的fd到数组中,方便后期批量化统一添加
class SelectServer
{
    const static int defaultfd = -1; //默认sockfd
    const static int N = sizeof(fd_set) * 8; //监视文件描述符的最大值
public:
    SelectServer(uint16_t port)
    :_port(port),
    _listensock(make_unique<TcpSocket>())
    {
        InetAddr addr("0", _port); //网络地址初始化
        _listensock->BuildListenSocket(addr);//创建监听套接字
        //初始化辅助数组
        for (int i = 0; i < N; i++)
        {
            _fd_array[i] = defaultfd;
        }
        _fd_array[0] = _listensock->SockFd();
    }
    void AcceptClient()
    {
        // 我们今天只关心了读,而读有:listensock 和 normal sockfd
        InetAddr clientaddr;
        int sockfd = _listensock->Accepter(&clientaddr); // 这里调用accept会不会阻塞呢??不会。因为事件已经就绪了
        if (sockfd < 0)
            return;

        LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());

        //select托管(监视状态):将新fd放入辅助数组中
        int pos = 1;
        for (; pos < N; pos++)
        {
            if (_fd_array[pos] == defaultfd)
                break;
        }//让pos到辅助数组的空缺位置
        if (pos == N)//说明监视的文件描述符满了
        {
            ::close(sockfd); // sockfd->Close();
            LOG(WARNING, "server is full!\n");
            return;
        }
        else
        {
            _fd_array[pos] = sockfd;
            LOG(DEBUG, "%d add to select array!\n", sockfd);
        }
        LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
    }
    void ServiceIO(int pos)
    {
        char buffer[1024];
        ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0); // 这里读取会不会被阻塞?不会
        if (n > 0)//处理接收数据
        {
            buffer[n] = 0; 
            std::cout << "client say# " << buffer << std::endl;
            std::string echo_string = "[server echo]# ";
            echo_string += buffer;
            ::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);//返回给客户端
        }
        else if (n == 0)//说明对方已断开连接
        {
            LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
            ::close(_fd_array[pos]);
            _fd_array[pos] = defaultfd;
            LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
        }
        else//出现错误
        {
            LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
            ::close(_fd_array[pos]);
            _fd_array[pos] = defaultfd;
            LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
        }
    }
    //处理准备就绪的事件
    void HandlerEvent(fd_set &rfds)
    {
         for (int i = 0; i < N; i++)
        {
            if (_fd_array[i] == defaultfd)
                continue;
            if (FD_ISSET(_fd_array[i], &rfds))
            {
                if (_fd_array[i] == _listensock->SockFd())//新的连接
                {
                    AcceptClient();
                }
                else
                {
                    // 普通的sockfd读事件就绪
                    ServiceIO(i);
                }
            }
        }
    }
     
    void Loop()
    {
        while(true)
        {
            //监听套接字在等待对方发送连接
            //新的连接 == 读事件就绪
            //要将listensock添加到select中!
            fd_set rfds; //一个记录文件描述符状态的集合
            FD_ZERO(&rfds);//将所有文件描述符移除集合
            int max_fd = defaultfd;//最大的文件描述符值

            for (int i = 0; i < N; i++)
            {
                if (_fd_array[i] == defaultfd)
                    continue;
                FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
                if (max_fd < _fd_array[i])
                {
                    max_fd = _fd_array[i]; // 更新出最大的fd的值
                }
            }

            struct timeval timeout = {0, 0};//设置等待时间
            int n=select(max_fd+1,&rfds,nullptr,nullptr,nullptr);
            //timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪
            switch (n)
            {
            case 0://指定时间内没有任何文件描述符准备就绪
                LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
                break;
            case -1://出现错误
                LOG(ERROR, "select error...\n");
                break;
            default://成功状态
                LOG(DEBUG, "Event Happen. n : %d\n", n); // 底层有一个事件就绪,select为什么会一直通知我?因为:我们没有处理!
                HandlerEvent(rfds);
                break;
            }
        }
    }
    //打印出已存在的fd
    std::string RfdsToString()
    {
        std::string fdstr;
        for (int i = 0; i < N; i++)
        {
            if (_fd_array[i] == defaultfd)
                continue;
            fdstr += std::to_string(_fd_array[i]);
            fdstr += " ";
        }
        return fdstr;
    }
    ~SelectServer()
    {
    }
private:
    uint16_t _port; //端口号
    std::unique_ptr<Socket> _listensock;//监听socket
    int _fd_array[N]; // 辅助数组
};

成员:
在这里插入图片描述
辅助数组:由上面select执行过程可以知道,当select执行后,会对fd_set的文件描述符产生影响,所以为了能够在循环中多次调用select函数,就需要一个数组来进行对文件描述符的保存;

初始化:
在这里插入图片描述
对于辅助数组来说,只要没有新的fd进来,那么文件描述符将保持为负;而对于Select来说,监听fd就是第一个事件;所以要在初始化就添加进来;

Loop:
这里操作流程就是跟上面的执行过程是一样的,只是增加了一些细节:
在这里插入图片描述
N是1024,指的是Select能存储的最大fd的数目;我们需要让select监听我们想要监听的事件,就需要通过循环来一个一个添加到rfds中;

select()返回值:
在这里插入图片描述
根据select()的返回值执行不同的代码;
这里所说的底层事件就绪,如果没有处理已就绪的事件,那么select就会一直监测到事件就绪,一直执行default语句的内容;

HandlerEvent():
在这里插入图片描述
在for循环里面通过FD_ISSET函数找出每个已经就绪的事件,然后再判断是不是监听事件的;

Main.cc

#include "SelectSever.hpp"
#include <memory>

// ./selectserver port
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cout << "Usage: " << argv[0] << " port" << std::endl;
        return 0;
    }
    uint16_t port = std::stoi(argv[1]);//获取端口号

    EnableScreen();
    std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);
    svr->Loop();

    return 0;
}

通过telnet进行测试:

开启Server服务:在这里插入图片描述

telnet进行访问
在这里插入图片描述

Server的监听socket收到就绪的事件,创建一个新的IO服务客户端:
在这里插入图片描述

客户端任意发送内容:
在这里插入图片描述
客户端服务端都能通过Select的底层就绪互相接收发送:
在这里插入图片描述

Select()的优缺点

优点

  • 多路复用:select()函数能够同时监视多个文件描述符,实现I/O多路复用,从而提高了程序的并发处理能力和资源利用率。
  • 简单易用:select()函数的接口相对简单,易于理解和使用,特别是对于初学者来说。
  • 灵活性:select()函数允许程序根据文件描述符的读、写、异常等事件进行灵活的处理,满足不同的I/O需求。

缺点

  • 文件描述符数量的限制:select()函数能够监视的文件描述符数量有限,通常在Linux上默认为1024个(尽管可以通过修改宏定义或重新编译内核来提升这一限制,但这样做可能会降低效率)。这对于需要监视大量文件描述符的应用程序来说是一个显著的限制。
  • 性能瓶颈:当监视的文件描述符数量较多时,select()函数的性能可能会成为瓶颈。因为每次调用select()时,内核都需要扫描所有被监视的文件描述符,这会导致不必要的开销。
  • 内核拷贝开销:在select()调用过程中,由于每次都要事先准本fd_set结构内容,内核与用户空间之间需要进行内存拷贝操作,以传递文件描述符集合和结果。这会增加额外的开销,并可能影响性能。

因此,在选择是否使用select()函数时,需要根据具体的应用场景和需求进行权衡。对于需要监视大量文件描述符或追求高性能的应用程序来说,可能需要考虑使用更高级的I/O多路复用机制,如poll()或epoll()等。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐