首先,ZooKeeper 提供了分布式数据的发布/订阅功能。

这让我想到一种模式,观察者模式(发布订阅模式):一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题自身状态变化时,会通知所有订阅者,试它们能够做出相应的处理。那 ZooKeeper 是不是也是使用了这个经典的模式呢?

在 ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务器的一些特定事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

以下引用代码以及说辞都是基于 ZooKeeper 3.4.8。

ZooKeeper 的 Watcher 机制主要包括客户端线程、客户端 WatchManager 和 ZooKeeper 服务器三部分。
这里写图片描述
在上图中:

  • ZooKeeper :部署在远程主机上的 ZooKeeper 集群,当然,也可能是单机的。
  • Client :分布在各处的 ZooKeeper 的 jar 包程序,被引用在各个独立应用程序中。
  • WatchManager :一个接口,用于管理各个监听器,只有一个方法 materialize(),返回一个 Watcher 的 set。

在具体流程上,简单讲,客户端在向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。当ZooKeeper 服务器触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 的实现类中取出对应的 Watcher 对象来执行回调逻辑。

工作机制

ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher,其内部各组件之间的关系如图:

组件之间的关系图

本来想自己画画的,画不好,就截了别人的一个。

客户端注册 Watcher

在创建一个 ZooKeeper 客户端对象实例时,可以向构造方法中传入一个默认的 Watcher:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

这个 Watcher 将作为整个 ZooKeeper会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中。另外,ZooKeeper 客户端也可以通过 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,无论哪种方式,注册 Watcher 的工作原理都是一致的。下面以 getData 这个接口为例说明,这个接口用于获取指定节点的数据内容,主要有两个方法:

    public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        PathUtils.validatePath(path);//路径校验
        ZooKeeper.DataWatchRegistration wcb = null;
        if(watcher != null) {
            wcb = new ZooKeeper.DataWatchRegistration(watcher, path);//封装 Watcher
        }

        String serverPath = this.prependChroot(path);
        RequestHeader h = new RequestHeader();
        h.setType(4);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);//发送到服务端
        if(r.getErr() != 0) {
            throw KeeperException.create(Code.get(r.getErr()), path);
        } else {
            if(stat != null) {
                DataTree.copyStat(response.getStat(), stat);
            }

            return response.getData();
        }
    }

    public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
        return this.getData(path, watch?this.watchManager.defaultWatcher:null, stat);
    }

以上两个接口都可以进行 Watcher 的注册,第二个接口通过一个 Boolean 参数来标识是否使用上文提到的默认 Watcher 来进行注册,看代码可知,然后仍然还要去调用第一个方法来完成注册逻辑。而在第一个方法中,客户端使用 this.cnxn.submitRequest(h, request, response, wcb) 方法向服务器发送这个注册请求,同时等待请求的返回。完成请求发送后,会由客户端 SendTread 线程的 readResponse 方法负责接收来自服务端的响应,readResponse 方法的最后会调用finishPacket 方法,它会从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去:

    private void finishPacket(ClientCnxn.Packet p) {
        if(p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if(p.cb == null) {
            synchronized(p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            this.eventThread.queuePacket(p);
        }

    }

watchRegistration.register方法就是把 WatchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 dataWatches 中存储起来。从上面UML图上可以知道,ZKWatchManager .dataWatches 是一个 Map<String, Set> 结构,用于将数据节点的路径和 Watcher 对象进行一一映射后管理起来。

    abstract class WatchRegistration {
        private Watcher watcher;
        private String clientPath;

        protected abstract Map<String, Set<Watcher>> getWatches(int var1);

        public void register(int rc) {
            if(this.shouldAddWatch(rc)) {
                Map watches = this.getWatches(rc);//通过子类的实现取得ZKWatchManager 中的 dataWatches
                synchronized(watches) {
                    Object watchers = (Set)watches.get(this.clientPath);
                    if(watchers == null) {
                        watchers = new HashSet();
                        watches.put(this.clientPath, watchers);
                    }
                    ((Set)watchers).add(this.watcher);//将 Watcher 对象放到 ZKWatchManager 中的 dataWatches里面
                }
            }
        }
    }

这个过程,简单来说,就是:当使用ZooKeeper 构造方法或者使用 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和Watcher对应关系存储起来备用。

服务端处理 Watcher

客户端watcher注册

下面是服务端接收watcher并将其存储起来的过程的序列图。

存储watcher的序列图

在方法 org.apache.zookeeper.server.FinalRequestProcessor#processRequest 中,会判断当前的请求是否需要注册,代码如下:

            case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest);
                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                Long aclL;
                synchronized(n) {
                    aclL = n.acl;
                }
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                        ZooDefs.Perms.READ,
                        request.authInfo);
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null); //关键点
                rsp = new GetDataResponse(b, stat);
                break;
            }

