项目终于不忙了.闲来无事,想起上次面试被人问了一个zk把我给问住了.看来要好好了解一下zk了.于是开始了zk的学习之路.


首先带大家搭建一下zk集群环境,这个很简单,我就不说那么详细了,


首先准备三台机器,来安装zk集群  要有root权限,因为要改hosts


下载zk安装包上传到liunx服务器  我用的zookeeper.3.4.10.tar.gz   你们用其他版本也行


解压  tar -zxvf  .....


个人习惯重命名  为zookeeper 在当前用户的家目录  


然后cd到conf这个目录下面  


重命名zoo.cfg   然后打开这个文件  重点两点  一个是datadir  我是直接在zookeeper下创建了data目录  所以我的配置是  datadir=/home/gang/zookeeper/data


然后配置 server.0=uamserver:2888:3888  
server.1=uamserver1:2888:3888  
server.2=uamserver2:2888:3888  

uamserver 和uamserver1 ,uamserver2分别对应三个机器.三个机器都配置一样  


然后在data目录下创建myid文件 内容分别是0  1   2  这三个数server0对应的myid为0  切换到root用户修改hosts文件


然后启动切换到zookeeper/bin目录下./zkServer.sh start  启动三台后 验证是否成功  ./zkServer.sh status  


这时候 

[was@uamserver bin]$ ./zkServer.sh  status
ZooKeeper JMX enabled by default
Using config: /home/was/zookeeper1/bin/../conf/zoo.cfg
Mode: follower

[was@uamserver bin]$ ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/was/zookeeper3/bin/../conf/zoo.cfg
Mode: leader

这时候说明集群已经搭建成功


现在我们通过zk原生api来操作下zk,java客户端


public class Zookeeper {

   private static CountDownLatch countDownLatch=new CountDownLatch(1);

   private final static String zk_url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";

   private final static int time_out=5000;

   public static void main(String[] arges) throws IOException, InterruptedException {
       //初始化zk
       ZooKeeper zooKeeper=new ZooKeeper(zk_url, time_out, new Watcher() {
           public void process(WatchedEvent watchedEvent) {
               Event.KeeperState state = watchedEvent.getState();
               Event.EventType type = watchedEvent.getType();
               if(Event.KeeperState.SyncConnected==state){
                   if(Event.EventType.None==type){
                       //调用此方法测计数减一
                       countDownLatch.countDown();
                   }
               }
           }
       });
       //阻碍当前线程进行,除非计数归零
       countDownLatch.await();
       try {
           //创建持久化节点
           zooKeeper.create("/gang","你好".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
           //获取节点数据
           byte[] data = zooKeeper.getData("/gang", false, null);
           System.out.println(new String(data));
           //修改节点数据
           zooKeeper.setData("/gang","吕金刚".getBytes(),0);
           //删除节点数据
           zooKeeper.delete("/gang",-1);
           //创建临时节点 异步创建
           zooKeeper.create("/jingang", "临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
               @Override
               public void processResult(int i, String s, Object o, String s1) {
                    System.out.println(o);
                    System.out.println(i);
                    System.out.println(s1);
                    System.out.println(s);
               }
           },"a");
           //获取临时节点数据
           byte[] jingangs = zooKeeper.getData("/jingang", false, null);
           System.out.println(new String(jingangs));
           //验证节点是否存在
           Stat exists = zooKeeper.exists("/jingang", false);
           System.out.println(exists);
       } catch (KeeperException e) {
           e.printStackTrace();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       zooKeeper.close();
   }
}

咱们把他简单的封装一下


public class ZookWatcher implements Watcher {
    //默认从0开始,原子性
    private AtomicInteger count = new AtomicInteger();

    private static ZooKeeper zk;

    private static final int time_out=10000;

    private static CountDownLatch countDownLatch=new CountDownLatch(1);

    private static final String  url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";

    /**
     * 重写process用来监控watcher事件
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        Event.EventType type = watchedEvent.getType();
        Event.KeeperState state = watchedEvent.getState();
        //是否连接成功
        if(Event.KeeperState.SyncConnected==state){
            if(Event.EventType.None==type){
                System.out.println("成功连接到zk服务器");
                //zk连接成功将计数器清零
                countDownLatch.countDown();
            }
            //如果是创建节点事件
            else if(Event.EventType.NodeCreated==type){
                System.out.println("创建节点成功");
            }
            //如果节点中的数据被改变
            else if(Event.EventType.NodeDataChanged==type){
                System.out.println("节点数据发生变化");
            }
            //如果子节点的数据发生改变
            else if(Event.EventType.NodeChildrenChanged==type){
                System.out.println("子节点数据发生改变");
            }
            //节点被删除事件
            else if(Event.EventType.NodeDeleted==type){
                System.out.println("节点被删除");
            }else;

        }else if(Event.KeeperState.Disconnected==state){
            //连接失败直接退出
            return;
        }
        try {
            //阻塞当前线程,直到zk初始化完毕.
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取zk连接
     * @return
     * @throws InterruptedException
     */
    public ZooKeeper createConntection() throws InterruptedException {
        try {
            zk=new ZooKeeper(url, time_out, this);
            //阻塞当前线程,当创好连接以后再继续执行当前线程
            countDownLatch.await();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return zk;
    }

    /**
     * 关闭zk连接
     * @param zooKeeper
     */
    public void close(ZooKeeper zooKeeper){
        try {
            zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 验证此节点是否存在
     * @param path
     * @param watcher
     * @return
     */
    public Stat extis(String path,boolean watcher){
        try {
            Stat exists = zk.exists(path, watcher);
            return exists;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 创建节点
     * @param path   节点路径
     * @param data   数据
     * @return
     */
    public String createNode(String path,String data,boolean watcher){
        String result=null;
        try {
            Stat exists = zk.exists(path, watcher);
            if(null==exists){
                result= zk.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }else{
                System.out.println("节点已经存在");
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * 修改节点的数据
     * @param path      节点路径
     * @param data      数据
     * @param version   版本号
     * @return
     */
    public Stat setNodeData(String path,String data,int version,boolean watcher){
        Stat stat =null;
        try {
            Stat exists = zk.exists(path, watcher);
            stat = zk.setData(path, data.getBytes(), version);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return stat;
    }

    /**
     * 删除节点
     * @param path    节点的路径
     * @param version 版本号,当输入-1时候是指所有数据
     */
    public void deleteNode(String path,int version,boolean watcher){
        try {
            Stat exists = zk.exists(path, watcher);
            zk.delete(path,version);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     *  获取节点数据
     * @param path
     * @return
     */
    public String getNodeData(String path,boolean watcher){
        byte[] data=null;
        try {
            data = zk.getData(path, watcher, null);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new String(data);
    }
}

下篇重点介绍一下zk的特性,以及watcher和watcher事件








Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