gRPC运行过程与流量控制

gRPC中的流量控制

流量控制是一个网络组件的基本功能, 我们熟知的 TCP 协议就规定了流量控制算法. gRPC 建立在 TCP 之上, 也依赖于 HTTP/2 WindowUpdate Frame 实现了自己在应用层的流量控制。

流量控制, 一般来说指的是在网络传输中, 发送者主动限制自身发送数据的速率或发送的数据量, 以适应接收者处理数据的速度——当接收者的处理速度较慢时, 来不及处理的数据会被存放在内存中, 而当内存中的数据缓存区被填满之后, 新收到的数据就会被扔掉, 导致发送者不得不重新发送, 就会造成网络带宽的浪费.

流量控制是双向的,并且,gRPC中控制流量是作用在HTTP2 data frame上的。

讲解思路

目前,grpc(v1.49.0) 存在三种流量控制的方式,BDP采样流量控制、connect level 流量控制、steam level流量控制。接下来我会详细介绍这三者的过程,并结合关键源代码来分析讲解。

因为这三种流量控制的作用细粒度存在递进关系。所以,会先单独地讲前两种控制,在第三种stream level流量控制的时候,再将它们组合连贯起来,通过源码步骤来看gRPC的运行过程。

另外,讲解源代码的方式通过这样展示:

// 下面的每一句都是源代码中copy出来的
s.Serve(listen)  // 初步的入口
  s.handleRawConn(lis.Addr().String(), rawConn)  // 缩进表示进入了Server函数里面
    s.newHTTP2Transport(...)                     // ...表示省略了一些参数
      st.HandleStreams --> func (t *http2Server) HandleStreams()  // 表示接口实现

这样可以看到源代码的运行流程,篇幅也易于观看。
更重要的是,希望读者可以跟着点击进入函数来查看,手动查找这些关键代码来了解它的运行,并且,我相信找的过程一定会有额外的收获。毕竟实践总能收获更多且会印象深刻。

!!!

文章需要具备有gRPC基础知识,了解gRPC主要结构对象的作用,以及知道client与server之间的过程。
这部分知识可以看我之前的一篇grpc介绍文章:
gRPC基础解读:https://blog.csdn.net/m0_60647847/article/details/126713300
或者,看下面这篇文章:
gRPC概述:https://juejin.cn/post/7089739785035579429

这里放一张图,涉及grpc交互时候的一些结构,方便查看。
请添加图片描述

期待这篇文章更够给你带来一些收获。😃

BDP采样流量控制

采样流量控制, 准确来说应该叫做 BDP 估算和动态流量控制窗口, 是一种通过在接收端收集数据, 以决定发送端流量控制窗口大小的流量控制方法.
BDP 采样目前在 gRPC-go 的 server 端是默认开启的.

  • 什么是BDP?
    Bandwidth Delay Product (BDP), 即带宽延迟积, 是网络连接的带宽和数据往返延迟的乘积. BDP 能够有效地告诉我们, 如果充分利用了网络连接, 那么在某一刻在网络连接上可以存在多少字节的数据.

  • BDP的原理是什么?
    在server收到一个data frame的时候,会发送一个独特的frame给client——BDP ping frame(专属的ping frame),然后server会统计在发出BDP frame后到收到这个frame对应的ACK应答这段时间内的字节数。这个大约在 1.5RTT (往返时间) 中收到的所有字节的总和就是有效 BDP1.5 的近似值。所以就通过这个得到当前的”流量大小“。当前状态知道后,就开始我们设定的调整策略。如果该值接近当前流量窗口的大小 (例如超过 2/3), 接收者就需要增加窗口的大小. 窗口的大小被设定为 BDP (所有采样期间接受到的字节总和) 的两倍.

结构分析

BDP的结构:

// grpc@v1.49.0/internal/transport/bdp_estimator.go
type bdpEstimator struct {
    // 记录ping frame发送的时间
    sentAt time.Time
    mu sync.Mutex
    // 当前的bdp estimate.
    bdp uint32
    // 收到的字节数
    sample uint32
    // bwMax is the maximum bandwidth noted so far (bytes/sec).
    bwMax float64
    // 是否发送ping frame的状态.
    isSent bool
    // 回调去更新窗口
    updateFlowControl func(n uint32)
    // sampleCount is the number of samples taken so far.
    sampleCount uint64
    // 往返时间 (seconds)
    rtt float64
}

var bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}

// controlbuf.go
type loopyWriter struct {
    // ···
    bdpEst        *bdpEstimator
    // ···
}

// http2_server.go
type http2Server struct {
    // ···
  loopy       *loopyWriter
  // ...
    bdpEst        *bdpEstimator
    // ···
}

(至于http2Server与loopyWriter的作用,需要明白gRPC的基本过程,链接在上面。)
bdpEstimator结构的两个主要方法: add 与 calculate。

  • add:决定 client 在接收到数据时是否开始采样,并记录采样开始的时间和初始数据量.
  • calculate:收到ack的时候,计算得到当前的 bdp 的值。

方法:(明白什么场景下做什么动作)

