zookeeper,CuratorFramework,springMVC整合相关问题
zookeeper 相关问题zookeeper的安装zookeeper的安装请参考:(http://www.abao365.cn/zk/index.html)zookeeper相关命令1、启动、停止、重启zk(bin目录下)启动:zkServer.sh start停止:zkServer.sh stop重启:zkServer.sh restart2、连接zookeeper连接本地:
zookeeper 相关问题
zookeeper的安装
zookeeper的安装请参考:
(http://www.abao365.cn/zk/index.html)
zookeeper相关命令
1、启动、停止、重启zk(bin目录下)
启动:zkServer.sh start
停止:zkServer.sh stop
重启:zkServer.sh restart
2、连接zookeeper
连接本地: ./zkCli.sh
连接其他机器:
./zkCli.sh -server host:port
出现:WatchedEvent state:SyncConnected type:None path:null表示连接成功
3、进入zk里相关命令
get path :获取节点数据
ls path:查看节点信息(包括子节点,和linux的ls差不多)
create path data:创建节点(如果创建的节点有父节点,应先创建父节点)
delete path:删除节点(如果删除的节点有子节点,则需要先删除子节点)
quit:退出zk坏境
4、其他命令
netstat -anp|grep 2181 查看当前机器有哪几个Client在连接
./zkServer.sh status 查看当前机器是leader还是follower
zookeeper,CuratorFramework,springMVC整合配置
引入相关的jar包
因为是maven项目,所以在pom.xml文件中加入下列代码即可:
//引入zookeeper所需要的包
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
//引入curator所需要的包
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
编写springMVC的配置文档
引入了相关的包之后,需要在springMVC的配置文件中进行配置:
<!-- 重连策略 -->
<bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
<!-- 间隔时间基数 -->
<constructor-arg index="0" value="1000" />
<!-- 重连策略 -->
<constructor-arg index="1" value="3" />
</bean>
<bean id="curatorFramework" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
<constructor-arg index="0" value="server1:port,server2:port,server3:port" />
<!-- sessionTimeoutMs会话超时时间,单位为毫秒。默认是60000ms -->
<constructor-arg index="1" value="5000" />
<!-- connectionTimeoutMs连接创建超时时间,单位毫秒,默认15000ms -->
<constructor-arg index="2" value="3000" />
<constructor-arg index="3" ref="retryPolicy" />
</bean>
<!--应用场景一:查看当前Client是不是主机-->
<bean id="adShowLeader" class="com.chinaso.common.zookeeper.leader.LeaderDispatch" scope="singleton" init-method="init">
<constructor-arg index="0" ref="curatorFramework" />
<constructor-arg index="1" value="${sas.zookeeper.leader}" />
</bean>
<!--应用场景一:监控并同步所有Client的数据-->
<bean id="publishAdFlush" class="com.chinaso.modules.ad.job.PublishAdFlush" scope="singleton" init-method="init">
<constructor-arg index="0" ref="curatorFramework" />
<constructor-arg index="1" value="${sas.zookeeper.publish}" />
</bean>
</beans>
注意:factory-method=”newClient” init-method=”start” 表示curator与zkeeper在初步建立连接的时候就启动,不需要再手动的调用
curatorFramework的start方法。
应用场景一介绍
应用场景一主要是用来判断当前的Client是否为leader,因为zkpeeper用在并发的环境中,当启动多个应用时,也就是本文中所提到的Client,每个Client都会与zkserver产生连接。但是实际运用过程中,有些事情只需要某一个Client去完成,比如本文中的创建数据库表。
ZKBase的代码如下:
public abstract class ZKBase {
protected CuratorFramework curatorFramework;
public ZKBase(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
}
public void init(){
try {
call();
} catch (Exception e) {
e.printStackTrace();
}
}
protected abstract void call() throws Exception;
}
主要是注入curatorFramework,定义call()接口方法,并且定义了配置文件中的初始化方法init()
LeaderDispatch类的代码如下:
/**
* Leader调度
*
*/
public class LeaderDispatch extends ZKBase {
private static Logger logger = LoggerFactory
.getLogger(LeaderDispatch.class);
private boolean leader = false;
public boolean isLeader() {
return leader;
}
private String path;
public LeaderDispatch(CuratorFramework curatorFramework, String path) {
super(curatorFramework);
this.path = path;
}
/**
* 异步监控,存在初始化过快,无法确定Leader,如必须在初始化时Leader执行,就在方法中直接调用
*/
public void leaderSelector() {
LeaderSelector selector = new LeaderSelector(curatorFramework, path,
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework curatorFramework)
throws Exception {
System.out.println("成为Master");
logger.info(" leader 选择成功!!!");
leader = true;
while (true) {
Thread.sleep(Integer.MAX_VALUE);
}
}
});
selector.autoRequeue();
selector.start();
}
@Override
public void call() {
leader = false;
leaderSelector();
}
}
定义了leader变量,在 leaderSelector()方法中实例了LeaderSelector类并给她添加了LeaderSelectorListenerAdapter监听器,如果该Client是leader,那么就会把leader变量设为true并让当前线程一直sleep。当当前的Client由于种种原因(比如重启),那么集群另外的Client检测到节点发生变化,则会在另外的client中选一个作为leader。
这个类还暴露出一个isLeader()方法来被其他类使用。
具体应用代码:
LeaderDispatch leaderDispatch = SpringContextHolder.getBean("adShowLeader");
System.out.println("创建日志表");
if(leaderDispatch.isLeader()){ //只有主机创建表格,备机不操作
logger.info("创建日志表!");
ShowLogDao showLogDao = SpringContextHolder.getBean("showLogDao");
try {
String d = DateTool.date2String("yyyy-MM-dd",DateTool.calculateByDate(new Date(), 1));
showLogDao.careteTable(d);
} catch (Exception e) {
e.printStackTrace();
}
}
应用场景二介绍
应用场景二主要是运用了zkeeper的订阅和发布功能,在并发环境中,后台往往都是集群,集群之间往往都需要进行数据之间的同步。传统的做法是一台一台进行更新数据,如果运用zkeeper的订阅和发布功能,可以直接同步到所有的Client。具体做法:
1、创建一个zkeeper节点(在Client启动的时候创建,若果已经创建则不需要再创建)
2、每一个Client启动的时候都会获取已创建的节点,并且给节点添加一个监听事件,在监听事件里编写需要监听的事件和需要做的操作
3、编写更改节点数据的方法。
下面结合应用具体的例子来介绍:
public class PublishAdFlush extends ZKBase {
private static Logger logger = LoggerFactory
.getLogger(PublishAdFlush.class);
private String path;//要监听的路径
public PublishAdFlush(CuratorFramework curatorFramework, String path) {
super(curatorFramework);
this.path = path;
}
/*
*具体监听的方法
*/
private String watcherPath(String path, CuratorWatcher watcher)
throws Exception {
Stat stat = curatorFramework.checkExists().forPath(path);
String date = new Date().toString();
if (stat == null) { // 没有节点创建节点
curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, date.getBytes());
}
byte[] buffer = curatorFramework.getData().usingWatcher(watcher)
.forPath(path);
System.out.println(new String(buffer));
return new String(buffer);
}
/*
*获取所监听路径的数据
*/
private String readPath(String path) throws Exception {
byte[] buffer = curatorFramework.getData().forPath(path);
return new String(buffer);
}
/**
* 跟新节点数据
*/
public void updateData() throws Exception {
String date = new Date().toString();
Stat stat = curatorFramework.checkExists().forPath(this.path);
if (stat == null) { // 没有节点创建节点
curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(this.path, date.getBytes());
System.out.println(date+":数据更新,创建新节点!");
} else {
curatorFramework.setData().forPath(this.path, date.getBytes());
System.out.println(date+":数据更新,不用创建新节点!");
}
}
/**
* 实现监听器和监听事件后的操作
*/
private CuratorWatcher pathWatcher = new CuratorWatcher() {
public void process(WatchedEvent event) throws Exception {
String date = new Date().toString();
// 当数据变化后,重新获取数据信息
if (event.getType() == EventType.NodeDataChanged) {
// 获取更改后的数据,进行相应的业务处理
//继续调用监听 curatorFramework.getData().usingWatcher(pathWatcher)
.forPath(path);
System.out.println("处理数据");
String value = readPath(path);
String uuid = StringUtil.getUUID2();
logger.info("{},数据开始更新", uuid);
AdShowService adShowService = SpringContextHolder
.getBean("adShowService");
boolean flag = adShowService .flushData(CommonConstants.COMMON_FLUSH);
logger.info("{},数据更新完成结果为:{}", uuid, flag);
}else if(event.getType() == EventType.NodeDeleted){
Stat stat = curatorFramework.checkExists().forPath(path);
if (stat == null) { // 没有节点创建节点 curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, date.getBytes());
}
//继续调用监听 curatorFramework.getData().usingWatcher(pathWatcher)
.forPath(path);
}else if(event.getType() == EventType.NodeCreated){
//继续调用监听 curatorFramework.getData().usingWatcher(pathWatcher)
.forPath(path);
}
}
};
}
需要注意的点(很关键):
(1)、watcherPath()方法只在Client启动的时候才会调用(因为它被写在call里,而call又是被ZKBase的init方法调用,init方法在springMVC的配置文件中有调用到)
(2)、所有的节点都是CreateMode.EPHEMERAL类型的,也就是说某Client在
“curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, date.getBytes())”
这段代码中所创建的节点,在改Client重启的过程中,或者是出现异常的过程中,节点是会被删掉的。关于节点的介绍,请参考:
zk客户端和zk服务器间主要可能存在下面几种异常情况
短暂失去连接:此时客户端检测到与服务端的连接已经断开,但是服务端维护的客户端session尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于session没有过期,zookeeper能够保证连接恢复后保持正常服务。
失去连接时间很长:此时服务器相对于客户端的session已经过期了,与先前session相关的watcher和ephemeral的路径和数据都会消失;当Curator重新创建了与zk的连接后,会获取到session expired异常,Curator会销毁先前的session,并且会创建一个新的session,需要注意的是,与之前session相关的watcher和ephemeral类型的路径和数据在新的session中也不会存在,需要开发者在CuratorFramework.getConnectionStateListenable().addListener()中添加状态监听事件,对ConnectionState.LOST事件进行监听,当session过期后,使得之前的session状态得以恢复。对于ephemeral类型,在客户端应该保持数据的状态,以便及时恢复。
客户端重新启动:不论先前的zk session是否已经过期,都需要重新创建临时节点、添加数据和watch事件,先前的session也会在稍后的一段时间内过期。
Zk服务器重新启动:由于zk将session信息存放到了硬盘上,因此重启后,先前未过期的session仍然存在,在zk服务器启动后,客户端与zk服务器创建新的连接,并使用先前的session,与1相同。
需要注意的是,当session过期了,在session过期期间另外的客户端修改了zk的值,那么这个修改在客户端重新连接到zk上时,zk客户端不会接收到这个修改的watch事件(尽管添加了watch),如果需要严格的watch逻辑,就需要在curator的状态监控中添加逻辑。
(3) watcher仅仅是一次性的,zookeeper通知了watcher事件后,就会将这个watcher从session中删除,因此,如果想继续监控,就要添加新的watche。因此,在实现CuratorWatcher监听器的过程中,不管监听到了什么事件,都需要继续添加:
“curatorFramework.getData().usingWatcher(pathWatcher)
.forPath(path);”
继续对节点进行跟踪,否则改Client节点只要监听到一次事件之后就不再对该节点进行监听。
在编写好具体的更改节点数据方法之后(如上述例子中的updateData方法),不论哪一个Client掉用了这个方法,那么所有的Client都会监听到这个节点数据的变化,然后会响应:
event.getType() == EventType.NodeDataChanged
下的数据同步代码。
更多推荐
所有评论(0)