一、首先说明的一个非常重要的点:
使用zookeeper第一件事就是要连接zookeeper,那么连接就是耗时间的。先做一个例子:比如你写了一段代码,这段代码主要干了是三件事:第一件事连接zookeeper、第二件事操作zookeeper(比如更新删除节点了)、第三件事你的业务代码并且这些业务代码和zookeeper一点关系都没有。
那么问题来了 第一件事连接zookeeper这是非常消耗时间的,导致你的业务代码不能执行,这样用户体验很不好。
所以zookeeper 的开发人员就把zookeeper的连接放在了一个单独的线程,就是在你的主线程上又开启了一个线程用于zoookeeper的连接。
那么问题又来了,此时第一件事和第二件事是并列执行的,但是你发现第二件事是依赖第一件事成功才能执行的,也是就说只有zookeeper成功连接后才能操作zookeeper上的节点。那么怎么解决请看第二点。

二、CountDownLatch
这个类很是神奇,以前没听过,也没研究过,主要说一下这个类是干嘛的和怎么用?
这个类主要两个方法:1.countDownLatch.await() 让线程阻塞 2.countDownLatch.countDown() 唤醒阻塞的线程
具体怎么用看三代码实现

三、代码实现

public class Test002 implements Watcher {
	// zk 连接地址
	private static final String CONNECTSTRING = "******:2181";// 换成你实际的zookeeper地址
	// 连接zk 的超时时间
	private static final int SESSION_OUTTIME = 2000;
	private static final CountDownLatch countDownLatch = new CountDownLatch(1);
	ZooKeeper zk = null;
	// zk 创建连接
	public void createContection(String connectString, int sessionTimeout) throws IOException, InterruptedException {
		zk = new ZooKeeper(connectString, sessionTimeout, this);
		System.out.println("#####zk开始启动连接#####");
	}

	// 创建持久化节点
	public boolean createNode(String path, String data) throws InterruptedException, KeeperException {
		countDownLatch.await();// 让当前方法处于等待状态 
		exists(path, true);
		// 创建节点 参数 1 节点 2 节点数据 3 权限 4 节点类型
		zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		System.out.println("#####新增节点信息path+" + path + "data" + data);
		return true;
	}

	// 时间通知
	public void process(WatchedEvent event) {
		System.out.println("#####事件通知开始######");
		// 1. 获取事件状态
		KeeperState keeperState = event.getState();
		// 获取节点的路径
		String path = event.getPath();
		// 获取节点类型
		EventType eventType = event.getType();
		System.out.println("节点的详细信息:节点的状态" + keeperState + "节点路径" + path + "节点类型" + eventType);
		// 2. 判断连接状态
		if (KeeperState.SyncConnected == keeperState) {
			// 3. 判断获取事件类型
			if (eventType == EventType.None) {// 连接状态 
				countDownLatch.countDown();// 连接成功 此时可以释放countDownLatch.await()
				System.out.println("#####zk连接成功#####");
			} else if (EventType.NodeCreated == eventType) {
				System.out.println("获取新增node节点的事件通知" + "path:" + path);

			} else if (EventType.NodeDataChanged == eventType) {
				System.out.println("获取了节点修改的时间通知:修改的节点Node:" + path);
			}

		}

		System.out.println("#####事件通知结束######");

	}
	
	// 设置节点是可以监听的状态
	public Stat exists(String path, boolean isWatch) throws KeeperException, InterruptedException {
		return zk.exists(path, isWatch);
	}

	// 更新节点数据
	public void update(String path, String data) throws KeeperException, InterruptedException {
		exists(path, true);
		zk.setData(path, data.getBytes(), 0);

	}

	// 关闭节点
	public void close() throws InterruptedException {
		if (zk != null) {
			zk.close();
		}
	}
	public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
		Test002 test002 = new Test002();
		test002.createContection(CONNECTSTRING, SESSION_OUTTIME);
		test002.update("/zzy_test001", "007");
		test002.close();
	}
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