func (b *bdpEstimator) add(n uint32) bool {
  // ...
    if b.bdp == bdpLimit { // 达到上限就不采样
        return false
    }
    if !b.isSent {  // 通过isSent控制是否发送BDP ping frame,如果当前没有发送,就要发送
        b.isSent = true
        b.sample = n
        b.sentAt = time.Time{}
        b.sampleCount++
        return true
    }
  // ping frame发出去了但是还没有收到ACK
    b.sample += n
    return false
}

// 挂钩时间来计算BDP
func (b *bdpEstimator) calculate(d [8]byte) {
    // ...
  // 针对往返时间的计算
    rttSample := time.Since(b.sentAt).Seconds()
    if b.sampleCount < 10 {
        // 拿前十个rtt的平均值计入
        b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
    } else {
        // Heed to the recent past more.
        b.rtt += (rttSample - b.rtt) * float64(alpha)
    }
    b.isSent = false

  // 当前已经计算的字节数 <= 实际的BDP的1.5倍
    bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
    if bwCurrent > b.bwMax {
        b.bwMax = bwCurrent
    }

    if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
        sampleFloat := float64(b.sample)
        b.bdp = uint32(gamma * sampleFloat)
        if b.bdp > bdpLimit {
            b.bdp = bdpLimit
        }
        bdp := b.bdp
        b.mu.Unlock()
        b.updateFlowControl(bdp)  // 回调控制更新
        return
    }
    b.mu.Unlock()
}
过程分析

server端:

s.Serve(listen)
  s.handleRawConn(lis.Addr().String(), rawConn)
    s.newHTTP2Transport(rawConn)      // cs之间进行HTTP2握手 -- HTTP2 frame的传递
      transport.NewServerTransport(c, config) --> func NewServerTransport(...)(.) // http2_server.go
        t.bdpEst = &bdpEstimator{...}  // 这里创建bdpEstimator
      s.serveStreams(st)            // 对 frame 类型的分类和处理
        st.HandleStreams --> func (t *http2Server) HandleStreams()  // 不同的frame不同的处理
          t.handleData(frame)  //  server收到data frame之后的处理
            sendBDPPing = t.bdpEst.add(size)
            t.controlBuf.put(bdpPing)
          t.handlePing(frame)  // server 在收到一个 HTTP/2 ping frame 之后调用的函数
            f.IsAck()
            t.bdpEst.calculate(f.Data)  // 收到ping frame的处理
              b.updateFlowControl(bdp) --> func (t *http2Server) updateFlowControl(n uint32)
                s.fc.newLimit(n)
                t.controlBuf.put(&outgoingWindowUpdate{...})
                t.controlBuf.put(&outgoingSettings{...})
            t.controlBuf.put(...)

client端:

// 直接看client的建立流程开始 -- 目前只注重BDP流量控制相关的
transport.NewClientTransport(...)
  newHTTP2Client()
    fc:&trInFlow{limit: uint32(icwz)}
    t.bdpEst = &bdpEstimator{...}
    t.controlBuf = newControlBuffer(t.ctxDone)
    go t.reader()  // 创建连接的同时,根据不同的frame做不同处理,并进行connction、stream流量控制 【重要函数】
      frame, err := t.framer.fr.ReadFrame()
      t.handleSettings(sf, true)
      t.handleData(frame)
        if t.bdpEst != nil
        sendBDPPing = t.bdpEst.add(size)
        w := t.fc.onData(size)
        w := t.fc.reset()
        t.controlBuf.put(bdpPing)
      t.handlePing(frame)
        f.IsAck()
          t.bdpEst.calculate(f.Data)
            b.updateFlowControl(bdp) --> func (t *http2Client) updateFlowControl(n uint32)
              t.controlBuf.executeAndPut(...)
              t.controlBuf.put(&outgoingSettings{...})
        pingAck := &ping{ack: true}
        t.controlBuf.put(pingAck)

通过上面的代码过程,我们可以知道:

  • gRPC client 和每一个 server 之间都维护着一个 bdpEstimator。

  • 每次收到一个 data frame, gRPC client 都会判断是否需要进行采样. 同一时刻, 同一个 client 只会进行一次采样。而如果需要进行采样, 就向 server 发送一个 bdpPing frame.

  • Server 端在收到一个 bdpPing frame 之后, 会立刻返回一个 ping frame 并且标志了 ACK 这个 flag, client 会捕捉到这个 ACK

  • handlePing 是收到一个 HTTP/2 ping frame 之后调用的函数, 可以看到当 ping frame 是一个 ack 时, 会调用 calculate 这个函数.

  • 在 calculate 中, 经过一系列的计算得到了当前的 bdp 的值, 如果需要更新流量控制的话, 会调用之前注册在 bdpEstimator 中的updateFlowControl 函数(transport.NewServerTransport(c, config)函数中), 并将新的窗口大小传递进去。

值得注意的是, BDP 采样结果会影响 HTTP/2 的窗口大小, connection level 的窗口大小 以及 stream level 的窗口大小. BDP 对 gRPC 的影响是动态的, 全面的。

这个部分就是ping frame导致的消耗成本,在考虑的时候需要综合考量。

Connection Level 流量控制

