zookeeper作为一个分布式服务框架,主要用来解决分布式数据一致性问题,对多种语言提供了API。这里主要记录下JAVA客户端API的使用。


1.创建会话

客户端可以通过创建一个ZooKeeper实例来连接zookeeper服务器

ZooKeeper的4个构造函数如下:

ZooKeeper(connectString, sessionTimeout, watcher);
ZooKeeper(connectString, sessionTimeout, watcher,canBeReadOnly);
ZooKeeper(connectString, sessionTimeout, watcher, sessionId, sessionPasswd);
ZooKeeper(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, canBeReadOnly);

 

参数说明:

connectString: 连接字符串 例如 "127.0.0.1:2181"

sessionTimeout: 会话超时时间 以毫秒为单位的整型值 在sessionTimeout时间内服务端与客户端没有有效的心跳检测 则会话失效

watcher: 默认的事件通知处理器

sessionId: 会话ID

sessionPasswd: 会话秘钥

canBeReadOnly: 是否是只读  

      

public class ZookeeperSampleTest implements Watcher {

	private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	public static void main(String args[]) throws IOException, InterruptedException{
		ZooKeeper zk = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleTest());
		System.out.println(zk.getState());
		try {
			connectedSemaphore.await();
		} catch (Exception e) {
			// TODO: handle exception
		}
		Long sessionId = zk.getSessionId();
		byte[] password = zk.getSessionPasswd();
		zk = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleTest(),1L,"test".getBytes());
		zk = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleTest(),sessionId,password);
		Thread.sleep(Integer.MAX_VALUE);
		//System.out.println("ZooKeeper session established");
	}

	@Override
	public void process(WatchedEvent event) {
		// TODO Auto-generated method stub
		System.out.println("Receive watched event :" + event);
		if(event.getState()==KeeperState.SyncConnected){
			connectedSemaphore.countDown();
		}
	}

}

2.创建节点

创建节点的方式分为同步和异步,构造函数如下:

                zk.create(path, data, acl, createMode);
		zk.create(path, data, acl, createMode, cb, ctx);
参数说明:

path: 创建节点的路径 如:/zk-test

data[]: 字节数组,节点数据

acl: 权限  

createMode: 节点类型  分为4种: 持久性节点,持久性有序节点,临时节点,临时有序节点

cb: 回调函数 需要实现StringCallback接口

ctx: 上下文 一般用于回调函数时候使用

public class ZookeeperSampleCreateTest implements Watcher {

	public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	public static void main(String[] args) throws IOException, InterruptedException, KeeperException{
		ZooKeeper zk = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest());
		connectedSemaphore.await();
		/*同步的方式*/
		/*String path1 = zk.create("/zk-test-ephemeral-", "test-ephemeral".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		System.out.println("Success create node :" + path1);
		String path2 = zk.create("/zk-test-ephemeral-", "test-ephemeral-sequential".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println("Success create node :" + path2);*/
		
		/*异步的方式*/
		zk.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(),"i am context");
		zk.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(),"i am context");
		zk.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,new IStringCallback(),"i am context");
		Thread.sleep(Integer.MAX_VALUE);
	}
	
	@Override
	public void process(WatchedEvent event) {
		// TODO Auto-generated method stub
		if(KeeperState.SyncConnected == event.getState()){
			connectedSemaphore.countDown();
		}
	}
}

class IStringCallback implements AsyncCallback.StringCallback{

	@Override
	public void processResult(int rc, String path, Object ctx, String name) {
		// TODO Auto-generated method stub
		System.out.println("Create path result: [" + rc + "," + path + "," + ctx + ", real path name " + name);
	}
	
}

3.删除节点

zk.delete(path,version);
zk.delete(path,version,cb,ctx);
参数说明:

path:节点路径

version:版本号

cb: 回调函数

ctx: 上下文


4.获取子节点

getChildren有8个重载方法

                zk.getChildren(path, watch);
		zk.getChildren(path, watcher);
		zk.getChildren(path, watch, stat);
		zk.getChildren(path, watcher, stat);
		zk.getChildren(path, watch, cb, ctx);
		zk.getChildren(path, watcher, cb, ctx);
其中 回调函数有两种  ChildrenCallback  Children2Callback

参数说明:

path:节点路径

watch: boolean类型 如果为true 表示使用默认的Watcher 为false表示不需要Watcher

watcher: 通知处理器 在本次获取子节点以后  一旦子节点有变化机会收到服务端传来的通知

stat: 指定数据节点的节点状态信息,传入一个旧的stat对象,当执行方法后 stat会被服务器响应的新stat替换

cb:回调函数 有两种类型 上面已经说过

ctx: 上下文

 5.获取节点数据

获取节点数据有4个重载方法

                zk.getData(path, watch, stat);
		zk.getData(path, watcher, stat);
		zk.getData(path, watch, cb, ctx);
		zk.getData(path, watcher, cb, ctx);
参数说明:

