目录

 

1、什么是Watcher监听机制

2、Zookeeper命令实现

3、Java API实现

4、源码解析

4.1 Watcher接口

4.2 注册全局监听器

4.3 注册监听器(getChildren)

4.4 请求包入列并发送

4.5 服务器端循环监听

4.6 触发Watcher(setData)


1、什么是Watcher监听机制

Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper实现分布式锁,发布订阅(多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者)等功能。

  Watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(Watcher 是一次性的操作)。 当然,可以通过循环监听去达到永久监听效果。

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

如何注册事件机制:

  ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher客户端。注册 watcher 有 3 种方式,getData、exists、getChildren

ZK的所有读操作都可以设置watch监视点: getData, getChildren, exists. 写操作则是不能设置监视点的。

  监视有两种类型:数据监视点和子节点监视点。创建、删除或者设置znode都会触发这些监视点。exists,getData 可以设置数据监视点。getChildren 可以设置子节点变化。

  而可能监测的事件类型有: None,NodeCreated, NodeDataChanged, NodeDeleted, NodeChildrenChanged.

   None // 客户端连接状态发生变化的时候 会收到None事件
   NodeCreated // 节点创建事件
   NodeDeleted // 节点删除事件
   NodeDataChanged // 节点数据变化
   NodeChildrenChanged // 子节点被创建 删除触发该事件

  ZK 可以做到,只要数据一发生变化,就会通知相应地注册了监听的客户端。那么,它是怎么做到的呢?

其实原理应该是很简单的,四个步骤:

1. 客户端注册Watcher到服务端;

2. 服务端发生数据变更;

3. 服务端通知客户端数据变更;

4. 客户端回调Watcher处理变更应对逻辑;

2、Zookeeper命令实现

#起两个客户端,都监听同一个节点
[root@ydt1 zookeeper-3.4.6]# ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 6] create /watchtest "abc" 
Created /watchtest
[zk: localhost:2181(CONNECTED) 0] get /watchtest watch #监听节点变化
"111"
cZxid = 0xf0000000a
ctime = Wed Jul 29 17:00:30 CST 2020
mZxid = 0xf0000000b
mtime = Wed Jul 29 17:01:03 CST 2020
pZxid = 0xf0000000a
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
​
#再起一个客户端,用来修改上一个客户端的节点数据
[root@ydt2 zookeeper-3.4.6]# ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] set /watchtest "111"
cZxid = 0xf0000000a
ctime = Wed Jul 29 17:00:30 CST 2020
mZxid = 0xf0000000b
mtime = Wed Jul 29 17:01:03 CST 2020
pZxid = 0xf0000000a
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
​
#回过头再看两个监听节点,都可以看大节点数据变化的信息
[zk: localhost:2181(CONNECTED) 1]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
​
#客户端再次修改该节点数据,监听客户端不会有响应,说明监听是一次性的
​

 

3、Java API实现