我们知道gRPC是基于HTTP/2在通信,所以cilent与server之间维护有一个TCP链接。Connection Level流量控制就是针对这个TCP链接做流量控制。
在连接建立之初, server 会被分配一个流量 quota, 默认为 65535 字节. 流量控制就是围绕着这个 quota 展开的:
(这里的流量控制就是限制server的流量,而不是client限流)

  • 当 server 发送 n 个字节的数据给 client 时, quota -= n
  • 当 server 收到来自 client 的 HTTP/2 WindowUpdate Frame (简称 window update) 后, 根据 window update 中指定的数值 m, quota += m
  • 当 server 的 quota 为 0 时, server 将 不能发送任何数据给 client.

这里有种令牌桶限流的思想,通过quota限制总量,如果没有了,要么等待,要么就返回。

当然,流量控制不仅仅是server端,client端也是需要配合:

  • client在连接始化的时候,会分配一个limit,限制收到的数据量和unacked(表示server没有发过来)。
  • client会记录收到的数据量的总和和unacked。当 unacked 超过了 limit 的四分之一后(表示居然有1/4的包没有应答了), client 端就会向 server 发送一个 window update (数值为 unacked), 通知 server 可以把 quota 加回来(相当于就是增加令牌), 并且将 unacked 重新置零。
  • 之所以使用这个limit的机制,为了避免频繁地发送 window udpate 占用网络带宽, client 并不会在每次接收到数据之后就发送 window update, 而是等待接受的数据量达到某一阈值后再发送. 注意 limit * 1/4 的阈值时不可修改的.
结构分析

涉及到connection level流量控制的结构:

// ==server端==
// contrlobuf.go
type loopyWriter struct {  // 负责发送
  // ······
  sendQuota uint32  // server 流量 quota
  // ······
}

// http2_server.go
type http2Server struct {  // 针对建立http2server的transport
  // ·····
  loopy       *loopyWriter
  // ...
  fc         *trInFlow
  // ·····
}

// ==client端==
// http2_client.go
type http2Client struct {
  // ······
  loopy      *loopyWriter
  fc         *trInFlow
  // ······
}

// ==核心控制结构==
// flowcontrol.go  -- 与client关系比较密切
type trInFlow struct {
  limit               uint32 // server 端能够发送数据的上限, 会被 server 端根据采用控制的结果更新
  unacked             uint32 // client 端已经接收到的数据
  effectiveWindowSize uint32 // 用于 metric 记录, 不影响流量控制
}

方法:
核心方法是onData(n).

// 参数 n 是 收到传递的数据大小, 返回值表示需要发送的 window update 中的数值大小. 返回 0 代表不需要发送 window update
func (f *trInFlow) onData(n uint32) uint32 {
  f.unacked += n
  if f.unacked >= f.limit/4 {  // 超过 1/4 * limit 才会发送 window update, 且数值为已经接收到的数据总量
    w := f.unacked
    f.unacked = 0
    f.updateEffectiveWindowSize()
    return w
  }
  f.updateEffectiveWindowSize()
  return 0
}

func (f *trInFlow) newLimit(n uint32) uint32 {
    d := n - f.limit
    f.limit = n
    f.updateEffectiveWindowSize()
    return d
}

