zookeeper--Java的基本使用
zookeeper工具类简单实用、废话不说、直接上代码!public class CuratorZookeeperUtils {private static Log log = LogFactory.getLog(CuratorZookeeperUtils.class);/**** @描述:创建一个zookeeper连接* @return CuratorFramework* @exception
·
🤖zookeeper–Java的基本使用🤖
🍗本片文章主要介绍通过Java代码与zookeeper的交互,👉非常实用👈,拷贝到代码中就可以使用!
代码中主要包含:
- 🍟创建一个zookeeper连接
- 🍔 获取子节点列表
- ☕递归创建一个节点,没有默认值
- 🥞递归创建一个节点,带默认值
- 🥩获取指定节点中信息
- 🥛获取指定节点中信息,返回字节数组
- 🍍设置节点中的信息
- 🍞删除该节点以及该节点下子节点
- 🧀强制删除该节点及以下子节点
- 🍕检查节点是否存在
- 🍖设置zookeeper监听
🧇话不多说直接上代码:
public class CuratorZookeeperUtils {
private static Log log = LogFactory.getLog(CuratorZookeeperUtils.class);
/**
*
* @描述:创建一个zookeeper连接
* @return CuratorFramework
* @exception @createTime:2021年2月1日
* @author: ws
*/
public static CuratorFramework initClient(ZKConfig zkConfig) {
String connectString = "";
for (String tempConnect : zkConfig.getConnectString()) {
connectString += tempConnect + ",";
}
// 为了实现不同的ZooKeeper也无语之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个ZooKeeper根路径,那么该客户端对ZooKeeper上数据节点的任何操作,都是基于该相对目录进行的
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMs()).sessionTimeoutMs(zkConfig.getSessionTimeoutMs())
.connectString(connectString.substring(0, connectString.length() - 1))
.namespace(zkConfig.getNamespace())
.retryPolicy(new RetryNTimes(zkConfig.getRetryCount(), zkConfig.getSleepMsBetweenRetries())).build();
client.start();
return client;
}
/**
*
* @描述:获取子节点列表 打印
* @return void
* @exception @createTime:2021年2月1日
* @author: ws
* @throws Exception
*/
public static List<String> nodesList(CuratorFramework client, String parentPath) throws Exception {
Stat stat = client.checkExists().forPath(parentPath);
if (stat == null) {
return null;
}
List<String> paths = client.getChildren().forPath(parentPath);
return paths;
}
/**
*
* @描述:递归创建一个节点,没有默认值
* @param path
* @return void
* @exception @createTime:2021年2月1日
* @author: ws
* @throws Exception
*/
public static Boolean createNode(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
client.create().creatingParentsIfNeeded().forPath(path);
}
return true;
}
/**
*
* @描述:递归创建一个节点,带默认值
* @param path
* @return void
* @exception @createTime:2021年2月1日
* @author: ws
* @throws Exception
*/
public static Boolean createNode(CuratorFramework client, String path, String value) throws Exception { byte[] data = getByteDataNode(client, path);
if (data != null) {
if (Arrays.equals(data, value.getBytes())) {
return true;
} else {
delNodeAndChild(client, path);
}
}
client.create().creatingParentsIfNeeded().forPath(path, value.getBytes());
return true;
}
/**
* 获取指定节点中信息
*
* @throws Exception
* @createTime:2021年2月1日 @author: ws
*/
public static String getDataNode(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
byte[] datas = client.getData().forPath(path);
return new String(datas, "UTF-8");
}
/**
* 获取指定节点中信息,返回字节数组
*
* @throws Exception
* @createTime:2021年2月1日 @author: ws
*/
public static byte[] getByteDataNode(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
byte[] data = client.getData().forPath(path);
return data;
}
/**
*
* @描述:设置节点中的信息
* @param client
* @param path
* @param message
* @return void
* @exception @createTime:2021年2月1日
* @author: ws
* @throws Exception
*/
public static Boolean setDataNode(CuratorFramework client, String path, String message) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.setData().withVersion(stat.getVersion()).forPath(path, message.getBytes());
return true;
}
/**
* 删除该节点以及该节点下子节点
*
* @param client
* @param path
* @throws Exception
* @author: ws
*/
public static Boolean delNodeAndChild(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.delete().deletingChildrenIfNeeded().forPath(path);
return true;
}
/**
* 删除当前节点
*
* @param client
* @param path
* @throws Exception
* @author: ws
*/
public static Boolean delCurrentNode(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.delete().forPath(path);
return true;
}
/**
* 强制删除该节点及以下子节点
*
* @param client
* @param path
* @throws Exception
* @author: ws
*/
public static Boolean guarantDelNodeAndChild(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
return true;
}
/**
* 检查节点是否存在
*
* @param client
* @param path
* @return
* @throws Exception
* @author: ws
*/
public static Boolean checkNode(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return false;
}
return true;
}
/**
*
* @描述:设置zookeeper监听
* @return CuratorFramework
* @exception @createTime:2021年2月1日
* @author: ws
*/
public static void setListenter(CuratorFramework client, ZookeeperOpService curatorZookeeperOp, String path)
throws Exception {
ExecutorService pool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
// 设置监听器和处理过程
if (StringHelper.isEmpty(path)) {
path = "/";
}
if (!path.startsWith("/")) {
path = "/" + path;
}
delNodeAndChild(client,path);
CuratorCache curatorCache = CuratorCache.builder(client, path).build();
curatorCache.listenable()
.addListener(CuratorCacheListener.builder().forTreeCache(client, new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data != null) {
switch (event.getType()) {
case NODE_ADDED:
log.info("NODE_ADDED : " + data.getPath() + " 数据:" + new String(data.getData()));
curatorZookeeperOp.addNode(data);
break;
case NODE_REMOVED:
log.info("NODE_ADDED : " + data.getPath() + " 数据:" + new String(data.getData()));
curatorZookeeperOp.delNode(data);
break;
case NODE_UPDATED:
log.info("NODE_ADDED : " + data.getPath() + " 数据:" + new String(data.getData()));
curatorZookeeperOp.updNode(data);
break;
default:
break;
}
} else {
log.info("data is null : " + event.getType());
}
}
}).build());
// 开始监听
curatorCache.start();
}
}
如果对各位有帮助,欢迎各位👍点赞📢评论⭐️收藏
更多推荐
已为社区贡献1条内容
所有评论(0)