我们一起来探寻rpcx框架,本系列会详细详解rpcx的源码,会涉及到他的各个模块,看看这款优秀的rpc框架是如何实现的。

概念

我们先了解下几个概念

注册中心:服务注册中心用来实现服务发现和服务的元数据存储。现在主流的做法是通过:zookeepereurekaconsuletcd 等开源框架实现。同时,注册中心需要自动剔除不可用的服务元数据。

服务注册:服务端提供者将服务的元数据信息注册到注册中心的过程,这些元数据包含:服务名,监听地址,监听协议,权重v吞吐率等。

服务发现:客户端获取服务元数据的过程,有了这些元数据,客户端就可以发起服务调用了。获取元数据可以有两种实现方式:pull(自己去注册中心取)、push(注册中心主动告诉我)。

结合下面这张图,这三个过程可以用一句话来概括:注册中心相当于一个令牌寄存处(上图的Register),服务端把自己的令牌寄存在这里(对应于服务注册,上图的register箭头),客户端来令牌寄存处拿令牌发起服务调用(对应于服务发现,上图的pull/push

在这里插入图片描述

好的,接下来我们来看rpcx怎么来实现这些过程的,注册中心以zookeeper为例

源码

服务注册

注册成功后,zookeeper客户端上可以看到的元数据信息:
在这里插入图片描述
我们来拆解 /rpcx/test/Arith/tcp@localhost:8972这个路径,它对应就是我们刚说的元数据信息了。

/rcpx/test:路径,就是我们将服务的元数据信息放在这个路径下面,类似于java项目的package一样,所有的代码都放在了同一个package下,一般推荐做法是:/rpcx/{项目名}/{应用名},项目名:比如我们做一个电商网站,那电商就是项目,应用名:对应于电商下面的一个子系统,比如订单、商品系统等,这个很好理解

Arith:服务标识,告诉你提供的是什么服务,相当于我们的应用下面的某一个service类的名称,推荐做法是:/{package}/{serviceName}

tcp:协议,包含tcpudphttp

localhost:8972localhost:服务提供者所在的ip地址。8972:提供服务的端口。一个socket客户端,通过这个就可以发起socket调用了。

tps=0:服务提供者的tps信息,0代表无限制

好了,通过上面的拆解,我们应该都能理解提供的元数据了。现在,终于到了上代码的时间了。

下面是启动服务的一个例子,也是完成服务注册的过程

package main

import (
	"flag"
	"github.com/rcrowley/go-metrics"
	"github.com/smallnest/rpcx/server"
	"github.com/smallnest/rpcx/serverplugin"
	"log"
	example "github.com/rpcxio/rpcx-examples"
	"time"
)

var (
	addr      = flag.String("addr", "localhost:8972", "server address")
	zkServers = []string{"62.234.15.24:2181"}
	basePath  = "/rpcx/test"
)

// go run -tags zookeeper server.go
func main() {
	flag.Parse()

    //1、new一个服务struct
	s := server.NewServer()
    //2、连接注册中心(这里是zookeeper)
	addRegistryPlugin(s)
	//3、服务注册
	s.RegisterName("Arith", new(example.Arith), "")
    //4、启动服务监听
	s.Serve("tcp", *addr)
}

func addRegistryPlugin(s *server.Server) {
	r := &serverplugin.ZooKeeperRegisterPlugin{
		ServiceAddress:   "tcp@" + *addr,
		ZooKeeperServers: zkServers,
		BasePath:         basePath,
		Metrics:          metrics.NewRegistry(),
		UpdateInterval:   5*time.Second,
	}
	err := r.Start()
	if err != nil {
		log.Fatal(err)
	}
	s.Plugins.Add(r)
}

下面对步骤1、2、3、4做详细讲解

1、初始化Server

每一个服务,对应一个server,关于每个属性这里先不做阐述,以免影响阅读

func NewServer(options ...OptionFn) *Server {
	s := &Server{
		Plugins:    &pluginContainer{},
		options:    make(map[string]interface{}),
		activeConn: make(map[net.Conn]struct{}),
		doneChan:   make(chan struct{}),
		serviceMap: make(map[string]*service),//服务类的反射信息
	}

	for _, op := range options {
		op(s)
	}

	return s
}

2、连接注册中心

rpcx使用的是libkv库来操作zookeeper,同时libkv也可以操作etcd,consul。有兴趣的同学可以了解下,github地址:github.com/docker/libkv

这里需要注意的是:zookeeper没有TTL的概念,只有永久或者临时目录,所以TTL对于zookeeper无效,只能作为是否创建永久目录的依据:

  • TTL>0 :临时目录
  • 其他情况:永久目录
func (p *ZooKeeperRegisterPlugin) Start() error {
	……
    //初始化zookeeper实例
	if p.kv == nil {
		kv, err := libkv.NewStore(store.ZK, p.ZooKeeperServers, p.Options)
		if err != nil {
			log.Errorf("cannot create zk registry: %v", err)
			return err
		}
		p.kv = kv
	}
	//在zookeeper上创建basepath地址:/rpcx/test
	err := p.kv.Put(p.BasePath, []byte("rpcx_path"), &store.WriteOptions{IsDir: true})
	if err != nil {
		log.Errorf("cannot create zk path %s: %v", p.BasePath, err)
		return err
    //保存tps信息
   	//更新TTL(声明周期
    if p.UpdateInterval > 0 {
		ticker := time.NewTicker(p.UpdateInterval)
		go func() {
			defer p.kv.Close()

			// refresh service TTL
			for {
				select {
				case <-p.dying:
					close(p.done)
					return
				case <-ticker.C://每隔p.UpdateInterval(一分钟)更新一次
					var data []byte
					if p.Metrics != nil {
                        //获取注入的元数据(如tps),本例子没有设置,这里留一个TODO,后面会给大家展示
						clientMeter := metrics.GetOrRegisterMeter("clientMeter", p.Metrics)
						data = []byte(strconv.FormatInt(clientMeter.Count()/60, 10))
					}
					//set this same metrics for all services at this server
					for _, name := range p.Services {
						nodePath := fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress)
						kvPaire, err := p.kv.Get(nodePath)
                        //不存在则创建
						if err != nil {
							log.Infof("can't get data of node: %s, because of %v", nodePath, err.Error())

							p.metasLock.RLock()
							meta := p.metas[name]
							p.metasLock.RUnlock()

							err = p.kv.Put(nodePath, []byte(meta), &store.WriteOptions{TTL: p.UpdateInterval * 2})
							if err != nil {
								log.Errorf("cannot re-create zookeeper path %s: %v", nodePath, err)
							}
						} else {
                            //更新tps,设置目录的ttl
							v, _ := url.ParseQuery(string(kvPaire.Value))
							v.Set("tps", string(data))
							p.kv.Put(nodePath, []byte(v.Encode()), &store.WriteOptions{TTL: p.UpdateInterval * 2})
						}
					}
				}
			}
		}()
	}

	return nil
}

3、服务注册

服务注册,也就是注册服务的元数据,在概念中有详细的阐述,下面是调用的对应方法

func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {
    //在注册中心做服务注册
	s.Plugins.DoRegister(name, rcvr, metadata)
    //通过反射,获取rcvr的Type,Value,Method等信息,可以借鉴作者是怎么使用反射和验证的,代码也很简单,节约篇幅,就不贴出来了
	_, err := s.register(rcvr, name, true)
	return err
}

我们来跟踪DoRegister方法,发现调用的是ZooKeeperRegisterPluginRegister方法,该方法完成真正的服务注册,相当于在zookeeper客户端执行了如下3挑命令:

create /rpcx/test rpcx_path

create /rpcx/test/Arith Arith

create /rpcx/test/Arith/tcp@localhost:8972 tps=0

// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *ZooKeeperRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
	if strings.TrimSpace(name) == "" {
		err = errors.New("Register service `name` can't be empty")
		return
	}

	if p.kv == nil {
		zookeeper.Register()
		kv, err := libkv.NewStore(store.ZK, p.ZooKeeperServers, nil)
		if err != nil {
			log.Errorf("cannot create zk registry: %v", err)
			return err
		}
		p.kv = kv
	}

	if p.BasePath[0] == '/' {
		p.BasePath = p.BasePath[1:]
	}
    //create /rpcx/test rpcx_path
	err = p.kv.Put(p.BasePath, []byte("rpcx_path"), &store.WriteOptions{IsDir: true})
	if err != nil {
		log.Errorf("cannot create zk path %s: %v", p.BasePath, err)
		return err
	}

    //create /rpcx/test/Arith Arith
	nodePath := fmt.Sprintf("%s/%s", p.BasePath, name)
	err = p.kv.Put(nodePath, []byte(name), &store.WriteOptions{IsDir: true})
	if err != nil {
		log.Errorf("cannot create zk path %s: %v", nodePath, err)
		return err
	}

    //create /rpcx/test/Arith/tcp@localhost:8972 tps=0
	nodePath = fmt.Sprintf("%s/%s/%s", p.BasePath, name, p.ServiceAddress)
	err = p.kv.Put(nodePath, []byte(metadata), &store.WriteOptions{TTL: p.UpdateInterval * 2})
	if err != nil {
		log.Errorf("cannot create zk path %s: %v", nodePath, err)
		return err
	}

	p.Services = append(p.Services, name)

	p.metasLock.Lock()
	if p.metas == nil {
		p.metas = make(map[string]string)
	}
	p.metas[name] = metadata
	p.metasLock.Unlock()
	return
}

