ZKClient详解
前提:zkClient是对zookeeper原生API操作的一个封装,简化客户端对zk的操作。同时zkClient内部实现了诸如Session超时重连、Watcher反复注册(watcher只能生效一次,所以如果使用原生zookeeper api操作,需要反复注册watcher),说白了就是开发人员轻松了很多。一顿操作猛如虎,直奔主题。ZKClient maven坐标&l...
前提:
zkClient是对zookeeper原生API操作的一个封装,简化客户端对zk的操作。同时zkClient内部实现了诸如Session超时重连、Watcher反复注册(watcher只能生效一次,所以如果使用原生zookeeper api操作,需要反复注册watcher),说白了就是开发人员轻松了很多。
一顿操作猛如虎,直奔主题。
- ZKClient maven坐标
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
- 初始化ZKClient对象
// 建立zk链接,同步方法,原生的zookeeper客户端连接初始化时是一个异步操作(Zookeeper zk = new Zookeeper())。
参数说明:
192.168.1.28:2181:服务器地址
50000:连接超时时间,在此范围内如果还未成功链接,则抛异常
ZkClient zkClient = new ZkClient("192.168.1.28:2181",50000);
- ZKClient源码
只列出了相关的构造函数,其他的无关代码没列出
参数说明:
zkServers:zookeeper服务信息,ip:port
connectionTimeout:连接超时时间,连接时超过设置的时间会抛异常
sessionTimeout:会话的超时时间,一个会话断了可以在这个时间内进行重连
zkSerializer:对内容的序列化和反序列化实现。可自定义,如hession等;
zkConnection:zookeeper的连接对象,默认sessionTimeout时间为30000ms
/**
* Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper
*/
public class ZkClient implements Watcher {
private final static Logger LOG = Logger.getLogger(ZkClient.class);
public ZkClient(String serverstring) {
this(serverstring, Integer.MAX_VALUE);
}
public ZkClient(String zkServers, int connectionTimeout) {
this(new ZkConnection(zkServers), connectionTimeout);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
}
public ZkClient(IZkConnection connection) {
this(connection, Integer.MAX_VALUE);
}
public ZkClient(IZkConnection connection, int connectionTimeout) {
this(connection, connectionTimeout, new SerializableSerializer());
}
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) {
_connection = zkConnection;
_zkSerializer = zkSerializer;
connect(connectionTimeout, this);
}
}
- 创建节点
特别注意:
1. createParents可以递归创建节点(public void createPersistent(String path, boolean createParents))
2. 无需注册watcher(前面也说了,ZKClient帮我们做好了)
3. 节点内容可以传任意类型数据
4. 可以自定义内容的序列化和反序列化
5. 在没指定zkSerializer时,默认使用java自动的序列化和反序列化
package cn.lxm.attendance.controller;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkConnection;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.CreateMode;
import java.util.List;
/**
* @author lxm
* @version 1.1
* @date 2019/12/10 0010 16:01
*/
public class ZkClientTest {
private static final String PATH = "/lxm/test";
// 建立zk链接,同步方法。192.168.1.28:2181:服务器地址,50000:连接超时时间,在此范围内如果还未成功链接,则抛异常
static ZkClient zkClient = new ZkClient("192.168.1.28:2181",50000);
public static void main(String[] args) throws InterruptedException {
createNode();
List<String> children = zkClient.getChildren("/");
System.out.println(children);
}
// 创建node
public static void createNode(){
// 创建一个永久的node节点a
zkClient.createPersistent("/a");
// 创建一个永久性node节点b,这种创造方式和zookeeper的原生API很相识,不太一样的是隐藏了watcher注册和内容可以传object对象,zookeeper原生直接传数组
// 节点类型:CreateMode有四种取值,大家可以自行脑补
zkClient.create("/b","这里是内容", CreateMode.PERSISTENT);
// 创建一个永久性node节点c/cc,true代表可以递归创建(如果c不存在,则会先创建c再创建cc节点),默认false(原生zookeeper Api不能跨层级创建,这就是ZKClient优势)
zkClient.createPersistent("/c/cc",true);
// 创建永久性node节点d,里面内容为‘内容d’
zkClient.createPersistent("/d","内容d");
// 创建一个临时节点f
zkClient.createEphemeral("/f");
// 创建一个临时节点g,内容为 ‘内容g’
zkClient.createEphemeral("/g","内容g");
// 创建一个 递增序号 临时节点h
zkClient.createEphemeralSequential("/h","内容h");
// 创建一个 递增序号 的永久节点i
zkClient.createPersistentSequential("/i","内容i");
}
}
运行结果:
[lxm, a, b, h0000000036, c, i0000000037, zookeeper, test, d, f, g, dubbo, zk, newnode3, newnode2, mytest110000000016, mytest110000000018, mytest110000000017]
- 删除节点
注意点:
deleteRecursive可以递归删除节点下所有的子节点
// 删除node
public static void deleteNode(){
// 删除节点c,以及节点c下面所有的子节点
// 节点c下面,还有子节点cc,deleteRecursive方法可以递归的删除子节点,而zookeeper原生api不允许删除存在子节点的节点
zkClient.deleteRecursive("/c");
// 删除节点a
zkClient.delete("/a");
}
- ZKClient注册监听
好奇的你可能发现了,zkClient是怎么来监听子节点的状态变化的,比如新增子节点,删除子节点操作。你考虑到的,我们的zkClient开发人员也考虑到了,那么就是使用listen来监听节点的变化。
subscribeChildChanges:在节点上加上一个listen监听事件,用来监听子节点的变化
subscribeDataChanges:监听当前节点上数据的变化;handleDataChange数据变化, handleDataDeleted节点数据被删除
subscribeStateChanges: 监听zookeeper的状态
// 注册一个监听事件,subscribeChildChanges,通过使用listen方式来监听来达到消息广播效果,监听子节点变化
zkClient.subscribeChildChanges("/zz", new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
System.out.println(">>>进入了handleChildChange");
System.out.println(">>>s = " + s);
System.out.println(">>>list = " + list);
}
});
// 注册一个监听事件,subscribeDataChanges 监听节点上数据的变化
zkClient.subscribeDataChanges("/zz", new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(">>> 进入了handleDataChange");
System.out.println(">>> dataPath = " + dataPath);
System.out.println(">>> data = " + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(">>> 进入了handleDataDeleted");
System.out.println(">>> dataPath = " + dataPath);
}
});
// 监听zookeeper状态变化
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
System.out.println(">>> 进入了subscribeStateChanges");
System.out.println(">>> state = " + state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println(">>> 进入了handleNewSession");
}
});
- 获取子节点
返回PATH下的所有子节点,
zkClient.getChildren(PATH);
- 其他的操作
如
写数据:zkClient.writeData(PATH,"456");
检查节点是否存在:zkClient.exists(PATH);
- 完整代码
package cn.lxm.attendance.controller;
import org.I0Itec.zkclient.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import java.util.List;
/**
* @author lxm
* @version 1.1
* @date 2019/12/10 0010 16:01
*/
public class ZkClientTest {
private static final String PATH = "/lxm/test";
// 建立zk链接,同步方法。192.168.1.28:2181:服务器地址,50000:连接超时时间,在此范围内如果还未成功链接,则抛异常
static ZkClient zkClient = new ZkClient("192.168.1.28:2181",50000);
public static void main(String[] args) throws InterruptedException {
// 注册一个监听事件,subscribeChildChanges,通过使用listen方式来监听来达到消息广播效果,监听子节点变化
zkClient.subscribeChildChanges(PATH, new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
System.out.println(">>>进入了handleChildChange");
System.out.println(">>>s = " + s);
System.out.println(">>>list = " + list);
}
});
// 注册一个监听事件,subscribeDataChanges 监听节点上数据的变化
zkClient.subscribeDataChanges(PATH, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(">>> 进入了handleDataChange");
System.out.println(">>> dataPath = " + dataPath);
System.out.println(">>> data = " + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(">>> 进入了handleDataDeleted");
System.out.println(">>> dataPath = " + dataPath);
}
});
// 监听zookeeper状态变化
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
System.out.println(">>> 进入了subscribeStateChanges");
System.out.println(">>> state = " + state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println(">>> 进入了handleNewSession");
}
});
// 获取子节点
List<String> children = zkClient.getChildren(PATH);
// 创建节点
createNode();
// 删除节点
deleteNode();
zkClient.createEphemeral(PATH,"123");
Thread.sleep(1000);
// 往节点上写数据
zkClient.writeData(PATH,"456");
// 检查节点是否存在
zkClient.exists(PATH);
Thread.sleep(Integer.MAX_VALUE);
}
// 创建node
public static void createNode(){
// 创建一个永久的node节点a
zkClient.createPersistent("/a");
// 创建一个永久性node节点b,这种创造方式和zookeeper的原生API很相识,不太一样的是隐藏了watcher注册和内容可以传object对象,zookeeper原生直接传数组
// 节点类型:CreateMode有四种取值,大家可以自行脑补
zkClient.create("/b","这里是内容", CreateMode.PERSISTENT);
// 创建一个永久性node节点c/cc,true代表可以递归创建(如果c不存在,则会先创建c再创建cc节点),默认false(原生zookeeper Api不能跨层级创建,这就是ZKClient优势)
zkClient.createPersistent("/c/cc",true);
// 创建永久性node节点d,里面内容为‘内容d’
zkClient.createPersistent("/d","内容d");
// 创建一个临时节点f
zkClient.createEphemeral("/f");
// 创建一个临时节点g,内容为 ‘内容g’
zkClient.createEphemeral("/g","内容g");
// 创建一个 递增序号 临时节点h
zkClient.createEphemeralSequential("/h","内容h");
// 创建一个 递增序号 的永久节点i
zkClient.createPersistentSequential("/i","内容i");
}
// 删除node
public static void deleteNode(){
// 删除节点c,以及节点c下面所有的子节点
// 节点c下面,还有子节点cc,deleteRecursive方法可以递归的删除子节点,而zookeeper原生api不允许删除存在子节点的节点
zkClient.deleteRecursive("/c");
// 删除节点a
zkClient.delete("/a");
}
}
更多推荐
所有评论(0)