func (f *trInFlow) reset() uint32 {
    w := f.unacked
    f.unacked = 0
    f.updateEffectiveWindowSize()
    return w
}
过程分析
  1. server端
  • Connection流量控制的是TCP的连接,所以,代码应该从建立连接之后的部分代码看起。
    在grpc完成连接并开始交互时,都是由HTTP2建立连接在最后启动 loopyWriter , 开始了 RPC 交互阶段. loopyWriter 不断地从 controlBuf 中读取 control frames (包括 setting frame), 并将缓存中的 frame 发送给 client. 可以说 loopyWriter 就是 gRPC server 控制流量以及发送数据的地方。

    建立连接相关:

    s.Serve(listen)
      s.handleRawConn(lis.Addr().String(), rawConn)
        s.newHTTP2Transport(rawConn)
          transport.NewServerTransport(c, config)  // 建立连接
            t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
              sendQuota:defaultWindowSize,
              bdpEst:bdpEst,
            t.loopy.run()
              l.handle(it)     // 接收frame
                l.incomingWindowUpdateHandler(i) //处理来自 client 的 window update frame
                  l.sendQuota += w.increment
                l.outgoingWindowUpdateHandler(i)
              l.processData()  // 发送交互
                if l.sendQuota == 0 {return true, nil} //判断sendQuota为0时候不发送数据
                l.sendQuota -= uint32(size) // 发送数据之后会减少
    

    交互相关:

    s.Serve(listen)
      s.handleRawConn(lis.Addr().String(), rawConn)
        s.newHTTP2Transport(rawConn)
          transport.NewServerTransport(c, config)
           go func() {..s.serveStreams(st)..}()
             st.HandleStreams --> func (t *http2Server) HandleStreams()  //http2_server.go
               t.handleData(frame)
                 if w := t.fc.onData(size)  // 调用onData方法(流量控制与数据读取解耦)
                 t.controlBuf.put(&outgoingWindowUpdate{...})
                 if w := t.fc.reset()
                 t.controlBuf.put(bdpPing)
                 s.write(recvMsg{buffer: buffer})
    
  • 服务端的Quota就是sendQuota,在连接的时候,调用了newLoopyWriter的run方法。而l.processData负责发送data frame给client。

  • 接受到client的window updata,incomingWindowUpdateHandler处理,会增加Quota。

  • 在processData发送数据之后,Quota会减少。

  1. client端
  • Connection level流量控制中,关于client端的任务,一个是负责初始化limit限制值;另外一个是负责监控并发送window update。两个任务都是运行在交互阶段的。而client的交互通过一个stream对象管理。我们还是应该先从客户端http2Client连接看起。
    控制流程:

    grpc.Dial(.)
      DialContext()
        cc.parseTargetAndFindResolver()
        newCCBalancerWrapper(.)  // 核心处理函数
          go ccb.watcher()
        newCCResolverWrapper(.)
        cc.Connect()
          cc.balancerWrapper.exitIdle()
            ccb.updateCh.Put(&exitIdleUpdate{}) --> // 借助了watch机制,chan传递
              func (ccb *ccBalancerWrapper) watcher()
                ccb.handleExitIdle()
                  ccb.balancer.ExitIdle()
                    sc.Connect()
                      go acbw.ac.connect()
                        ac.resetTransport()
                           ac.tryAllAddrs(.)
                             ac.createTransport(addr, copts, connectDeadline)
    // clientconn.go
    func (ac *addrConn) createTransport(.)
      transport.NewClientTransport(.)
        newHTTP2Client()
          fc:&trInFlow{limit: uint32(icwz)}
          go t.reader()
            t.handleData(frame)
               t.fc.onData(size)
                 if f.unacked >= f.limit/4 // 超过 1/4 * limit 才会发送 window update, 且数值为已经接收到的数据总量
                 f.updateEffectiveWindowSize()
    
  • 从上面可以看到,针对流量控制的设置是在Dial获取连接的时候就进行的

  • go t.reader()以goroutine的方式运行在后台,实时监控并控制流量。

  • trInFlow 是 client 端控制是否发送 window update 的核心. 值得注意的是 client 端是否发送 window update 只取决于已经接收到的数据量, 而不管这些数据是否被某些 stream 读取. 这一点是 gRPC 在流量控制中的优化。

    即因为多个 stream 共享同一个 connection, 不应该因为某个 stream 读取数据较慢而影响到 connection level 的流量控制, 影响到其他 stream.

Stream level流量控制

上面介绍的BDP采样流量控制,是针对全局端到端的流量控制。第二种流量控制是Connnect level流量控制,针对的是HTTP/2协议下的TCP连接做流量控制,在服务端限制额度,然后通过客户端限制与交互调整。

现在这个Stream level流量控制进一步着眼于client与server之间控制交互的stream,完成更加精细化的控制。

stream level与Connection level十分相似:

  1. stream level中的quota只针对单个stream,也就是说现增加了这个流量控制以后,每个stream受stream level控制,又受connection level控制。(当然,这个还受控于全局的BDP采样流量控制)
  2. client端决定反馈window update frame的机制变得更加复杂。

Stream level的原理
既然原理相似,我们可以再顺着connection level的原理往下走看看,connection level在server端设置了quota,在client端设置了limit。quota控制着流量的发送时机,limit控制window update更新,可以理解为是在server端记录已经收到的请求与流量(通过 quota-=x 的形式)。现在我们要精细化到stream维度来控制流量,就需要同时记录被stream消费掉的流量,达到更加精细化的控制。
也就是通过client记录:

  1. pendingData: stream 收到但还未被应用消费 (未被读取) 的数据量.
  2. pendingUpdate: stream 收到且已经被应用消费 (已被读取) 的数据量.
  3. limit: stream 能接受的数据上限, 被初始为 65535 字节, 受到采样流量控制的影响.
  4. delta: delta 是在 limit 基础上额外增加的数据量, 当应用试着去读取超过 limit 大小的数据是, 会临时在 limit 上增加 delta, 来允许应用读取数据.

代码过程
上面介绍了Stream level的原理与实现精细化控制增加的结构(client记录的内容)。既然和Connection level类似,那么还是和底层的TCP连接相关。
可以先看看控制结构,client与server的交互是通过stream对象管理,我们应该通过stream对象来查看结构。

因为Stream level精细化控制,涉及到了client端与server端,还有涉及到了上两个流量控制。所以通过Stream流程可以串联起前两个流量控制过程,并且,我们可以重新看看它在grpc服务启动与连接交互过程中的表现。

结构分析
// ==client端==
// stream.go
type clientStream struct {
  // ······
  attempt *csAttempt
  // ······
}

type csAttempt struct {
  // ·····
  t  transport.ClientTransport  // http2Client{}
  s  *transport.Stream
  // ······
}

// Stream represents an RPC in the transport layer.
type Stream struct {
  // ······
  st   ServerTransport    // nil for client side Stream
  ct   *http2Client       // nil for server side Stream
  // ······
  fc   *inFlow
  // ······
}

