本文介绍如何连接zookeeper集群、对zookeeper数据的crud、以及循环监听器的使用

引入pom依赖

引入的zookeeper版本依赖应该与安装的zookeeper版本一致。注,需要注释掉type标签

    <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.9</version>
      <!--<type>pom</type>-->
    </dependency>

CRUD测试

针对crud命令做测试的几个方法

package com.mym.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZookeeperTest {

    ZooKeeper zkClient = null;

    String createNodeName = "/apiTest10000000007"; //记录创建的节点名称

    /**
     * 建立连接
     * @throws IOException
     */
    @Before
    public void connect() throws IOException {
        if(zkClient == null){
            zkClient = new ZooKeeper("192.168.31.201:2181,192.168.31.202:2181,192.168.31.203:2181", 1000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("success to connect zk cluster!");
                }
            });
        }
    }

    /**
     * 测试创建节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testCreate() throws KeeperException, InterruptedException {
        createNodeName = zkClient.create("/apiTest1", "this is api test1 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println("创建节点的返回值是:"+createNodeName);
    }

    /**
     * ls命令
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testLs() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("获得监听事件,path:" + watchedEvent.getPath() + ";state" + watchedEvent.getState() + ";type:" + watchedEvent.getType());
            }
        });

        for (int i = 0; i < children.size(); i++) {
            System.out.println("ls / 数据:"+children.get(i));
        }
    }

    /**
     * set命令
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testSet() throws KeeperException, InterruptedException {
        Stat stat = zkClient.setData(createNodeName, ("this is update data! "+System.currentTimeMillis()).getBytes(), -1);//-1表示让系统维护version
        System.out.println("set 返回数据::"+stat);
    }

    /**
     * del命令
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testDel() throws KeeperException, InterruptedException {
        zkClient.delete(createNodeName,-1);
        System.out.println("success to del "+createNodeName+" Znode!");
    }


    /**
     * get
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testGet() throws KeeperException, InterruptedException {
        byte[] data = zkClient.getData(createNodeName, false, null);
        System.out.println("节点"+createNodeName+"的数据是:"+new String(data));
    }

    /**
     * 测试watch
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testWatch() throws KeeperException, InterruptedException {

       //先重置zkClient的watch对象(也可以在实例化zkClient时指定)
        zkClient.register(watcher);
        //进行监听
        byte[] data = zkClient.getData(createNodeName, true, null);
        System.out.println("获得节点"+createNodeName+"的数据是:"+new String(data));

        //第一次进行修改,触发watch
        this.testSet();

        //第二次进行修改,触发watch
        this.testSet();
    }

    /**
     * 关闭连接
     * @throws InterruptedException
     */
    @After
    public void close() throws InterruptedException {
        zkClient.close();
    }


    /**定义watch对象*/
    private Watcher watcher = new Watcher() {
        public int watchCount = 0;  //记录监听次数

        @Override
        public void process(WatchedEvent watchedEvent) {
            System.out.println("获得监听事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType());

            //循环重复监听
            try {
                zkClient.exists(watchedEvent.getPath(), true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            watchCount++;
            System.out.println("第 "+watchCount+" 次监听到!");
        }
    };

}


测试结果如下

  • 连接上集群时
success to connect zk cluster!
  • ls
    类命令输入:ls / 对应方法testLs()
ls / 数据:watchTest
ls / 数据:shunxu20000000001
ls / 数据:shunxu10000000000
ls / 数据:zookeeper
ls / 数据:a
  • create
    类命令输入:create /apiTest1 'this is api test1 data' 对应方法testCreate
创建节点的返回值是:/apiTest10000000007
  • get
    类命令输入:get /名称 对应方法testGet()
节点/apiTest10000000007的数据是:this is api test1 data
  • set
    类命令输入:set /名称 数据 对应方法testSet()
set 返回数据::4294967336,4294967345,1526852099431,1526852869041,1,0,0,0,20,0,4294967336

注,这些返回的数据就是节点的所有属性,可以进去看下Stat类的定义:

public class Stat implements Record {
    private long czxid;
    private long mzxid;
    private long ctime;
    private long mtime;
    private int version;
    private int cversion;
    private int aversion;
    private long ephemeralOwner;
    private int dataLength;
    private int numChildren;
    private long pzxid;

    ....
  • delete
    类命令输入:delete /名称 对应方法testDel()
success to del /apiTest10000000007 Znode!

测试Watch(循环监听器)

逻辑就是对某个路径在本次监听完后主动用简单(exist()方法)的调用再次注册监听

具体方法:

(注,这只是一个测试类,故直接把Watch匿名类对象直接实例化,不太友好,实际使用时应通过实现Watch接口,创造一个Watch实现来,从而获得watcher对象来使用)

   /**
     * 测试watch
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testWatch() throws KeeperException, InterruptedException {

       //先重置zkClient的watch对象(也可以在实例化zkClient时指定)
        zkClient.register(watcher);
        //进行监听
        byte[] data = zkClient.getData(createNodeName, true, null);
        System.out.println("获得节点"+createNodeName+"的数据是:"+new String(data));

        //第一次进行修改,触发watch
        this.testSet();

        //第二次进行修改,触发watch
        this.testSet();
    }


    /**定义watch对象*/
    private Watcher watcher = new Watcher() {
        public int watchCount = 0;  //记录监听次数

        @Override
        public void process(WatchedEvent watchedEvent) {
            System.out.println("获得监听事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType());

            //循环重复监听
            try {
                zkClient.exists(watchedEvent.getPath(), true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            watchCount++;
            System.out.println("第 "+watchCount+" 次监听到!");
        }
    };

测试结果:

获得监听事件,path:null;state:SyncConnected;typeNone
获得节点/apiTest10000000007的数据是:this is update data! 1532258475593
获得监听事件,path:/apiTest10000000007;state:SyncConnected;typeNodeDataChanged
set 返回数据::4294967352,4294967398,1526853419126,1526855521206,21,0,0,0,34,0,42949673521 次监听到!
获得监听事件,path:/apiTest10000000007;state:SyncConnected;typeNodeDataChanged
set 返回数据::4294967352,4294967399,1526853419126,1526855521216,22,0,0,0,34,0,42949673522 次监听到!

注:第一个 获得监听事件 是客户端连接服务器的自动监听。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