(六)、ZooKeeper自动重连
在一套分布式的online services系统中,各service通常不会放在一台服务器上,而是通过Zookeeper这样的东西,将自己的service信息注册到上面,service的使用者通过Zookeeper来发现各service的信息,从而可以将request发送到不同的service上去处理。 如上图所示,两个Service Pr
在一套分布式的online services系统中,各service通常不会放在一台服务器上,而是通过Zookeeper这样的东西,将自己的service信息注册到上面,service的使用者通过Zookeeper来发现各service的信息,从而可以将request发送到不同的service上去处理。
如上图所示,两个Service Provider 1和2分别在192.168.1.5和192.168.1.6这两台服务器的2688端口上提供服务,服务的地址和端口注册到了Zookeeper中。Service User通过查询Zookeeper,可得知这些服务的信息。通常,Service User与Service Provider之间的通信,是通过connection pool实现的,因为Service User不可能假定在第一次查询到所有Service Provider的信息之后,它们就是一直存活的,假如某个Service Provider因为程序问题死掉了,向它发送request只会造成大量的失败结果,因此通常会实现一个connection pool来保证实时更新节点的信息,当有一个Service Provider从Zookeeper上消失之后,从connection pool中取出的connection总是可用的(即:总能通过它把request发送到一个有效的Service Provider那里)。
代码如下:
package com.lyh.zk;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import com.lyh.common.ZKCommon;
/**
* ZooKeeper监听
* @author liuyuehu
*/
public class MyZooKeeper {
Logger logger = Logger.getLogger(MyZooKeeper.class);
protected CountDownLatch countDownLatch = new CountDownLatch(1);
public static ZooKeeper zooKeeper = null;
private Object waiter = new Object();
/**
* 监控所有被触发的事件
*/
public void process(WatchedEvent event) {
logger.info("收到事件通知:" + event.getState() );
if(event.getState()==KeeperState.SyncConnected){
countDownLatch.countDown();
}
}
/**
* <p>连接Zookeeper</p>
* 启动zk服务 本实例基于自动重连策略,如果zk连接没有建立成功或者在运行时断开,将会自动重连.
* @param connectString Zookeeper服务地址
* @param sessionTimeout Zookeeper连接超时时间
*/
public void connect(){
try {
synchronized (waiter) {
SessionWatcher watcher = new SessionWatcher();
if(zooKeeper == null){
// ZK客户端允许我们将ZK服务器的所有地址都配置在这里
zooKeeper = new ZooKeeper(ZKCommon.connectAddress,ZKCommon.sessionTimeout,watcher);
// 使用CountDownLatch.await()的线程(当前线程)阻塞直到所有其它拥有
//CountDownLatch的线程执行完毕(countDown()结果为0)
countDownLatch.await();
}
}
} catch (IOException e) {
logger.error("连接创建失败,发生 InterruptedException , e " + e.getMessage(), e);
} catch (InterruptedException e) {
logger.error( "连接创建失败,发生 IOException , e " + e.getMessage(), e );
}
waiter.notifyAll();
}
/**
* 关闭连接
*/
public void close(){
try {
synchronized (waiter) {
if(zooKeeper != null){
zooKeeper.close();
}
waiter.notifyAll();
}
} catch (InterruptedException e) {
logger.error("release connection error ," + e.getMessage() ,e);
}
}
class SessionWatcher implements Watcher {
public void process(WatchedEvent event) {
// 如果是“数据变更”事件
if (event.getType() != Event.EventType.None) {
return;
}
synchronized (waiter){
switch(event.getState()) {
case SyncConnected:
//zk连接建立成功,或者重连成功
waiter.notifyAll();
logger.info("Connected...");
break;
case Expired:
// session过期,这是个非常严重的问题,有可能client端出现了问题,也有可能zk环境故障
// 此处仅仅是重新实例化zk client
logger.info("Expired(重连)...");
connect();
break;
case Disconnected:
logger.info("链接断开,或session迁移....");
break;
case AuthFailed:
close();
throw new RuntimeException("ZK Connection auth failed...");
default:
break;
}
}
}
}
}
参考地址网页:http://iwinit.iteye.com/blog/1844177
http://www.codelast.com/%e5%8e%9f%e5%88%9b-zookeeper%e6%b3%a8%e5%86%8c%e8%8a%82%e7%82%b9%e7%9a%84%e6%8e%89%e7%ba%bf%e8%87%aa%e5%8a%a8%e9%87%8d%e6%96%b0%e6%b3%a8%e5%86%8c%e5%8f%8a%e6%b5%8b%e8%af%95%e6%96%b9%e6%b3%95/
更多推荐
所有评论(0)