https://bbs.comblockengine.com/forum.php?mod=viewthread&tid=2948

 

开源项目嘛 大家了解了读懂了才敢用 不然到时候出问题了不好排查;对于我这种从来没碰过服务器的还是很难理解的;抱着接受新事物的心态来读了下源码理解下流程,先读了下NetworkInterface


        NetworkInterface        {
                /*
                        NetworkInterface的流程
                        
                        创建内网和外网两个socket终端(EndPoint),并开始监听来自于客户端(或者其他内部服务器)的消息。    
                        创建一个socekt响应事件ListenerReceiver   并将该socket终端(EndPoint)   通过epoll 与其绑定,来处理来自于客户端的连接
                        
                        这样每当该socket终端(EndPoint)有客户端(或者内联的其他服务器)连接时,(已经注册过的客户端直接执行已经在epoll注册已经绑定的PacketReceiver处理packet数据)
                        新客户端连接就会调用epoll绑定的对应ListenerReceiver::handleInputNotification()
                        在handleInputNotification中accept该新客户端对应的的socket终端(EndPoint) ,
                        并将该socket终端封装到一个 Channel 来代表与该客户端的通信频道,这样所有的该客户端的事件都在其对应的的channel中处理,
                        创建PacketReceiver 来处理来自于各自客户端的事件,将该PacketReceiver与该链接EndPoint 绑定到epoll中,
                        这样每次客户端有消息来,就自动调用PacketReceiver,暂时放在该客户端对应的Channel中的addReceiveWindow放在 bufferedReceives_待处理
                        然后通过周期调用NetworkInterface::handleMainTick==>processChannels==>processPacket
                        ==>pPacketReader_->processMessages(pMsgHandlers, pPacket);==>pMsgHandler->handle(pChannel_, *pPacket);
                        在注册过的消息中搜索msgID来找到对应的句柄函数来处理该消息                        
                        


                        主要的三个函数
                        构造函数中 : 创建内部链接和外部连接的 监听函数 ,
                        recreateListeningSocket    用客户端的连接创建socket  将其与 监听函数    绑定到 epoll去处理
                        
                        //定时处理所有注册过的Channel的绑定解除和处理
                        processChannels{
                                foreach (pChannel in channelMap)
                                        pChannel->processPackets(pMsgHandlers);
                        }


                */


        
                std::map<Address, Channel *>        ChannelMap;
                EndPoint extEndpoint_, intEndpoint_;
                DelayedChannels::Task;

                
                new ListenerReceiver *        pExtListenerReceiver_;
                new ListenerReceiver *        pIntListenerReceiver_;


                recreateListeningSocket;(const char* pEndPointName, uint16 listeningPort_min, uint16 listeningPort_max, 
                                                                                const char * listeningInterface, EndPoint* pEP, ListenerReceiver* pLR, uint32 rbuffer, 
                                                                                uint32 wbuffer)

                {
                        //http://www.cnblogs.com/dolphinX/p/3460545.html
                        pEP->socket(SOCK_STREAM);//create socket


                        //注册到 epoll   ep为socket   
                        //LR为ListenerReceiver即InputNotificationHandler;为epoll绑定的handler  
                        //这样每当socket有客户端连接的时候 handler绑定的函数handleInputNotification 被执行
                        this->dispatcher().registerReadFileDescriptor(*pEP, pLR); --> pPoller_->registerForWrite(fd, handler);
                        


                        pEP->bind(listeningPort, ifIPAddr)        //绑定网络地址
                                
                        pEP->listen(backlog)                  //开始监听
                
                }==
                //当socket有连接进来时 会调用绑定的 ListenerReceiver  handleInputNotification
                ===>>>        int ListenerReceiver::handleInputNotification(int fd)
                {
                        EndPoint* pNewEndPoint = endpoint_.accept();//获取socket的连接事件  交给channel处理
                        

                        Channel* pChannel = Network::Channel::ObjPool().createObject();
                        
                        bool ret = pChannel->initialize(networkInterface_, pNewEndPoint, traits_){
                                        pPacketReceiver_ = new TCPPacketReceiver(*pEndPoint_, *pNetworkInterface_);

                                        //这里 又将事件注册回epoll
                                        this->dispatcher().registerReadFileDescriptor(*pEndPoint_, pPacketReceiver_);
                                        
                                        /*        epoll接收到该客户端再次来的数据后
                                        对应的处理在TCPPacketReceiver{
                                        handleInputNotification{
                                                TCPPacketReceiver::processRecv(bool expectingPacket)
                                                {
                                                        TCPPacket* pReceiveWindow = TCPPacket::ObjPool().createObject();
                                                        int len = pReceiveWindow->recvFromEndPoint(*pEndpoint_);
        
                                                        Reason ret = this->processPacket(pChannel, pReceiveWindow)
                                                        {
                                                        
                                                                        processFilteredPacket(pChannel, pPacket)
                                                                        {

                                                                                        pChannel->addReceiveWindow(pPacket)
                                                                                        {
                                                                                                //将客户端的消息放在了数组中待处理
                                                                                                bufferedReceives_.push_back(pPacket);

                                                                                        }


                                                                         }
                                                        }
                                                                                

                                                }

                                                
                                        }


                                        }*/
                        

                                }

                        this.registerChannel pChannel
                        {
                                const Address & addr = pChannel->addr();
                                channelMap_[addr] = pChannel;
                        }

                }


                NetworkInterface::handleMainTick{
                        this.processChannels{
                                foreach (pChannel in channelMap_)
                                        pChannel->processPackets(pMsgHandlers);
                                        {

                                                pPacketReader_->processMessages(pMsgHandlers, pPacket);
                                                {
                                                        pMsgHandler->handle(pChannel_, *pPacket);
                                                }
                                                
                                        }

                        }
                }




        }

