(Kafka源码五)Kafka服务端处理消息
Kafka 服务端(Broker)采用 Reactor 的架构思想,通过1 个 Acceptor,N 个 Processor(N默认为3),M 个 KafkaRequestHandler(M默认为8),来处理客户端请求,这种模式结合了多线程和事件驱动的设计,优点是能够有效地利用系统资源,可以实现高效地处理请求,无需为每个连接或请求创建新的线程,减少了线程上下文切换的开销,以实现高并发和高吞吐量。
Kafka 服务端(Broker)采用 Reactor 的架构思想,通过1 个 Acceptor,N 个 Processor(N默认为3),M 个 KafkaRequestHandler(M默认为8),来处理客户端请求,这种模式结合了多线程和事件驱动的设计,优点是能够有效地利用系统资源,可以实现高效地处理请求,无需为每个连接或请求创建新的线程,减少了线程上下文切换的开销,以实现高并发和高吞吐量。
服务端整体架构
Kafka 服务端的网络结构主要包含以下三层:
- 网络连接层:Acceptor 线程接收客户端的连接请求并创建网络连接。
- 请求转发层:Acceptor 线程以轮询的方式分发给Processor 线程,从而实现负载均衡的效果,Processor
线程将请求放到请求队列中。 - 请求处理层:KafkaRequestHandler线程不断地从请求队列中获取请求,解析请求,调用KafkaAPIs获取对应的操作结果,并将结果返回给客户端。
执行流程
Acceptor
线程在初始化的时候会往selector注册 OP_ACCEPT事件
,表示可以接受客户端的连接请求,当客户端有请求连接过来时,根据selectionkey可以得到socketChannel,再将socketChannel以轮询的方式交给Processor线程(默认有3个Processor线程)
处理。- Processor线程收到Acceptor线程分发的连接后,会先将连接放入自己的队列
newConnections
中,然后在selector注册OP_READ事件
,表示可以读取客户端的请求,当客户端发送消息过来时,Processor线程就会处理OP_READ事件,然后Processor线程会将客户端的请求连接放入requestChannel的RequestQueue(请求队列被所有Processor线程共享)里并取消OP_READ事件的监听
。 KafkaRequestHandler线程(默认会创建8个线程)
会从RequestQueue取出请求进行处理,通过KafkaApis调用得到响应结果,将处理后的响应结果放入responseQueues中(每个Processor线程对应一个responseQueues)。- Processor线程往selector
注册OP_WRITE事件
,表示可以将响应结果发送给客户端,当Processor线程检测到有OP_WRITE事件时,Processor线程就会从对应的responseQueues中取出响应结果,并通过selector.poll()方法将响应结果发送给对应的客户端且取消OP_WRITE事件的监听
,最后Processor线程就会重新注册OP_READ事件
,准备下一个请求的处理。
源码剖析
服务端的核心代码都在kafka.scala
这个类,首先是main入口方法,该方法主要设置参数,然后调用启动方法
def main(args: Array[String]): Unit = {
try {
//启动服务端的时候,在这里解析参数
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
//启动的核心代码方法
kafkaServerStartable.startup
//...
}
kafka的服务端核心方法都在startup()里面
def startup() {
//启动服务
server.startup()
//...
}
创建SocketServer,startup启动后,会创建Acceptor线程
和三个Processor线程
并启动
//Kafka 服务端的功能 都是在这里面实现
def startup() {
//创建NIO的服务端
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
}
def startup() {
this.synchronized {
// 设置发送和接收的缓冲区大小
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
//获取当前broker主机id
val brokerId = config.brokerId
var processorBeginIndex = 0
//endpoints表示Kafka配置文件config/server.properties中的信息
//正常情况下,只有一个服务实例
endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
//processorEndIndex = 0 + 3
val processorEndIndex = processorBeginIndex + numProcessorThreads
//创建了三个Processor的线程
for (i <- processorBeginIndex until processorEndIndex)
//默认新建3个Processor线程
processors(i) = newProcessor(i, connectionQuotas, protocol)
//Acceptor类的主构造函数会启动3个Processor线程
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
// Utils是线程工具类,启动acceptor线程,
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
}
Acceptor 线程处理
Utils.newThread启动acceptor
线程的start()
方法后,就会执行该线程的run
方法
-
首先
serverChannel
向nioSelector
注册OP_ACCEPT
事件,nioSelector
就会监听serverChannel
是否有新的连接请求 -
若有新的连接请求到来,根据该连接的key创建
SocketChannel
,然后通过轮询的方式分发给不同的processors线程
处理,从而保证processor线程的负载均衡。
def run() {
//ServerChannel往Selector注册OP_ACCEPT事件,表示可以接收客户端的请求,
//Selector就会检查ServerChannel是否有新的请求到达
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
//死循环,不断轮询
while (isRunning) {
try {
//selecotr 查看是否有新的注册事件
val ready = nioSelector.select(500)
//大于0,说明有新事件到来
if (ready > 0) {
//获取事件的key
val keys = nioSelector.selectedKeys()
//遍历注册的所有key
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
//遍历完就删除
iter.remove()
//如果事件是OP_ACCEPT,就会调用accept()方法接收请求
if (key.isAcceptable)
// 创建SocketChannel,将其分发给Processor线程处理
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// 轮询遍历下一个Processor线程
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
}
根据key封装socketChannel
,分发给processor线程
处理,processor线程将socketChannel放入自己的队列newConnections
中,该队列是由ConcurrentLinkedQueue
实现的队列,然后唤醒processor
的 selector
处理
def accept(key: SelectionKey, processor: Processor) {
//根据SelectionKey获取到serverSocketChannel
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
//获取到一个socketChannel
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
// processor调用accept方法对socketChannel进行处理
processor.accept(socketChannel)
}
}
def accept(socketChannel: SocketChannel) {
//将接收的 SocketChannel放入到自己的队列
newConnections.add(socketChannel)
// 唤醒 Processor 的 selector 进行处理
wakeup()
}
Processor 线程处理
在上面的startup()中已经创建了3个Processor线程,然后在Acceptor的主构造函数中进行启动
//主构造函数,new出来的时候会被运行
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
processors: Array[Processor],
connectionQuotas: ConnectionQuotas)
extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
this.synchronized {
//启动在startup()创建的3个Processor线程
processors.foreach { processor =>
Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
}
}
Processor启动之后就会执行run方法
override def run() {
startupComplete()
while (isRunning) {
try {
//读取队列中的每个SocketChannel,都往Selector上面注册OP_READ事件
configureNewConnections()
//处理响应,并注册OP_WRITE事件
processNewResponses()
//读取和发送请求的代码应该都是在这个方法完成,用于处理OP_READ事件与OP_WRITE事件
poll()
//处理接收到的新请求,将这些请求放入requestChannel请求队列中并取消OP_READ事件
processCompletedReceives()
//处理已经发送出去的响应并重新监听OP_READ事件
processCompletedSends()
processDisconnected()
}
swallowError(closeAll())
shutdownComplete()
}
不断获取连接队列里所有的SocketChannel,解析参数得到ConnectionId,再往selector注册OP_READ事件,注册之后就可以读取客户端的请求。
private def configureNewConnections() {
//当连接队列不为空
while (!newConnections.isEmpty) {
//不断获取连接队列里面的SocketChannel
val channel = newConnections.poll()
try {
//解析SocketChannel,获取对应的参数
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
//往selector注册OP_READ事件
selector.register(connectionId, channel)
}
}
}
从这段代码可以知道kafka对SocketChannel进行了封装,封装成KakaChannel,并将SelectionKey和KakaChannel进行二者的绑定,除此之外,Kafka还实现了channel复用,将connectionId和KakaChannel放入map中,避免每次发起请求都新建channel,减少了资源的消耗
。
public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
//往自己的Selector上面注册OP_READ事件
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
//kafka对SocketChannel进行了封装,封装成KakaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
//将key和channel进行绑定
key.attach(channel);
//channels护了多个网络连接,实现channel复用
this.channels.put(id, channel);
}
将客户端的请求放入请求队列中,并取消OP_READ事件
private def processCompletedReceives() {
//遍历每一个请求
selector.completedReceives.asScala.foreach { receive =>
try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
//对于获取到的请求进行解析,得到request
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
//将request放入请求队列中
requestChannel.sendRequest(req)
//取消OP_READ事件
selector.mute(receive.source)
}
}
}
KafkaRequestHandlerPool 处理请求
接下来就会通过KafkaRequestHandler
线程去处理请求队列中的请求。回到最开始的 startup(),
def startup() {
//主要用于处理请求队列里面的请求
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
//...
}
新建的KafkaRequestHandlerPool
,会在主构造函数创建8个KafkaRequestHandler
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
//默认启动8个线程,一般情况下可以根据消息的吞吐量去设置这个参数
for(i <- 0 until numThreads) {
//创建线程
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
//线程启动,就会执行run方法
threads(i).start()
}
}
KafkaRequestHandler启动之后就会执行run方法,将客户端的请求交由KafkaAPIs
进行最终的处理。
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
val startSelectTime = SystemTime.nanoseconds
//获取request对象
req = requestChannel.receiveRequest(300)
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
//将请求交给KafkaApis进行处理
apis.handle(req)
}
}
}
def handle(request: RequestChannel.Request) {
//处理生产者发送过来的请求
case ApiKeys.PRODUCE => handleProducerRequest(request)
}
def handleProducerRequest(request: RequestChannel.Request) {
//获取到生产发送过来的请求信息
val produceRequest = request.body.asInstanceOf[ProduceRequest]
val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
//按照分区的方式去遍历数据
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
//对发送过来的数据进行权限等判断。
case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
}
//判断是否有写权限。
val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
}
//把接收的数据追加到磁盘上
replicaManager.appendMessages(
produceRequest.timeout.toLong,
produceRequest.acks,
internalTopicsAllowed,
authorizedMessagesPerPartition,
sendResponseCallback)
}
数据存储到磁盘后,调用sendResponseCallback()回调函数处理响应。
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
//...
quotas.produce.recordAndMaybeThrottle(
request.session.sanitizedUser,
request.header.clientId,
numBytesAppended,
produceResponseCallback)
}
}
继续调用回调函数produceResponseCallback(),根据ack
的值进行处理
- acks=0:生产者不会等待任何来自broker的确认。
- acks=1(默认):生产者会等待leader broker接收到消息并确认(但不保证所有副本都已同步)。
- acks=all 或 acks=-1:生产者会等待所有同步副本都确认接收到消息。
def produceResponseCallback(delayTimeMs: Int) {
//acks = 0,表示生产者不关心数据的处理结果,所以不需要返回响应信息
if (produceRequest.acks == 0) {
//...
} else {
//acks不为0,表明生产者需要响应消息
//封装请求头
val respHeader = new ResponseHeader(request.header.correlationId)
//封装请求体,也就是响应消息
val respBody = request.header.apiVersion match {
case 0 => new ProduceResponse(mergedResponseStatus.asJava)
case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
}
//将响应消息发送给客户端
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
}
}
将响应放入processor对应的responseQueue中,默认情况下有3个responseQueue。
def sendResponse(response: RequestChannel.Response) {
//先从数组获取Processor对应的队列,再将响应放到这个队列
responseQueues(response.processor).put(response)
for(onResponse <- responseListeners)
onResponse(response.processor)
}
接着服务端需要响应客户端,回到processor的run方法
override def run() {
//处理响应,并注册OP_WRITE事件
processNewResponses()
//处理已经发送出去的响应并重新监听OP_READ事件
processCompletedSends()
}
处理responseQueues中的响应可以分为三种类型:
- NoOpAction:对于不需要返回响应的请求,重新注册OP_READ监听事件。
- SendAction:需要发送响应的情况,接下来注册OP_WRITE监听事件,并最终通过selector.poll()方法将响应结果发送给客户端。
- CloseConnectionAction:需要关闭的响应。
private def processNewResponses() {
//通过Process线程的id获取Response对象
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
//对于不需要返回响应的请求
case RequestChannel.NoOpAction =>
curr.request.updateRequestMetrics
//重新监听OP_READ事件
selector.unmute(curr.request.connectionId)
//需要发送响应的情况
case RequestChannel.SendAction =>
//注册OP_WRITE事件,发送响应
sendResponse(curr)
// 需要关闭的响应,关闭连接
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}
正常情况下处理已经发送出去的响应,将响应从响应队列中移除,并重新监听OP_READ事件,准备处理客户端的下一个请求。
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
//移除响应队列的响应
val resp = inflightResponses.remove(send.destination).getOrElse {
//...
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination)
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)