btcd源码解析——节点P2P连接建立的过程 (1)
1. 写在前面从这一篇博客开始,我们将介绍btcd节点之间P2P连接建立的过程。考虑到内容太长,分为上下两篇博客来讲解。这两篇博客之后,我们将继续介绍节点之间数据的同步过程。源码解析是基于https://github.com/btcsuite/btcd仓库c26ffa870fd817666a857af1bf6498fabba1ffe3的commit id 版本。2. 从节点的“启动”说起b...
文章目录
1. 写在前面
从这一篇博客开始,我们将介绍btcd
节点之间P2P
连接建立的过程。考虑到内容太长,分为上下两篇博客来讲解。这两篇博客之后,我们将继续介绍节点之间数据的同步过程。
源码解析是基于btcd仓库c26ffa870fd817666a857af1bf6498fabba1ffe3
的commit id
版本。
2. 从节点的“启动”说起
btcd
节点的启动主要由btcd.go
文件中的btcdMain
函数完成,其中P2P
的连接过程又是由 server.Start()
代码完成,如下所示:
// btcdMain [btcd.go]
func btcdMain(serverChan chan<- *server) error { // L43
...
server, err := newServer(...) // L149
...
server.Start() // L162
...
}
Start
函数中和P2P
连接相关的部分在go s.peerHandler()
完成,如下代码所示:
// btcdMain [btcd.go] -> Start [server.go]
func (s *server) Start() { // L2291
...
go s.peerHandler() // L2305
...
}
在比特币P2P
连接的语境中,一个节点就是一个peer
. peerHandler
函数包含三个最关键的启动工作:
- 启动
addrManager
,进行peer
地址的管理 - 启动
syncManager
,进行peer
之间数据的同步 - 启动
connManager
,进行peer
之间连接的管理
代码如下所示:
// btcdMain [btcd.go] -> Start [server.go] -> peerHandler
func (s *server) peerHandler() { // L2062
...
s.addrManager.Start() // L2068
s.syncManager.Start() // L2069
...
go s.connManager.Start() // L2093
...
}
其中跟P2P
连接相关的主要是addrManager
和connManager
的启动。具体而言:
addrManager
负责对其他peer
地址的管理,主要是一些本地的工作,不涉及直接的网络连接或传输;connManager
则主要负责与其他peer
建立P2P
连接,建立连接是需要对方peer
的地址,这便依赖于addrManager
中管理的地址。
3. P2P连接中peer地址的管理
3.1 AddrManager数据结构相关
P2P
连接中peer
地址的管理主要由addrManager
完成,addrManager
变量中包含了各种用于地址管理的信息,其数据结构如下所示:
// AddrManager [addrmanager.go]
type AddrManager struct { // L32
...
peersFile string // L34
...
addrIndex map[string]*KnownAddress // L38
addrNew [newBucketCount]map[string]*KnownAddress
addrTried [newBucketCount]map[string]*KnownAddress
...
}
其中最重要的四个字段是peersFile
, addrIndex
, addrNew
, 和addrTried
:
peersFile
对应于一个文件名,该文件主要保存序列化后的addrManager
,用于节点重启时能快速建立连接。该文件路径名默认为$data-dir/data/mainnet/peers.json
addrIndex
缓存所有KnownAddress
的map
addrNew
缓存所有新地址的map slice
addrTried
缓存所有已经尝试连接过的地址的list slice
准确来说,peersFile
中保存的并不是直接序列化后的addrManager
,因为addrManager
中的一些信息是运行时信息,并不需要保存下来。因此源码中构造了专门用于序列化addrManager
的数据结构,如下所示:
// serializedAddrManager [addrmanager.go]
type serializedAddrManager struct { // L64
Version int
Key [32]byte
Addresses []*serializedKnownAddress
NewBuckets [newBucketCount][]string // string is NetAddressKey
TriedBuckets [triedBucketCount][]string
}
3.2 从peersFile中反序列化填充AddrManager变量
addrManager
的Start
函数中的loadPeers
函数用来从peersFile
中反序列化出peers
的信息,并填充到addrManager
中.
// Start [addrmanager.go]
func (a *AddrManager) Start() { // L567
...
a.loadPeers()
...
go a.addressHandler() // L580
}
loadPeers
函数中主要做事的是deserializePeers
函数,如下所示:
// Start [addrmanager.go] -> loadPeers
func (a *AddrManager) loadPeers() { // L423
...
err := a.deserializePeers(a.peersFile)
...
}
deserializePeers
函数中的工作主要包括两个部分:
- 将
peersFile
文件中的数据反序列化成serializedAddrManager
变量 (sam
) - 将
sam
中的Addresses
,NewBuckets
和TriedBuckets
字段处理赋值给AddrManager
变量 (a
) 的addrIndex
,addrNew
和addrTried
字段
deserializePeers
函数代码如下所示, 其中:
- L444 - L456行代码将
peersFile
文件中的数据反序列化成sam
变量, - L471 - L502行代码将
sam
中的Addresses
字段处理赋值给的addrIndex
, - L504 - L518将
sam
中的NewBuckets
字段处理赋值给的addrNew
, - L519 - L531将
sam
中的TriedBuckets
字段处理赋值给的addrTried
// Start [addrmanager.go] -> loadPeers -> deserializePeers
func (a *AddrManager) deserializePeers(filePath string) error { // L442
...
r, err := os.Open(filePath) // L444
...
var sam serializedAddrManager
dec := json.NewDecoder(r)
err = dec.Decode(&sam) // L456
...
for _, v := range sam.Addresses { // L471
ka := new(KnownAddress)
...
ka.na, err = a.DeserializeNetAddress(v.Addr, v.Services)
...
ka.srcAddr, err = a.DeserializeNetAddress(v.Src, v.SrcServices)
...
a.addrIndex[NetAddressKey(ka.na)] = ka
} // L502
...
for i := range sam.NewBuckets { // L504
for _, val := range sam.NewBuckets[i] {
ka, ok := a.addrIndex[val]
...
a.addrNew[i][val] = ka
}
} // L518
for i := range sam.TriedBuckets { // L519
for _, val := range sam.TriedBuckets[i] {
ka, ok := a.addrIndex[val]
...
a.addrTried[i].PushBack(ka)
}
} // L531
...
}
3.3 将AddrManager变量序列化存储到peersFile中
3.2小节中Start()
函数的L580行启动了一个addressHandler
函数的协程,该函数每隔10分钟调用一次savePeers
函数,将peers
信息 (sam
变量) 序列化并保存到peersFile
文件中。addressHandler
函数和savePeers
函数的代码分别如下所示:
// Start [addrmanager.go] -> addressHandler
func (a *AddrManager) addressHandler() { // L567
...
for {
select {
case <- dumpAddressTicker.C:
a.savePeers()
...
}
}
...
}
// Start [addrmanager.go] -> addressHandler -> savePeers
func (a *AddrManager) savePeers() { // L361
...
w, err := os.Create(a.peersFile) // L408
...
enc := json.NewEncoder(w) // L413
...
enc.Encode(&sam) // L415
...
}
4. 与其他peer建立P2P连接
第2小节的peerHandler
函数中的L2093启动了一个新协程,该协程运行s.connManager.Start()
代码启动了ConnManager
管理器,代码如下所示:
// Start [connmanager.go]
func (cm *ConnManager) Start() {
...
go cm.connHandler() // L518
...
if cm.cfg.OnAccept != nil {
for _, listener := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenerHandler(listener) // L525
}
}
for i := atomic.LoadUnit64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
go cm.NewConnReq() // L530
}
}
Start
函数中最重要的代码包括三部分:
- L530行主动发起与其他
peer
节点的连接 - L518行对所有的主动连接进行管理
- L525行被动接受其他
peer
节点的连接
需要进一步说明的是,1主要是完成主动连接的建立过程,2主要完成主动连接建立之后的管理。1每建立一次连接后,都需要由2完成后续的管理工作,如登记到conns
变量中。
4.1 主动发起连接
节点主动发起连接的行为由NewConnReq
函数完成,代码如下所示:
// Start [connmanager.go] -> NewConnReq [server.go]
func (cm *ConnManager) NewConnReq() {
...
done := make(chan struct{}) // L376
select {
case cm.requests <- registerPending{c, done}:
case <-cm.quit:
return
}
...
select {
case <-done:
case <-cm.quit:
return
} // L398
addr, err := cm.cfg.GetNewAddress() // L400
...
c.Addr = addr
cm.Connect(c) // L402
}
其中需要解释的代码包括三个部分:
- L376-L398: 将当前连接请求登记到
pending
变量中,方便对该连接进行后续管理。登记过程是通过requests
管道完成的,管道的另一端连接connHandler
函数,后续在讲解connHandler
函数的再做详细介绍; - L400: 获取将要连接的peer地址,将在4.1.1节讲述
- L402: 完成实际的连接过程,将在4.1.2节讲述
4.1.1 获取将要连接的peer地址
由上可知,获取将要连接的peer
地址是由GetNewAddress
函数实现的,该函数是connmanager.go
文件中config
数据结构的字段. 在初始化cmg
r变量时,该字段被赋值为newAddressFunc
,代码如下所示:
// newServer [server.go]
func newServer(...) (*server, error) {
...
cmgr, err := connmgr.New(&connmgr.Config{ // L2818
...
GetNewAddress: newAddressFunc,
})
...
...
}
进一步查看newAddressFunc
函数的定义,其也定义在newServer
函数中,代码如下所示:
// newServer [server.go]
func newServer(...) (*server, error) {
...
var newAddressFunc func() (net.Addr, error) // L2773
if !cfg.SimNet && len(cfg.ConnectPeers) == 0 {
newAddressFunc = func() (net.Addr, error) {
for tries := 0; tries < 100; tries++ {
addr := s.addrManager.GetAddress() // L2777
...
addrString := addrmgr.NetAddressKey(addr.NetAddress())
return addrStringToNetAddr(addrString)
}
...
}
}
...
}
其中最重要的代码在L2777行,利用addrManager
的GetAddress
函数获取可用的连接地址。GetAddress
函数主要在addrTried
和addrNew
两个列表中随机地挑选可用的地址,这就与第3节的内容联系起来了。
4.1.2 完成实际的连接过程
实际的连接过程是由cm.Connect(c)
完成的,其代码如下所示:
// Start [connmanager.go] -> NewConnReq [server.go] -> Connect [connmanager.go]
func (cm *ConnManager) Connect(c *ConnReq) {
...
conn, err := cm.cfg.Dial(c.Addr) // L444
if err != nil {
select {
case cm.requests <- handleFailed{c, err}: // L447
case <-cm.quit:
}
return
}
select {
case cm.requests <- handleConnected{c, conn}: // L454
case <-cm.quit:
}
}
L444行通过调用Dial
函数进行连接,该函数主要对golang
中net
包的Dial
方法进行了一些包装。
L447和L454行分别用来处理连接失败和成功的情况,具体处理过程也是通过向requests
通道传递数据来完成。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)