zookeeper之原生API和zkClient的API
文章目录1 ZK的原生API1.1 java操作原生API1.1.1 引入依赖1.1.2 创建会话1.1.3 节点的CRUD1 ZK的原生API1.1 java操作原生API1.1.1 引入依赖使用java操作zookeeper,引入maven坐标<dependency><groupId>org.apache.zookeeper</groupId>...
文章目录
1 ZK的原生API
1.1 java操作原生API
1.1.1 引入依赖
使用java
操作zookeeper
,引入maven
坐标
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
1.1.2 创建会话
创建会话方法:客户端通过创建一个zookeeper
实例来链接zookeeper
服务器
Zookeeper
提供了4
个构造方法,根据所提供的参数不同
参数说明如下:
connectString:
连接服务器列表,如果有多个则用,
分隔sessionTimeout:
心跳检测时间周期(毫秒
)wather:
事件处理通知器canBeReadOnly:
标识当前会话是否支持只读sessionId和sessionPassword:
提供连接zookeeper
的session
和密码,通过这两个确定唯一一台客户端,目的可以提供重复会话
注意
:zookeeper
客户端和服务器端会话的建立是一个异步过程
,也就是说在程序中,我们程序方法在处理完客户端初始化后立即返回(也就是说程序往下执行代码,这样,大多数情况下我们并没有真正构建一个可用会话,在会话的生命周期处于CONNECTING
时才算真正建立完毕,所以我们需要使用多线程中学习的一个小工具类)
public class ZooKeeperDemo{
//zk 连接地址
private static final String CONNECT_ADDR="192.168.126.130:2181";
//session 超时时间
private static final Integer SESSION_OUTTIME=5000;
//阻塞程序执行,用于等待zookeeper连接成功,发送成功信号
private static CountDownLatch countDown = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
//zk 连接
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR,SESSION_OUTTIME,new Watcher(){
@Override
public void process(WatchedEvent event){
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
//如果建立了安全连接
if(Event.KeeperState.SyncConnected==state){
if(Event.EventType.None==type){
//如果连接建立成功,则发送信号
countDown.countDown();
System.out.println("zk 建立连接");
}
}
}
});
//进行阻塞
countDown.await();
System.out.println("zk 执行了");
//关闭会话
zk.close();
}
}
1.1.3 节点的CRUD
1.1.3.1 创建节点
创建节点方法:create
有两种创建节点的方法:同步
和异步
创建节点方式
同步方式:
- 参数1,节点路径(名称):
/nodeName
(不允许递归创建节点
,也就是说在父节点不存在的情况下,不允许创建子节点) - 参数2,节点内容: 要求类型是
字节数组
- 参数3,节点权限:使用
Ids.OPEN_ACL_UNSAFE
开发权限即可。(这个参数一般在权限没有太高要求的场景下,没必要关注) - 参数4,节点类型:创建节点的类型:
CreateMode.*
,提供以下几种节点类型:
PERSISENT
:持久节点
PERSISENT_SEQUENTIAL
:持久顺序节点
EPHEMERAL
:临时节点
EPHEMERAL_SEQUENTIAL
:临时顺序节点
CONTAINER
:容器节点,用于Leader
、Lock
等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点
PERSISTENT_WITH_TTL
:带TTL
(time-to-live
,存活时间)的永久节点,节点在TTL
时间之内没有得到更新并且没有孩子节点,就会被自动删除
PERSISTENT_SEQUENTIAL_WITH_TTL
:带TTL
(time-to-live
,存活时间)和单调递增序号的永久节点,节点在TTL
时间之内没有得到更新并且没有孩子节点,就会被自动删除
//创建父节点,此处的zk是上一步的会话产生的
String ret = zk.create("/testRoot","testRoot".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISENT);
System.out.println(ret);
注意
:临时节点效率很高,主要用来作为分布式锁
异步方式:(在同步方式参数基础上增加俩参数)
- 参数5,注册一个异步回调函数,要实现
AsynCallBack.VoidCallBack
接口,重写processResult(int rc,String path,Obejct ctx,String name)
方法,当节点创建完毕后执行此方法
rc
:为服务端响应码,0
表示调用成功,-4表示端口连接,-110
表示指定节点存在,-112
表示会话已经过期
path
:接口调用时传入API
的数据节点的路径参数
ctx
:为调用接口传入API
的ctx
的值
name
:实际在服务器端创建节点的名称 - 参数6,传递给回调函数的参数,一般为上下文(
context
)信息
//此处的 -1是跳过版本限制 dataVersion ,在删除时如果传入的版本号不是-1 与现有版本号一致,那么可以删除
zk.delete("/testRoot",-1,new AsynCallBack.VoidCallBack(){
@Override
public void processResult(int rc,String path,Object ctx){
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
}
},"a");
1.1.3.2 修改节点
//此处的-1 也是跳过版本号的检查
zk.setData("/testRoot","modify data root".getBytes(),-1);
1.1.3.3 判断是否存在
判断节点是否存在
// 参数一:路径,参数二:是否watch监听
zk.exists("/testRoot",false);
1.1.3.4 获取节点值
byte[] data = zk.getData("/testRoot", false, null);
System.out.println(new String(data));
System.out.println(zk.getChildren("/testRoot", false));
1.1.3.5 删除节点
//删除节点
zk.delete("/testRoot/children", -1);
System.out.println(zk.exists("/testRoot/children", false));
2 zkClient的API
2.1 引入pom依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
2.2 操作客户端
2.2.1 创建客户端
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);
}
2.2.2 增加和删除节点
可以递归
增加和删除节点
创建节点时,那个true
是否创建父节点
//1. create and delete方法
zkc.createEphemeral("/temp");
//第二个参数createParents==true,表示父节点不存在时创建父节点(递归创建)
zkc.createPersistent("/super/c1", true);
Thread.sleep(10000);
zkc.delete("/temp");
zkc.deleteRecursive("/super");
2.2.3 读取节点
//2. 设置path和data 并且读取子节点和每个节点的内容
zkc.createPersistent("/super", "1234");
zkc.createPersistent("/super/c1", "c1内容");
zkc.createPersistent("/super/c2", "c2内容");
List<String> list = zkc.getChildren("/super");
for(String p : list){
System.out.println(p);
String rp = "/super/" + p;
//此处的readData是原生API
String data = zkc.readData(rp);
System.out.println("节点为:" + rp + ",内容为: " + data);
}
2.2.4 判断是否存在
//3. 更新和判断节点是否存在
zkc.writeData("/super/c1", "新内容");
System.out.println(zkc.readData("/super/c1"));
System.out.println(zkc.exists("/super/c1"));
2.3 zkClinet不需要watcher
zkClient
不用watcher
和watcher
参数,也就是说我们开发人员无需关心反复注册Watcher
的问题,zkclient
给我们提供了一套监听方式,可以使用监听节点的方式进行操作,剔除了繁琐的反复watcher
操作,简化了代码的复杂程度
2.3.1 subscribeChildChanges方法
只监听父子节点的新增和删除,但是节点变化不监听
- 参数一:
path
路径 - 参数二:实现了
IZkChildListener
接口的类
public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);
//对父节点添加监听子节点变化。
zkc.subscribeChildChanges("/super", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath: " + parentPath);
System.out.println("currentChilds: " + currentChilds);
}
});
Thread.sleep(3000);
zkc.createPersistent("/super");
Thread.sleep(1000);
zkc.createPersistent("/super" + "/" + "c1", "c1内容");
Thread.sleep(1000);
zkc.createPersistent("/super" + "/" + "c2", "c2内容");
Thread.sleep(1000);
zkc.delete("/super/c2");
Thread.sleep(1000);
zkc.deleteRecursive("/super");
Thread.sleep(Integer.MAX_VALUE);
}
2.3.2 subscribeDataChanges方法
主要监听节点的删除和改变
public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);
zkc.createPersistent("/super", "1234");
//对父节点添加监听子节点变化。
zkc.subscribeDataChanges("/super", new IZkDataListener() {
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println("删除的节点为:" + path);
}
@Override
public void handleDataChange(String path, Object data) throws Exception {
System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
}
});
Thread.sleep(3000);
zkc.writeData("/super", "456", -1);
Thread.sleep(1000);
zkc.delete("/super");
Thread.sleep(Integer.MAX_VALUE);
}
更多推荐
所有评论(0)