package com.ydt.zookeeper.watcher;
​
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
​
import java.io.IOException;
​
public class WatcherTest {
​
    ZooKeeper zk;
    @Before
    public void init() throws IOException, KeeperException, InterruptedException {
        zk= new ZooKeeper("ydt1:2181,ydt2:2181,ydt3:2181"
                , Integer.MAX_VALUE,new Watcher() {
            //全局监听
            public void process(WatchedEvent watchedEvent) {
                //客户端回调Watcher
                System.out.println("-----------------------------------------");
                System.out.println("connect state:" + watchedEvent.getState());
                System.out.println("event type:" + watchedEvent.getType());
                System.out.println("znode path:" + watchedEvent.getPath());
                System.out.println("-----------------------------------------");
            }
        }
        );
    }
​
    /**
     * exists监听事件:
     *      NodeCreated:节点创建
     *      NodeDeleted:节点删除
     *      NodeDataChanged:节点内容
     * @throws IOException
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void test1() throws KeeperException, InterruptedException {
        //exists注册监听
        zk.exists("/watcher-exists", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("-----------------------------------------");
                System.out.println("connect state:" + watchedEvent.getState());
                System.out.println("event type:" + watchedEvent.getType());
                System.out.println("znode path:" + watchedEvent.getPath());
                System.out.println("-----------------------------------------");
                try {
                    zk.exists("/watcher-exists",this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
​
            }
        });
        //不开启ACL,以持久化自动生成序列方式创建
        zk.create("/watcher-exists", "watcher-exists".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //通过修改的事务类型操作来触发监听事件
        zk.setData("/watcher-exists", "watcher-exists2".getBytes(), -1);
        //删除节点看看能否触发监听事件
        zk.delete("/watcher-exists", -1);
​
    }
​
    /**
     * getData监听事件:
     *      NodeDeleted:节点删除
     *      NodeDataChange:节点内容发生变化
     * @throws IOException
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void test2() throws IOException, KeeperException, InterruptedException {
        //不开启ACL,以持久化自动生成序列方式创建
        zk.create("/watcher-getData", "watcher-getData".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //getData注册监听
        zk.getData("/watcher-getData", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("-----------------------------------------");
                System.out.println("connect state:" + watchedEvent.getState());
                System.out.println("event type:" + watchedEvent.getType());
                System.out.println("znode path:" + watchedEvent.getPath());
                System.out.println("-----------------------------------------");
                try {
                    zk.getData("/watcher-getData",this,null);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
​
            }
        },null);
        //通过修改的事务类型操作来触发监听事件
        zk.setData("/watcher-getData", "watcher-getData2".getBytes(), -1);
        //删除节点看看能否触发监听事件
        zk.delete("/watcher-getData", -1);
    }
​
    /**
     * getChildren监听事件:
     *      NodeChildrenChanged:子节点发生变化
     *      NodeDeleted:节点删除
     * @throws IOException
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void test3() throws IOException, KeeperException, InterruptedException {
        zk.create("/watcher-getChildren",null,
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.create("/watcher-getChildren/watcher-getChildren01","watcher-getChildren01".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //getChildren注册监听
        zk.getChildren("/watcher-getChildren", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("-----------------------------------------");
                System.out.println("connect state:" + watchedEvent.getState());
                System.out.println("event type:" + watchedEvent.getType());
                System.out.println("znode path:" + watchedEvent.getPath());
                System.out.println("-----------------------------------------");
                try {
                    zk.getChildren("/watcher-getChildren",this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        zk.setData("/watcher-getChildren/watcher-getChildren01","watcher-getChildren02".getBytes(), -1);//修改子节点
        zk.delete("/watcher-getChildren/watcher-getChildren01", -1);//删除子节点
        zk.delete("/watcher-getChildren", -1);//删除根节点
    }
}
​

 

4、源码解析

原理流程:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

客户端首先将 Watcher注册到服务端,同时将Watcher对象保存到客户端的Watcher管理器中。当Zookeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的 Watcher管理器会触发相关 Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程

4.1 Watcher接口

public interface Watcher {
​
    /**
     * Event的状态
     */
    public interface Event {
        /**
         * 在事件发生时,ZooKeeper的状态
         */
        public enum KeeperState {
​
            @Deprecated
            Unknown (-1),
​
            Disconnected (0),
​
            @Deprecated
            NoSyncConnected (1),
​
            SyncConnected (3),
​
            AuthFailed (4),
​
            ConnectedReadOnly (5),
​
            SaslAuthenticated(6),
​
            Expired (-112);
​
            private final int intValue;  
​
            KeeperState( int intValue) {
                this.intValue = intValue;
            }   
​
            ......
        }
​
        /**
         * ZooKeeper中的事件
         */
        public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);
​
            private final int intValue;     // Integer representation of value
                                            // for sending over wire
            EventType( int intValue) {
                this.intValue = intValue;
            }
            ......   
        }
    }
​
    //Watcher的回调方法
    abstract public void process(WatchedEvent event);
}

