一、watch基于事件的应用

zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client.即watcher.
同样:其watcher是监听数据发送了某些变化,那就一定会有对应的事件类型和状态类型。

事件类型:(znode节点相关的)

	 EventType:NodeCreated            //节点创建
	 EventType:NodeDataChanged        //节点的数据变更
	 EventType:NodeChildrentChanged   //子节点下的数据变更
	 EventType:NodeDeleted

状态类型:(是跟客户端实例相关的)

	 KeeperState:Disconneced        //连接失败
	 KeeperState:SyncConnected	//连接成功	 
	 KeeperState:AuthFailed         //认证失败
	 KeeperState:Expired            //会话过期

watcher的特性:一次性、客户端串行执行、轻量

一次性:

对于ZK的watcher,只需要记住一点:zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher,由于zookeeper的监控都是一次性的,所以每次必须设置监控。

客户端串行执行:

客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同事需要开发人员注意一点,千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。

轻量:

WatchedEvent 是Zookeeper整个Watcher通知机制的最小通知单元,整个结构只包含三部分:通知状态、事件类型和节点路径。也就是说Watcher通知非常的简单,只会告诉客户端发生了事件,而不会告知其具体内容,需要客户端自己去进行获取,比如NodeDataChanged事件,Zookeeper只会通知客户端指定节点的数据发生了变更,而不会直接提供具体的数据内容。

二、watch底层机制

ZooKeeper 提供了分布式数据的发布/订阅功能。

这让我想到一种模式,观察者模式(发布订阅模式):一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题自身状态变化时,会通知所有订阅者,试它们能够做出相应的处理。那 ZooKeeper 是不是也是使用了这个经典的模式呢?

在 ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务器的一些特定事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

以下引用代码以及说辞都是基于 ZooKeeper 3.4.8。

ZooKeeper 的 Watcher 机制主要包括客户端线程、客户端 WatchManager 和 ZooKeeper 服务器三部分。

在这里插入图片描述
在上图中:

  • ZooKeeper :部署在远程主机上的 ZooKeeper 集群,当然,也可能是单机的。
  • Client :分布在各处的 ZooKeeper 的 jar 包程序,被引用在各个独立应用程序中。
  • WatchManager :一个接口,用于管理各个监听器,只有一个方法 materialize(),返回一个 Watcher 的 set。

工作机制:

客户端在向zk服务器注册Watcher的同时,会将Watcher对象存储在客户端的WatchManager中。
当zk服务端触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象。来执行回调逻辑。

Watcher接口

Watcher是一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventState,分别代表通知状态和事件类型,在回调process中进行处理。

public abstract void process(WatchedEvent event); 

WatchedEvent类

public class WatchedEvent
  {
    //是一个枚举类,代表通知状态,是链接的,还是断开的。
    keeperState;
    //是一个枚举类,代表事件类型,当处于连接时,事件是子节点变化等事件
    eventType
    //节点路径
    path
  }

WatcherEvent类

WatcherEvent
 {
   //代表事件类型
   type:int;
   //代表节点状态
   state:int;
   path:String
 }
总结:

WatchedEvent和WatcherEvent表示的是同一个事物。WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象。而WatcherEvent因为实现了序列化接口,因此,可以用于网络传输。
服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个WatcherEvent,以便通过网络传输到客户端,客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent事件还原成一个WatchedEvent事件,并传递给process方法进行处理。
从事件可以看到,客户端无法直接从该事件中获取到对应数据节点的原始数据内容以及变更后的新数据内容。

客户端注册 Watcher

1、在创建一个 ZooKeeper 客户端对象实例时,可以向构造方法中传入一个默认的 Watcher。默认的Watcher作为整个zk会话期间的默认Watcher,会一直保存在客户端的ZkWatchManager的defaultWatcher中。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

ZkWatchManager数据结构

public class  ZkWatchManager
{
  dataWatches:Map<String,Set<Watcher>>
  existWatches: Map<String,Set<Watcher>>
  childWatches: Map<String,Set<Watcher>>
  defaultWatches: Watcher
}

