zk作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅发布功能,其实说白了就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。

zk的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的“推”部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是“拉”部分。


订阅-发布在zk中是通过事件注册和回调机制实现的,下面看下这部分内容。

整个注册回调过程分为三个大的部分:客户端注册,服务端发回事件,客户端回调


1.客户端注册:

回调接口:

public interface Watcher {
    abstract public void process(WatchedEvent event);
}
所有的事件回调接口都需要实现这个接口,并在process内部实现回调逻辑。event封装了事件的信息。event有两个层级,第一个是state,第二个是evetType。不同的state有不同的type。

下面是对应关系:



zk的事件注册接口:

zk的事件注册接口主要有有以下的四类:

1.默认watch,也就是在new一个ZooKeeper实例代表了一个zk客户端去连接服务器的时候,在构造方法里面传入的一个默认watch的回调接口,这个主要解决连接事件。在event中对应了syncConnected的state和none的type。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

2.通过getData,getChildren和exist三个接口。每一种又有同步和异步两种版本。下面只看getData版本的:

 public byte[] getData(final String path, Watcher watcher, Stat stat)
 public void getData(final String path, Watcher watcher,
            DataCallback cb, Object ctx)

第一个有返回值的是同步的,第二个无返回值有回调cb的是异步的。当然,每一个又有几个重载版本,这里只贴了其中的一种。

所以注册的接口基本上是我们先实现一个watch接口,作为回调处理逻辑,然后调用以上的接口来注册感兴趣的事件。那么这个注册过程是怎样的?

我们重点以getData同步版本来说明,异步的其实在注册这一块是一样的,都是通过构造packet来完成。

        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
。。。
 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
在getData内部,首先构建了一个watchRegistration实例,这个类后面说,总之它封装了了回调接口和关注节点。然后把这个注册对象和packetheader一起传入了submit方法。再看submit方法:

 Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
里面构造了一个packet,再看是如何构造的:

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
        synchronized (outgoingQueue) {
            if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
                h.setXid(getXid());
            }
            packet = new Packet(h, r, request, response, null,
                    watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!zooKeeper.state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }

        sendThread.wakeup();
        return packet;
    }
主要就是设置了packet的属性,然后把这个请求packet送入了发送队列。要知道我们注册回调的接口本来是用来获取数据的,所以回调依附在了获取这个过程中,这里的packet构造主要是为了获取一次数据,构建的一个请求包,我们的事件回调依附了这个过程,然后作为了这个请求packet的属性保存了起来。因为我们的是同步版本,所以packet的异步接口cb在上一步设置为了null。这里和回调相关的就是设置了packet的watchRegistration属性,也就是我们传入的回调接口,这是通过packet的构造方法完成的。所以有必要看下一个请求packet的内部:

static class Packet {
        RequestHeader header;

        ByteBuffer bb;

        /** Client's view of the path (may differ due to chroot) **/
        String clientPath;
        /** Servers's view of the path (may differ due to chroot) **/
        String serverPath;

        ReplyHeader replyHeader;

        Record request;

        Record response;

        boolean finished;

        AsyncCallback cb;

        Object ctx;

        WatchRegistration watchRegistration;

这是packet的属性,这里的wathRegistration就是回调接口,cb是getData的异步版本的回调,在得到数据以后的回调函数,也就是上面我们谈到的设为null的属性,因为我们看的是getData的同步版本,所以为null。需要明确两个回调的区别。
到这里,我们的事件回调函数已经和这次getData请求的packet关联起来的。

那么,最后这个packet就会进入到outgoingQueue中被发送。

也就是在SendThread的一次write过程中。

然后getData请求的数据就会被服务器返回,在SendThread的一次read过程中,具体在readResponse函数中的最后部分,也就是finishPacket函数中,完成最后的注册:

