码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071,或关注公众号:程序猿AirGo

本文源码已上传至 Github:​​​​​​GitHub - why444216978/grpc-cmux: Serve grpc and http on the same port by cmux  

公众号原文:连接多路复用,同一个端口同时提供 HTTP 和 gRPC 服务

gRPC-Gateway

关于 gRPC 和 grpc-gateway 的介绍使用方法不再过多赘述,我的另一篇博客和 gRPC-Gateway 官方文档已经介绍的很全面了:

初探gRPC

Adding gRPC-Gateway annotations to an existing proto file

有两个问题可能大家都会遇到,这里简单提一下,一个是因为缺少 grpc-gateway 提供的 proto 包,需要我们下载对应包并复制到项目根目录中:

google/api/annotations.proto: File not found.
helloworld/hello_world.proto:6:1: Import "google/api/annotations.proto" was not found or had errors.
go get github.com/grpc-ecosystem/grpc-gateway
cp -rf $GOPATH/pkg/mod/github.com/grpc-ecosystem/grpc-gateway@v1.16.0/third_party/googleapis/google ./

另一个问题是因为我们引入的 annotations.proto 文件又会引入 protobuf 包的 descriptor.proto 文件,所以我们需要复制我们上面下载的 protoc 源码中的 protobuf 目录到项目根目录的 googole 目录下:

google/protobuf/descriptor.proto: File not found.
google/api/annotations.proto:20:1: Import "google/protobuf/descriptor.proto" was not found or had errors.
google/api/annotations.proto:28:8: "google.protobuf.MethodOptions" is not defined.
cp -rf /user_dir/protoc/include/google/protobuf ./google

封装多路复用服务

package server

import (
	"context"
	"fmt"
	"net"
	"net/http"
	"time"

	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"

	"github.com/soheilhy/cmux"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/encoding/protojson"
)

type Server struct {
	endpoint      string
	HTTPListener  net.Listener
	GRPCListener  net.Listener
	httpServer    *http.Server
	router        *http.ServeMux
	GRPClientConn *grpc.ClientConn
	registerHTTP  registerFunc
	registerGRPC  registerFunc
	ServerMux     *runtime.ServeMux
	tcpMux        cmux.CMux
}

type registerFunc func(ctx context.Context, s *Server)

type Option func(*Server)

func WithEndpoint(endpoint string) Option {
	return func(s *Server) { s.endpoint = endpoint }
}

func WithHTTPregisterFunc(registerHTTP registerFunc) Option {
	return func(s *Server) { s.registerHTTP = registerHTTP }
}

func WithGRPCregisterFunc(registerGRPC registerFunc) Option {
	return func(s *Server) { s.registerGRPC = registerGRPC }
}

// New returns a Server instance
func New(opts ...Option) *Server {
	s := &Server{}

	for _, o := range opts {
		o(s)
	}

	listener, err := net.Listen("tcp", s.endpoint)
	if err != nil {
		panic(err)
	}
	s.tcpMux = cmux.New(listener)

	return s
}

func (s *Server) Start() error {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	s.GRPCListener = s.tcpMux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc"))
	s.HTTPListener = s.tcpMux.Match(cmux.HTTP1Fast())

	go func() {
		s.registerGRPC(ctx, s)
	}()

	go func() {
		if err := s.initGateway(ctx); err != nil {
			panic(err)
		}
		s.registerHTTP(ctx, s)
		s.startGateway()
	}()

	return s.tcpMux.Serve()
}

func (s *Server) Stop() {
	s.tcpMux.Close()
}

func (s *Server) initGateway(ctx context.Context) error {
	var err error

	s.router = http.NewServeMux()

	s.GRPClientConn, err = grpc.Dial(s.endpoint, []grpc.DialOption{
		grpc.WithTimeout(10 * time.Second),
		grpc.WithBlock(),
		grpc.WithInsecure(),
	}...)
	if err != nil {
		return fmt.Errorf("Fail to dial: %v", err)
	}

	s.ServerMux = runtime.NewServeMux(
		runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
			MarshalOptions: protojson.MarshalOptions{
				UseProtoNames:  true,
				UseEnumNumbers: true,
			},
		}),
		runtime.WithErrorHandler(runtime.DefaultHTTPErrorHandler),
	)

	return nil
}

func (s *Server) startGateway() {
	s.router.Handle("/", s.ServerMux)

	s.httpServer = &http.Server{
		Addr:         s.endpoint,
		Handler:      s.router,
		ReadTimeout:  time.Second,
		WriteTimeout: time.Second,
		IdleTimeout:  time.Second,
	}

	if err := s.httpServer.Serve(s.HTTPListener); err != nil {
		panic(err)
	}
}

这里说明几点,帮助大家理解服务运行逻辑:

  • 定义 registerFunc 方法,通过 Options 将自己的 gRPC 服务注册方法在初始化的时候传入,来达到与具体服务解耦的目的。
  • 注册 gRPC 和 HTTP 服务的 Matcher
  • 注册具体的 gRPC 服务并启动服务
  • 初始化 gRPC-Gateway 并启动服务
  • 启动 cmux 服务

