基于Curator操作ZooKeeper(二)-Watcher操作-补充TreeCache
转自:https://blog.csdn.net/Leafage_M/article/details/78735485#treecacheJava原生API操作ZooKeeper可参看:Java原生API操作Zookeeper(一)Java原生API操作Zookeeper(二)相关内容:基于Curator操作ZooKeeper(一)-基本操作基于Curator操作ZooKe...
转自:https://blog.csdn.net/Leafage_M/article/details/78735485#treecache
Java原生API操作ZooKeeper可参看:
相关内容:
基于Curator操作ZooKeeper(二)-Watcher操作
基于Curator操作ZooKeeper(三)-Curator整合Spring
TreeCache
TreeCache有点像上面两种Cache的结合体,NodeCache能够监听自身节点的数据变化(或者是创建该节点),PathChildrenCache能够监听自身节点下的子节点的变化,而TreeCache既能够监听自身节点的变化、也能够监听子节点的变化。
TreeCache的话只有一种构造方法了:
TreeCache(CuratorFramework client, String path)//Create a TreeCache for the given client and path with default options.
与上面的PathChildrenCache不同的是,如果指定的节点路径不存在的话,不会自动创建。但是也能够监听到一个INITIALIZED类型的事件。
TreeCache可以添加一个节点变化的监听器,同样的也可以添加一个异常的监听器。
假如ZooKeeper服务器中还是有如下的节点内容:
使用如下代码进行测试,就是当节点的内容发生变化时,输出节点的变化类型和变化的节点路径。
public class TreeCacheTest {
private static final String zkAddress = "centos3";
private static final int sessionTimeout = 2000;
private static String parentPath = "/Curator-Recipes1";//父节点
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
public static void main(String[] args) throws InterruptedException {
client.start();
final TreeCache treeCache = new TreeCache(client, parentPath);
try {
treeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
//添加错误监听器
treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
public void unhandledError(String s, Throwable throwable) {
System.out.println(".错误原因:" + throwable.getMessage() + "\n==============\n");
}
});
//节点变化的监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("treeCache ------ Type:" + treeCacheEvent.getType() + ",");
System.out.println(treeCacheEvent.getData().getPath());
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
运行时输出的内容如下:
可以看到缓存父节点和子节点的时候触发了NODE_ADDED事件,同时也会触发一个INITIALIZED事件,并且会在异常监听器中发生回调事件,异常的原因就是我们回调函数因为INITIALIZED事件被触发了,但是此时执行第条输出语句的时候发现treeCacheEvent.getData()是null,而我们在null上调用了getPath方法,所以会触发异常监听。
注意:
- 无论如何都会收到一个INITIALIZED事件的。
- 无论是TreeCache、PathChildrenCache,所谓的监听都是本地视图和ZooKeeper服务器进行对比。所以如果ZooKeeper节点不为空的话,才会在缓存开始的时候监听到NODE_ADDED事件,这是因为刚开始本地缓存并没有内容,然后本地缓存和服务器缓存进行对比,发现ZooKeeper服务器有节点而本地缓存没有,这才将服务器的节点缓存到本地,所以才会触发NODE_ADDED事件。
示例
来一个综合性的示例,该示例中先使用init进行初始化操作,会创建一个父节点“/Curator-Recipes”以及父节点下的一个子节点“c1”。然后分别使用TreeCache、PathChildrenCache、NodeCache三种缓存方式进行缓存。最后再创建c2、c3两个节点并进行修改和删除观察监听器的输出内容。
package com.leafage.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
* @Author Leafage
* @Date 2017/12/6 14:03
**/
public class CuratorRecipes {
private static int num = 1;
private static final String zkAddress = "centos3";
private static final int sessionTimeout = 2000;
private static String parentPath = "/Curator-Recipes";//父节点
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
public static void main(String[] args) throws InterruptedException {
init();
treeCache();
pathChildrenCache();
nodeCache();
testData();
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 初始化操作,创建父节点
*/
public static void init() {
client.start();
try {
//可以用用来确保父节点存在,2.9之后弃用
// EnsurePath ensurePath = new EnsurePath(parentPath);
// ensurePath.ensure(client.getZookeeperClient());
if (client.checkExists().forPath(parentPath) == null) {
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(parentPath, "This is Parent Data!".getBytes());
}
client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c1","This is C1.".getBytes());//创建第一个节点
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 监听子节点变化
*/
public static void pathChildrenCache() {
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, parentPath, true);
try {
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//启动模式
} catch (Exception e) {
e.printStackTrace();
}
//添加监听
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println( num++ + ".pathChildrenCache------发生的节点变化类型为:" + pathChildrenCacheEvent.getType() + ",发生变化的节点内容为:" + new String(pathChildrenCacheEvent.getData().getData()) + "\n======================\n");
}
});
}
/**
* 监听节点数据变化
*/
public static void nodeCache() {
final NodeCache nodeCache = new NodeCache(client, parentPath, false);
try {
nodeCache.start(true);//true代表缓存当前节点
} catch (Exception e) {
e.printStackTrace();
}
if (nodeCache.getCurrentData() != null) {//只有start中的设置为true才能够直接得到
System.out.println( num++ + ".nodeCache-------CurrentNode Data is:" + new String(nodeCache.getCurrentData().getData()) + "\n===========================\n");//输出当前节点的内容
}
//添加节点数据监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println( num++ + ".nodeCache------节点数据发生了改变,发生的路径为:" + nodeCache.getCurrentData().getPath() + ",节点数据发生了改变 ,新的数据为:" + new String(nodeCache.getCurrentData().getData()) + "\n===========================\n");
}
});
}
/**
* 同时监听数据变化和子节点变化
*/
public static void treeCache() {
final TreeCache treeCache = new TreeCache(client, parentPath);
try {
treeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
//添加错误
treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
public void unhandledError(String s, Throwable throwable) {
System.out.println(num++ + ".错误原因:" + throwable.getMessage() + "\n==============\n");
}
});
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println( num++ + ".treeCache------当前发生的变化类型为:" + treeCacheEvent.getType() + ",发生变化的节点内容为:" + new String(treeCacheEvent.getData().getData()) + "\n=====================\n");
}
});
}
/**
* 创建节点、修改数据、删除节点等操作,用来给其他的监听器测试使用
*/
public static void testData() {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c2","This is C2.".getBytes());//创建第一个节点
client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c3","This is C3.".getBytes());//创建第一个节点
client.setData().forPath(parentPath + "/c2", "This is New C2.".getBytes());//修改节点数据
client.delete().forPath(parentPath + "/c3");//删除一个节点
client.delete().deletingChildrenIfNeeded().forPath(parentPath);//将父节点下所有内容删除
} catch (Exception e) {
e.printStackTrace();
}
}
}
输出内容(每次运行结果输出顺序可能不一致):
1.treeCache------当前发生的变化类型为:NODE_ADDED,发生变化的节点内容为:This is Parent Data!
=====================
2.treeCache------当前发生的变化类型为:NODE_ADDED,发生变化的节点内容为:This is C1.
=====================
4.错误原因:null
==============
5.nodeCache-------CurrentNode Data is:This is Parent Data!
===========================
6.pathChildrenCache------发生的节点变化类型为:CHILD_ADDED,发生变化的节点内容为:This is C1.
======================
8.nodeCache------节点数据发生了改变,发生的路径为:/Curator-Recipes,节点数据发生了改变 ,新的数据为:This is Parent Data!
===========================
9.treeCache------当前发生的变化类型为:NODE_ADDED,发生变化的节点内容为:This is C2.
=====================
10.treeCache------当前发生的变化类型为:NODE_ADDED,发生变化的节点内容为:This is C3.
=====================
11.pathChildrenCache------发生的节点变化类型为:CHILD_ADDED,发生变化的节点内容为:This is C3.
======================
12.pathChildrenCache------发生的节点变化类型为:CHILD_ADDED,发生变化的节点内容为:This is C2.
======================
13.treeCache------当前发生的变化类型为:NODE_UPDATED,发生变化的节点内容为:This is New C2.
=====================
14.pathChildrenCache------发生的节点变化类型为:CHILD_UPDATED,发生变化的节点内容为:This is New C2.
======================
15.pathChildrenCache------发生的节点变化类型为:CHILD_REMOVED,发生变化的节点内容为:This is C3.
======================
16.treeCache------当前发生的变化类型为:NODE_REMOVED,发生变化的节点内容为:This is C3.
=====================
17.treeCache------当前发生的变化类型为:NODE_REMOVED,发生变化的节点内容为:This is C1.
=====================
18.pathChildrenCache------发生的节点变化类型为:CHILD_REMOVED,发生变化的节点内容为:This is C1.
======================
19.pathChildrenCache------发生的节点变化类型为:CHILD_REMOVED,发生变化的节点内容为:This is New C2.
======================
20.treeCache------当前发生的变化类型为:NODE_REMOVED,发生变化的节点内容为:This is New C2.
=====================
21.treeCache------当前发生的变化类型为:NODE_REMOVED,发生变化的节点内容为:This is Parent Data!
=====================
从输出内容可以看到:
1、2 :代表是TreeCache监听到了父节点和c1节点的创建缓存事件。
3、4 :同时会发现并没有3这条语句,而是直接跳到了4,这是因为接收到的事件为:INITIALIZED,所以使用getData会得到null,而我们试图在null上调用getPath,所以才会触发异常。
5:NodeCache缓存进行start的时候传入true参数,所以能够直接得到当前节点的内容。
6: PathChildrenCache缓存成功c1的时候接收到的事件。
7:会发现没有7,因为PathChildrenCache的启动模式是:INITIALIZED,此时也是试图在null上调用GetPath,但是PathChildrenCache没有提供异常监听器,所以没办法获取。
8:第八点最让人疑惑了,因为上面的代码中并没有对父节点的数据进行改变,但是却监听到了这个事件,做了很多的测试发现,触发这个事件的原因为后面的testData方法中调用create导致的,并且只会监听到一次,这一点的具体原因还不太清楚。
9、10、11、12:创建c2、c3节点是TreeCache和PathChildrenCache监听到的事件。
13、14:修改c2节点数据,TreeCache和PathChildrenCache监听到的事件。
15、16、17、18、19、20:删除c2、c1、c3节点时,TreeCache和PathChildrenCache监听到的事件。
21:删除根节点时接收到的监听事件,此时只有TreeCache能够监听到。
更多推荐
所有评论(0)