// ==inFlow结构==
// inFlow deals with inbound flow control
// 通过client记录Stream level相关的字段
type inFlow struct {
    mu sync.Mutex
    // limit: stream 能接受的数据上限, 被初始为 65535 字节, 受到采样流量控制的影响.
    limit uint32
    // pendingData: stream 收到但还未被应用消费 (未被读取) 的数据量.
    pendingData uint32
    // pendingUpdate: stream 收到且已经被应用消费 (已被读取) 的数据量
    pendingUpdate uint32
    // delta: delta 是在 limit 基础上额外增加的数据量, 当应用试着去读取超过 limit 大小的数据是, 会临时在 limit 上增加 delta, 来允许应用读取数据.
    delta uint32
}

// ==Server端==
// Server.go
type Server struct {
    // ···
    conns    map[string]map[transport.ServerTransport]bool
    // ···
}

// 实现了transport.ServerTransport
type http2Server struct {
  // ···
  activeStreams map[uint32]*Stream  // Stream在这里
  // ···
}

Stream level流量控制策略运行过程:(应该结合着上面的字段功能介绍来看)。
相关方法:

// client端更新Limit
func (f *inFlow) newLimit(n uint32) {
    f.mu.Lock()
    f.limit = n
    f.mu.Unlock()
}

// 复杂过程体现在这里,需要在流量被传递给client下游的application之前,做好某些判断
// client能知道会有多少数据被读取走,因为data frame 的大小被包含在 headers frame 中提前发送给了client.
// Client 端会尝试估算 server 端此时的 quota, 做好读取数据之前的预热。 这一步与结构中的delta相关
func (f *inFlow) maybeAdjust(n uint32) uint32 {
    if n > uint32(math.MaxInt32) {
        n = uint32(math.MaxInt32)
    }
    f.mu.Lock()
    defer f.mu.Unlock()
    // estSenderQuota 是接收方对发送方的最大字节数的推测。表示可以在没有window update的情况下发送的最大字节数(因为)
    estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))  // 推测出 server 端剩余 quota 的最大值

    estUntransmittedData := int32(n - f.pendingData) // 推测出 server 还未发送数据的最大值

    if estUntransmittedData > estSenderQuota { // 如果预估 server 还未发送的数据的最大值, 大于了 server 剩余 quota 的最大值
        // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
        if f.limit+n > maxWindowSize {
            f.delta = maxWindowSize - f.limit
        } else {  // 发送一个 window update 给 serve ,临时提高 server 的 quota 上限,并记录到delta增量中
            f.delta = n
        }
        return f.delta
    }
    return 0
    // 到这里都只是预热阶段,预热是为了提高效率
}

// client接收到来自server的data frame的时候,pendingData增加
func (f *inFlow) onData(n uint32) error {
    f.mu.Lock()
    f.pendingData += n
    if f.pendingData+f.pendingUpdate > f.limit+f.delta {
        limit := f.limit
        rcvd := f.pendingData + f.pendingUpdate
        f.mu.Unlock()
        return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
    }
    f.mu.Unlock()
    return nil
}

// application开始从stream读取数据,pendingData减少
func (f *inFlow) onRead(n uint32) uint32 {
    f.mu.Lock()
    if f.pendingData == 0 {
        f.mu.Unlock()
        return 0
    }
    f.pendingData -= n
    if n > f.delta {  // n 和 delta 比较, 需要再一次判断需不需要window update。这个步骤是自适应,慢慢调节的过程。可以渐渐平衡之前预热阶段增加的delta额度
        n -= f.delta
        f.delta = 0
    } else {
        f.delta -= n
        n = 0
    }
    f.pendingUpdate += n  //表示 stream 上将会有 n 字节的数据被读取且未向 server 端发送反馈
    if f.pendingUpdate >= f.limit/4 {  // 大于的情况需要发送window update
        wu := f.pendingUpdate
        f.pendingUpdate = 0  // 然后清空pendingUpdate
        f.mu.Unlock()
        return wu
    }
    f.mu.Unlock()
    return 0
}
过程分析

因为下面的过程分析会整体上讲解一遍,包含了上述的两种流量控制以及grpc启动的过程。不只着眼于stream level流量控制,所以,这里先介绍运行的过程,再开始源代码分析。