这个 Watcher 将作为整个 ZooKeeper会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中。另外,ZooKeeper 客户端也可以通过 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,无论哪种方式,注册 Watcher 的工作原理都是一致的。下面以 getData 这个接口为例说明,这个接口用于获取指定节点的数据内容,主要有两个方法:

public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        PathUtils.validatePath(path);//路径校验
        ZooKeeper.DataWatchRegistration wcb = null;
        if(watcher != null) {
            wcb = new ZooKeeper.DataWatchRegistration(watcher, path);//封装 Watcher
        }

        String serverPath = this.prependChroot(path);
        RequestHeader h = new RequestHeader();
        h.setType(4);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);//发送到服务端
        if(r.getErr() != 0) {
            throw KeeperException.create(Code.get(r.getErr()), path);
        } else {
            if(stat != null) {
                DataTree.copyStat(response.getStat(), stat);
            }

            return response.getData();
        }
    }

    public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
        return this.getData(path, watch?this.watchManager.defaultWatcher:null, stat);
    }

以上两个接口都可以进行 Watcher 的注册,第二个接口通过一个 Boolean 参数来标识是否使用上文提到的默认 Watcher 来进行注册,看代码可知,然后仍然还要去调用第一个方法来完成注册逻辑。而在第一个方法中,客户端使用 this.cnxn.submitRequest(h, request, response, wcb) 方法向服务器发送这个注册请求,同时等待请求的返回。
完成请求发送后,会由客户端 SendTread 线程的 readResponse 方法负责接收来自服务端的响应,readResponse 方法的最后会调用finishPacket 方法,它会从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去。

