用java来实现对zookeeper节点添加、更新、删除的监听(非常的浅显易懂,代码可以跑通)
一、首先说明的一个非常重要的点:使用zookeeper第一件事就是要连接zookeeper,那么连接就是耗时间的。先做一个例子:比如你写了一段代码,这段代码主要干了是三件事:第一件事连接zookeeper、第二件事操作zookeeper(比如更新删除节点了)、第三件事你的业务代码并且这些业务代码和zookeeper一点关系都没有。那么问题来了 第一件事连接zookeeper这是非常消耗时间的,..
一、首先说明的一个非常重要的点:
使用zookeeper第一件事就是要连接zookeeper,那么连接就是耗时间的。先做一个例子:比如你写了一段代码,这段代码主要干了是三件事:第一件事连接zookeeper、第二件事操作zookeeper(比如更新删除节点了)、第三件事你的业务代码并且这些业务代码和zookeeper一点关系都没有。
那么问题来了 第一件事连接zookeeper这是非常消耗时间的,导致你的业务代码不能执行,这样用户体验很不好。
所以zookeeper 的开发人员就把zookeeper的连接放在了一个单独的线程,就是在你的主线程上又开启了一个线程用于zoookeeper的连接。
那么问题又来了,此时第一件事和第二件事是并列执行的,但是你发现第二件事是依赖第一件事成功才能执行的,也是就说只有zookeeper成功连接后才能操作zookeeper上的节点。那么怎么解决请看第二点。
二、CountDownLatch
这个类很是神奇,以前没听过,也没研究过,主要说一下这个类是干嘛的和怎么用?
这个类主要两个方法:1.countDownLatch.await() 让线程阻塞 2.countDownLatch.countDown() 唤醒阻塞的线程
具体怎么用看三代码实现
三、代码实现
public class Test002 implements Watcher {
// zk 连接地址
private static final String CONNECTSTRING = "******:2181";// 换成你实际的zookeeper地址
// 连接zk 的超时时间
private static final int SESSION_OUTTIME = 2000;
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zk = null;
// zk 创建连接
public void createContection(String connectString, int sessionTimeout) throws IOException, InterruptedException {
zk = new ZooKeeper(connectString, sessionTimeout, this);
System.out.println("#####zk开始启动连接#####");
}
// 创建持久化节点
public boolean createNode(String path, String data) throws InterruptedException, KeeperException {
countDownLatch.await();// 让当前方法处于等待状态
exists(path, true);
// 创建节点 参数 1 节点 2 节点数据 3 权限 4 节点类型
zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("#####新增节点信息path+" + path + "data" + data);
return true;
}
// 时间通知
public void process(WatchedEvent event) {
System.out.println("#####事件通知开始######");
// 1. 获取事件状态
KeeperState keeperState = event.getState();
// 获取节点的路径
String path = event.getPath();
// 获取节点类型
EventType eventType = event.getType();
System.out.println("节点的详细信息:节点的状态" + keeperState + "节点路径" + path + "节点类型" + eventType);
// 2. 判断连接状态
if (KeeperState.SyncConnected == keeperState) {
// 3. 判断获取事件类型
if (eventType == EventType.None) {// 连接状态
countDownLatch.countDown();// 连接成功 此时可以释放countDownLatch.await()
System.out.println("#####zk连接成功#####");
} else if (EventType.NodeCreated == eventType) {
System.out.println("获取新增node节点的事件通知" + "path:" + path);
} else if (EventType.NodeDataChanged == eventType) {
System.out.println("获取了节点修改的时间通知:修改的节点Node:" + path);
}
}
System.out.println("#####事件通知结束######");
}
// 设置节点是可以监听的状态
public Stat exists(String path, boolean isWatch) throws KeeperException, InterruptedException {
return zk.exists(path, isWatch);
}
// 更新节点数据
public void update(String path, String data) throws KeeperException, InterruptedException {
exists(path, true);
zk.setData(path, data.getBytes(), 0);
}
// 关闭节点
public void close() throws InterruptedException {
if (zk != null) {
zk.close();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
Test002 test002 = new Test002();
test002.createContection(CONNECTSTRING, SESSION_OUTTIME);
test002.update("/zzy_test001", "007");
test002.close();
}
}
更多推荐
所有评论(0)