Stream level的控制流量过程
Client 端的逻辑是这样的:

  • 每当 client 接收到来自 server 的 data frame 的时候, pendingData += 接收到的数据量 .

  • 每当 application 在从 stream 中读取数据之前 (即 pendingData 将被消费的时候), client 知晓它应该能够读取到 n 字节的数据, 因为 data frame 的大小被包含在 headers frame 中提前发送给了 client. Client 端会尝试估算 server 端此时的 quota, 方法是:

    • 因为 client 端知道目前已经收到的仍为给予 server 反馈的数据总量是 pendingData + pendingUpdate , 所以可以推测出 server 端剩余 quota 的最大值为 limit - (pendingData + pendingUpdate) , 之所以只能推测出最大值, 是因为 client 不能精确地知道在有多少数据是 server 已经发送但 client 仍未收到的.

    • 又因为 client 知道它应该能读取 n 字节的数据, 因此可以推测出 server 还未发送数据的最大值是 n - pendingData , 同理, 因为不知道又多少数据正在网络上传输, 因此只能推测出最大值.

    • 那么如果 server 仍为发送的数据的最大值, 大于了 server 剩余 quota 的最大值, 就意味着 client 必须要发送一个 window update 给 server, 以临时提高 server 的 quota 上限, 才能让 server 把数据顺利发送出来. 具体应该提高多少呢? gRPC client 选择让 server 的 quota 提高 n 字节. 并将这一 n 值记录在 delta 中.

    • 这样的逻辑保证了 server 端在一次 data frame 的传输中发送大量数据时, 不会因为 quota 上限过低而陷入停滞中.
      (~注意知道现在为止, 应用并没有真正地从 stream 中读取数据. 上述的调整均发生在读取之前, 相当于读取数据之前的预热.~)

  • 每当 application 真正地从 stream 中读取 n 字节数据的时候, client 端还需要再一次衡量是否需要向 server 发送 window update 来更新 server 的 quota. 方法是:

    • pendingData -= n
    • 将要读取的数据量大小 n 和 delta 比较, 并试着将 delta 抵消掉. 这样做是因为 delta 这个值时额外的临时增加的 quota, delta 这么多的数据已经被加到了 server 端的 quota 中, client 端就不需要为了这些数据而向 server 发送 window update 了. 这一切是为了渐渐消除之前为了允许 server 发送大量数据而临时增加的额度.

server端的流程比较简单,只是在处理data frame调用OnData的方法。
记录
stream level 还需要总和考虑应用读取 stream 中数据的速度, 才能更好地控制 stream 上的流量. 并且 gRPC 还为此额外地增加了 delat 这一数值, 来避免因流量控制造成的 stream 阻塞.

源码过程
方便查看,我把上面的图拿下来放在这里,不用拖回上面查看。

请添加图片描述

server端的代码过程:
server过程分为:创建服务、注册服务、启动服务、连接、交互的情况。

// ==Server端==

// 创建服务相关的逻辑
s := grpc.NewServer()
  s := &Server{...}         // 补充服务相关的字段
  if s.opts.numServerWorkers > 0 {s.initServerWorkers()}    // 初始化工作线程(在启动的时候,HandleStreams函数会使用到工作线程)
    go s.serverWorker(s.serverWorkerChannels[i])
      data, ok := <-ch      // 接收信号
      s.handleStream(...)  // 重要函数
  s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")  // grpc更加底层的结构关系,与socket相关

// 注册服务相关的逻辑
pb.RegisterHelloServer(s,xxxx)
  s.RegisterService(&_xxxx_serviceDesc, srv)  // 第一个参数_xxx_serviceDesc是在pb文件上生成的,相关属性来自proto文件
     s.register(sd, ss)
       info := &serviceInfo{...}
         s.services[sd.ServiceName] = info  // 建立起全局server属性 service name -> service info

// 启动服务与连接相关的逻辑
listen, err := net.Listen("tcp", Address) // listen是通用的面向流协议的网络监听者
s.Serve(listen)
  ls := &listenSocket{Listener: lis}
  rawConn, err := lis.Accept()
  go func(){ s.handleRawConn(lis.Addr().String(), rawConn) ... }()  // 会开启goroutine处理连接
    st := s.newHTTP2Transport(rawConn)   // 进行HTTP/2握手,得到http2Server
      st, err := transport.NewServerTransport(c, config)
        framer := newFramer(...)    // 使用了原生的HTTP2 frame对象
        framer.fr.WriteSettings(isettings...)  // frame约定配置部分
        framer.fr.WriteWindowUpdate(0, delta)  // 流量窗口更新
        t := &http2Server{.. fc:&trInFlow{limit: uint32(icwz)},  ..}  // 创建connection level流量控制的结构
        t.bdpEst = &bdpEstimator{..}   // 创建http2Server对象字段中BDP流量控制器
        t.controlBuf = newControlBuffer(t.done)  // 建立controlBuf结构--通讯使用
        t.framer.fr.ReadFrame()               // 第一次连接需要有setting frame规范彼此参数
        t.handleSettings(sf)
        go func() {.. // 通过单独的goroutine运行
          t.loopy = newLoopyWriter(...t.bdpEst)  // 建立loopyWriter结构
            l := &loopyWriter{.. bdpEst: bdpEst, ..}
            activeStreams: newOutStreamList()   // loopyWriter下的activeStream结构 流结构
          t.loopy.run()  // 【建议看看源代码中关于这个函数的介绍注释】
            it, err := l.cbuf.get(true)  // 阻塞运行
            l.handle(it)     // 接收到client的frame
              l.incomingWindowUpdateHandler(i)
                l.sendQuota += w.increment  // 收到更新connection level流量控制的quota
                l.activeStreams.enqueue(str)
              l.pingHandler(i)
                l.bdpEst.timesnap(p.data)  // ping frame的BDP流量控制相关
            l.processData()  // 判断sendQuota为0时候不发送数据 【维护steam链表】
              l.sendQuota -= uint32(size) // 发送数据之后quota会减少
              if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]) // 将stream中的数据发送出去给client
          }