参数:

  • name:注册的服务名称,对应例子的Arith
  • rcvr: 对应的服务service,也就是example.Arith
  • metadata:元数据信息,比如tps,为空默认tps=0,代表不对tps做限制

从源码可以看到,服务注册没有那么神秘,这个过程都是非常简单的

4、启动服务监听

也就是启动一个Socket

// Serve starts and listens RPC requests.
// It is blocked until receiving connectings from clients.
func (s *Server) Serve(network, address string) (err error) {
	……
	//启动监听
	return s.serveListener(ln)
}

// serveListener accepts incoming connections on the Listener ln,
// creating a new service goroutine for each.
// The service goroutines read requests and then call services to reply to them.
func (s *Server) serveListener(ln net.Listener) error {
	……
	for {
        //接收一个client发过来的请求
		conn, e := ln.Accept()
		……
	}
}

服务发现

服务发现主要涵盖两个方面:

  • 客户端获取服务元数据
  • 自动剔除失效的服务

下面是启动一个客户端的代码

package main

import (
	"context"
	"flag"
	example "github.com/rpcxio/rpcx-examples"
	"github.com/smallnest/rpcx/client"
	"log"
	"time"
)
var (
	addr      = flag.String("addr", "localhost:8972", "server address")
	zkServers = []string{"62.234.15.24:2181"}
	basePath  = "/rpcx/test"
)
func main()  {
    //1、启动一个ZookeeperDiscovery实例
	d := client.NewZookeeperDiscovery(basePath, "Arith",zkServers, nil)
    //2、启动一个客户端
	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
	defer xclient.Close()


	args := &example.Args{
		A: 10,
		B: 20,
	}

	for {
		reply := &example.Reply{}
        //3、发起调用
		err := xclient.Call(context.Background(), "Mul", args, reply)
		if err != nil {
			log.Printf("failed to call: %v\n", err)
		}

		log.Printf("%d * %d = %d", args.A, args.B, reply.C)
		time.Sleep(3*time.Second)
	}
}

