在正式的生产环境中,为了应付高并发、高负载的业务需求,集群部署已然成为了家常便饭。这样我们就需要监控服务器集群中所有服务器的运行状态和服务器当前的负载情况,根据服务器的运行状态和负载情况将可用且负载较低的服务器信息放回给服务器的调用方使用,以达到集群负载均衡的目的,如果服务器宕机,则需迅速响应并将信息反馈给调用方,通知调用方切换可用的服务器,同时启动相应的报警机制。
本例子就是利用Zookeeper的文件目录特性和事件通知机制,来实现服务器集群的监控的一个简单例子。在实际的开发中,可借鉴此例子的思想来实现自己的集群负载均衡管理器。
实现思路

  • 在Zookeeper里面创建名为/sgroup的永久节点,表示是整个服务器集群的根节点
  • 每一个服务器节点启动时,在/sgroup的节点下创建自己的EPHEMERAL节点,表示此服务器在运行状态,并每隔10秒上传自己的负载信息,存为此节点的数据。EPHEMERAL有个重要的特性,当创建此类节点的客户端与Zookeeper服务器的连接关闭时,此节点自动删除,可利用此特性来监控服务器的上下线状态。
  • 监控服务器一直不断的监视集群节点/sgroup下子节点的状态,达到监控相应的服务器的运行状态和负载情况的目录,如有异常,可迅速地启动报警机制。

首先我们来创建自己的服务器,代码如下:

