【zookeeper】事件 watch 机制 原理
zk作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅发布功能,其实说白了就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。zk的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级
zk作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅发布功能,其实说白了就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。
zk的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的“推”部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是“拉”部分。
订阅-发布在zk中是通过事件注册和回调机制实现的,下面看下这部分内容。
整个注册回调过程分为三个大的部分:客户端注册,服务端发回事件,客户端回调
1.客户端注册:
回调接口:
public interface Watcher {
abstract public void process(WatchedEvent event);
}
所有的事件回调接口都需要实现这个接口,并在process内部实现回调逻辑。event封装了事件的信息。event有两个层级,第一个是state,第二个是evetType。不同的state有不同的type。
下面是对应关系:
zk的事件注册接口:
zk的事件注册接口主要有有以下的四类:
1.默认watch,也就是在new一个ZooKeeper实例代表了一个zk客户端去连接服务器的时候,在构造方法里面传入的一个默认watch的回调接口,这个主要解决连接事件。在event中对应了syncConnected的state和none的type。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
2.通过getData,getChildren和exist三个接口。每一种又有同步和异步两种版本。下面只看getData版本的:
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher,
DataCallback cb, Object ctx)
第一个有返回值的是同步的,第二个无返回值有回调cb的是异步的。当然,每一个又有几个重载版本,这里只贴了其中的一种。
所以注册的接口基本上是我们先实现一个watch接口,作为回调处理逻辑,然后调用以上的接口来注册感兴趣的事件。那么这个注册过程是怎样的?
我们重点以getData同步版本来说明,异步的其实在注册这一块是一样的,都是通过构造packet来完成。
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
。。。
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
在getData内部,首先构建了一个watchRegistration实例,这个类后面说,总之它封装了了回调接口和关注节点。然后把这个注册对象和packetheader一起传入了submit方法。再看submit方法:
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
里面构造了一个packet,再看是如何构造的:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
synchronized (outgoingQueue) {
if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
h.setXid(getXid());
}
packet = new Packet(h, r, request, response, null,
watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!zooKeeper.state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.wakeup();
return packet;
}
主要就是设置了packet的属性,然后把这个请求packet送入了发送队列。要知道我们注册回调的接口本来是用来获取数据的,所以回调依附在了获取这个过程中,这里的packet构造主要是为了获取一次数据,构建的一个请求包,我们的事件回调依附了这个过程,然后作为了这个请求packet的属性保存了起来。因为我们的是同步版本,所以packet的异步接口cb在上一步设置为了null。这里和回调相关的就是设置了packet的watchRegistration属性,也就是我们传入的回调接口,这是通过packet的构造方法完成的。所以有必要看下一个请求packet的内部:
static class Packet {
RequestHeader header;
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
ReplyHeader replyHeader;
Record request;
Record response;
boolean finished;
AsyncCallback cb;
Object ctx;
WatchRegistration watchRegistration;
这是packet的属性,这里的wathRegistration就是回调接口,cb是getData的异步版本的回调,在得到数据以后的回调函数,也就是上面我们谈到的设为null的属性,因为我们看的是getData的同步版本,所以为null。需要明确两个回调的区别。
到这里,我们的事件回调函数已经和这次getData请求的packet关联起来的。
那么,最后这个packet就会进入到outgoingQueue中被发送。
也就是在SendThread的一次write过程中。
然后getData请求的数据就会被服务器返回,在SendThread的一次read过程中,具体在readResponse函数中的最后部分,也就是finishPacket函数中,完成最后的注册:
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
可以看到这里调用了一个register的方法。
下面需要了解下zk客户端与注册有关的数据结构:
在ZooKeeper类中,有一个内部类ZKWatchManager,是客户端存储所有的事件注册的类,里面有以下几个重要的属性,存储回调:
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
从名字上就可以看出各个属性的作用,正好对应了我们开始所说的4种回调。
map中的key就是节点的path,set就是该节点上所有的回调。因为默认的回调处理只有一个,所以就不是map,其余的事件,每一个节点都可能会有多个,所以是一个set。
再看一直出现的WatchRegistration结构:
abstract class WatchRegistration {
private Watcher watcher;
private String clientPath;
public WatchRegistration(Watcher watcher, String clientPath)
{
this.watcher = watcher;
this.clientPath = clientPath;
}
是一个抽象类,其实就是封装了一个事件注册,包括了感兴趣的节点和回调函数。data,children和exist三种事件都有一个对应的实现类。这个抽象类有一个非抽象方法register,负责将packet里面的watchRegistration放到之前的watchmanager中:
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
首先根据事件类型拿到正确的map,然后把watch回调放入map里面。
至此客户端注册一个事件回调的逻辑就清晰了,总结就是,通过注册函数来设置回调接口为packet的属性。然后在注册函数收到其自身希望得到的数据的时候,来把回调函数注册到manager上。
服务端处理:
主要分为了两部分,服务端添加事件,服务端触发事件以后的处理。
先看服务端添加事件:
还是以刚才的getData为例,服务端的process收到了getData请求,就会返回数据,这个procesor是FinalRequestProcessor,其中处理getData请求的部分代码:
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
重点是getData函数的调用,检测客户端是否注册了watch,如果注册了,那么就传cnxn,否则就传null。这里的cnxn其实是服务端处理io的线程类,后面说。getData最终会到dataTree的getData函数:
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
会在datawatches里面添加watch,因为我们是data类型的watch。
在Datatree类有两个和watch相关的属性:
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
分别存储了数据的子节点的watch。再看WatchManager结构:
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
主要有两个map,存储了节点到watch 和 watch到节点的双向映射,这也是服务端存储事件的结构。这样服务端就在相应的节点上添加了一个watch。
再看服务端触发watch事件逻辑,比如通过setData改变数据:
在datatree的
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
函数的最后有一段:
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
会触发事件:
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
最终会调用process函数,这里process函数是watch接口的实现,但是这个只有客户端才有啊。实际上,服务端这里的实现类就是服务端的线程类:NIOServerCnxn。
public class NIOServerCnxn implements Watcher, ServerCnxn
再看它的process方法:
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
可以看到,只是发了一个事件类型的信息,header为-1。
客户端执行回调:
从上面可以看到,服务端触发了时间以后会发送一个-1为header的相应。
那么客户端就会在io线程的read部分读到这个信息,最后会到readResponse函数里处理:
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else
event.setPath(serverPath.substring(chrootPath.length()));
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );
return;
}
把事件event反序列化出来,构建一个watchedevent对象,然后把这个event扔进eventQueue里面,通过的是queueEvent函数:
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
这个函数会从之前的WatchManager中恢复出之前的回调注册。然后就会等待eventThread来处理。
EventThread也是一个线程,会周期性的处理队列里的事件。
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
调动事件的process函数即可。
更多推荐
所有评论(0)