初衷:本人做的这个一系列总结的初衷就是为那些Zookeeper的入门者以及想在工作之余提高自己能力的有志之士节省去查资料的时间,帮助大家提高自学的能力,迅速的掌握Zookeeper,以至于在这个饱和的行业中提高自己技术方面的竞争力。现在市场上有很多买卖的架构方面的学习资料,少则几百,多则上千上万,视频水的不行,而且大部分人还是一时脑热,钱花了,视频不看。笔者是过来人,所以我特别希望、建议你们静下心来,去自学一门技术,然后将你的总结分享出来,互相学习,互相进步。

Zookeeper也有自己的客户端库,但是其原生的api有很多不足,看过zk文档的可能都知道,Zookeeper Watcher的属性之一就是:"One-time trigger" 一次性触发,所以我们需要不断的重复注册Wathes,第三方客户端ZkClient解决了这一问题。

首先看一下ZkClient.jar包的结构图:

 

可以看见,ZkClient提供了3种监听器接口:IZKChildListener,IZKDataListener,IZKStateListener;

不多说了,上代码:

(1).子节点监听器:

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
/**
 * 子节点监听器
 * @author Soong
 *
 */
public class ZkClientWatcher1 {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "172.21.121.53:2181";
	
	/** connection超时时间  单位ms*/
	static final int CONNECTION_OUTTIME = 5000;
	
	
	public static void main(String[] args) throws Exception {
		ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), CONNECTION_OUTTIME);
		zkc.deleteRecursive("/test");
		//对父节点添加监听子节点变化。
		zkc.subscribeChildChanges("/test", new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
				System.out.println("parentPath: " + parentPath);
				System.out.println("currentChilds: " + currentChilds);
			}
		});		
		Thread.sleep(3000);		
		zkc.createPersistent("/test");
		Thread.sleep(5000);
		//parentPath: /test
		//currentChilds: []
		zkc.createPersistent("/test" + "/" + "aa", "aa内容");
		Thread.sleep(5000);
		//parentPath: /test
		//currentChilds: [aa]		
		zkc.createPersistent("/test" + "/" + "bb", "bb内容");
		Thread.sleep(5000);		
		//parentPath: /test
		//currentChilds: [aa, bb]
		zkc.delete("/test/bb");
		Thread.sleep(5000);	
		//parentPath: /test
		//currentChilds: [aa]
		zkc.deleteRecursive("/test");//因为此方法为递归删除,所以触发2次
		//parentPath: /test
		//currentChilds: null //删除后,子节点集合为null
		//parentPath: /test
		//currentChilds: null
		Thread.sleep(Integer.MAX_VALUE);
				
	}
}

(2).节点数据变化监听器:(不监听子节点)

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
/**
 * 节点数据变化监听器
 * @author Soong
 *
 */
public class ZkClientWatcher2 {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "172.21.121.53:2181";
	
	/** connection超时时间  单位ms*/
	static final int CONNECTION_OUTTIME = 5000; 
	
	
	public static void main(String[] args) throws Exception {
		ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), CONNECTION_OUTTIME);
		zkc.deleteRecursive("/test");
		zkc.createPersistent("/test", "1234");
		
		//监听节点数据变化:包括节点数据变更和节点删除
		zkc.subscribeDataChanges("/test", new IZkDataListener() {
			@Override
			public void handleDataDeleted(String path) throws Exception {
				System.out.println("删除的节点为:" + path);
			}
			
			@Override
			public void handleDataChange(String path, Object data) {
				System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
			}
		});
		
		Thread.sleep(3000);
		zkc.writeData("/test", "456", -1);
		//变更的节点为:/test, 变更内容为:456
		Thread.sleep(1000);
		//通过zkCli.sh 客户端set /test aaa,发现ZkClient并未监听到此次修改事件
		
		zkc.delete("/test");
		//删除的节点为:/test
		Thread.sleep(Integer.MAX_VALUE);
		
		
	}
}

(3).zookeeper服务状态监听器:

import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.Watcher.Event.KeeperState;

/**
 * zk服务端连接状态监控
 * @author Soong
 *
 */
public class ZkStateWatcher {
//	static final String CONNECT_ADDR = "172.21.121.53:2181,172.21.121.54:2181,172.21.121.55:2181";
	static final String CONNECT_ADDR = "172.21.121.53:2181";
	static final int CONNECTION_OUTTIME = 5000;
	
	public static void main(String[] args) throws InterruptedException{
		ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR),CONNECTION_OUTTIME);
		zkc.subscribeStateChanges(new IZkStateListener() {
			
			@Override
			public void handleStateChanged(KeeperState state) throws Exception {
				if(state==KeeperState.SyncConnected){
					//当我重新启动后start,监听触发
					System.out.println("连接成功");
				}else if(state==KeeperState.Disconnected){
					System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
				}else
					System.out.println("其他状态"+state);
				
			}
			
			@Override
			public void handleNewSession() throws Exception {
				System.out.println("---->重建session");
			}
		});
//		zkc.close();
		
		Thread.sleep(Integer.MAX_VALUE);
	}
		
}

当我只连接zk服务集群中单独的一个zk服务时:

./zkServer.sh stop 连接断开

./zkServer.sh start 连接成功

./zkServer.sh restart 连接断开 连接成功

./zkServer.sh stop 连接断开

等待几分钟

./zkServer.sh satrt 其他状态Expired ---->重建session 连接成功

其实无需等待几分钟,在zoo.cfg配置文件中有2个参数来管理session的超时时间:minSessionTimeout和maxSessionTimeout

2个参数的默认值都为-1,这个值在日志中可以看到:



默认情况下,maxSessionTimeout = ticketTime*20; minSessionTimeout = ticketTime*2;

ZkClient还提供了取消订阅监听器的方法。


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