从上面可以看出来,如果需要注册的话,会把cnxn保存到watcherManager里面,而cnxn其实就是一个ServerCnxn的子类,ServerCnxn是什么呢?下面看一下类描述。

/**
 * Interface to a Server connection - represents a connection from a client
 * to the server.
 */
public abstract class ServerCnxn implements Stats, Watcher {

它其实是client和server之间的连接的接口,并且,实现了watcher接口。到这里,就算是将客户端注册到服务端了,只需要有数据的变更来触发事件,然后通知客户端。

触发watcher

以NodeDataChanged事件为例来说的话,当调用org.apache.zookeeper.server.DataTree#setData方法时,就会调用org.apache.zookeeper.server.WatchManager#triggerWatch(String, EventType) 这个方法,具体源码如下:

    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);//在路径-watcher关系中,根据路径删除watcher
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                //如果watcher不存在,直接返回
                return null;
            }
            // 在watcher-路径 映射关系中,删除路径
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            //调用ServerCnxn的process方法,通知客户端
            w.process(e);
        }
        return watchers;
    }

从上面的代码可以看出来,watcher在服务端的触发也是一次性的。而ServerCnxn的process方法也很简单,以NettyServerCnxn的process方法为例,代码如下:

    @Override
    public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        try {
            sendResponse(h, e, "notification");
        } catch (IOException e1) {
            close();
        }
    }

可以看出来,process方法只是传输WatcherEvent对象而已,并没有什么逻辑,真正的客户端watcher回调和业务逻辑的执行,都在客户端。

客户端回调 Watcher