4.2 注册全局监听器

//-------------------------------------ZooKeeper.java----------------------------------------
    //初始化zookeeper客户端时会将默认的监听器设置到监听器管理中
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        //设置默认监听器
        watchManager.defaultWatcher = watcher;
​
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        //事件信息的传输,通知处理,可以从下面方法可以看出来
        cnxn.start();
    }
    
    ------->
    public void start() {
        //负责客户端和服务器端的数据通信, 也包括事件信息的传输和心跳
        sendThread.start();
        //主要在客户端回调注册的 Watchers 进行通知处理
        eventThread.start();
    }

 

4.3 注册监听器(getChildren)

​
public List<String> getChildren(final String path, Watcher watcher)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils. validatePath(clientPath);
​
    WatchRegistration wcb = null;
    //如果watcher不等于null, 构建WatchRegistration对象,执行回调的时候会用到
    //该对象描述了watcher和path之间的关系
    if (watcher != null) {
        wcb = new ChildWatchRegistration(watcher, clientPath);
    }
    
    //在传入的path加上root path前缀,构成服务器端的绝对路径
    final String serverPath = prependChroot(clientPath);
    
    //构建RequestHeader对象
    RequestHeader h = new RequestHeader();
    //设置操作类型为OpCode. getChildren
    h.setType(ZooDefs.OpCode. getChildren);
    //构建GetChildrenRequest对象
    GetChildrenRequest request = new GetChildrenRequest();
    //设置path
    request.setPath(serverPath);
    //设置是否使用watcher
    request.setWatch(watcher != null);
    //构建GetChildrenResponse对象
    GetChildrenResponse response = new GetChildrenResponse();
    //提交请求,通过客户端的网络处理类去提交请求,并阻塞等待结果
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                clientPath);
    }
    return response.getChildren();
}
​
​

4.4 请求包入列并发送

public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();//响应头
        //组装请求入队
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            //阻塞等待请求结果
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }
    
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
​
        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        // 这个队列就是存放我们请求的队列,注意,我们还没有为包生成Xid。
        // 它是在发送时生成,通过实现ClientCnxnSocket::doIO(),数据包实际发送的地方
        synchronized (outgoingQueue) {
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                //请求包入队
                outgoingQueue.add(packet);
            }
        }
        //请求包入列后,唤醒客户发送线程,将请求包发送出去,进入到发送线程SendThread.run()方法中查看
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }
public void run() {
            clientCnxnSocket.introduce(this,sessionId);
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            InetSocketAddress serverAddress = null;
            while (state.isAlive()) {
                try {
                    //判断当前session会话中socket请求是否还连接着
                    if (!clientCnxnSocket.isConnected()) {
                        if(!isFirstConnect){
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            serverAddress = hostProvider.next(1000);
                        }
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                    //校验判断客户端与服务器连接状态
                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }
​
                            if (sendAuthEvent == true) {
                                eventThread.queueEvent(new WatchedEvent(
                                      Watcher.Event.EventType.None,
                                      authState,null));
                            }
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    
                    if (to <= 0) {
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                    //心跳维持操作,心跳次数复位
                    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small 
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
​
                    // If we are in read-only mode, seek for read/write server
                    //如果我们处于只读模式节点,请根据情况查找读or写服务器
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
                    //调用ClientCnxnSocketNIO发起socket网络请求
                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
                }
                //省略......................................................
//发送前一些判断
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // readyOps :获取此键上ready操作集合.即在当前通道上已经就绪的事件
            // SelectKey.OP_CONNECT 连接就绪事件,表示客户与服务器的连接已经建立成功
            // 如果全部已经就绪,进行一次复位处理,包括心跳时间和发送时间
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                //读或者写通道准备完毕,进行IO传输
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
    }

4.5 服务器端循环监听

//...................................NIOServerCnxnFactory.java.................................
public void run() {
        // socket不是关闭状态就进入阻塞
        while (!ss.socket().isClosed()) {
            try {
                selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                    //获取事件键列表
                    selected = selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                        selected);
                Collections.shuffle(selectedList);
                //遍历事件keys
                for (SelectionKey k : selectedList) {
                    //就绪等待连接事件那就先创建连接
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k
                                .channel()).accept();
                        InetAddress ia = sc.socket().getInetAddress();
                        int cnxncount = getClientCnxnCount(ia);
                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                            LOG.warn("Too many connections from " + ia
                                     + " - max is " + maxClientCnxns );
                            sc.close();
                        } else {
                            LOG.info("Accepted socket connection from "
                                     + sc.socket().getRemoteSocketAddress());
                            sc.configureBlocking(false);
                            SelectionKey sk = sc.register(selector,
                                    SelectionKey.OP_READ);
                            NIOServerCnxn cnxn = createConnection(sc, sk);
                            sk.attach(cnxn);
                            addCnxn(cnxn);
                        }
                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        // 就绪读写事件,开始执行(包括处理消息,返回监听)
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Unexpected ops in select "
                                      + k.readyOps());
                        }
                    }
                }
                selected.clear();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring exception", e);
            }
        }
        closeAll();
        LOG.info("NIOServerCnxn factory exited run method");
    }

 

