Thrift是Facebook开源的一款rpc调用框架,提供给Apache之后叫做Apache Thrift。它是一种高效的、支持多种编程语言的远程服务调用框架。

Thrift是一个融合了序列化和rpc调用的框架,通过一个中间语言(IDL, 接口定义语言)来定义接口和数据类型,然后通过编译器生成不同语言的代码。


1 简介

Thrift框架一个很重要的特性就是跨语言。国内使用thrift的互联网公司有:美团,小米,喜马拉雅FM,欢聚时代等。
优点是使用简单,跨语言。缺点是完全静态化,使用的数据结构必须事先定义好,中途不能更改。线程不安全,优质教程和书籍比较少。

重要概念

Thrift中有几个重要的概念,下面将依次介绍。

Transport
Transport是网络读写(socket,http)的抽象,用于和其他thrift组件解耦。Transport的接口包括:open,close,read,write,flush,isOpen,readAll。

服务端需要ServerTransport(对监听socket的一种抽象),用于接收客户端连接,接口包括:listen,accept,close。

python中Transport的实现包括:TSocket,THttpServer,TSSLSocket,TTwisted,TZlibTransport,都是对某种协议或框架的实现。还有两个装饰器,用于装饰已有的Transport,分别是TBufferedTransport和TFramedTransport,前者用于增加缓冲,后者用于添加帧。在创建server时,传入的是Transport的工厂,这些factory包括:TTransportFactoryBase(没有任何修饰,直接返回),
TBufferedTransportFactory(返回带缓冲的Transport)和TFramedTransportFactory(返回带帧定位的Transport)

Protocol
Protocol用于对数据格式抽象,在rpc调用时序列化请求和响应。

Processor
Processor对stream读写抽象,最终会调用用户编写的handler以响应对应的service。具体的Processor由编译器生成,用户需要实现service的实现类。

Server
Server端创建Transport,输入,输出的Protocol,以及响应service的handler,监听到client的请求然后委托给processor处理。Tserver是基类,构造函数的参数包括:

  • processor,serverTransport
  • processor,serverTransport,transportFactory,protocolFactory
  • processor,serverTransport,inputTransportFactory,outputTransportFactory,inputProtocolFactory, outputProtocolFactory

TServer内部实际上需要完整的参数,不完整的参数会导致对应的参数使用默认值。
TServer的子类包括:TSimpleServer, TThreadedServer, TThreadPoolServer, TForkingServer, THttpServer, TNonblockingServer, TProcessPoolServer
TServer的serve方法用于开始服务,接收client的请求。

2 IDL与代码生成

一般来说,跨语言的rpc框架都需要一种通用语言来定义接口和数据结构,thrift也不例外。IDL为接口文件,通过thrift工具可以生成对应的服务端代理和客户端存根。首先安装thrift,通过thrift --version可以查看版本:

Thrift version 0.9.1

新建thrift_file文件夹,在此目录下新建thrift文件example.thrift:

namespace example

struct Data {
    1: string text
}

service format_data {
    Data do_format(1:Data data),
}

进入example.thrift所在目录,执行命令如下:
thrift -out .. -gen py example.thrift
其中,-out参数指明生成文件夹的路径,-gen参数指明代码语言。
执行后会在thrift_file同级目录下生成example文件夹,里面包含系统自动生成的代码:
在这里插入图片描述

其中,contans.py包含声明的所有常量
ttypes.py:声明的struct,实现了具体的序列化和反序列化
服务名.py:对应service的描述文件,包含了service的接口定义Iface和客户端的rpc调用桩Client。

3 调用示例

以上文中生成的代码为基础,实现服务端接口和客户端调用如下:
客户端client.py代码如下:

# -*- coding: utf-8 -*-

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from example.format_data import Client
from example.format_data import Data

__HOST = 'localhost'
__PORT = 8080

