使用CuratorCache创建TreeCacheListener
最新版Curator framework recipes高级API中包含三种基于zookeeper节点的本地缓存文件监听器TreeCache/NodeCache/PathCache的构造方法已经过时。官方提示改用CuratorCache builder构造缓存监听器,这里给出了其中treecache监听的注册示例
·
由于新版本TreeCache/NodeCache/PathCache 监听器已经过时了,新的构造方法贴出来了,记录一下,三种缓存监听器都写在一起了,按需选择,有问题欢迎指正,谢谢
package com.example.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;
import java.util.concurrent.TimeUnit;
/**
* config for curator framework and cache listener build within
*
* @author Hongyi Zheng
* @date 2020/12/14
*/
@Configuration
@Slf4j
public class ZkConfiguration {
// 这些配置最好抽出来放在配置中心
private final String hosts = "localhost:2181";
private final String namespace = "zk-demo";
private int sessionTimeoutMs = 10_000;
private int connectionTimeoutMs = 15_000;
private int maxWaitTimeMs = 10_000;
private CuratorFramework client;
@Bean("curator")
public CuratorFramework getClient() throws InterruptedException {
/*
重试策略使用指数补偿重试机制 arg1 = 基础等待时间Ms 如果重试失败 第二次的时长会基于baseSleepTime指数增长
arg2 = 最大重试次数 源码可以看到最大重试29次 大于29次的会默认等于29
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
/*
指定命名空间来隔离不同环境的节点,这里我用来隔离不同项目的节点
*/
client = CuratorFrameworkFactory.builder()
.connectString(hosts)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy)
.namespace(namespace)
.build();
client.start();
startListener();
destroyOnFailure();
return client;
}
/**
* close client manually
*
* @throws InterruptedException e
*/
private void destroyOnFailure() throws InterruptedException {
// block & wait until connected or maxWaitTime
// if client is not null close and shut down
if (!client.blockUntilConnected(maxWaitTimeMs, TimeUnit.MILLISECONDS) && client != null) {
client.close();
System.exit(0);
}
}
/**
* create a TreeCacheListener for childEvent listening
*/
public void startListener() {
//由于new TreeCacheListener/NodeCache/PathChildrenCache 新版本已经过时,
//这里使用curatorCache提供的工厂方法 构造treeCacheListener
CuratorCacheListener listener = CuratorCacheListener
.builder()
// 3 types of cache listener, choose one in case
.forPathChildrenCache(File.separator, client, (c, e) -> {
// child node listener
switch (e.getType()) {
default:
break;
}
})
.forNodeCache(() -> {
//do something when node changed
})
.forTreeCache(client, (c, e) -> {
log.info("status changed:{}", e.getType());
switch (e.getType()) {
case INITIALIZED:
whenInit(e);
break;
case NODE_ADDED:
whenAdded(e);
break;
case NODE_UPDATED:
whenUpdated(e);
break;
case NODE_REMOVED:
whenRemoved(e);
break;
default:
break;
}
})
.build();
CuratorCache cache = CuratorCache.builder(client, File.separator).build();
log.info("CuratorCache building complete ,listener path :{}", File.separator + namespace);
cache.listenable().addListener(listener);
cache.start();
}
private void whenInit(TreeCacheEvent e) {
log.info("TreeCacheListener initialized, status :{}", e.getType());
}
private void whenAdded(TreeCacheEvent e) {
Stat stat = e.getData().getStat();
log.info("node stat :{}", stat);
log.info("node created: {}", e.toString());
}
private void whenUpdated(TreeCacheEvent e) {
log.info("node update: Old :{}-{} ,New :{}-{} ",
e.getOldData().getPath(), e.getOldData().getData(),
e.getData().getPath(), e.getData().getData());
}
private void whenRemoved(TreeCacheEvent e) {
log.info("node removed:{}-{}", e.getData().getPath(), e.getData().getData());
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)