package com.laizs.test.zookeeper;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class AppServer {
    /**
     * zookeeper中集群服务器的总节点
     */
    private String groupNode = "sgroup";
    private ZooKeeper zooKeeper;
    /**
     * 服务器创建的节点的路径
     */
    private String serverNodePath="";
    /**
     * 当前服务器的负载
     */
    private int loadBalance=0;
    /**
     * 连接zookeeper服务器,并在集群总结点下创建EPHEMERAL类型的子节点,把服务器名称存入子节点的数据
     * @param zookeeperServerHost
     * @param serverName
     * @throws IOException
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void connectZookeeper(String zookeeperServerHost, String serverName)
            throws IOException, KeeperException, InterruptedException {
         zooKeeper = new ZooKeeper(zookeeperServerHost, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 啥都不做

            }
        });
        // 先判断sgroup节点是否存在
        String groupNodePath = "/" + groupNode;
        Stat stat = zooKeeper.exists(groupNodePath, false);
        if (null == stat) {
            zooKeeper.create(groupNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        // 将server的地址数据关联到新创建的子节点上 
        serverNodePath=zooKeeper.create(groupNodePath+"/"+serverName, 
                String.valueOf(loadBalance).getBytes("utf-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("创建了server节点:"+serverNodePath);
        //定时上传服务器的负载
        uploadBarance();
    }
    /**
     * 关闭于zookeeper服务器的连接
     * @throws InterruptedException
     */
    public void closeZookeeper() throws InterruptedException{
        if(null!=zooKeeper){
            zooKeeper.close();
        }
    }
    /**
     * 每隔10秒上传一次负载
     * 
     */
    private void uploadBarance(){
        new Thread(new  Runnable() {
            public void run() {
                while(true){
                    try {
                        Thread.sleep(10000);
                        loadBalance=new Random().nextInt(100000);
                        String l=String.valueOf(loadBalance);
                        System.out.println("服务器上传负载:"+loadBalance);
                        zooKeeper.setData(serverNodePath, l.getBytes("utf-8"), -1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }


            }
        }).start();
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
          System.out.print("请输入服务器名称(如server001):");
          Scanner scan = new Scanner(System.in);
          String serverName = scan.nextLine();
          AppServer appServer=new AppServer();
          appServer.connectZookeeper("192.168.0.5:2181", serverName);
          while(true){
              System.out.println("请输入您的操作指令(exit 退出系统):");
              String command = scan.nextLine();
              if("exit".equals(command)){
                  System.out.println("服务器关闭中....");
                  appServer.zooKeeper.close();
                  System.exit(0);
                  break;
              }else{
                  continue;
              }
          }
    }
}

然后,我们再完成监控服务器的代码:

package com.laizs.test.zookeeper;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 服务器集群监听管理器
 * @author laizs
 * @time 2016年3月17日上午11:02:01
 * @file AppServerMonitor.java
 */
public class AppServerMonitor implements Watcher{
    private String groupNode = "sgroup";
    private ZooKeeper zk;
    private Stat stat = new Stat();
    //服务器信息,包含了服务器名称、负载两个信息,使用map存储,key是服务器节点path,value是服务器信息对象
    private volatile Map<String,ServerInfo> serverList=new TreeMap<String, ServerInfo>();

    /**
     * 连接zookeeper服务器
     * 
     * @throws IOException
     * @throws InterruptedException 
     * @throws KeeperException 
     */
    public void connectZookeeper() throws IOException, KeeperException, InterruptedException {
        zk = new ZooKeeper("192.168.0.5:2181", 5000, this);
        //查看要检测的服务器集群的根节点是否存在,如果不存在,则创建
        if(null==zk.exists("/"+groupNode, false)){
            zk.create("/"+groupNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        updateServerList();
    }
    /**
     * 更新服务器列表信息
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    private void updateServerList() throws KeeperException, InterruptedException, UnsupportedEncodingException {
        Map<String,ServerInfo> newServerList=new TreeMap<String,ServerInfo>();
        // 获取并监听groupNode的子节点变化  
        // watch参数为true, 表示监听子节点变化事件.   
        // 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册  
        List<String> subList=zk.getChildren("/"+groupNode,true);
        for(String subNode:subList){
            ServerInfo serverInfo=new ServerInfo();
            serverInfo.setPath("/"+groupNode+"/"+subNode);
            serverInfo.setName(subNode);
            //获取每个子节点下关联的服务器负载的信息
            byte[] data=zk.getData(serverInfo.getPath(), true, stat);
            String loadBalance=new String(data,"utf-8");
            serverInfo.setLoadBalance(loadBalance);
            newServerList.put(serverInfo.getPath(), serverInfo);

        }
        // 替换server列表  
        serverList=newServerList;
        System.out.println("$$$更新了服务器列表:"+serverList);
    }
    /**
     * 更新服务器节点的负载数据
     * @param serverNodePath
     * @throws InterruptedException 
     * @throws KeeperException 
     * @throws UnsupportedEncodingException 
     */
    private void updateServerLoadBalance(String serverNodePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{
        ServerInfo serverInfo=serverList.get(serverNodePath);
        if(null!=serverInfo){
            //获取每个子节点下关联的服务器负载的信息
            byte[] data=zk.getData(serverInfo.getPath(), true, stat);
            String loadBalance=new String(data,"utf-8");
            serverInfo.setLoadBalance(loadBalance);
            serverList.put(serverInfo.getPath(), serverInfo);
            System.out.println("@@@更新了服务器的负载:"+serverInfo);
            System.out.println("------");
            System.out.println("###更新服务器负载后,服务器列表信息:"+serverList);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("监听到zookeeper事件-----eventType:"+event.getType()+",path:"+event.getPath());
        //集群总节点的子节点变化触发的事件
        if (event.getType() == EventType.NodeChildrenChanged && 
                event.getPath().equals("/" + groupNode)) {
             //如果发生了"/sgroup"节点下的子节点变化事件, 更新server列表, 并重新注册监听  
            try {
                updateServerList();
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
        if (event.getType() == EventType.NodeDataChanged && 
                event.getPath().startsWith("/" + groupNode)) {
             //如果发生了服务器节点数据变化事件, 更新server列表, 并重新注册监听  
            try {
                updateServerLoadBalance(event.getPath());
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    /**
     * client的工作逻辑写在这个方法中 
     * 此处不做任何处理, 只让client sleep 
     * @throws InterruptedException 
     */
    public void handle() throws InterruptedException{
        Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        AppServerMonitor ac=new AppServerMonitor();
        ac.connectZookeeper();
        ac.handle();
    }
    /**
     * 内部类,服务器信息
     * @author Administrator
     *
     */
    class ServerInfo{
        //服务节点在zookeeper上的路径
        private String path;
        //服务器名称
        private String name;
        //服务器负载量
        private String loadBalance;

        public String getPath() {
            return path;
        }
        public void setPath(String path) {
            this.path = path;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getLoadBalance() {
            return loadBalance;
        }
        public void setLoadBalance(String loadBalance) {
            this.loadBalance = loadBalance;
        }
        @Override
        public String toString() {
            return " [服务器节点路径=" + path + ", 服务器名称=" + name + ", 服务器负载=" + loadBalance + "]";
        }




    }

}

代码搞定,我们运行AppServerMonitor代表我们的监控服务器;然后分别启动两次AppServer,表示运行了两个服务器,然后再关闭其中一个的运行。
AppServerMonitor控制台输出的信息如下:

INFO : (ClientCnxn.java:1235)    Session establishment complete on server 192.168.0.5/192.168.0.5:2181, sessionid = 0x153a16c686e000b, negotiated timeout = 5000
监听到zookeeper事件-----eventType:None,path:null
$$$更新了服务器列表:{}
监听到zookeeper事件-----eventType:NodeChildrenChanged,path:/sgroup
$$$更新了服务器列表:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDataChanged,path:/sgroup/server001
@@@更新了服务器的负载: [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=64581]
------
###更新服务器负载后,服务器列表信息:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=64581]}
监听到zookeeper事件-----eventType:NodeChildrenChanged,path:/sgroup
$$$更新了服务器列表:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=64581], /sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDataChanged,path:/sgroup/server001
@@@更新了服务器的负载: [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=38594]
------
###更新服务器负载后,服务器列表信息:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=38594], /sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDeleted,path:/sgroup/server001
监听到zookeeper事件-----eventType:NodeChildrenChanged,path:/sgroup
$$$更新了服务器列表:{/sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDataChanged,path:/sgroup/server002
@@@更新了服务器的负载: [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=43131]
------
###更新服务器负载后,服务器列表信息:{/sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=43131]}

由此可见,AppServerMonitor能及时地观察到集群服务器的状态的变化。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