下面是一个简单的使用例子:

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	pb "helloworld/helloworld"
	"helloworld/server"

	"google.golang.org/grpc"
)

const (
	endpoint = ":8888"
)

type Server struct {
	*server.Server
	pb.UnimplementedGreeterServer
}

func (s *Server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: in.Name + " world"}, nil
}

func registerHTTP(ctx context.Context, s *server.Server) {
	if err := pb.RegisterGreeterHandler(ctx, s.ServerMux, s.GRPClientConn); err != nil {
		panic(err)
	}
}

func registerGRPC(ctx context.Context, s *server.Server) {
	grpcServer := grpc.NewServer()
	pb.RegisterGreeterServer(grpcServer, new(Server))
	if err := grpcServer.Serve(s.GRPCListener); err != nil {
		panic(err)
	}
}

func main() {
	s := server.New(
		server.WithEndpoint(endpoint),
		server.WithGRPCregisterFunc(registerGRPC),
		server.WithHTTPregisterFunc(registerHTTP))

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		panic(s.Start())
		wg.Done()
	}()

	cc, err := newClientConn(endpoint)
	if err != nil {
		log.Fatal(err)
	}

	ticker := time.NewTicker(time.Second)
	for range ticker.C {
		client(cc)
	}

	wg.Wait()
}

func client(cc *grpc.ClientConn) {
	client := pb.NewGreeterClient(cc)

	reply, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "why"})
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(reply)
}

func newClientConn(target string) (*grpc.ClientConn, error) {
	cc, err := grpc.Dial(
		target,
		grpc.WithInsecure(),
	)
	if err != nil {
		return nil, err
	}
	return cc, nil
}
[why@MacBook-Pro] ~/Desktop/go/grpc-cmux$go run main.go
message:"why world"
message:"why world"
message:"why world"
......
[why@MacBook-Pro] ~/Desktop/go/grpc-cmux$curl http://localhost:8888/v1/example/echo
{"message":" world"}

cmux 源码分析

整体思路:通过实现不同的 net.listener 方法,自定义各自的请求过滤规则,并用相应协议的服务监听对应 listener 来接收 TCP 字节流,达到在同一端口提供不同服务的效果。

cMux 结构体:

type matchersListener struct {
	ss []MatchWriter //匹配规则列表
	l  muxListener //伪Listener
}

type cMux struct {
	root        net.Listener //真实Listener
	bufLen      int //连接队列大小,1024
	errh        ErrorHandler //错误处理方法
	sls         []matchersListener //自定义"Listener"列表
	readTimeout time.Duration //读超时时间,默认不限制
	donec       chan struct{} //连接关闭chan
	mu          sync.Mutex //关闭cmux服务时需要加索,因为既可以主动触发也可以被动触发,避免重复close导致panic
}

MatcherWriter 匹配流量:

  • HTTP1Fast 和 TLS 使用前缀树(patricia Tree)匹配
  • HTTP1 通过硬解析 request 对比 HTTP 版本
  • Header 相关方法通过字符串匹配

核心处理逻辑:

  1. 注册多个 Matcher,根据 Header、HTTP版本等匹配并接管流量(matchers)
  2. cmux.root.Accept 接收所有流量
  3. 双层迭代,外层按照注册 listener 的先后顺序,内层按照 MatchWriter 参数的先后顺序,依次匹配注册的 Matcher
  4. 匹配成功后,用对应  matchersListener 的 connc  接收连接,执行对应协议的后续处理逻辑

匹配 matchersListener 核心源码:

func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()

	muc := newMuxConn(c)
	if m.readTimeout > noTimeout {
		_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
	}
    //双层迭代,外层按照注册 listener 的先后顺序,内层按照 MatchWriter 参数的先后顺序,依次匹配注册的 Matcher
	for _, sl := range m.sls {
		for _, s := range sl.ss {
			matched := s(muc.Conn, muc.startSniffing())
			if matched {
				muc.doneSniffing()
				if m.readTimeout > noTimeout {
					_ = c.SetReadDeadline(time.Time{})
				}
				select {
				case sl.l.connc <- muc:
				case <-donec:
					_ = c.Close()
				}
				return
			}
		}
	}

	_ = c.Close()
	err := ErrNotMatched{c: c}
	if !m.handleErr(err) {
		_ = m.root.Close()
	}
}

流量分配到对应协议源码:

func (l muxListener) Accept() (net.Conn, error) {
	select {
	case c, ok := <-l.connc:
		if !ok {
			return nil, ErrListenerClosed
		}
		return c, nil
	case <-l.donec:
		return nil, ErrServerClosed
	}
}

可以加个断点验证一下流量转发的逻辑,在 cmux.go 第 264 行加个断电,然后启动调试,可以看到流量被转发到了 gRPC.(*Server).Serve 上 :

参考:Golang: Run multiple services on one port - Dgraph Blog

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