更新发送消息
                /*
                        NetworkInterface的流程
                       
                        创建内网和外网两个socket终端(EndPoint),并开始监听来自于客户端(或者其他内部服务器)的消息。    
                        创建一个socekt响应事件ListenerReceiver   并将该socket终端(EndPoint)   通过epoll 与其绑定,来处理来自于客户端的连接
                        
                        这样每当该socket终端(EndPoint)有客户端(或者内联的其他服务器)连接时,(已经注册过的客户端直接执行已经在epoll注册已经绑定的PacketReceiver处理packet数据)
                        新客户端连接就会调用epoll绑定的对应ListenerReceiver::handleInputNotification()
                        在handleInputNotification中accept该新客户端对应的的socket终端(EndPoint) ,
                        并将该socket终端封装到一个 Channel 来代表与该客户端的通信频道,这样所有的该客户端的事件都在其对应的的channel中处理,
                        创建PacketReceiver 来处理来自于各自客户端的事件,将该PacketReceiver与该链接EndPoint 绑定到epoll中,
                        这样每次客户端有消息来,就自动调用PacketReceiver,暂时放在该客户端对应的Channel中的addReceiveWindow放在 bufferedReceives_待处理
                        然后通过周期调用NetworkInterface::handleMainTick==>processChannels==>processPacket
                        ==>pPacketReader_->processMessages(pMsgHandlers, pPacket);==>pMsgHandler->handle(pChannel_, *pPacket);
                        在注册过的消息中搜索msgID来找到对应的句柄函数来处理该消息        

                        而在服务器向外发送数据时候流程为
                        Channel::send==> pPacketSender_->processSend(this);
                        // 如果不能立即发送到系统缓冲区,那么交给poller处理PacketSender::handleOutputNotification==>processSend()??啥时候被触发呢?
                        ==>TCPPacketSender::processPacket
                        ==>processFilterPacket(pChannel, pPacket); ==>EndPoint* pEndpoint = pChannel->pEndPoint();        
                                                                                                         pEndpoint->send();
                                                                                                         
                       

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