// 交互相关的逻辑
s.Serve(listen)
  ls := &listenSocket{Listener: lis}
  rawConn, err := lis.Accept()
  go func(){ s.handleRawConn(lis.Addr().String(), rawConn) ... }()
    st := s.newHTTP2Transport(rawConn)    // cs之间进行HTTP2握手 -- HTTP2 frame的传递 -- 结束则连接完成
    s.addConn(lisAddr, st)                // 将http2server结构加入server的conns字段的map结构中
    go func() {..s.serveStreams(st)..}()  // 传递进一个函数参数,待会儿会回来执行这个参数
      st.HandleStreams --> func (t *http2Server) HandleStreams()  //http2_server.go
        t.controlBuf.throttle()  // 拿到一个chan并阻塞等待有值可以获取
        frame, err := t.framer.fr.ReadFrame()
        switch frame := frame.(type) {...}   // 针对不同frame做不同的处理
        t.operateHeaders(frame, handle, traceCtx)  // 处理header frame
          streamID := frame.Header().StreamID  // server与client都会对应一致的streamID
          s := &Stream{..fc:&inFlow{limit: uint32(t.initialWindowSize)},..}  // 建立stream flow 流量控制相关
          t.activeStreams[streamID] = s  // 创建出来的stream存放在activeStream中,交由下面的handle处理
          s.requestRead = func(n int) {t.adjustWindow(s, uint32(n))}  // stream flow 流量控制相关
          handle(s) --> st.HandleStreams(func(stream *transport.Stream) {..传入的函数参数..}  // server.go中处理stream
            s.handleStream(st, stream, s.traceInfo(st, stream))  // 当一个新的 stream 被创建之后, 进行一些配置并处理逻辑
              s.processUnaryRPC(t, stream, srv, md, trInfo)      // 根据 headers frame 中 path 和 method 的信息, gRPC server 找到注册好的method并执行
                d, err := recvAndDecompress(&parser{r: stream}, stream, dc ...)  // 读取data frame的参数信息
                reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)  //执行已经注册好的方法
                err := s.sendResponse(t, stream, reply, cp, opts, comp)  //发送回去
                  err = t.Write(stream, hdr, payload, opts) --> func (t *http2Server) Write(...)
                    df := &dataFrame{...} // 封装dataFrame
                    t.controlBuf.put(df)  // 放到controlbuf中 ----》 剩下的交给loopyWriter处理了
              s.processStreamingRPC(t, stream, srv, sd, trInfo)  // stream的方式访问
                rc := stream.RecvCompress()  // 接受信息
                appErr = sd.Handler(server, ss)
                err = t.WriteStatus(ss.s, statusOK) --> func (t *http2Server) WriteStatus(...)
                  success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
                  t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
                 err := s.sendResponse(t, stream, reply, cp, opts, comp);
              err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc))  // 响应,写回流中
        t.handleData(frame)  // 处理data frame
          sendBDPPing = t.bdpEst.add(size)  // BDP流量控制
          s.fc.onData(size)  // stream level 流量控制
          s.fc.onRead(size - uint32(len(f.Data())))  //stream level 流量控制
          s.write(recvMsg{err: io.EOF})
        t.handlePing(frame)  // 处理ping frame
          f.IsAck()
            t.bdpEst.calculate(f.Data)  // BDP流量控制
              b.updateFlowControl(bdp) --> func (t *http2Server) updateFlowControl(n uint32)
                t.controlBuf.put(&outgoingWindowUpdate{..    increment: t.fc.newLimit(n)})
                t.controlBuf.put(&outgoingSettings{...})

client端代码过程:

与client流程关联比较密切的结构就是

  • Balancer(balancerWrapper、balancer.ClientConn)
    • balancerWrapper(Balancer、subconns)
    • balancer.ClientConn – 【ccBalancerWrapper】(ClientConn)
  • ClientConn(ccBalancerWrapper、ccResolverWrapper、conns map[*addrConn]struct{} )
    • addrConn(ClientConn、transport.ClientTransport )
    • transport.ClientTransport – 【http2Client】
  • http2Client(loopyWriter、controlBuffer、trInFlow、bdpEstimator)
