在一套分布式的online services系统中,各service通常不会放在一台服务器上,而是通过Zookeeper这样的东西,将自己的service信息注册到上面,service的使用者通过Zookeeper来发现各service的信息,从而可以将request发送到不同的service上去处理。

                  

        如上图所示,两个Service Provider 1和2分别在192.168.1.5和192.168.1.6这两台服务器的2688端口上提供服务,服务的地址和端口注册到了Zookeeper中。Service User通过查询Zookeeper,可得知这些服务的信息。通常,Service User与Service Provider之间的通信,是通过connection pool实现的,因为Service User不可能假定在第一次查询到所有Service Provider的信息之后,它们就是一直存活的,假如某个Service Provider因为程序问题死掉了,向它发送request只会造成大量的失败结果,因此通常会实现一个connection pool来保证实时更新节点的信息,当有一个Service Provider从Zookeeper上消失之后,从connection pool中取出的connection总是可用的(即:总能通过它把request发送到一个有效的Service Provider那里)。

      代码如下:

    

package com.lyh.zk;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;

import com.lyh.common.ZKCommon;

/**
 * ZooKeeper监听
 * @author liuyuehu
 */
public class MyZooKeeper {
	
	Logger logger = Logger.getLogger(MyZooKeeper.class);
	
	protected CountDownLatch countDownLatch = new CountDownLatch(1);
	public static ZooKeeper zooKeeper = null;

	
	private Object waiter = new Object();
	
	
	/**
	 * 监控所有被触发的事件
	 */
	public void process(WatchedEvent event) {
		logger.info("收到事件通知:" + event.getState() );
		if(event.getState()==KeeperState.SyncConnected){  
            countDownLatch.countDown();  
        }
		
	}
	
	
	
	/**
     * <p>连接Zookeeper</p>
     * 启动zk服务 本实例基于自动重连策略,如果zk连接没有建立成功或者在运行时断开,将会自动重连. 
     * @param connectString  Zookeeper服务地址
     * @param sessionTimeout Zookeeper连接超时时间
     */
	public void connect(){     
        try {
        	synchronized (waiter) {
        		SessionWatcher watcher = new SessionWatcher();
        		if(zooKeeper == null){
            		// ZK客户端允许我们将ZK服务器的所有地址都配置在这里
        			zooKeeper = new ZooKeeper(ZKCommon.connectAddress,ZKCommon.sessionTimeout,watcher);     
        			// 使用CountDownLatch.await()的线程(当前线程)阻塞直到所有其它拥有
        			//CountDownLatch的线程执行完毕(countDown()结果为0)
        			countDownLatch.await();
            	}
			}
		} catch (IOException e) {
			logger.error("连接创建失败,发生 InterruptedException , e " + e.getMessage(), e);
		} catch (InterruptedException e) {
			logger.error( "连接创建失败,发生 IOException , e " + e.getMessage(), e );
		}
        waiter.notifyAll();
	}   
	
	/**
	 * 关闭连接
	 */
	public void close(){     
        try {
        	synchronized (waiter) {  
	        	if(zooKeeper != null){
	        		zooKeeper.close();
	        	}
	        	waiter.notifyAll();  
        	}
		} catch (InterruptedException e) {
			logger.error("release connection error ," + e.getMessage() ,e);
		}     
    } 
	
	class SessionWatcher implements Watcher {

		public void process(WatchedEvent event) {
			// 如果是“数据变更”事件  
            if (event.getType() != Event.EventType.None) {  
                return;  
            } 
            synchronized (waiter){
            	switch(event.getState()) {
            		case SyncConnected:
            			//zk连接建立成功,或者重连成功 
            			waiter.notifyAll();
            			logger.info("Connected...");
            			break;
            		case Expired:
            			// session过期,这是个非常严重的问题,有可能client端出现了问题,也有可能zk环境故障  
                        // 此处仅仅是重新实例化zk client  
            			logger.info("Expired(重连)...");
            			connect();
            			break;
            		case Disconnected:
            			logger.info("链接断开,或session迁移....");
            			break;
            		case AuthFailed:
            			close();  
                        throw new RuntimeException("ZK Connection auth failed...");
                    default:
                    	break;
            	}
            }
		}
	}
}


参考地址网页:http://iwinit.iteye.com/blog/1844177

http://www.codelast.com/%e5%8e%9f%e5%88%9b-zookeeper%e6%b3%a8%e5%86%8c%e8%8a%82%e7%82%b9%e7%9a%84%e6%8e%89%e7%ba%bf%e8%87%aa%e5%8a%a8%e9%87%8d%e6%96%b0%e6%b3%a8%e5%86%8c%e5%8f%8a%e6%b5%8b%e8%af%95%e6%96%b9%e6%b3%95/


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