path: 节点路径

watch: boolean类型 如果为true 表示使用默认的Watcher 为false表示不需要Watcher

stat: 指定数据节点的节点状态信息,传入一个旧的stat对象,当执行方法后 stat会被服务器响应的新stat替换

cb:回调函数 有两种类型 上面已经说过

ctx: 上下文

在获取节点数据时候 如果注册watcher 在节点数据发送变化的时候会通知客户端,当客户端收到通知以后,如果想下次数据发送变化再次收到通知,

需要重新注册watcher,获取子节点机制也如此

6.更新节点数据

更新节点数据也分为同步异步两个方法

zk.setData(path, data, version);
zk.setData(path, data, version, cb, ctx);
参数说明:同上

public class ZookeeperSampleGetDataTest implements Watcher {

	public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	public static Stat stat = new Stat();
	
	public static ZooKeeper zk = null;
	
	public static void main(String[] args) throws IOException, KeeperException, InterruptedException{
		String path = "/zk-test";
		zk = new ZooKeeper("192.168.1.138:2181", 5000, new ZookeeperSampleGetDataTest());
		connectedSemaphore.await();
		
		//获取子节点
		/*zk.delete(path, 0);
		zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		zk.create(path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		//同步方法获取children
		//List<String> childs = zk.getChildren(path, true);
		//System.out.println(childs);
		//异步方法获取children
		zk.getChildren(path, true, new IChildren2Callback(), null);
		
		zk.create(path+"/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		Thread.sleep(Integer.MAX_VALUE);*/
		//获取节点数据
		zk.delete(path, 0);
		zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		
		System.out.println(new String(zk.getData(path, true, stat)));
		/** czxid: 创建该节点的事务ID  mzxid: 更新该节点的事务ID  version: 数据版本*/
		System.out.println("Czxid: " + stat.getCzxid() + "Mzxid: " + stat.getMzxid() + "Version: " + stat.getVersion());
		
		zk.setData(path, "123".getBytes(), -1);
		
		Thread.sleep(Integer.MAX_VALUE);
	}
	
	
	@Override
	public void process(WatchedEvent event) {
		// TODO Auto-generated method stub
		if(KeeperState.SyncConnected==event.getState()){
			if(EventType.None==event.getType() && null==event.getPath()){
				connectedSemaphore.countDown();
			}else if(event.getType()==EventType.NodeChildrenChanged){
				try {
					System.out.println("Get Child :" + zk.getChildren(event.getPath(), true));
				} catch (KeeperException | InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}else if(event.getType()==EventType.NodeDataChanged){
				try {
					System.out.println(new String(zk.getData(event.getPath(), true, stat)));
					System.out.println("Czxid: " + stat.getCzxid() + "Mzxid: " + stat.getMzxid() + "Version: " + stat.getVersion());
				} catch (KeeperException | InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}	 
		}
	}
}

class IChildren2Callback implements AsyncCallback.Children2Callback{

	@Override
	public void processResult(int rc, String path, Object ctx,
			List<String> childrens, Stat stat) {
		// TODO Auto-generated method stub
		System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx " + ctx
				+ ", childrens :" +  childrens + ", stat: " + stat);
	}
	
}
其中 更新节点数据 版本version问题 -1表示基于数据的最新版本更新 这里可以作为分布式锁的一个思路 如果客户端参入的version不是数据最新版本则会更新失败

比如目前节点"/zk-test"的数据版本为 2 而某个客户端尝试 执行 setData("/zk-test","test".getBytes(),1) 由于传入version为1 < 服务器目前版本2 这样就会更新失败

7.检测节点是否存在

                zk.exists(path, watch);
		zk.exists(path, watcher);
		zk.exists(path, watch, cb, ctx);
		zk.exists(path, watcher, cb, ctx);

如果判断节点是否存在是 注册watcher 会对节点是否存在进行监听--创建节点,删除节点,节点数据更新都会通知客户端

8.权限控制

zookeeper提供了ACL的权限控制机制,简单来说就是通过控制zookeeper服务器上数据节点的ACL,来控制客户端对节点的访问权限

addAuthInfo(String scheme,byte[] auth);
参数说明:

scheme: 权限控制模式 分为: world ,auth,digest,ip和super

auth: 具体的权限信息  类似于shiro的权限字符串

如下代码:

		ZooKeeper zk1 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest());
		zk1.addAuthInfo("digest", "test:true".getBytes());
		zk1.create("/zk-test-auth", "123".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
		ZooKeeper zk2 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest());
		zk2.addAuthInfo("digest", "test:true".getBytes());
		System.out.println(new String(zk2.getData("/zk-test-auth", false, null)));
		ZooKeeper zk3 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest());
		zk3.addAuthInfo("digest", "test:false".getBytes());
		zk3.getData("/zk-test-auth", false, null);
zk2设置了正确的权限 所以可以获取到节点数据  zk3则会抛异常

 org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-test-auth

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