🤖zookeeper–Java的基本使用🤖

🍗本片文章主要介绍通过Java代码与zookeeper的交互,👉非常实用👈,拷贝到代码中就可以使用!
代码中主要包含:

  • 🍟创建一个zookeeper连接
  • 🍔 获取子节点列表
  • ☕递归创建一个节点,没有默认值
  • 🥞递归创建一个节点,带默认值
  • 🥩获取指定节点中信息
  • 🥛获取指定节点中信息,返回字节数组
  • 🍍设置节点中的信息
  • 🍞删除该节点以及该节点下子节点
  • 🧀强制删除该节点及以下子节点
  • 🍕检查节点是否存在
  • 🍖设置zookeeper监听
🧇话不多说直接上代码:
public class CuratorZookeeperUtils {
	private static Log log = LogFactory.getLog(CuratorZookeeperUtils.class);

	/**
	 *
	 * @描述:创建一个zookeeper连接
	 * @return CuratorFramework
	 * @exception @createTime:2021年2月1日
	 * @author: ws
	 */
	public static CuratorFramework initClient(ZKConfig zkConfig) {
		String connectString = "";
		for (String tempConnect : zkConfig.getConnectString()) {
			connectString += tempConnect + ",";
		}
		// 为了实现不同的ZooKeeper也无语之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个ZooKeeper根路径,那么该客户端对ZooKeeper上数据节点的任何操作,都是基于该相对目录进行的
		CuratorFramework client = CuratorFrameworkFactory
				.builder()
				.connectionTimeoutMs(zkConfig.getConnectionTimeoutMs()).sessionTimeoutMs(zkConfig.getSessionTimeoutMs())
				.connectString(connectString.substring(0, connectString.length() - 1))
				.namespace(zkConfig.getNamespace())
				.retryPolicy(new RetryNTimes(zkConfig.getRetryCount(), zkConfig.getSleepMsBetweenRetries())).build();
		client.start();
		return client;
	}

	/**
	 *
	 * @描述:获取子节点列表 打印
	 * @return void
	 * @exception @createTime:2021年2月1日
	 * @author: ws
	 * @throws Exception
	 */
	public static List<String> nodesList(CuratorFramework client, String parentPath) throws Exception {
		Stat stat = client.checkExists().forPath(parentPath);
		if (stat == null) {
			return null;
		}
		List<String> paths = client.getChildren().forPath(parentPath);
		return paths;
	}

	/**
	 *
	 * @描述:递归创建一个节点,没有默认值
	 * @param path
	 * @return void
	 * @exception @createTime:2021年2月1日
	 * @author: ws
	 * @throws Exception
	 */
	public static Boolean createNode(CuratorFramework client, String path) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			client.create().creatingParentsIfNeeded().forPath(path);
		}
		return true;
	}

	/**
	 *
	 * @描述:递归创建一个节点,带默认值
	 * @param path
	 * @return void
	 * @exception @createTime:2021年2月1日
	 * @author: ws
	 * @throws Exception
	 */
	public static Boolean createNode(CuratorFramework client, String path, String value) throws Exception { byte[] data = getByteDataNode(client, path);
		if (data != null) {
			if (Arrays.equals(data, value.getBytes())) {
				return true;
			} else {
				delNodeAndChild(client, path);
			}
		}
		client.create().creatingParentsIfNeeded().forPath(path, value.getBytes());
		return true;
	}

	/**
	 * 获取指定节点中信息
	 * 
	 * @throws Exception
	 * @createTime:2021年2月1日 @author: ws
	 */
	public static String getDataNode(CuratorFramework client, String path) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			return null;
		}
		byte[] datas = client.getData().forPath(path);
		return new String(datas, "UTF-8");
	}

	/**
	 * 获取指定节点中信息,返回字节数组
	 * 
	 * @throws Exception
	 * @createTime:2021年2月1日 @author: ws
	 */
	public static byte[] getByteDataNode(CuratorFramework client, String path) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			return null;
		}
		byte[] data = client.getData().forPath(path);
		return data;
	}

	/**
	 *
	 * @描述:设置节点中的信息
	 * @param client
	 * @param path
	 * @param message
	 * @return void
	 * @exception @createTime:2021年2月1日
	 * @author: ws
	 * @throws Exception
	 */
	public static Boolean setDataNode(CuratorFramework client, String path, String message) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			return null;
		}
		client.setData().withVersion(stat.getVersion()).forPath(path, message.getBytes());
		return true;
	}

	/**
	 * 删除该节点以及该节点下子节点
	 * 
	 * @param client
	 * @param path
	 * @throws Exception
	 * @author: ws
	 */
	public static Boolean delNodeAndChild(CuratorFramework client, String path) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			return null;
		}
		client.delete().deletingChildrenIfNeeded().forPath(path);
		return true;
	}

	/**
	 * 删除当前节点
	 * 
	 * @param client
	 * @param path
	 * @throws Exception
	 * @author: ws
	 */
	public static Boolean delCurrentNode(CuratorFramework client, String path) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			return null;
		}
		client.delete().forPath(path);
		return true;
	}

	/**
	 * 强制删除该节点及以下子节点
	 * 
	 * @param client
	 * @param path
	 * @throws Exception
	 * @author: ws
	 */
	public static Boolean guarantDelNodeAndChild(CuratorFramework client, String path) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			return null;
		}
		client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
		return true;
	}

	/**
	 * 检查节点是否存在
	 * 
	 * @param client
	 * @param path
	 * @return
	 * @throws Exception
	 * @author: ws
	 */
	public static Boolean checkNode(CuratorFramework client, String path) throws Exception {
		Stat stat = client.checkExists().forPath(path);
		if (stat == null) {
			return false;
		}
		return true;
	}

	/**
	 *
	 * @描述:设置zookeeper监听
	 * @return CuratorFramework
	 * @exception @createTime:2021年2月1日
	 * @author: ws
	 */
	public static void setListenter(CuratorFramework client, ZookeeperOpService curatorZookeeperOp, String path)
			throws Exception {
		ExecutorService pool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>());
		// 设置监听器和处理过程
		if (StringHelper.isEmpty(path)) {
			path = "/";
		}
		if (!path.startsWith("/")) {
			path = "/" + path;
		}
		delNodeAndChild(client,path);
		CuratorCache curatorCache = CuratorCache.builder(client, path).build();
		curatorCache.listenable()
				.addListener(CuratorCacheListener.builder().forTreeCache(client, new TreeCacheListener() {
					@Override
					public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
						ChildData data = event.getData();
						if (data != null) {
							switch (event.getType()) {
							case NODE_ADDED:
								log.info("NODE_ADDED : " + data.getPath() + "  数据:" + new String(data.getData()));
								curatorZookeeperOp.addNode(data);
								break;
							case NODE_REMOVED:
								log.info("NODE_ADDED : " + data.getPath() + "  数据:" + new String(data.getData()));
								curatorZookeeperOp.delNode(data);
								break;
							case NODE_UPDATED:
								log.info("NODE_ADDED : " + data.getPath() + "  数据:" + new String(data.getData()));
								curatorZookeeperOp.updNode(data);
								break;
							default:
								break;
							}
						} else {
							log.info("data is null : " + event.getType());
						}
					}
				}).build());
		// 开始监听
		curatorCache.start();
	}
}

如果对各位有帮助,欢迎各位👍点赞📢评论⭐️收藏

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