4.6 触发Watcher(setData)

在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。

如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。

其关SetData请求的时序图如下:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

4.6.1 DataTree.setData()方法

根据上面的时序图,一路跟踪到DataTree.setData方法:

public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        //根据path, 获得DataNode对象n
        DataNode n = nodes.get(path);
        //如果n为null, 则抛出NoNodeException异常
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
        synchronized (n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        // 更新数据
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
        //触发Watcher
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }

4.6.2 WatchManager触发Watcher

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            //从watchTable删除掉path对应的watcher
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        //循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            //ServerCnxn执行监控事件对象
            w.process(e);
        }
        return watchers;
    }

4.6.3 ServerCnxn发送notification

包括NIOServerCnxn和NettyServerCnxn,其实代码逻辑都一样

    //NIO
    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }
​
        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        //发送notification给Client端
        sendResponse(h, e, "notification");
    }
    
    //Netty
    public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }
​
        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        try {
            //发送notification给Client端
            sendResponse(h, e, "notification");
        } catch (IOException e1) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
            }
            close();
        }
    }
​
    protected void internalSendBuffer(ByteBuffer bb) {
        if (bb != ServerCnxnFactory.closeConn) {
            // We check if write interest here because if it is NOT set,
            // nothing is queued, so we can try to send the buffer right
            // away without waking up the selector
            if(sk.isValid() &&
                    ((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) {
                try {
                    sock.write(bb);//通过socket向客户端写入字节流,调度process
                } catch (IOException e) {
                    // we are just doing best effort right now
                }
            }
            // if there is nothing left to send, we are done
            if (bb.remaining() == 0) {
                packetSent();
                return;
            }
        }
​
        synchronized(this.factory){
            sk.selector().wakeup();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                        + " is valid: " + sk.isValid());
            }
            outgoingBuffers.add(bb);
            if (sk.isValid()) {
                sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
            }
        }
    }
​
    //至于远程客户端地址端口怎么知道,初始化ServerCnxn的时候就设置了
    public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
            SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
        this.zkServer = zk;
        this.sock = sock;//保存客户端sock对象
        this.sk = sk;
        this.factory = factory;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
        }
        if (zk != null) { 
            outstandingLimit = zk.getGlobalOutstandingLimit();
        }
        sock.socket().setTcpNoDelay(true);
        /* set socket linger to false, so that socket close does not
         * block */
        sock.socket().setSoLinger(false, -1);
        InetAddress addr = ((InetSocketAddress) sock.socket()
                .getRemoteSocketAddress()).getAddress();
        authInfo.add(new Id("ip", addr.getHostAddress()));
        sk.interestOps(SelectionKey.OP_READ);
    }

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