[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)
muduo 是由陈硕大佬个人开发的 TCP 网络库,本专栏使用c++11重构了Muduo库中核心的Multi-Reactor架构,剖析大佬核心的代码设计思想,希望能够理清自己的思路,也希望能够帮助到大家。
接着之前我们[muduo网络库]——muduo库Buffer类(剖析muduo网络库核心部分、设计思想),我们接下来继续看muduo库中的TcpConnection类。
TcpConnection类
TcpConnection类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。
重要成员变量
EventLoop *loop_; //绝对不是baseloop,因为TcpConnetion都是在subloop中管理的
const std::string name_;
std::atomic_int state_;
bool reading_;
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
ConnectionCallback connectionCallback_; //有新连接时的回调
MessageCallback messageCallback_; //有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
CloseCallback closeCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
size_t highWaterMark_;
Buffer inputBuffer_; //接受数据的缓冲区
Buffer outputBuffer_; //发送数据的缓冲区
loop_
该Tcp连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。name_
客户端的名字state_
客户端的状态,对应的有一个枚举类型,分别对应着已经断开连接,正在连接,已经连接,正在断开连接。
enum StateE {kDisconnected, kConnecting, kConnected, kDisconnecting};
reading_
连接是否正在监听读事件socket_
连接套接字, 用于对连接进行底层操作channel_
通道, 用于绑定要监听的事件localAddr_
本地IP地址peerAddr_
对端IP地址connectionCallback_
,messageCallback_
,writeCompleteCallback_
,highWaterMarkCallback_
,closeCallback_
对应的连接建立/关闭后的处理函数,收到消息后的处理函数,消息发送完后的处理函数,高水位回调,连接关闭后的处理函数。highWaterMark_
因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快inputBuffer_
,outputBuffer_
输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。
重要成员函数
- 构造函数
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
// 向Channel对象注册可读事件
channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd;
socket_->setKeepAlive(true);
}
- 构造函数给channel设置相应的回调函数
//当通道有读事件时候在Channel::handleEvent()内调用:TcpConnection::handleRead()
channel_->setReadCallback(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));
//当通道有写事件的时候在Channel::handleEven()内调用:TcpConnection::handleWrite()
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite,this));
//当通道有关闭事件的时候在Channel::handleEvent()内调用:TcpConnection::handleClose()
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose,this));
//当通道有错误事件的时候在Channel::handleEvent()内调用:TcpConnection::handleError()
channel_->setErrorCallback(std::bind(&TcpConnection::handleError,this));
LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
//开启Tcp/Ip层的心跳包检测
socket_->setKeepAlive(true);
- 一系列的获取
loop_
,name_
,地址,状态的函数。
EventLoop* getLoop() const { return loop_;}
const std::string& name() const { return name_;}
const InetAddress& localAddress() const { return localAddr_;}
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected;}
- 发送数据
void TcpConnection::send(const std::string &buf) //直接引用buffer
{
if(state_ == kConnected)
{
if(loop_->isInLoopThread())
{
//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。
sendInLoop(buf.c_str(),buf.size());
}
else
{
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop
, this
, buf.c_str()
, buf.size()
));
}
}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{
ssize_t nwrote = 0;
size_t remaining = len; //未发送的数据
bool faultError = false; //记录是否产生错误
//之前调用过connection的shutdown 不能在发送了
if(state_ == kDisconnected)
{
LOG_ERROR("disconnected,give up writing!");
return ;
}
//channel 第一次开始写数据,且缓冲区没有待发送数据
if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(),data,len);
if(nwrote >= 0)
{
remaining = len - nwrote;
if(remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this()));
}
}
else
{
nwrote = 0;
if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写
{
LOG_ERROR("TcpConnection::sendInLoop");
if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET
{
faultError = true;
}
}
}
}
if(!faultError && remaining > 0)
{
//目前发送缓冲区剩余的待发送数据的长度
size_t oldlen = outputBuffer_.readableBytes();
if(oldlen + remaining >= highWaterMark_
&& oldlen < highWaterMark_
&& highWaterMark_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining)
);
}
outputBuffer_.append((char*)data + nwrote,remaining);
if(!channel_->isWriting())
{
channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout
}
}
}
1) 发送数据要发送的数据长度是len
,如果在loop_
在当前的线程里面,就调用sendInLoop
,sendInLoop
内部实际上是调用了系统的write
,如果一次性发送完了,就设置writeCompleteCallback_
,表明不要再给channel设置epollout事件了
2)如果没有写完,先计算一下oldlen
目前发送缓冲区剩余的待发送数据的长度。满足:
if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)
就会触发高水位回调
3)不满足以上的话,直接写入outputBuffer_
4) 剩余的数据保存到缓冲区当中,要给给channel
注册epollout
事件(切记,一定要注册channel的写事件,否则poller不会向channel通知epollout),这样poller
发现tcp发送缓冲区有空间,会通知相应的socket-channel调用相应的writeCallback()
回调方法,也就是调用TcpConnection::handleWrite
,把发送缓冲区中数据全部发送出去。
- 重中之重
TcpConnection::handleRead
负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中inputBuffer_
,然后再调用connectionCallback_保存的连接建立后的处理函数。
void TcpConnection::handleRead(TimeStamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
if(n > 0)
{
messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
}
else if(n==0) //客户端断开
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::hanleRead");
handleError();
}
}
- 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作
onMessage
,shared_from_this()
获取了当前TcpConnection对象的智能指针. - n=0,说明客户端断开了,调用连接关闭后的处理函数。
- n<0 出错了,调用错误处理回调
handleWrite( )
负责处理Tcp连接的可写事件
void TcpConnection::handleWrite()
{
if(channel_->isWriting())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
if(n > 0)
{
outputBuffer_.retrieve(n); //处理了n个
if(outputBuffer_.readableBytes() == 0) //发送完成
{
channel_->disableWriting(); //不可写了
if(writeCompleteCallback_)
{
//唤醒loop对应的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
if(state_ == kDisconnecting)
{
shutdownInLoop();// 在当前loop中删除TcpConnection
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());
}
}
- 如果可写,通过fd发送数据,直到发送完成
- 设置不可写,如果writeCompleteCallback_,唤醒loop对应的thread线程,执行回调
- 当前TCP正在断开连接,调用
shutdownInLoop
,在当前loop中删除TcpConnection
- 处理Tcp连接关闭的事件
handleClose()
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr); //执行连接关闭的回调
closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}
处理逻辑就是将这个TcpConnection对象中的channel_
从事件监听器中移除。然后调用connectionCallback_
和closeCallback_
保存的回调函数。closeCallback_
在TcpServer::newConnection()
为新连接新建TcpConnection
时,已设为TcpServer::removeConnection()
,而removeConnection()
最终会调用TcpConnection::connectDestroyed()
来销毁连接资源。
void TcpConnection::connectDestroyed()
{
if(state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉
connectionCallback_(shared_from_this());
}
channel_->remove();//把channel从poller中删除掉
}
只有处于已连接状态(kConnected)的tcp连接, 才需要先更新状态, 关闭通道事件监听。
- 错误处理回调
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err = 0;
if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen) < 0)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n",name_.c_str(),err);
}
getsockopt()函数用于获取任意类型、任意状态套接口的选项当前值,并把结果存入optval,最后输出错误日志。
- 关闭连接
//关闭连接
void TcpConnection::shutdown()
{
if(state_ == kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(
std::bind(&TcpConnection::shutdownInLoop,this)
);
}
}
void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成
{
socket_->shutdowmWrite(); // 关闭写端
}
}
注意: 为什么是关闭了写端呢?在TcpConnection::shutdownInLoop()
中,我们会发现它调用了Socket的shutdowmWrite
,这里并没有使用close,陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。
另外如果当前outputbuffer
里面还有数据尚未发出的话,Muduo也不会立刻调用shutwownWrite
,而是等到数据发送完毕再shutdown
,可以避免对方漏收数据。
关闭连接事件很重要,涉及到TcpConnection和Channel的生命周期以及是否能合理销毁,用了智能指针来管理和控制生命周期。下面我们就来分析一下断开流程中**TcpConnection的引用计数问题**:
1.首先连接到来创建TcpConnection,并存入容器。引用计数+1 总数:1
2.客户端断开连接,在Channel的handleEvent
函数中会将Channel中的TcpConnection
弱指针提升,引用计数+1 总数:2
3.触发HandleRead
,可读字节0,进而触发HandleClose
,HandleClose
函数中栈上的TcpConnectionPtr guardThis
会继续将引用计数+1 总数:3
4.触发HandleClose
的回调函数 在TcpServer::removeConnection结束后(回归主线程队列),释放HandleClose的栈指针,以及Channel里提升的指针引用计数-2 总数:1
5.主线程执行回调removeConnectionInLoop,在函数内部将tcpconnection从TcpServer中保存连接容器中erase
掉。但在removeConnectionInLoop结尾用conn
为参数构造了bind
。引用计数不变 总数:1
6.回归次线程处理connectDestroyed
事件,结束完释放参数传递的最后一个shard_ptr,释放TcpConnection。引用计数-1 总数:0
- 建立连接
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); //向poller注册channel的epollin事件
//新连接建立 执行回调
connectionCallback_(shared_from_this());
}
思考一下:什么时候调用WriteCompleCallback?什么时候调用HighWaterMarkCallback?
- 如果发送缓存区被清空,就调用WriteCompleCallback。TcpConnection有两处可能触发此回调。
- TcpConnection::sendInLoop()。
- TcpConnection::handleWrite()。
- 如果输出缓冲的长度超过用户指定大小,就会触发回调HighWaterMarkCallback(只在上升沿触发一次)。
在非阻塞的发送数据情况下,假设Server发给Client数据流,为防止Server发过来的数据撑爆Client的输出缓冲区,一种做法是在Client的HighWaterMarkCallback中停止读取Server的数据,而在Client的WriteCompleteCallback中恢复读取Server的数据
需要注意的是: TcpConnection类是唯一默认用shared_ptr来管理的类,唯一继承自enable_shared_from_this的类。这是因为其生命周期模糊:可能在连接断开时,还有其他地方持有它的引用,贸然delete会造成空悬指针。只有确保其他地方没有持有该对象的引用的时候,才能安全地销毁对象。
代码地址:https://github.com/Cheeron955/mymuduo/tree/master
好了~ 有关于muduo库的TcpConnection类的细节就到此结束了,再次强调一点,当channel有相应的事件的时候,会调用TcpConnection对应的回调。到此为止,我们介绍了Channel,Poller,EPollPoller,EventLoop,Acceptor,Socket,Buffer,TcpConnection八大类,还有单独的CurrentThread,DefaultPoller,接下来我们会剖析一下剩下的,比如Logger,TimeStamp等比较简单的,然后最后在从连接建立,断开,发送消息等过程在进行一个总结剖析,希望大家多多支持,我们下一节见~~
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)