Zookeeper的Watcher机制及Watcher原理分析
目录1、什么是Watcher监听机制2、Zookeeper命令实现3、Java API实现4、源码解析4.1 Watcher接口4.2 注册全局监听器4.3 注册监听器(getChildren)4.4 请求包入列并发送4.5 服务器端循环监听4.6 触发Watcher(setData)1、什么是Watcher监听机制Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zoo
目录
1、什么是Watcher监听机制
Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper实现分布式锁,发布订阅(多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者)等功能。
Watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(Watcher 是一次性的操作)。 当然,可以通过循环监听去达到永久监听效果。
如何注册事件机制:
ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher客户端。注册 watcher 有 3 种方式,getData、exists、getChildren;
ZK的所有读操作都可以设置watch监视点: getData, getChildren, exists. 写操作则是不能设置监视点的。
监视有两种类型:数据监视点和子节点监视点。创建、删除或者设置znode都会触发这些监视点。exists,getData 可以设置数据监视点。getChildren 可以设置子节点变化。
而可能监测的事件类型有: None,NodeCreated, NodeDataChanged, NodeDeleted, NodeChildrenChanged.
None // 客户端连接状态发生变化的时候 会收到None事件
NodeCreated // 节点创建事件
NodeDeleted // 节点删除事件
NodeDataChanged // 节点数据变化
NodeChildrenChanged // 子节点被创建 删除触发该事件
ZK 可以做到,只要数据一发生变化,就会通知相应地注册了监听的客户端。那么,它是怎么做到的呢?
其实原理应该是很简单的,四个步骤:
1. 客户端注册Watcher到服务端;
2. 服务端发生数据变更;
3. 服务端通知客户端数据变更;
4. 客户端回调Watcher处理变更应对逻辑;
2、Zookeeper命令实现
#起两个客户端,都监听同一个节点
[root@ydt1 zookeeper-3.4.6]# ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 6] create /watchtest "abc"
Created /watchtest
[zk: localhost:2181(CONNECTED) 0] get /watchtest watch #监听节点变化
"111"
cZxid = 0xf0000000a
ctime = Wed Jul 29 17:00:30 CST 2020
mZxid = 0xf0000000b
mtime = Wed Jul 29 17:01:03 CST 2020
pZxid = 0xf0000000a
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
#再起一个客户端,用来修改上一个客户端的节点数据
[root@ydt2 zookeeper-3.4.6]# ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] set /watchtest "111"
cZxid = 0xf0000000a
ctime = Wed Jul 29 17:00:30 CST 2020
mZxid = 0xf0000000b
mtime = Wed Jul 29 17:01:03 CST 2020
pZxid = 0xf0000000a
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
#回过头再看两个监听节点,都可以看大节点数据变化的信息
[zk: localhost:2181(CONNECTED) 1]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
#客户端再次修改该节点数据,监听客户端不会有响应,说明监听是一次性的
3、Java API实现
package com.ydt.zookeeper.watcher;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class WatcherTest {
ZooKeeper zk;
@Before
public void init() throws IOException, KeeperException, InterruptedException {
zk= new ZooKeeper("ydt1:2181,ydt2:2181,ydt3:2181"
, Integer.MAX_VALUE,new Watcher() {
//全局监听
public void process(WatchedEvent watchedEvent) {
//客户端回调Watcher
System.out.println("-----------------------------------------");
System.out.println("connect state:" + watchedEvent.getState());
System.out.println("event type:" + watchedEvent.getType());
System.out.println("znode path:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
}
}
);
}
/**
* exists监听事件:
* NodeCreated:节点创建
* NodeDeleted:节点删除
* NodeDataChanged:节点内容
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void test1() throws KeeperException, InterruptedException {
//exists注册监听
zk.exists("/watcher-exists", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------------------------------");
System.out.println("connect state:" + watchedEvent.getState());
System.out.println("event type:" + watchedEvent.getType());
System.out.println("znode path:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
try {
zk.exists("/watcher-exists",this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//不开启ACL,以持久化自动生成序列方式创建
zk.create("/watcher-exists", "watcher-exists".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//通过修改的事务类型操作来触发监听事件
zk.setData("/watcher-exists", "watcher-exists2".getBytes(), -1);
//删除节点看看能否触发监听事件
zk.delete("/watcher-exists", -1);
}
/**
* getData监听事件:
* NodeDeleted:节点删除
* NodeDataChange:节点内容发生变化
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void test2() throws IOException, KeeperException, InterruptedException {
//不开启ACL,以持久化自动生成序列方式创建
zk.create("/watcher-getData", "watcher-getData".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//getData注册监听
zk.getData("/watcher-getData", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------------------------------");
System.out.println("connect state:" + watchedEvent.getState());
System.out.println("event type:" + watchedEvent.getType());
System.out.println("znode path:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
try {
zk.getData("/watcher-getData",this,null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},null);
//通过修改的事务类型操作来触发监听事件
zk.setData("/watcher-getData", "watcher-getData2".getBytes(), -1);
//删除节点看看能否触发监听事件
zk.delete("/watcher-getData", -1);
}
/**
* getChildren监听事件:
* NodeChildrenChanged:子节点发生变化
* NodeDeleted:节点删除
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void test3() throws IOException, KeeperException, InterruptedException {
zk.create("/watcher-getChildren",null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/watcher-getChildren/watcher-getChildren01","watcher-getChildren01".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//getChildren注册监听
zk.getChildren("/watcher-getChildren", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------------------------------");
System.out.println("connect state:" + watchedEvent.getState());
System.out.println("event type:" + watchedEvent.getType());
System.out.println("znode path:" + watchedEvent.getPath());
System.out.println("-----------------------------------------");
try {
zk.getChildren("/watcher-getChildren",this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
zk.setData("/watcher-getChildren/watcher-getChildren01","watcher-getChildren02".getBytes(), -1);//修改子节点
zk.delete("/watcher-getChildren/watcher-getChildren01", -1);//删除子节点
zk.delete("/watcher-getChildren", -1);//删除根节点
}
}
4、源码解析
原理流程:
客户端首先将 Watcher注册到服务端,同时将Watcher对象保存到客户端的Watcher管理器中。当Zookeeper
服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的 Watcher管理器会触发相关 Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程
4.1 Watcher接口
public interface Watcher {
/**
* Event的状态
*/
public interface Event {
/**
* 在事件发生时,ZooKeeper的状态
*/
public enum KeeperState {
@Deprecated
Unknown (-1),
Disconnected (0),
@Deprecated
NoSyncConnected (1),
SyncConnected (3),
AuthFailed (4),
ConnectedReadOnly (5),
SaslAuthenticated(6),
Expired (-112);
private final int intValue;
KeeperState( int intValue) {
this.intValue = intValue;
}
......
}
/**
* ZooKeeper中的事件
*/
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);
private final int intValue; // Integer representation of value
// for sending over wire
EventType( int intValue) {
this.intValue = intValue;
}
......
}
}
//Watcher的回调方法
abstract public void process(WatchedEvent event);
}
4.2 注册全局监听器
//-------------------------------------ZooKeeper.java----------------------------------------
//初始化zookeeper客户端时会将默认的监听器设置到监听器管理中
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
//设置默认监听器
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//事件信息的传输,通知处理,可以从下面方法可以看出来
cnxn.start();
}
------->
public void start() {
//负责客户端和服务器端的数据通信, 也包括事件信息的传输和心跳
sendThread.start();
//主要在客户端回调注册的 Watchers 进行通知处理
eventThread.start();
}
4.3 注册监听器(getChildren)
public List<String> getChildren(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils. validatePath(clientPath);
WatchRegistration wcb = null;
//如果watcher不等于null, 构建WatchRegistration对象,执行回调的时候会用到
//该对象描述了watcher和path之间的关系
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
//在传入的path加上root path前缀,构成服务器端的绝对路径
final String serverPath = prependChroot(clientPath);
//构建RequestHeader对象
RequestHeader h = new RequestHeader();
//设置操作类型为OpCode. getChildren
h.setType(ZooDefs.OpCode. getChildren);
//构建GetChildrenRequest对象
GetChildrenRequest request = new GetChildrenRequest();
//设置path
request.setPath(serverPath);
//设置是否使用watcher
request.setWatch(watcher != null);
//构建GetChildrenResponse对象
GetChildrenResponse response = new GetChildrenResponse();
//提交请求,通过客户端的网络处理类去提交请求,并阻塞等待结果
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code. get(r.getErr()),
clientPath);
}
return response.getChildren();
}
4.4 请求包入列并发送
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();//响应头
//组装请求入队
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
//阻塞等待请求结果
while (!packet.finished) {
packet.wait();
}
}
return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
// 这个队列就是存放我们请求的队列,注意,我们还没有为包生成Xid。
// 它是在发送时生成,通过实现ClientCnxnSocket::doIO(),数据包实际发送的地方
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//请求包入队
outgoingQueue.add(packet);
}
}
//请求包入列后,唤醒客户发送线程,将请求包发送出去,进入到发送线程SendThread.run()方法中查看
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
//判断当前session会话中socket请求是否还连接着
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//校验判断客户端与服务器连接状态
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
//心跳维持操作,心跳次数复位
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
//如果我们处于只读模式节点,请根据情况查找读or写服务器
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//调用ClientCnxnSocketNIO发起socket网络请求
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}
//省略......................................................
//发送前一些判断
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// readyOps :获取此键上ready操作集合.即在当前通道上已经就绪的事件
// SelectKey.OP_CONNECT 连接就绪事件,表示客户与服务器的连接已经建立成功
// 如果全部已经就绪,进行一次复位处理,包括心跳时间和发送时间
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//读或者写通道准备完毕,进行IO传输
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
4.5 服务器端循环监听
//...................................NIOServerCnxnFactory.java.................................
public void run() {
// socket不是关闭状态就进入阻塞
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
//获取事件键列表
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
//遍历事件keys
for (SelectionKey k : selectedList) {
//就绪等待连接事件那就先创建连接
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 就绪读写事件,开始执行(包括处理消息,返回监听)
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
4.6 触发Watcher(setData)
在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。
如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。
其关SetData请求的时序图如下:
4.6.1 DataTree.setData()方法
根据上面的时序图,一路跟踪到DataTree.setData方法:
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
//根据path, 获得DataNode对象n
DataNode n = nodes.get(path);
//如果n为null, 则抛出NoNodeException异常
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
// 更新数据
String lastPrefix;
if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
//触发Watcher
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
4.6.2 WatchManager触发Watcher
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
//从watchTable删除掉path对应的watcher
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
//循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//ServerCnxn执行监控事件对象
w.process(e);
}
return watchers;
}
4.6.3 ServerCnxn发送notification
包括NIOServerCnxn和NettyServerCnxn,其实代码逻辑都一样
//NIO
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
//发送notification给Client端
sendResponse(h, e, "notification");
}
//Netty
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
try {
//发送notification给Client端
sendResponse(h, e, "notification");
} catch (IOException e1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
}
close();
}
}
protected void internalSendBuffer(ByteBuffer bb) {
if (bb != ServerCnxnFactory.closeConn) {
// We check if write interest here because if it is NOT set,
// nothing is queued, so we can try to send the buffer right
// away without waking up the selector
if(sk.isValid() &&
((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) {
try {
sock.write(bb);//通过socket向客户端写入字节流,调度process
} catch (IOException e) {
// we are just doing best effort right now
}
}
// if there is nothing left to send, we are done
if (bb.remaining() == 0) {
packetSent();
return;
}
}
synchronized(this.factory){
sk.selector().wakeup();
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+ " is valid: " + sk.isValid());
}
outgoingBuffers.add(bb);
if (sk.isValid()) {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}
}
//至于远程客户端地址端口怎么知道,初始化ServerCnxn的时候就设置了
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
this.zkServer = zk;
this.sock = sock;//保存客户端sock对象
this.sk = sk;
this.factory = factory;
if (this.factory.login != null) {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
}
if (zk != null) {
outstandingLimit = zk.getGlobalOutstandingLimit();
}
sock.socket().setTcpNoDelay(true);
/* set socket linger to false, so that socket close does not
* block */
sock.socket().setSoLinger(false, -1);
InetAddress addr = ((InetSocketAddress) sock.socket()
.getRemoteSocketAddress()).getAddress();
authInfo.add(new Id("ip", addr.getHostAddress()));
sk.interestOps(SelectionKey.OP_READ);
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)