Zookeeper实现服务上下线监控服务列表
package com.billstudy.zookeeper;import java.util.ArrayList;import java.util.List;import java.util.concurrent.TimeUnit;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedE
·
package com.billstudy.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 注册服务,并且自动监听可用服务列表
* @author Bill
* @since V1.0 2015年6月24日 - 上午10:03:24
*/
public class AppServer {
private ZooKeeper zk = null;
// 树前缀
private static final String zkParentPrefix = "/appserver";
private static final String zkChildPrefix = "/app";
// 维护可用列表
private final ArrayList<String> availableServerList = new ArrayList<String>();
public void connectZk(String address){
try {
zk = new ZooKeeper("hadoop-server05:2181,hadoop-server06:2181,hadoop-server07:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
if (event.getType() == EventType.NodeChildrenChanged
&& event.getPath().startsWith(zkParentPrefix)
) {
flushServerList();
}
}
});
// 如果根节点没有,则先创建
if (zk.exists(zkParentPrefix, true) == null) {
zk.create(zkParentPrefix, "AppServer root dir ".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("znode :" + zkParentPrefix + "is not exists , create successful !");
}
// 根据当前address创建临时连续子节点,这样多个不同的app child节点不会重复。 zk会自己维护序列
String childPath = zk.create(zkParentPrefix + zkChildPrefix, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("create " + childPath + " successful, address is :" + address);
flushServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 收到子节点更新后,刷新当前可用服务列表
* @author Bill
* @since V1.0 2015年6月24日 - 上午10:08:19
*/
protected void flushServerList() {
availableServerList.clear();
try {
List<String> children = zk.getChildren(zkParentPrefix, true);
for (String child : children) {
byte[] data = zk.getData(zkParentPrefix + "/" + child, true,new Stat());
availableServerList.add(new String(data,"UTF-8"));
}
System.out.println("current available server list:" + availableServerList);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 此处可以用来处理业务逻辑,目前让主线程挂起
* @author Bill
* @since V1.0 2015年6月24日 - 上午10:24:00
*/
public void handle(){
try {
// System.out.println("handle ...");
TimeUnit.HOURS.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("The program first argument must be address !");
System.exit(1);
}
AppServer appServer = new AppServer();
appServer.connectZk(args[0]);
appServer.handle();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)