Thrift基础
Thrift是Facebook开源的一款rpc调用框架,提供给Apache之后叫做Apache Thrift。它是一种高效的、支持多种编程语言的远程服务调用框架。本文将以python和java为例来介绍thrift的开发和使用。1 简介Thrift中有几个重要的概念,下面将依次次介绍。TransportTransport是网络读写(socket,http)的抽象,用于和其他thrift...
Thrift是Facebook开源的一款rpc调用框架,提供给Apache之后叫做Apache Thrift。它是一种高效的、支持多种编程语言的远程服务调用框架。
Thrift是一个融合了序列化和rpc调用的框架,通过一个中间语言(IDL, 接口定义语言)来定义接口和数据类型,然后通过编译器生成不同语言的代码。
1 简介
Thrift框架一个很重要的特性就是跨语言。国内使用thrift的互联网公司有:美团,小米,喜马拉雅FM,欢聚时代等。
优点是使用简单,跨语言。缺点是完全静态化,使用的数据结构必须事先定义好,中途不能更改。线程不安全,优质教程和书籍比较少。
重要概念
Thrift中有几个重要的概念,下面将依次介绍。
Transport
Transport是网络读写(socket,http)的抽象,用于和其他thrift组件解耦。Transport的接口包括:open,close,read,write,flush,isOpen,readAll。
服务端需要ServerTransport(对监听socket的一种抽象),用于接收客户端连接,接口包括:listen,accept,close。
python中Transport的实现包括:TSocket,THttpServer,TSSLSocket,TTwisted,TZlibTransport,都是对某种协议或框架的实现。还有两个装饰器,用于装饰已有的Transport,分别是TBufferedTransport和TFramedTransport,前者用于增加缓冲,后者用于添加帧。在创建server时,传入的是Transport的工厂,这些factory包括:TTransportFactoryBase(没有任何修饰,直接返回),
TBufferedTransportFactory(返回带缓冲的Transport)和TFramedTransportFactory(返回带帧定位的Transport)
Protocol
Protocol用于对数据格式抽象,在rpc调用时序列化请求和响应。
Processor
Processor对stream读写抽象,最终会调用用户编写的handler以响应对应的service。具体的Processor由编译器生成,用户需要实现service的实现类。
Server
Server端创建Transport,输入,输出的Protocol,以及响应service的handler,监听到client的请求然后委托给processor处理。Tserver是基类,构造函数的参数包括:
- processor,serverTransport
- processor,serverTransport,transportFactory,protocolFactory
- processor,serverTransport,inputTransportFactory,outputTransportFactory,inputProtocolFactory, outputProtocolFactory
TServer内部实际上需要完整的参数,不完整的参数会导致对应的参数使用默认值。
TServer的子类包括:TSimpleServer, TThreadedServer, TThreadPoolServer, TForkingServer, THttpServer, TNonblockingServer, TProcessPoolServer
TServer的serve方法用于开始服务,接收client的请求。
2 IDL与代码生成
一般来说,跨语言的rpc框架都需要一种通用语言来定义接口和数据结构,thrift也不例外。IDL为接口文件,通过thrift工具可以生成对应的服务端代理和客户端存根。首先安装thrift,通过thrift --version可以查看版本:
Thrift version 0.9.1
新建thrift_file文件夹,在此目录下新建thrift文件example.thrift:
namespace example
struct Data {
1: string text
}
service format_data {
Data do_format(1:Data data),
}
进入example.thrift所在目录,执行命令如下:
thrift -out .. -gen py example.thrift
其中,-out参数指明生成文件夹的路径,-gen参数指明代码语言。
执行后会在thrift_file同级目录下生成example文件夹,里面包含系统自动生成的代码:
其中,contans.py包含声明的所有常量
ttypes.py:声明的struct,实现了具体的序列化和反序列化
服务名.py:对应service的描述文件,包含了service的接口定义Iface和客户端的rpc调用桩Client。
3 调用示例
以上文中生成的代码为基础,实现服务端接口和客户端调用如下:
客户端client.py代码如下:
# -*- coding: utf-8 -*-
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from example.format_data import Client
from example.format_data import Data
__HOST = 'localhost'
__PORT = 8080
tsocket = TSocket.TSocket(__HOST, __PORT)
transport = TTransport.TBufferedTransport(tsocket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Client(protocol)
data = Data('hello,world!')
transport.open()
print(client.do_format(data).text)
服务端server.py代码如下:
# coding=utf-8
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from example import ttypes
from example import format_data
class FormatDataHandler:
def __init__(self):
pass
def do_format(self, data):
print(data.text)
return ttypes.Data(data.text.upper())
if __name__ == '__main__':
handler = FormatDataHandler()
processor = format_data.Processor(handler)
transport = TSocket.TServerSocket('localhost', 8080)
# 传输方式,使用buffer
tfactory = TTransport.TBufferedTransportFactory()
# 传输的数据类型:二进制
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
# 创建一个thrift服务
rpcServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
print("starting thrift server...")
rpcServer.serve()
当然,依赖包肯定是需要thrift的。
先启动服务端,再启动客户端,输出如下:
HELLO,WORLD!
4 Server与Client封装
将服务端初始化和客户端调用过程进行封装,以便代码复用。
服务端封装如下:
# coding=utf-8
import logging
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from thrift.transport import TSocket, TTransport
class ThriftServer(object):
def __init__(self, service_class, handler_class):
self.service_class = service_class
self.handler_class = handler_class
self.service_name = self.service_class.__name__.split('.')[-1]
def start_server(self, address=None, port=8080):
# 创建processor
processor = self.service_class.Processor(self.handler_class())
transport = TSocket.TServerSocket(address, port)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
logging.info("starting thrift server: %s, %s", self.service_name, port)
server.serve()
服务端初始化:
import logging
import sys
from thrift_core.thrift_server import ThriftServer
from thrift_file.transmit_service import ttypes
from thrift_file.transmit_service import format_data
class FormatDataHandler:
def __init__(self):
pass
def do_format(self, data):
print(data.text)
return ttypes.Data(data.text.upper())
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.info("starting server...")
server = ThriftServer(format_data, FormatDataHandler)
server.start_server('localhost', '8080')
客户端封装如下:
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport, TSocket
class ThriftClient(object):
def __init__(self, service_class,
transport_type=TTransport.TBufferedTransport,
protocol_type=TBinaryProtocol.TBinaryProtocol):
self.service_class = service_class
self.transport_type = transport_type
self.protocol_type = protocol_type
def get_client(self, address=None, port=8080):
tsocket = TSocket.TSocket(address, port)
transport = self.transport_type(tsocket)
protocol = self.protocol_type(transport)
client = self.service_class.Client(protocol)
transport.open()
return client
客户端调用:
# coding=utf-8
from thrift_core.thrift_client import ThriftClient
from thrift_file.transmit_service import format_data
from thrift_file.transmit_service.format_data import Data
_HOST = 'localhost'
_PORT = 8080
client = ThriftClient(format_data).get_client(_HOST, _PORT)
data = Data('hello,world!')
print(client.do_format(data).text)
5 跨语言调用
上文中服务端和客户端都是使用的python,下面将以python作为客户端,java作为服务端来实现一个简单调用。
IDL文件test_service.thrift定义如下:
service TestService {
string getMsg(1:string msg)
}
在thrift文件目录下执行以下命令,生成java代码:
thrift -out .. -gen java test_service.thrift
在项目中新建thrift_service包,将生成的java类拷贝至其中(注意,默认生成的java代码的package为default package)。
项目目录如下:
接着实现服务接口:
public class TestServiceImpl implements TestService.Iface {
@Override
public String getMsg(String msg) throws TException {
return msg + " thx!";
}
}
启动Java服务端:
public class GetMsgServer {
public static void main(String[] args) {
try {
TProcessor processor = new TestService.Processor<TestService.Iface>(new TestServiceImpl());
TServerSocket serverSocket = new TServerSocket(8080);
TServer.Args tArgs = new TServer.Args(serverSocket);
tArgs.processor(processor);
tArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TSimpleServer(tArgs);
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
python客户端同样要生成python端的代码,生成方式前文已经介绍过。客户端调用如下:
# coding=utf-8
from thrift_core.thrift_client import ThriftClient
from thrift_file.test_service import TestService
_HOST = 'localhost'
_PORT = 8080
client = ThriftClient(TestService).get_client(_HOST, _PORT)
print(client.getMsg("deal user"))
执行后输出结果:
deal user thx!
6 RPC与服务化
上文通过简单的示例对thrift进行了基本的使用介绍,本小节将继续深入介绍其原理和高级用法。
thrift原理分析
Thrift是一个RPC框架,通过屏蔽网络通讯的复杂底层细节,使得调用远程的过程就像调用本地方法一样。
RPC是一种C/S架构的服务模型,server端提供接口供client调用,client端向server端发送数据,server端接收client端的数据进行处理之后将结果返回给client端。其本质过程如下图所示:
在日常开发中,通常有两种方式使用rpc。一种是完全自主实现rpc的几个过程,自己序列化数据,通过socket或者http进行数据传输。游戏开发领域通常会采用此种方式。另外就是使用现成的RPC框架,如thrift。
Thrift协议栈如下图所示:
底层I/O模块:负责实际的数据传输,可以是Socket、文件和压缩数据流等。
TTransport:以字节流的方式发送和接收消息,是底层IO模块在thrift框架中的实现。每一个底层io模块都有一个对应的TTransport来负责Thrift的字节流的数据在该io模块上的传输。如TSocket对应Socket传输,TFileTransport对应文件传输。
TProtocol:负责结构化数据与字节流消息的转化,即消息的序列化/反序列化。与TTransport类似,TProtocol有不同的子类实现,分别对应不同的消息格式转化,如TBinaryProtocol对应字节流。
TServer:负责接收Client的请求,并转发到Processor进行处理。TServer的各子类实现机制不同,性能也差距巨大。
Processor:负责处理客户端请求并返回响应,包括RPC请求转发、参数解析、调用用户业务代码等。Processor代码是Thrift自动生成的,是服务端从thrift框架转入用户逻辑的关键。
ServerClient:负责客户端发送rpc请求,也是由thrift自动生成的。
Server实现子类介绍
TServer主要分为两类,分别是非阻塞式和阻塞式的。Java端的类结构如下图所示:
非阻塞式的Server有三种:TNonblockingServer,THsHaServer和TThreadedSelectorServer,均继承于AbstractNonblockingServer抽象类。
阻塞服务Server有两种:TThreadPoolServer和TSimpleServer。
TSimpleServer
TSimpleServer使用的是单线程阻塞模式,通过主线程循环监听server的端口,并接收请求来进行处理。每次只能接收和处理一个Socket连接。一般只用于测试,不会在生产环境中使用。
TThreadPoolServer
为了解决单线程server问题,TThreadPoolServer通过线程池的方式来处理请求。主线程循环接收请求,并在接收到请求时将其封装为WorkerProcess丢到线程池中处理:
这种处理方式是多线程阻塞模式。
TNonblockingServer
如果使用阻塞式多线程模型,在并发场景下,当并发数大于线程池大小时,同样会产生并发问题。因此在TNonblockingServer中引入了java NIO的方法来解决这个问题。
TNonblockingServer采用单线程非阻塞模式,使用了Channel/Selector机制。所有的socket都被注册到selector上,在一个线程中循环监控所有的socket。后面的任务处理使用的是单线程,吞吐量较低。
THsHaServer
THsHaServer继承了TNonblockingServer,通过线程池来提高任务处理的并发能力。THsHaServer是半同步半异步(Half-Aysnc-Half-Sync)的处理模式。半异步是指IO事件处理,半同步是指业务逻辑的处理。
THsHaServer类是TNonblockingServer的子类,在TNonblockingServer模式中,采用一个线程来完成对所有socket的监听和业务处理,效率低下。THsHaServer的引入部分解决了这些问题。
在THsHaServer模式中,引入了一个线程池专门进行业务的处理。
THsHaServer的优点是:
THsHaServer的I/O监听线程在完成数据读取后,将业务处理过程交给业务线程池来完成,主线程也就是I/O处理线程则继续监听Socket和进行数据读写。效率大大提升。
缺点是:
主线程负责的工作虽然很简单,但毕竟只有一个线程。当并发量比较大且发送数据量较多时,监听socket上新连接请求不能被及时接受。
TThreadedSelectorServer
TThreadedSelectorServer是当前Thrift中能提供的最高级的线程服务模型。其内部由几个部分构成:
- (1) 一个AcceptThread线程对象,专门用于处理监听socket上的新连接。
- (2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成。
- (3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
- (4) 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,将请求读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。
参考资料
[1]https://blog.csdn.net/weixin_34348174/article/details/86028879
[2]https://blog.csdn.net/luoyexuge/article/details/80433139
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)