zk+curator监听器机制实现分布式高可用
curator是开源zookeeper客户端,与原生zk client相比,抽象层次更高,功能更加丰富。监听器在分布式中主要应用于服务注册,也是很多开源项目中客户端请求服务的必选技术应用,它不仅可以减少客户端无限for循环导致的服务端流量问题,通过zk的天生分布式能力提供高可用特性。 curator的监听采用3种Watcher机制来监听节点变化: 1. Pa
curator是开源zookeeper客户端,与原生zk client相比,抽象层次更高,功能更加丰富。监听器在分布式中主要应用于服务注册,也是很多开源项目中客户端请求服务的必选技术应用,它不仅可以减少客户端无限for循环导致的服务端流量问题,通过zk的天生分布式能力提供高可用特性。
curator的监听采用3种Watcher机制来监听节点变化:
1. Path Cache:监视一个路径下孩子节点的建立,删除,以及节点数据更新,使用的事件监听类:PathChildrenCacheListener;
2. Node Cache:监视一个节点的创建、更新、删除,并将节点数据更新至本地
3. Tree Cache:Path cache和Node Cache的结合体,监视路径下创建,更新,删除事件,并缓存路径下所有孩子节点的数据。
Path Cache的举例:
public class CuratorWatcher {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.90:2181";
private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws Exception {
// 1.创建Client端
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
// 2.创建zk的监听器
PathChildrenCache watcher = new PathChildrenCache(
client,
ZK_PATH,
true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println("Register zk watcher successfully!");
Thread.sleep(Integer.MAX_VALUE);
}
}
在zk client端输入测试:create /zktest/whdp "adfd"
输出结果:
zk client start successfully!
Register zk watcher successfully!
Receive event: type=[CHILD_ADDED], path=[/zktest/whdp], data=[adfd], stat=[34359807061,34359807061,1528098906473,1528098906473,0,0,0,0,4,0,34359807061]
Node Cache 举例:
zk消费者端代码:
public class ConfigBusConsumer extends ConfigBusBase {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusConsumer.class);
private NodeCache cache; //定义nodecache
private String zkPath;
public static final ObjectMapper mapper = new ObjectMapper();
public ConfigBusConsumer(ZKConfig config, String topic, ConfigChangeCallback callback) {
super(config); //初始化启动zk连接代码
// zkRoot="/alert";
this.zkPath = zkRoot + "/" + topic;
LOG.info("monitor change for zkPath " + zkPath);
cache = new NodeCache(curator, zkPath);
cache.getListenable().addListener(() -> {
ConfigValue v = getConfigValue();
callback.onNewConfig(v); //监听端口,当node发生变化,执行callback接口实现类
}
);
try {
cache.start();
} catch (Exception ex) {
LOG.error("error start NodeCache listener", ex);
throw new RuntimeException(ex);
}
}
public ConfigValue getConfigValue() throws Exception {
byte[] value = curator.getData().forPath(zkPath);//zkPath "/alert/ALERT_UNIT_TOPOLOGY_APP_SANDBOX/spout"
ConfigValue v;
try {
v = mapper.readValue(value, ConfigValue.class);
} catch (JsonParseException e) {
LOG.warn("warn getConfigValue parse exception", e.getMessage());
v = new ConfigValue();
}
return v;
}
}
public interface ConfigChangeCallback {
void onNewConfig(ConfigValue value); //该接口实现类定义监听器监听到变化后的实现
}
zk 生产者端代码:
public class ConfigBusProducer extends ConfigBusBase {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusProducer.class);
public ConfigBusProducer(ZKConfig config) {
super(config); //初始化链接并启动zk客户端
}
public void send(String topic, ConfigValue config) {
// check if topic exists, create this topic if not existing
String zkPath = zkRoot + "/" + topic; //拼接zk路径
try {
if (curator.checkExists().forPath(zkPath) == null) {
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(zkPath);
}
ObjectMapper mapper = new ObjectMapper();
byte[] content = mapper.writeValueAsBytes(config);
curator.setData().forPath(zkPath, content); // 向zk中写入内容
} catch (Exception ex) {
LOG.error("error creating zkPath " + zkPath, ex);
throw new RuntimeException(ex);
}
}
}
ConfigBusBase实现:
public class ConfigBusBase implements Closeable {
protected String zkRoot; //定义zkroot目录
protected CuratorFramework curator;
public ConfigBusBase(ZKConfig config) {
this.zkRoot = config.zkRoot;
curator = CuratorFrameworkFactory.newClient(
config.zkQuorum,
config.zkSessionTimeoutMs,
config.connectionTimeoutMs,
new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
);
curator.start(); //启动zk客户端
}
@Override
public void close() {
curator.close();
}
}
测试类:
public class TestConfigBusProducer {
protected ZKConfig config;
protected ConfigBusProducer producer;
protected ConfigBusConsumer consumer;
protected String topic = "publisher";
final AtomicBoolean validate = new AtomicBoolean(false);
final AtomicReference<String> configValue = new AtomicReference<>();
@Before
public void setUp() throws Exception{
config=new ZKConfig();
config.zkQuorum="192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181";
config.zkRoot = "/alert/ALERT_UNIT_TOPOLOGY_APP_SANDBOX";
config.zkRetryInterval = 1000;
config.zkRetryTimes = 3;
config.connectionTimeoutMs = 3000;
config.zkSessionTimeoutMs = 10000;
producer = new ConfigBusProducer(config);
consumer = new ConfigBusConsumer(config, topic, value -> {
validate.set(value.isValueVersionId());
configValue.set((String) value.getValue());
System.out.println("******** get notified of config " + value);
});
}
@Test
public void testConfigChange() throws Exception {
// first change
String version="spec_version_1526286651841";
producer.send("publisher", createConfigValue(true, version));
producer.send("alert", createConfigValue(true, version));
producer.send("router", createConfigValue(true, version));
producer.send("spout", createConfigValue(true, version));
Thread.sleep(1000);
}
@After
public void shutdown() throws IOException {
producer.close();
consumer.close();
}
private ConfigValue createConfigValue(boolean isValueVersionId, String value) {
ConfigValue configValue = new ConfigValue();
configValue.setValueVersionId(isValueVersionId);
configValue.setValue(value);
return configValue;
}
}
总结:
zookeeper因为具备得天独厚的高可用优势,在分布式系统中应用及其广泛,curator则提供了一整套分布式解决方案,包括注册监听,分布式锁等问题。
代码请参考:
https://github.com/whpHarper/java
更多推荐
所有评论(0)