    private void finishPacket(Packet p) {
        if (p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }
可以看到这里调用了一个register的方法。

下面需要了解下zk客户端与注册有关的数据结构:

在ZooKeeper类中,有一个内部类ZKWatchManager,是客户端存储所有的事件注册的类,里面有以下几个重要的属性,存储回调:

   private static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();

        private volatile Watcher defaultWatcher;
从名字上就可以看出各个属性的作用,正好对应了我们开始所说的4种回调。

map中的key就是节点的path,set就是该节点上所有的回调。因为默认的回调处理只有一个,所以就不是map,其余的事件,每一个节点都可能会有多个,所以是一个set。

再看一直出现的WatchRegistration结构:

 abstract class WatchRegistration {
        private Watcher watcher;
        private String clientPath;
        public WatchRegistration(Watcher watcher, String clientPath)
        {
            this.watcher = watcher;
            this.clientPath = clientPath;
        }

是一个抽象类,其实就是封装了一个事件注册,包括了感兴趣的节点和回调函数。data,children和exist三种事件都有一个对应的实现类。这个抽象类有一个非抽象方法register,负责将packet里面的watchRegistration放到之前的watchmanager中:

        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }
首先根据事件类型拿到正确的map,然后把watch回调放入map里面。


至此客户端注册一个事件回调的逻辑就清晰了,总结就是,通过注册函数来设置回调接口为packet的属性。然后在注册函数收到其自身希望得到的数据的时候,来把回调函数注册到manager上。


服务端处理:

主要分为了两部分,服务端添加事件,服务端触发事件以后的处理。

先看服务端添加事件:

还是以刚才的getData为例,服务端的process收到了getData请求,就会返回数据,这个procesor是FinalRequestProcessor,其中处理getData请求的部分代码:

 case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ZooKeeperServer.byteBuffer2Record(request.request,
                        getDataRequest);
                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                Long aclL;
                synchronized(n) {
                    aclL = n.acl;
                }
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                        ZooDefs.Perms.READ,
                        request.authInfo);
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
                rsp = new GetDataResponse(b, stat);
                break;
            }

重点是getData函数的调用,检测客户端是否注册了watch,如果注册了,那么就传cnxn,否则就传null。这里的cnxn其实是服务端处理io的线程类,后面说。getData最终会到dataTree的getData函数:

    public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);
            }
            return n.data;
        }
    }
会在datawatches里面添加watch,因为我们是data类型的watch。

在Datatree类有两个和watch相关的属性:

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();
分别存储了数据的子节点的watch。再看WatchManager结构:

    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();

    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();

主要有两个map,存储了节点到watch 和 watch到节点的双向映射,这也是服务端存储事件的结构。这样服务端就在相应的节点上添加了一个watch。


再看服务端触发watch事件逻辑,比如通过setData改变数据:

在datatree的

  public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
函数的最后有一段:

  dataWatches.triggerWatch(path, EventType.NodeDataChanged);
会触发事件:

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }
最终会调用process函数,这里process函数是watch接口的实现,但是这个只有客户端才有啊。实际上,服务端这里的实现类就是服务端的线程类:NIOServerCnxn。

public class NIOServerCnxn implements Watcher, ServerCnxn
再看它的process方法:

  synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();

        sendResponse(h, e, "notification");
    }
可以看到,只是发了一个事件类型的信息,header为-1。



客户端执行回调:

从上面可以看到,服务端触发了时间以后会发送一个-1为header的相应。

那么客户端就会在io线程的read部分读到这个信息,最后会到readResponse函数里处理:

            if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else
                        event.setPath(serverPath.substring(chrootPath.length()));
                }

                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }

                eventThread.queueEvent( we );
                return;
            }
把事件event反序列化出来,构建一个watchedevent对象,然后把这个event扔进eventQueue里面,通过的是queueEvent函数:

public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();

            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }
这个函数会从之前的WatchManager中恢复出之前的回调注册。然后就会等待eventThread来处理。

EventThread也是一个线程,会周期性的处理队列里的事件。

public void run() {
           try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
调动事件的process函数即可。












Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