private void finishPacket(ClientCnxn.Packet p) {
        if(p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if(p.cb == null) {
            synchronized(p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            this.eventThread.queuePacket(p);
        }

    }

watchRegistration.register方法就是把 WatchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 dataWatches 中存储起来。从上面UML图上可以知道,ZKWatchManager .dataWatches 是一个 Map

abstract class WatchRegistration {
        private Watcher watcher;
        private String clientPath;

        protected abstract Map<String, Set<Watcher>> getWatches(int var1);

        public void register(int rc) {
            if(this.shouldAddWatch(rc)) {
                Map watches = this.getWatches(rc);//通过子类的实现取得ZKWatchManager 中的 dataWatches
                synchronized(watches) {
                    Object watchers = (Set)watches.get(this.clientPath);
                    if(watchers == null) {
                        watchers = new HashSet();
                        watches.put(this.clientPath, watchers);
                    }

                    ((Set)watchers).add(this.watcher);//将 Watcher 对象放到 ZKWatchManager 中的 dataWatches里面
                }
            }
        }
    }

这个过程,简单来说,就是。当使用ZooKeeper 构造方法或者使用 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和Watcher对应关系存储起来备用。

服务端处理Watcher

说白了也就是存起来,当某个节点变动时,取出来该节点,然后调用 sendResponse 方法发送event 事件。
1、在请求报文中判断是否需要注册。

getDataRequest.getWatch?cnxn:null
 ...
 byte[] b = zks.getZkDatabase().getData(getDataRequest,getPath(),stat,getDataRequest.getWatch()?cnxn:null);
 rsp = new GetDataResponse(b,stat);
 break;

ServerCnxn代表一个客户端和服务端的链接。
是基于netty的实现;NettyServerCnxn,

ServerCnxn实现了Watcher的process接口

数据节点的节点路径和serverCnxn最终会存储在WatchManager的watchTable和watch2Paths中。WatchManager是服务端的Watcher的管理者。WatchManager只是一个统称,在服务端,DataTree中会托管两个watchManager,分别是dataWatches和childWatches;

WatchManager
//watchTable是从数据节点的路径的粒度来托管Watcher。
--watchTable:HashMap<String,HashSet<Watcher>>
//watch2Paths;是从Watcher的粒度来控制事件触发需要触发的数据节点。
--watch2Paths:HashMap<Watcher,HashSet<String>>;
//定义对watcher的增删改查和触发watcher
triggerWatch();
addWatch();
removeWatch();

2、zk服务端的数据存储

DataNode n = nodes.get(path);
 setData(String path,byte[] data,int version,long zxid,long time)
 {
   DataNode n = nodes.get(path);
   synchronized(n);
   //触发节点通知
   dataWatches.triggerWatch(path,EventType,NodeDataChanged);
 }
 //触发watcher实现
 public set<Watcher> triggerWatch(String path,EventTye type ,Set<Watcher> process)
 {
    WatchedEvent e = new WatchedEvent(type,KeeperState,path);
    HashSet<Watcher> watches;
    synchronized(this)
    {
      watchers = watchTable.remove(path);
      //...
      for(Watcher w:watchers)
      {
        HashSet<String> paths = watch2Paths..
        if (paths != null)
        {
          paths.remove(path);
        }
      }
     for(Watch w:watchers)
     {
       w.process(e);//进行处理
     }
    }
    return watchers;
 }

服务端处理watcher总结

1、封装WatchedEvent; 首先将通知状态(KeeperState),事件类型(EventType)以及节点路径(path)封装成WatchedEvent。

2、查询Watcher,根据数据节点的路径从watchTable中取出对应的watcher,如果没有,说明没有注册watcher。如果找到这个watcher,将其提取出来,同时会直接从watchTable和watch2Paths中将其删除。watcher在server端是一次性的。

3、调用process触发watcher。逐个调用触发;zk会把当前请求的ServerCnxn作为一个Watcher进行存储。

public class NIOServerCnxn extends ServerCnxn
 {
   synchronized void process (WathedEvent event)
   {
     ReplayHeader h = new ReplayHeader(-1,-1l,0);
     ,,,
     watcherEvent e = event.getWrapper();
     sendResponse(h,e,"notification");
   }
 }
 在请求头中,标记为"-1",表面当前是一个通知。
 将watched包装成watcherEvent,以便进行网络传输。
 

客户端回调 Watcher

最终服务端会通过使用 ServerCnxn 对应的 TCP 连接来向客户端发送一个 WatcherEvent 事件。

//SendThread接收事件通知
    class SendThread extends ZooKeeperThread {
        ....
        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            if(replyHdr.getXid() == -2) {
                ....
            } else if(replyHdr.getXid() == -4) {
                .....
            } else if(replyHdr.getXid() == -1) {
                WatcherEvent packet2 = new WatcherEvent();
                packet2.deserialize(bbia, "response");
                if(ClientCnxn.this.chrootPath != null) {
                    String we = packet2.getPath();
                    if(we.compareTo(ClientCnxn.this.chrootPath) == 0) {
                        packet2.setPath("/");
                    } else if(we.length() > ClientCnxn.this.chrootPath.length()) {
                        packet2.setPath(we.substring(ClientCnxn.this.chrootPath.length()));
                    } else {
                        ClientCnxn.LOG.warn("Got server path " + packet2.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
                    }
                }
                WatchedEvent we1 = new WatchedEvent(packet2);

                ClientCnxn.this.eventThread.queueEvent(we1); 
            } else if(this.clientTunneledAuthenticationInProgress()) {
                GetSASLRequest packet1 = new GetSASLRequest();
                packet1.deserialize(bbia, "token");
                ClientCnxn.this.zooKeeperSaslClient.respondToServer(packet1.getToken(), ClientCnxn.this);
            } else {

处理步骤:

对于一个来自服务端的响应,都是经过一堆的 NIO 处理类到达客户端,然后由 SendThread.readResponse(ByteBuffer incomingBuffer) 方法来统一进行处理的。如果响应头 replyHdr 中标识了 xid 为 -1,表面这是一个通知类型的响应,对其的处理大体上分为 4 个步骤。

1、反序列化

packet2.deserialize(bbia, “response”);将字节流转换成 WatcherEvent 对象。

2、处理 chrootPath

如果chrootPath != null,然后。。。

3、还原 WatchedEvent

WatchedEvent we1 = new WatchedEvent(packet2);

4、回调 Watcher

ClientCnxn.this.eventThread.queueEvent(we1); 最后将 WatchedEvent 对象交给 eventThread 线程,在下一个轮询周期中进行回调。

下面来看一下 eventThread.queueEvent(we1) 里面的逻辑:

 public void queueEvent(WatchedEvent event) {
            if(event.getType() != EventType.None || this.sessionState != event.getState()) {
                this.sessionState = event.getState();
                ClientCnxn.WatcherSetEventPair pair = new ClientCnxn.WatcherSetEventPair(ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
                this.waitingEvents.add(pair);
            }
        }

对于这个方法,首先使用该 event 来生成一个 WatcherSetEventPair 类型的pari,这个pari只是把 event 加了一个壳,然后附加上了 这个节点上所有的 Watcher :

  private static class WatcherSetEventPair {
        private final Set<Watcher> watchers;
        private final WatchedEvent event;

那么又是如何获取到注册该节点的所有 Watcher 呢?看一下上面的 ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()) 这个方法,以 NodeCreated 事件为例:

public Set<Watcher> materialize(KeeperState state, EventType type, String clientPath) {
            HashSet result = new HashSet();
            Map msg;
            switch(null.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[type.ordinal()]) {
            case 1:
                ....
            case 2:
            case 3:
                msg = this.dataWatches;
                synchronized(this.dataWatches) {
                    this.addTo((Set)this.dataWatches.remove(clientPath), result);
                }

                msg = this.existWatches;
                synchronized(this.existWatches) {
                    this.addTo((Set)this.existWatches.remove(clientPath), result);
                    break;
                }
            case 4:

客户端在识别出事件类型 EventType 后,会从相应的 Watcher 存储(即 dataWatches、existWatches 或 childWatches 中的一个或多个,本例中就是从 dataWatches 和 existWatches 两个存储中获取,因为,节点创建事件不会在 childWatches 中存储)中去除对应的 Watcher。需要注意的是,这里使用的是 remove 接口,因此也表明了客户端的 Watcher 机制同样也是一次性的,即一旦被触发后,该 Watcher 就失效了。

取到所有的 Watcher 后,放到 pari 的 Set 里面,然后再把这个 pari 放到 waitingEvents 里面,而 waitingEvents 是啥玩意儿呢?

private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();

就是说,把这个 pari 放到一个单向链表实现的阻塞队列里面。到这,感觉就很熟悉了。

 public void run() {
            try {
                this.isRunning = true;

                while(true) {
                    Object e = this.waitingEvents.take();//循环取pari
                    if(e == ClientCnxn.this.eventOfDeath) {
                        this.wasKilled = true;
                    } else {
                        this.processEvent(e);//进行处理
                    }

                    if(this.wasKilled) {
                        LinkedBlockingQueue var2 = this.waitingEvents;
                        synchronized(this.waitingEvents) {
                            if(this.waitingEvents.isEmpty()) {
                                this.isRunning = false;
                                break;
                            }
                        }
                    }
                }
            } catch (InterruptedException var5) {
                ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
            }

        }

waitingEvents 是一个待处理 Watcher 的队列,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:

        private void processEvent(Object event) {
            try {
                if(event instanceof ClientCnxn.WatcherSetEventPair) {
                    ClientCnxn.WatcherSetEventPair t = (ClientCnxn.WatcherSetEventPair)event;
                    Iterator rc = t.watchers.iterator();

                    while(rc.hasNext()) {
                        Watcher clientPath = (Watcher)rc.next();

                        try {
                            clientPath.process(t.event);
                        } catch (Throwable var11) {
                            ClientCnxn.LOG.error("Error while calling watcher ", var11);
                        }
                    }
                } else {

OK,针对于本次事件,取出所有的 Watcher 类型的对象,遍历运行process方法,进行串行同步处理。此处 processEvent 方法中的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。

三、Watcher 特性总结

ZooKeeper 的 Watcher 具有以下几个特性。

一次性

无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,在 Watcher 的使用上,需要反复注册。这样的设计有效地减轻了服务端的压力。

客户端串行执行

客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要注意的一点是,一定不能因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调,所以,我觉得客户端 Watcher 的实现类要另开一个线程进行处理业务逻辑,以便给其他的 Watcher 调用让出时间。

轻量

WatcherEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的Watcher 只会通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据——这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