package com.billstudy.zookeeper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 注册服务,并且自动监听可用服务列表
 * @author Bill
 * @since V1.0 2015年6月24日 - 上午10:03:24
 */
public class AppServer {

	private ZooKeeper zk = null;
	
	// 树前缀
	private static final String zkParentPrefix = "/appserver";

	private static final String zkChildPrefix = "/app"; 
	
	// 维护可用列表
	private final ArrayList<String> availableServerList = new ArrayList<String>();
	
	public void connectZk(String address){
		try {
			zk = new ZooKeeper("hadoop-server05:2181,hadoop-server06:2181,hadoop-server07:2181", 5000, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					
					System.out.println(event.toString());
					
					if (event.getType() == EventType.NodeChildrenChanged
						&& event.getPath().startsWith(zkParentPrefix)
							) {
						flushServerList();
					}
				}
			});
			
			// 如果根节点没有,则先创建
			if (zk.exists(zkParentPrefix, true) == null) {
				zk.create(zkParentPrefix, "AppServer root dir ".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				System.out.println("znode :" + zkParentPrefix + "is not exists , create successful !");
			}
			
			// 根据当前address创建临时连续子节点,这样多个不同的app child节点不会重复。 zk会自己维护序列
			String childPath = zk.create(zkParentPrefix + zkChildPrefix, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			
			System.out.println("create " + childPath + " successful, address is :" + address);
			
			
			flushServerList();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}

	/**
	 * 收到子节点更新后,刷新当前可用服务列表
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 上午10:08:19
	 */
	protected void flushServerList() {
		availableServerList.clear();
		try {
			List<String> children = zk.getChildren(zkParentPrefix, true);
			for (String child : children) {
				byte[] data = zk.getData(zkParentPrefix + "/" + child, true,new Stat());
				availableServerList.add(new String(data,"UTF-8"));
			}
			System.out.println("current available server list:" + availableServerList);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 此处可以用来处理业务逻辑,目前让主线程挂起
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 上午10:24:00
	 */
	public void handle(){
		try {
			// System.out.println("handle ...");
			TimeUnit.HOURS.sleep(Long.MAX_VALUE);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		
		if (args.length != 1) {
			System.err.println("The program first argument must be address !");
			System.exit(1);
		}
		AppServer appServer = new AppServer();
		appServer.connectZk(args[0]);
		appServer.handle();
	}
	
}

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