最终服务端会通过使用 ServerCnxn 对应的 TCP 连接来向客户端发送一个 WatcherEvent 事件。

    class SendThread extends ZooKeeperThread {
		....
        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            if(replyHdr.getXid() == -2) {
                ....
            } else if(replyHdr.getXid() == -4) {
                .....
            } else if(replyHdr.getXid() == -1) {
                WatcherEvent packet2 = new WatcherEvent();
                packet2.deserialize(bbia, "response");
                if(ClientCnxn.this.chrootPath != null) {
                    String we = packet2.getPath();
                    if(we.compareTo(ClientCnxn.this.chrootPath) == 0) {
                        packet2.setPath("/");
                    } else if(we.length() > ClientCnxn.this.chrootPath.length()) {
                        packet2.setPath(we.substring(ClientCnxn.this.chrootPath.length()));
                    } else {
                        ClientCnxn.LOG.warn("Got server path " + packet2.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
                    }
                }
                WatchedEvent we1 = new WatchedEvent(packet2);
                
                ClientCnxn.this.eventThread.queueEvent(we1); 
            } else if(this.clientTunneledAuthenticationInProgress()) {
                GetSASLRequest packet1 = new GetSASLRequest();
                packet1.deserialize(bbia, "token");
                ClientCnxn.this.zooKeeperSaslClient.respondToServer(packet1.getToken(), ClientCnxn.this);
            } else {

对于一个来自服务端的响应,都是经过一堆的 NIO 处理类到达客户端,然后由 SendThread.readResponse(ByteBuffer incomingBuffer) 方法来统一进行处理的。如果响应头 replyHdr 中标识了 xid 为 -1,表面这是一个通知类型的响应,对其的处理大体上分为 4 个步骤。

  1. 反序列化
    packet2.deserialize(bbia, “response”);将字节流转换成 WatcherEvent 对象。

  2. 处理 chrootPath
    如果chrootPath != null,然后。。。

  3. 还原 WatchedEvent
    WatchedEvent we1 = new WatchedEvent(packet2);

  4. 回调 Watcher
    ClientCnxn.this.eventThread.queueEvent(we1); 最后将 WatchedEvent 对象交给 eventThread 线程,在下一个轮询周期中进行回调。

下面来看一下 eventThread.queueEvent(we1) 里面的逻辑:

        public void queueEvent(WatchedEvent event) {
            if(event.getType() != EventType.None || this.sessionState != event.getState()) {
                this.sessionState = event.getState();
                ClientCnxn.WatcherSetEventPair pair = new ClientCnxn.WatcherSetEventPair(ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
                this.waitingEvents.add(pair);
            }
        }

对于这个方法,首先使用该 event 来生成一个 WatcherSetEventPair 类型的pari,这个pari只是把 event 加了一个壳,然后附加上了 这个节点上所有的 Watcher :

    private static class WatcherSetEventPair {
        private final Set<Watcher> watchers;
        private final WatchedEvent event;

那么又是如何获取到注册该节点的所有 Watcher 呢?看一下上面的 ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()) 这个方法,以 NodeCreated 事件为例:

        public Set<Watcher> materialize(KeeperState state, EventType type, String clientPath) {
            HashSet result = new HashSet();
            Map msg;
            switch(null.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[type.ordinal()]) {
            case 1:
				....
            case 2:
            case 3:
                msg = this.dataWatches;
                synchronized(this.dataWatches) {
                    this.addTo((Set)this.dataWatches.remove(clientPath), result);
                }

                msg = this.existWatches;
                synchronized(this.existWatches) {
                    this.addTo((Set)this.existWatches.remove(clientPath), result);
                    break;
                }
            case 4:

客户端在识别出事件类型 EventType 后,会从相应的 Watcher 存储(即 dataWatches、existWatches 或 childWatches 中的一个或多个,本例中就是从 dataWatches 和 existWatches 两个存储中获取,因为,节点创建事件不会在 childWatches 中存储)中去除对应的 Watcher。需要注意的是,这里使用的是 remove 接口,因此也表明了客户端的 Watcher 机制同样也是一次性的,即一旦被触发后,该 Watcher 就失效了。

取到所有的 Watcher 后,放到 pari 的 Set 里面,然后再把这个 pari 放到 waitingEvents 里面,而 waitingEvents 是啥玩意儿呢?

private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();

就是说,把这个 pari 放到一个单向链表实现的阻塞队列里面。到这,感觉就很熟悉了。

        public void run() {
            try {
                this.isRunning = true;

                while(true) {
                    Object e = this.waitingEvents.take();//循环取pari
                    if(e == ClientCnxn.this.eventOfDeath) {
                        this.wasKilled = true;
                    } else {
                        this.processEvent(e);//进行处理
                    }

                    if(this.wasKilled) {
                        LinkedBlockingQueue var2 = this.waitingEvents;
                        synchronized(this.waitingEvents) {
                            if(this.waitingEvents.isEmpty()) {
                                this.isRunning = false;
                                break;
                            }
                        }
                    }
                }
            } catch (InterruptedException var5) {
                ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
            }

        }

waitingEvents 是一个待处理 Watcher 的队列,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:

        private void processEvent(Object event) {
            try {
                if(event instanceof ClientCnxn.WatcherSetEventPair) {
                    ClientCnxn.WatcherSetEventPair t = (ClientCnxn.WatcherSetEventPair)event;
                    Iterator rc = t.watchers.iterator();

                    while(rc.hasNext()) {
                        Watcher clientPath = (Watcher)rc.next();

                        try {
                            clientPath.process(t.event);
                        } catch (Throwable var11) {
                            ClientCnxn.LOG.error("Error while calling watcher ", var11);
                        }
                    }
                } else {

OK,针对于本次事件,取出所有的 Watcher 类型的对象,遍历运行process方法,进行串行同步处理。此处 processEvent 方法中的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。

Watcher 特性总结

ZooKeeper 的 Watcher 具有以下几个特性。

  • 一次性
    无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,在 Watcher 的使用上,需要反复注册。这样的设计有效地减轻了服务端的压力。

  • 客户端串行执行
    客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要注意的一点是,一定不能因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调,所以,我觉得客户端 Watcher 的实现类要另开一个线程进行处理业务逻辑,以便给其他的 Watcher 调用让出时间。

  • 轻量
    WatcherEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的Watcher 只会通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据——这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