tsocket = TSocket.TSocket(__HOST, __PORT)
transport = TTransport.TBufferedTransport(tsocket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Client(protocol)

data = Data('hello,world!')
transport.open()

print(client.do_format(data).text)

服务端server.py代码如下:

# coding=utf-8
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from example import ttypes
from example import format_data


class FormatDataHandler:
    def __init__(self):
        pass

    def do_format(self, data):
        print(data.text)
        return ttypes.Data(data.text.upper())


if __name__ == '__main__':
    handler = FormatDataHandler()
    processor = format_data.Processor(handler)
    transport = TSocket.TServerSocket('localhost', 8080)
    # 传输方式,使用buffer
    tfactory = TTransport.TBufferedTransportFactory()
    # 传输的数据类型:二进制
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()

    # 创建一个thrift服务
    rpcServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

    print("starting thrift server...")
    rpcServer.serve()

当然,依赖包肯定是需要thrift的。

先启动服务端,再启动客户端,输出如下:

HELLO,WORLD!

4 Server与Client封装

将服务端初始化和客户端调用过程进行封装,以便代码复用。
服务端封装如下:

# coding=utf-8
import logging

from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from thrift.transport import TSocket, TTransport


class ThriftServer(object):

    def __init__(self, service_class, handler_class):
        self.service_class = service_class
        self.handler_class = handler_class
        self.service_name = self.service_class.__name__.split('.')[-1]

    def start_server(self, address=None, port=8080):
        # 创建processor
        processor = self.service_class.Processor(self.handler_class())
        transport = TSocket.TServerSocket(address, port)
        tfactory = TTransport.TBufferedTransportFactory()
        pfactory = TBinaryProtocol.TBinaryProtocolFactory()
        server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
        logging.info("starting thrift server: %s, %s", self.service_name, port)
        server.serve()

服务端初始化:

import logging
import sys
from thrift_core.thrift_server import ThriftServer
from thrift_file.transmit_service import ttypes
from thrift_file.transmit_service import format_data

class FormatDataHandler:

    def __init__(self):
        pass

    def do_format(self, data):
        print(data.text)
        return ttypes.Data(data.text.upper())


if __name__ == '__main__':

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info("starting server...")
    server = ThriftServer(format_data, FormatDataHandler)
    server.start_server('localhost', '8080')

客户端封装如下:

from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport, TSocket

class ThriftClient(object):

    def __init__(self, service_class,
                 transport_type=TTransport.TBufferedTransport,
                 protocol_type=TBinaryProtocol.TBinaryProtocol):
        self.service_class = service_class
        self.transport_type = transport_type
        self.protocol_type = protocol_type

    def get_client(self, address=None, port=8080):
        tsocket = TSocket.TSocket(address, port)
        transport = self.transport_type(tsocket)
        protocol = self.protocol_type(transport)
        client = self.service_class.Client(protocol)
        transport.open()
        return client

客户端调用:

# coding=utf-8
from thrift_core.thrift_client import ThriftClient
from thrift_file.transmit_service import format_data
from thrift_file.transmit_service.format_data import Data

_HOST = 'localhost'
_PORT = 8080

client = ThriftClient(format_data).get_client(_HOST, _PORT)

data = Data('hello,world!')

print(client.do_format(data).text)

5 跨语言调用

上文中服务端和客户端都是使用的python,下面将以python作为客户端,java作为服务端来实现一个简单调用。
IDL文件test_service.thrift定义如下:

service TestService {
    string getMsg(1:string msg)
}

在thrift文件目录下执行以下命令,生成java代码:
thrift -out .. -gen java test_service.thrift
在项目中新建thrift_service包,将生成的java类拷贝至其中(注意,默认生成的java代码的package为default package)。
项目目录如下:
在这里插入图片描述
接着实现服务接口:

public class TestServiceImpl implements TestService.Iface {

    @Override
    public String getMsg(String msg) throws TException {
        return msg + " thx!";
    }
}

启动Java服务端:

public class GetMsgServer {

    public static void main(String[] args) {
        try {
            TProcessor processor = new TestService.Processor<TestService.Iface>(new TestServiceImpl());
            TServerSocket serverSocket = new TServerSocket(8080);
            TServer.Args tArgs = new TServer.Args(serverSocket);
            tArgs.processor(processor);
            tArgs.protocolFactory(new TBinaryProtocol.Factory());
            TServer server = new TSimpleServer(tArgs);
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }
}

python客户端同样要生成python端的代码,生成方式前文已经介绍过。客户端调用如下:

# coding=utf-8

from thrift_core.thrift_client import ThriftClient
from thrift_file.test_service import TestService

_HOST = 'localhost'
_PORT = 8080

client = ThriftClient(TestService).get_client(_HOST, _PORT)

print(client.getMsg("deal user"))

执行后输出结果:

deal user thx!

6 RPC与服务化

上文通过简单的示例对thrift进行了基本的使用介绍,本小节将继续深入介绍其原理和高级用法。

thrift原理分析

Thrift是一个RPC框架,通过屏蔽网络通讯的复杂底层细节,使得调用远程的过程就像调用本地方法一样。
RPC是一种C/S架构的服务模型,server端提供接口供client调用,client端向server端发送数据,server端接收client端的数据进行处理之后将结果返回给client端。其本质过程如下图所示:

在这里插入图片描述
在日常开发中,通常有两种方式使用rpc。一种是完全自主实现rpc的几个过程,自己序列化数据,通过socket或者http进行数据传输。游戏开发领域通常会采用此种方式。另外就是使用现成的RPC框架,如thrift。

Thrift协议栈如下图所示:
在这里插入图片描述
底层I/O模块:负责实际的数据传输,可以是Socket、文件和压缩数据流等。

TTransport:以字节流的方式发送和接收消息,是底层IO模块在thrift框架中的实现。每一个底层io模块都有一个对应的TTransport来负责Thrift的字节流的数据在该io模块上的传输。如TSocket对应Socket传输,TFileTransport对应文件传输。

TProtocol:负责结构化数据与字节流消息的转化,即消息的序列化/反序列化。与TTransport类似,TProtocol有不同的子类实现,分别对应不同的消息格式转化,如TBinaryProtocol对应字节流。

TServer:负责接收Client的请求,并转发到Processor进行处理。TServer的各子类实现机制不同,性能也差距巨大。

Processor:负责处理客户端请求并返回响应,包括RPC请求转发、参数解析、调用用户业务代码等。Processor代码是Thrift自动生成的,是服务端从thrift框架转入用户逻辑的关键。

ServerClient:负责客户端发送rpc请求,也是由thrift自动生成的。

Server实现子类介绍

TServer主要分为两类,分别是非阻塞式和阻塞式的。Java端的类结构如下图所示:
在这里插入图片描述

非阻塞式的Server有三种:TNonblockingServer,THsHaServer和TThreadedSelectorServer,均继承于AbstractNonblockingServer抽象类。

阻塞服务Server有两种:TThreadPoolServer和TSimpleServer。

TSimpleServer
TSimpleServer使用的是单线程阻塞模式,通过主线程循环监听server的端口,并接收请求来进行处理。每次只能接收和处理一个Socket连接。一般只用于测试,不会在生产环境中使用。

TThreadPoolServer
为了解决单线程server问题,TThreadPoolServer通过线程池的方式来处理请求。主线程循环接收请求,并在接收到请求时将其封装为WorkerProcess丢到线程池中处理:
在这里插入图片描述
这种处理方式是多线程阻塞模式。

TNonblockingServer
如果使用阻塞式多线程模型,在并发场景下,当并发数大于线程池大小时,同样会产生并发问题。因此在TNonblockingServer中引入了java NIO的方法来解决这个问题。
TNonblockingServer采用单线程非阻塞模式,使用了Channel/Selector机制。所有的socket都被注册到selector上,在一个线程中循环监控所有的socket。后面的任务处理使用的是单线程,吞吐量较低。

THsHaServer
THsHaServer继承了TNonblockingServer,通过线程池来提高任务处理的并发能力。THsHaServer是半同步半异步(Half-Aysnc-Half-Sync)的处理模式。半异步是指IO事件处理,半同步是指业务逻辑的处理。

THsHaServer类是TNonblockingServer的子类,在TNonblockingServer模式中,采用一个线程来完成对所有socket的监听和业务处理,效率低下。THsHaServer的引入部分解决了这些问题。

在THsHaServer模式中,引入了一个线程池专门进行业务的处理。

THsHaServer的优点是:
THsHaServer的I/O监听线程在完成数据读取后,将业务处理过程交给业务线程池来完成,主线程也就是I/O处理线程则继续监听Socket和进行数据读写。效率大大提升。

缺点是:
主线程负责的工作虽然很简单,但毕竟只有一个线程。当并发量比较大且发送数据量较多时,监听socket上新连接请求不能被及时接受。

TThreadedSelectorServer
TThreadedSelectorServer是当前Thrift中能提供的最高级的线程服务模型。其内部由几个部分构成:

  • (1) 一个AcceptThread线程对象,专门用于处理监听socket上的新连接。
  • (2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成。
  • (3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
  • (4) 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,将请求读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。

参考资料

[1]https://blog.csdn.net/weixin_34348174/article/details/86028879
[2]https://blog.csdn.net/luoyexuge/article/details/80433139

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