// client.go
// 连接相关逻辑
grpc.Dial(.)
  DialContext()
    cc := &ClientConn{...}
    resolverBuilder, err := cc.parseTargetAndFindResolver()
      rb = cc.getResolver(parsedTarget.Scheme)
    newCCBalancerWrapper(...)  // 建立balance对象处理请求
      go ccb.watcher()        // 启动watch监听机制【建议看看源代码上的解读】
    rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
    cc.Connect()  // 这里是通过参数控制是否阻塞与非阻塞,阻塞的话,会等链接准备完成之后再开始完成
      // 下面是balancer机制负责控制
      cc.balancerWrapper.exitIdle()  // 通过balance的方法,发现存在空闲的对象
        ccb.updateCh.Put(&exitIdleUpdate{}) --> // 借助了watch机制,chan通信
          func (ccb *ccBalancerWrapper) watcher()
            ccb.handleExitIdle()
              ccb.balancer.ExitIdle()
                sc.Connect()  // sc是balancerWrapper对象中的subconns map[]bool
                  go acbw.ac.connect()
                    ac.resetTransport()
                      ac.tryAllAddrs(.)
                        // 准备工作完成,开始创建TCP连接
                        for _, addr := range addrs {}
                        ac.createTransport(addr, copts, connectDeadline) -->  func (ac *addrConn) createTransport(...)
                          transport.NewClientTransport(.)
                            newHTTP2Client()
                              fc:&trInFlow{limit: uint32(icwz)}
                              t.controlBuf = newControlBuffer(t.ctxDone)
                              go t.reader()  // 创建连接的同时,根据不同的frame做不同处理,处理过程会进行connction level、stream level流量控制 【重要函数】
                                t.handleData(frame)
                                  t.fc.onData(size)
                              n, err := t.conn.Write(clientPreface)  // 发送给server连接
                              err = t.framer.fr.WriteSettings(ss...)  // 确定窗口配置
                              if delta := uint32(icwz - defaultWindowSize); delta > 0 {...} // connection level流量控制
                              t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
                              err := t.loopy.run()  // 等待交互
                                l.cbuf.get(true)
                                  h.isTransportResponseFrame()
                                l.handle(it)  // 针对不同的frame处理
                                l.processData()  // 写出数据并维护steam链表的结构
                                  strQuota := int(l.oiws) - str.bytesOutStanding  // stream level流量控制
                                  maxSize > int(l.sendQuota) { // connection-level 流量控制

// client交互处理阶段逻辑
c.xxxClient()-->
  c.cc.Invoket()
    invoke()-->
      newClientStream()-->
        newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
          cs := &clientStream{...}
          op := func(a *csAttempt) error {...}
          err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
            cs.attempt, err = cs.newAttemptLocked(...)
            a := cs.attempt
            err := op(a) --> // 链接回上面传入的op函数(闭包)
              err := a.getTransport()
                cs.cc.getTransport(...) -->
                  cc.blockingpicker.pick --> func (pw *pickerWrapper) pick(...) (transport.ClientTransport, func(balancer.DoneInfo), error)
                    t := acw.getAddrConn().getReadyTransport()
              err := a.newStream()
                s, err := a.t.NewStream(a.ctx, cs.callHdr) --> func (t *http2Client) NewStream(...)
                  s := t.newStream(ctx, callHdr)
                    t.adjustWindow(s, uint32(n))  // stream level 流量控制 (这里只是定义了方法,并没有调用)
                  a.p = &parser{r: s}
      SendMsg()  // 发送
        hdr, payload, data, err := prepareMsg(...)
        op := func(a *csAttempt) error {return a.sendMsg(m, hdr,payload, data)}
        err = cs.withRetry(op,...)
          err := op(a) -->  func (a *csAttempt) sendMsg(...)
            err := a.t.Write(...)
              t.controlBuf.put(df)  // 发送到controlBuf中
          <-a.s.Done()
          onSuccess()--> func (cs *clientStream) bufferForRetryLocked(...)

      cs.RecvMsg(reply)  // 接收
        err := cs.withRetry(func(a *csAttempt) error {return a.recvMsg(m, recvInfo)}, cs.commitAttemptLocked)
          cs.attempt, err = cs.newAttemptLocked(false)
          err := op(a) --> func (a *csAttempt) recvMsg(m interface{},...)
            err = recv(a.p,...)  // 从asAttempt中接收
              d, err := recvAndDecompress(...)
                pf, d, err := p.recvMsg(maxReceiveMessageSize)
                  p.r.Read(msg)
                d, err = dc.Do(bytes.NewReader(d))
                d, size, err = decompress(compressor, d, maxReceiveMessageSize)
              c.Unmarshal(d, m)
        cs.finish(err)

总结

总结一下,上面讲解了gRPC中三种流量控制(注意当前版本是v1.49.0),分别为BDP、connection level 、 sream level 流量控制。按照层次关系,先从整体的BDP介绍了原理与结构,然后顺着思路,从“唯一的TCP连接”引入了connection level流量控制,最后讲解最细粒度的stream level流量控制,并且,通过gRPC的源代码步骤,从连接建立到交互完成,我们过了一遍关键的源码,在源码中看流量控制机制的过程。

另外,我建议把核心的那些结构(server、http2Server、http2Client、loopyWriter等)都自己再看一遍,上面的源码只是过了一遍,核心细节需要再不断阅读源码才能够清楚。

从技术体系走到方法论——并且去超越这个方法论的选择,这个才是我们阅读源代码的态度。(看 - 仿 - 改)

上面的代码还有图以及部分结构体的注释翻译我放在这里了:
https://gitee.com/zeng-jinghao/go101stu/tree/master/grpc_stu/%E5%AE%9E%E8%B7%B5%E4%B8%8E%E8%AE%B0%E5%BD%95


参考

强力推荐的流量控制详解:https://juejin.cn/post/7094606537036922917
服务端源码分析:https://juejin.cn/post/6972066636811632671

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