使用java操作zookeeper(五)
原文地址,转载请注明出处: https://blog.csdn.net/qq_34021712/article/details/82872186     ©王赛超 之前使用的客户端是3.4.6,后来换成了3.5.3-beta 版本,前面的知识中
原文地址,转载请注明出处: https://blog.csdn.net/qq_34021712/article/details/82872186 ©王赛超
参考官网
http://zookeeper.apache.org/doc/r3.5.3-beta/api/org/apache/zookeeper/ZooKeeper.html
java操作zookeeper
pom添加依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.3-beta</version>
</dependency>
主类:org.apache.zookeeper.ZooKeeper
这是ZooKeeper客户端库的主要类。要使用ZooKeeper服务,应用程序必须首先实例化ZooKeeper类的对象。所有操作zookeeper的操作都将通过调用ZooKeeper类的方法来完成。除非另有说明,否则此类的方法是线程安全的。
客户端与服务器建立连接后,会为客户端分配一个会话ID。客户端将定期向服务器发送心跳以保持会话有效。只要客户端的会话ID保持有效,应用程序就可以通过客户端调用ZooKeeper API。
如果由于某种原因,客户端长时间无法向服务器发送心跳(例如,超过sessionTimeout值),则服务器将使会话到期,并且会话ID将失效。客户端对象将不再可用。要进行ZooKeeper API调用,应用程序必须创建一个新的客户端对象。
如果客户端当前连接的ZooKeeper服务器出现故障或者没有响应,则客户端将在其sessionID到期之前自动尝试连接到另一台zookeeper服务器。如果成功,应用程序可以继续使用客户端。
ZooKeeper API方法有同步或异步两种。同步方法会阻塞,直到服务器响应。异步方法只是将发送请求排队并立即返回。它们采用一个回调对象,该回调对象将在成功执行请求时执行,或者在错误时执行,并返回指示错误的返回代码。
一些成功的ZooKeeper API调用可以将监视(Watcher)留在ZooKeeper服务器中的“数据节点”上。其他成功的ZooKeeper API调用可以触发这些Watcher。一旦Watcher被触发,事件将被传递给客户,假如Watcher触发之后,立刻get节点信息,客户端在得到 Watch 消息之前肯定不可能看到更新后的数据。换句话说,更新通知先于更新结果。Watch 只会被触发一次。如果客户端想得到后续更新的通知,必须要在 Watch 被触发后重新注册一个 Watch。
当客户端丢失当前连接后重新连接服务器,所有被认为是触发的监视器,但是没有送达的事件将丢失。为了模式这个场景,客户端将产生一个特殊的事件,去告诉事件处理器连接被删除。这个特殊的事件的类型是EventNone 状态是KeeperStateDiscounnected。
ZooKeeper类有如下几个构造器:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig clientConfig);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig clientConfig);
注意:每个构造器创建连接都是异步的,构造方法启动与服务器的连接,然后立马返回,此时会话处于CONNECTING状态,通过watcher通知。此通知可以在构造方法调用返回之前或之后的任何时候到来。会话创建成功之后,状态会改为CONNECTED。
构造器将抛出两个异常:
java.io.IOException - 在网络故障的情况下
java.lang.IllegalArgumentException - ZooKeeper无效服务列表,或者 指定了无效的chroot路径(下面介绍connectString参数时会介绍chroot)
参数介绍:
参数名 | 描述 |
---|---|
connectString | 要创建ZooKeeper客户端对象,应用程序需要传递一个连接字符串,其中包含逗号分隔的host:port列表,每个对应一个ZooKeeper服务器。例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183实例化的ZooKeeper客户端对象将从connectString中选择一个任意服务器并尝试连接到它。如果建立连接失败,将尝试连接字符串中的另一个服务器(顺序是非确定性的,因为是随机),直到建立连接。客户端将继续尝试,直到会话显式关闭。在3.2.0版本之后,也可以在connectString后面添加后缀字符串,如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/app/a,客户端连接上ZooKeeper服务器之后,所有对ZooKeeper的操作,都会基于这个根目录。例如,客户端对/foo/bar的操作,都会指向节点/app/a/foo/bar——这个目录也叫Chroot,即客户端隔离命名空间。 |
sessionTimeout | 会话超时(以毫秒为单位)客户端和服务端连接创建成功之后,ZooKeeper中会建立一个会话,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。 |
watcher | 创建ZooKeeper客户端对象时,ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watcher事件通知处理器。当然,该参数可以设置为null以表明不需要设置默认的Watcher处理器。如果设置为null,日志中会有空指针异常,但是并不影响使用。 |
canBeReadOnly | 3.4之后添加的boolean类型的参数,用于标识当前会话是否支持“read-only”模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请求)。但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)——这就是ZooKeeper的“read-only”模式。 |
sessionId 和 sessionPasswd | 会话id和 会话密码,这两个参数能够唯一确定一个会话,同时客户端使用这两个参数实现客户端会话复用,从而达到恢复会话的效果,使用方法:第一次连接上ZooKeeper服务器后,客户端使用getSessionId()和getSessionPasswd()获取这两个值,如果需要会话复用,在重新创建ZooKeeper客户端对象的时候可以传过去,如果不需要会话复用,请使用不需要这些参数的其他构造函数。 |
HostProvider | 客户端地址列表管理器 |
关于列表管理器,具体参考博客:
从Paxos到Zookeeper 分布式一致性原理与实践7.3.2章_服务器地址列表
https://blog.csdn.net/en_joker/article/details/79310801
ZKClientConfig:3.5.2版本之后添加的参数,传递此conf对象为每个客户端提供了与其他实例相比不同的配置属性,更加的灵活性。
常用javaAPI调用
package com.zookeeper.test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
/**
* @author: WangSaiChao
* @date: 2018/9/10
* @description:
*/
public class TestZookeeper implements Watcher{
public static void main(String[] args) throws Exception {
//===============================创建会话======================================
/**
* 使用第一个构造器创建会话,刚创建完立刻打印会话状态为 CONNECTING
* 线程阻塞5秒,这5秒期间收到了服务端的Watcher通知 SyncConnected
* 之后会话状态为 CONNECTED
*/
ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181,192.168.1.198:2181,172.12.96.123:2181", 5000, new TestZookeeper());
System.out.println(zookeeper1.getState());
Thread.sleep(2000);
System.out.println(zookeeper1.getState());
/**
* 使用第三个构造器,sessionId 和 sessionPasswd用的是上一个连接
*/
long sessionId = zookeeper1.getSessionId();
byte[] sessionPasswd = zookeeper1.getSessionPasswd();
ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, new TestZookeeper(),sessionId,sessionPasswd);
System.out.println(zookeeper2.getState());
Thread.sleep(2000);
System.out.println(zookeeper2.getState());
//===============================创建节点======================================
/**
* CreateMode
* PERSISTENT : 持久节点
* PERSISTENT_SEQUENTIAL : 持久顺序节点
* EPHEMERAL : 临时节点
* EPHEMERAL_SEQUENTIAL : 临时顺序节点
*
* 无论是同步还是异步接口,ZooKeeper都不支持递归创建,即无法在父节点不存在的情况下创建一个子节点。
* 另外,如果一个节点已经存在了,那么创建同名节点的时候,会抛出NodeExistException异常。如果是顺序节点,那么永远不会抛出NodeExistException异常
* 临时节点不能有子节点,创建节点时,如果给定的父节点是临时节点,则会抛出NoChildrenForEphemeralsException
* 创建节点同时,也可以在节点上设置Watcher 当删除节点或者setData时,将会触发Watcher
*/
/**
* 同步创建一个持久节点,ACL为 world:anyone:cdrwa 等同于如下命令:
* create /node 123 world:anyone:cdrwa
*/
zookeeper1.create("/node",
"123".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
/**
* 同步创建一个持久节点,ACL为 world:anyone:cdrwa 所有人只拥有创建的权限,等同于如下命令:
* create /node1 123 world:anyone:c
*/
zookeeper1.create("/node1",
"123".getBytes(),
Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE)),
CreateMode.PERSISTENT);
/**
* 异步创建一个 临时的顺序节点,ACL为 ip:127.0.0.1:c 等同于如下命令:
* create /node2 123 ip:127.0.0.1:c
*/
zookeeper1.create("/node2",
"123".getBytes(),
Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, new Id("ip", "127.0.0.1"))),
CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
System.out.println("name:" + name);
}
}, "传给服务端的内容,会在异步回调时传回来");
/**
* 注意这里,线程睡眠20秒,因为是创建的临时节点,如果不睡眠,你不能使用命令在控制台看见创建的临时节点
*/
Thread.sleep(20000);
/**
* 异步创建一个持久节点, ACL为 digest:wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw=:cdrwa,等同于如下命令:
* create /node3 123 digest:wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw=:cdrwa
*/
zookeeper1.create("/node3",
"123".getBytes(),
Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw="))),
CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
System.out.println("name:" + name);
}
}, "传给服务端的内容,会在异步回调时传回来");
/**
* 注意这里,线程睡眠20秒,可以接收到watcher
*/
Thread.sleep(20000);
/**
* 创建一个持久顺序定时节点,如果在10000毫秒内 未修改node,并且没有子节点,那么它将被删掉
*/
zookeeper1.create("/node4",
"123".getBytes(),
Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw="))),
CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL,
new AsyncCallback.Create2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
System.out.println("name:" + name);
System.out.println("stat:" + stat);
}
}, "传给服务端的内容,会在异步回调时传回来", 10000);
Thread.sleep(20000);
//===============================获取节点数据======================================
/**
* 每一步操作/node3都要先执行该语句
* 因为上一步创建的node3 添加了 digest ACL 所以在获取该节点信息的时候,要先添加授权
*/
zookeeper1.addAuthInfo("digest","wangsaichao:123456".getBytes());
/**
* 同步调用
* path 节点路径
* Watcher 监视
* Stat 节点统计信息
* 添加授权之后同步获取节点信息,返回给定路径的节点的数据和统计信息
* 如果调用成功,并且watcher参数不为空,则会在具有给定路径的节点上保留监视,当删除节点 或者 setData时候将会触发监视
*
*/
byte[] data = zookeeper1.getData("/node3", new TestZookeeper(), new Stat());
System.out.println(new String(data));
/**
* 注意这里,线程睡眠2秒,因为是创建的临时节点,如果不睡眠,你不能使用命令在控制台看见创建的临时节点
*/
Thread.sleep(2000);
/**
* 异步调用
* path 节点路径
* watch true使用创建zookeeper时指定的默认watcher 如果为false则不设置监听
* DataCallback 异步通知
* ctx 回调上下文
*/
zookeeper1.getData("/node3", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
System.out.println("data:" + new String(data));
System.out.println("stat:" + stat);
}
},"传给服务端的内容,会在异步回调时传回来");
Thread.sleep(2000);
//===============================修改节点数据======================================
/**
* setData节点有version这一参数,给定版本与节点的版本匹配,则设置给定路径的节点的数据(如果给定版本是-1,则它匹配任何节点的版本)。返回节点的统计信息。
* 如果不存在具有给定路径的节点,则将抛出NoNodeException
* 如果给定版本与节点的版本不匹配,将抛出BadVersionException
* 设置的数据最大允许大小为1 MB
*/
/**
* 每一步操作/node3都要先执行该语句
* 因为上一步创建的node3 添加了 digest ACL 所以在获取该节点信息的时候,要先添加授权
*/
zookeeper1.addAuthInfo("digest","wangsaichao:123456".getBytes());
/**
* 同步设置数据 -1匹配任何版本
*/
Stat stat = zookeeper1.setData("/node3", "嗨喽".getBytes(), -1);
System.out.println(stat);
/**
* 异步设置数据
*/
zookeeper1.setData("/node3", "helloword".getBytes(), -1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
System.out.println("stat:" + stat);
}
},"传给服务端的内容,会在异步回调时传回来");
Thread.sleep(2000);
//===============================删除节点======================================
/**
* 删除给定路径的节点。如果存在这样的节点,则调用将成功,并且给定版本与节点的版本匹配(如果给定版本为-1,则它匹配任何节点的版本)。
* 如果节点不存在,将抛出NoNodeException。
* 如果给定版本与节点的版本不匹配,将抛出BadVersionException。
* 如果节点有子节点,将抛出NotEmptyException。
* 如果成功将触发现有API调用留下的给定路径节点上的所有监视,以及getChildren API调用留下的父节点上的监视。
*/
/**
* 同步删除节点
* path 节点路径
* version 版本号 -1 代表匹配所有版本
*/
zookeeper1.delete("/node1",-1);
/**
* 异步删除节点
*/
zookeeper1.delete("/node2", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
}
},"传给服务端的内容,会在异步回调时传回来");
Thread.sleep(2000);
//===============================判断节点是否存在======================================
/**
* 返回给定路径的节点的stat。如果不存在这样的节点,则返回null。
* 如果wathher非空并且调用成功(不会抛出异常),则会在具有给定路径的节点上保留监视。wather将由创建/删除节点或在节点上设置数据的成功时触发。
*/
/**
* 同步检查节点是否存在,并留下监听
*/
Stat exists = zookeeper1.exists("/node2", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("留下监视");
}
});
System.out.println("判断节点是否存在:"+exists);
/**
* 异步检查节点是否存在,并留下监听
*/
zookeeper1.exists("/node2", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("留下监视");
}
}, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
System.out.println("判断节点是否存在:" + stat);
}
},"传给服务端的内容,会在异步回调时传回来");
//===============================ACL操作======================================
/**
* ACL操作只给 同步 ,异步 自己看文档
*/
/**
* 每一步操作/node3都要先执行该语句
* 因为上一步创建的node3 添加了 digest ACL 所以在获取该节点信息的时候,要先添加授权
*/
/**
* 设置ACL
*/
//先注册一个 helloworld 密码为123456的用户
zookeeper1.addAuthInfo("digest","helloworld:123456".getBytes());
//因为是 /node3 节点 所以还需要 先添加 /node3的授权
zookeeper1.addAuthInfo("digest","wangsaichao:123456".getBytes());
Stat auth = zookeeper1.setACL("/node3", Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("auth", "helloworld:123456"))), -1);
System.out.println(auth);
/**
* 获取ACL
*/
List<ACL> acl = zookeeper1.getACL("/node3", new Stat());
System.out.println(acl);
}
@Override
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if(event.getState() == KeeperState.SyncConnected){
System.out.println("zookeeper state is " + KeeperState.SyncConnected);
}
}
}
更多推荐
所有评论(0)