下面对步骤1、2、3做详细讲解

1、启动一个ZookeeperDiscovery实例

这个方法做了两件事情

  • 获取注册的服务
  • 监听服务动态
// NewZookeeperDiscovery returns a new ZookeeperDiscovery.
func NewZookeeperDiscovery(basePath string, servicePath string, zkAddr []string, options *store.Config) ServiceDiscovery {
	……
	return NewZookeeperDiscoveryWithStore(basePath+"/"+servicePath, kv)
}


// NewZookeeperDiscoveryWithStore returns a new ZookeeperDiscovery with specified store.
func NewZookeeperDiscoveryWithStore(basePath string, kv store.Store) ServiceDiscovery {
	if basePath[0] == '/' {
		basePath = basePath[1:]
	}
	d := &ZookeeperDiscovery{basePath: basePath, kv: kv}
	d.stopCh = make(chan struct{})

	//获取已经注册且正在运行的服务
	ps, err := kv.List(basePath)
	if err != nil {
		log.Infof("cannot get services of %s from registry: %v, err: %v", basePath, err)
		panic(err)
	}

	var pairs = make([]*KVPair, 0, len(ps))
	for _, p := range ps {
		pair := &KVPair{Key: p.Key, Value: string(p.Value)}
		if d.filter != nil && !d.filter(pair) {
			continue
		}
		pairs = append(pairs, pair)
	}
	//将已经注册的服务放入到paires
	d.pairs = pairs
	d.RetriesAfterWatchFailed = -1
	//启动一个多路器,监听注册服务的动态
	go d.watch()

	return d
}



func (d *ZookeeperDiscovery) watch() {
	for {
		var err error
		var c <-chan []*store.KVPair
		var tempDelay time.Duration

		retry := d.RetriesAfterWatchFailed
		for d.RetriesAfterWatchFailed < 0 || retry >= 0 {
            //返回一个监控注册服务目录变化的chan,也就是说,如果这个目录下面的服务发生了变化,则会收到通知
			c, err = d.kv.WatchTree(d.basePath, nil)
			……
			break
		}
		……

	readChanges:
		for {
			select {
			case <-d.stopCh:
				log.Info("discovery has been closed")
				return
			case ps := <-c://取出最新注册服务(所有的)
				if ps == nil {
					break readChanges
				}
				var pairs []*KVPair // latest servers
				for _, p := range ps {
					pair := &KVPair{Key: p.Key, Value: string(p.Value)}
					if d.filter != nil && !d.filter(pair) {
						continue
					}
					pairs = append(pairs, pair)
				}
				d.pairs = pairs

				d.mu.Lock()
                //将注册服务发送给订阅者
				for _, ch := range d.chans {
					ch := ch
					go func() {
						defer func() {
							recover()
						}()
						select {
						case ch <- pairs:
						case <-time.After(time.Minute):
							log.Warn("chan is full and new change has been dropped")
						}
					}()
				}
				d.mu.Unlock()
			}
		}

		log.Warn("chan is closed and will rewatch")
	}
}

c, err = d.kv.WatchTree(d.basePath, nil),这里监听的注册中心的服务变化,之所以能监听到,是因为服务注册的是临时目录,所以在服务宕机或者监听超时的时候,目录就会自动消失,这也是为什么能动态监听注册服务的原因。

2、启动一个客户端

需要注意的是,这个客户端并没有发起连接,只是保存实时的注册服务,以及调用策略等,定义如下:

type xClient struct {
	failMode     FailMode //失败策略:Failover,Failfast,Failtry,Failbackup
	selectMode   SelectMode //调用策略:RandomSelect,RoundRobin,WeightedRoundRobin,WeightedICMP等
    ……
	servers   map[string]string  //实时的注册服务信息
	discovery ServiceDiscovery  //ZookeeperDiscovery
    ……
}

我们看xclient是如何更新servers的

// NewXClient creates a XClient that supports service discovery and service governance.
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
	client := &xClient{
		failMode:     failMode,
		selectMode:   selectMode,
		discovery:    discovery,
		servicePath:  servicePath,
		cachedClient: make(map[string]RPCClient),
		option:       option,
	}
	//从discovery实例中获取servers
	pairs := discovery.GetServices()
	servers := make(map[string]string, len(pairs))
	for _, p := range pairs {
		servers[p.Key] = p.Value
	}
    //服务分组
	filterByStateAndGroup(client.option.Group, servers)
	//先做初始化
	client.servers = servers
	if selectMode != Closest && selectMode != SelectByUser {
		client.selector = newSelector(selectMode, servers)
	}
	
	client.Plugins = &pluginContainer{}
	//获取一个服务变化的监听通道
	ch := client.discovery.WatchService()
	if ch != nil {
		client.ch = ch
        //监听,更新servers
		go client.watch(ch)
	}

	return client
}


// watch changes of service and update cached clients.
func (c *xClient) watch(ch chan []*KVPair) {
	for pairs := range ch {
		servers := make(map[string]string, len(pairs))
		for _, p := range pairs {
			servers[p.Key] = p.Value
		}
		c.mu.Lock()
		filterByStateAndGroup(c.option.Group, servers)
        //更新servers
		c.servers = servers

        //更新调用策略里面的servers
		if c.selector != nil {
			c.selector.UpdateServer(servers)
		}

		c.mu.Unlock()
	}
}

之所以能实时监听服务变化,全在于这句代码:ch := client.discovery.WatchService(),我们一起来看下

func (d *ZookeeperDiscovery) WatchService() chan []*KVPair {
	d.mu.Lock()
	defer d.mu.Unlock()

	ch := make(chan []*KVPair, 10)
  
	d.chans = append(d.chans, ch)
	return ch
}

这里new一个通道,并将通道放在了ZookeeperDiscovery的chans中,根据func (d *ZookeeperDiscovery) watch()就知道,在服务发生变化的时候,会直接网d.chans赋值

服务发现到此就全部讲解完毕,可以看到为了作者全部采用的是通道来完成实时注册服务更新的。

3、发起调用

发起调用,顾名思义,就是走网络请求,通过socket客户端调用服务端的socket,这里涉及到一些选择策略失败策略,我们单独一个章节讲

结语

我打算花足够多的时间来和大家读一读rpcx的源码,来一层层的剖解rpcx,有兴趣的朋友,可以关注我。

下一篇我们分析客户端调用服务的流程,以及过程中涉及到的各种策略

相关阅读

rpc框架之rpcx-简介(1)

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