zookeeper使用(二):javaAPI基本操作和循环监听器使用
本文介绍如何连接zookeeper集群、对zookeeper数据的crud、以及循环监听器的使用引入pom依赖CRUD测试测试Watch(循环监听器)zookeeper使用(一):简洁概述与shell客户端使用云服务器环境安装与配置:zookeeper集群引入pom依赖引入的zookeeper版本依赖应该与安装的zookeeper版...
·
本文介绍如何连接zookeeper集群、对zookeeper数据的crud、以及循环监听器的使用
引入pom依赖
引入的zookeeper版本依赖应该与安装的zookeeper版本一致。注,需要注释掉type标签
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
<!--<type>pom</type>-->
</dependency>
CRUD测试
针对crud命令做测试的几个方法
package com.mym.test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZookeeperTest {
ZooKeeper zkClient = null;
String createNodeName = "/apiTest10000000007"; //记录创建的节点名称
/**
* 建立连接
* @throws IOException
*/
@Before
public void connect() throws IOException {
if(zkClient == null){
zkClient = new ZooKeeper("192.168.31.201:2181,192.168.31.202:2181,192.168.31.203:2181", 1000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("success to connect zk cluster!");
}
});
}
}
/**
* 测试创建节点
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testCreate() throws KeeperException, InterruptedException {
createNodeName = zkClient.create("/apiTest1", "this is api test1 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建节点的返回值是:"+createNodeName);
}
/**
* ls命令
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testLs() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren("/", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("获得监听事件,path:" + watchedEvent.getPath() + ";state" + watchedEvent.getState() + ";type:" + watchedEvent.getType());
}
});
for (int i = 0; i < children.size(); i++) {
System.out.println("ls / 数据:"+children.get(i));
}
}
/**
* set命令
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testSet() throws KeeperException, InterruptedException {
Stat stat = zkClient.setData(createNodeName, ("this is update data! "+System.currentTimeMillis()).getBytes(), -1);//-1表示让系统维护version
System.out.println("set 返回数据::"+stat);
}
/**
* del命令
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testDel() throws KeeperException, InterruptedException {
zkClient.delete(createNodeName,-1);
System.out.println("success to del "+createNodeName+" Znode!");
}
/**
* get
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testGet() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData(createNodeName, false, null);
System.out.println("节点"+createNodeName+"的数据是:"+new String(data));
}
/**
* 测试watch
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testWatch() throws KeeperException, InterruptedException {
//先重置zkClient的watch对象(也可以在实例化zkClient时指定)
zkClient.register(watcher);
//进行监听
byte[] data = zkClient.getData(createNodeName, true, null);
System.out.println("获得节点"+createNodeName+"的数据是:"+new String(data));
//第一次进行修改,触发watch
this.testSet();
//第二次进行修改,触发watch
this.testSet();
}
/**
* 关闭连接
* @throws InterruptedException
*/
@After
public void close() throws InterruptedException {
zkClient.close();
}
/**定义watch对象*/
private Watcher watcher = new Watcher() {
public int watchCount = 0; //记录监听次数
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("获得监听事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType());
//循环重复监听
try {
zkClient.exists(watchedEvent.getPath(), true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
watchCount++;
System.out.println("第 "+watchCount+" 次监听到!");
}
};
}
测试结果如下
- 连接上集群时
success to connect zk cluster!
- ls
类命令输入:ls /
对应方法testLs()
ls / 数据:watchTest
ls / 数据:shunxu20000000001
ls / 数据:shunxu10000000000
ls / 数据:zookeeper
ls / 数据:a
- create
类命令输入:create /apiTest1 'this is api test1 data'
对应方法testCreate
创建节点的返回值是:/apiTest10000000007
- get
类命令输入:get /名称
对应方法testGet()
节点/apiTest10000000007的数据是:this is api test1 data
- set
类命令输入:set /名称 数据
对应方法testSet()
set 返回数据::4294967336,4294967345,1526852099431,1526852869041,1,0,0,0,20,0,4294967336
注,这些返回的数据就是节点的所有属性,可以进去看下Stat
类的定义:
public class Stat implements Record {
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private int dataLength;
private int numChildren;
private long pzxid;
....
- delete
类命令输入:delete /名称
对应方法testDel()
success to del /apiTest10000000007 Znode!
测试Watch(循环监听器)
逻辑就是对某个路径在本次监听完后主动用简单(exist()方法)的调用再次注册监听
具体方法:
(注,这只是一个测试类,故直接把Watch匿名类对象直接实例化,不太友好,实际使用时应通过实现Watch接口,创造一个Watch实现来,从而获得watcher对象来使用)
/**
* 测试watch
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testWatch() throws KeeperException, InterruptedException {
//先重置zkClient的watch对象(也可以在实例化zkClient时指定)
zkClient.register(watcher);
//进行监听
byte[] data = zkClient.getData(createNodeName, true, null);
System.out.println("获得节点"+createNodeName+"的数据是:"+new String(data));
//第一次进行修改,触发watch
this.testSet();
//第二次进行修改,触发watch
this.testSet();
}
/**定义watch对象*/
private Watcher watcher = new Watcher() {
public int watchCount = 0; //记录监听次数
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("获得监听事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType());
//循环重复监听
try {
zkClient.exists(watchedEvent.getPath(), true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
watchCount++;
System.out.println("第 "+watchCount+" 次监听到!");
}
};
测试结果:
获得监听事件,path:null;state:SyncConnected;type:None
获得节点/apiTest10000000007的数据是:this is update data! 1532258475593
获得监听事件,path:/apiTest10000000007;state:SyncConnected;type:NodeDataChanged
set 返回数据::4294967352,4294967398,1526853419126,1526855521206,21,0,0,0,34,0,4294967352
第 1 次监听到!
获得监听事件,path:/apiTest10000000007;state:SyncConnected;type:NodeDataChanged
set 返回数据::4294967352,4294967399,1526853419126,1526855521216,22,0,0,0,34,0,4294967352
第 2 次监听到!
注:第一个 获得监听事件 是客户端连接服务器的自动监听。
更多推荐
已为社区贡献1条内容
所有评论(0)