前提:

zkClient是对zookeeper原生API操作的一个封装,简化客户端对zk的操作。同时zkClient内部实现了诸如Session超时重连、Watcher反复注册(watcher只能生效一次,所以如果使用原生zookeeper api操作,需要反复注册watcher),说白了就是开发人员轻松了很多。

一顿操作猛如虎,直奔主题。

 

  • ZKClient maven坐标
<dependency>
    <groupId>com.github.sgroschupf</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.1</version>
</dependency>

 

  • 初始化ZKClient对象

// 建立zk链接,同步方法,原生的zookeeper客户端连接初始化时是一个异步操作(Zookeeper zk = new Zookeeper())。

参数说明:

192.168.1.28:2181:服务器地址

50000:连接超时时间,在此范围内如果还未成功链接,则抛异常

ZkClient zkClient = new ZkClient("192.168.1.28:2181",50000);

 

  • ZKClient源码

只列出了相关的构造函数,其他的无关代码没列出

参数说明:

zkServers:zookeeper服务信息,ip:port

connectionTimeout:连接超时时间,连接时超过设置的时间会抛异常

sessionTimeout:会话的超时时间,一个会话断了可以在这个时间内进行重连

zkSerializer:对内容的序列化和反序列化实现。可自定义,如hession等;

zkConnection:zookeeper的连接对象,默认sessionTimeout时间为30000ms


/**
 * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper
 */
public class ZkClient implements Watcher {

    private final static Logger LOG = Logger.getLogger(ZkClient.class);

    public ZkClient(String serverstring) {
        this(serverstring, Integer.MAX_VALUE);
    }

    public ZkClient(String zkServers, int connectionTimeout) {
        this(new ZkConnection(zkServers), connectionTimeout);
    }

    public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
        this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout);
    }

    public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
        this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
    }

    public ZkClient(IZkConnection connection) {
        this(connection, Integer.MAX_VALUE);
    }

    public ZkClient(IZkConnection connection, int connectionTimeout) {
        this(connection, connectionTimeout, new SerializableSerializer());
    }

    public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) {
        _connection = zkConnection;
        _zkSerializer = zkSerializer;
        connect(connectionTimeout, this);
    }
}

 

  • 创建节点

特别注意:

 

1. createParents可以递归创建节点(public void createPersistent(String path, boolean createParents))

2. 无需注册watcher(前面也说了,ZKClient帮我们做好了)

3. 节点内容可以传任意类型数据

4. 可以自定义内容的序列化和反序列化

5. 在没指定zkSerializer时,默认使用java自动的序列化和反序列化

package cn.lxm.attendance.controller;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkConnection;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.CreateMode;

import java.util.List;

/**
 * @author lxm
 * @version 1.1
 * @date 2019/12/10 0010 16:01
 */
public class ZkClientTest {


    private static final String PATH = "/lxm/test";

    // 建立zk链接,同步方法。192.168.1.28:2181:服务器地址,50000:连接超时时间,在此范围内如果还未成功链接,则抛异常
    static ZkClient zkClient = new ZkClient("192.168.1.28:2181",50000);

    public static void main(String[] args) throws InterruptedException {
        createNode();
        List<String> children = zkClient.getChildren("/");
        System.out.println(children);
    }

    // 创建node
    public static void createNode(){

        // 创建一个永久的node节点a
        zkClient.createPersistent("/a");

        // 创建一个永久性node节点b,这种创造方式和zookeeper的原生API很相识,不太一样的是隐藏了watcher注册和内容可以传object对象,zookeeper原生直接传数组
        // 节点类型:CreateMode有四种取值,大家可以自行脑补
        zkClient.create("/b","这里是内容", CreateMode.PERSISTENT);

        // 创建一个永久性node节点c/cc,true代表可以递归创建(如果c不存在,则会先创建c再创建cc节点),默认false(原生zookeeper Api不能跨层级创建,这就是ZKClient优势)
        zkClient.createPersistent("/c/cc",true);

        // 创建永久性node节点d,里面内容为‘内容d’
        zkClient.createPersistent("/d","内容d");

        // 创建一个临时节点f
        zkClient.createEphemeral("/f");

        // 创建一个临时节点g,内容为  ‘内容g’
        zkClient.createEphemeral("/g","内容g");

        // 创建一个  递增序号  临时节点h
        zkClient.createEphemeralSequential("/h","内容h");

        // 创建一个  递增序号  的永久节点i
        zkClient.createPersistentSequential("/i","内容i");
    }
}

 

运行结果:

[lxm, a, b, h0000000036, c, i0000000037, zookeeper, test, d, f, g, dubbo, zk, newnode3, newnode2, mytest110000000016, mytest110000000018, mytest110000000017]

 

  • 删除节点

注意点:

deleteRecursive可以递归删除节点下所有的子节点

// 删除node
public static void deleteNode(){
    // 删除节点c,以及节点c下面所有的子节点
    // 节点c下面,还有子节点cc,deleteRecursive方法可以递归的删除子节点,而zookeeper原生api不允许删除存在子节点的节点
    zkClient.deleteRecursive("/c");

    // 删除节点a
    zkClient.delete("/a");
}

 

  • ZKClient注册监听

好奇的你可能发现了,zkClient是怎么来监听子节点的状态变化的,比如新增子节点,删除子节点操作。你考虑到的,我们的zkClient开发人员也考虑到了,那么就是使用listen来监听节点的变化。

 

subscribeChildChanges:在节点上加上一个listen监听事件,用来监听子节点的变化

subscribeDataChanges:监听当前节点上数据的变化;handleDataChange数据变化, handleDataDeleted节点数据被删除

subscribeStateChanges: 监听zookeeper的状态

 

// 注册一个监听事件,subscribeChildChanges,通过使用listen方式来监听来达到消息广播效果,监听子节点变化
zkClient.subscribeChildChanges("/zz", new IZkChildListener() {
    @Override
    public void handleChildChange(String s, List<String> list) throws Exception {
        System.out.println(">>>进入了handleChildChange");
        System.out.println(">>>s = " + s);
        System.out.println(">>>list = " + list);
    }
});
// 注册一个监听事件,subscribeDataChanges 监听节点上数据的变化
zkClient.subscribeDataChanges("/zz", new IZkDataListener() {
    @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {
        System.out.println(">>> 进入了handleDataChange");
        System.out.println(">>> dataPath = " + dataPath);
        System.out.println(">>> data = " + data);
    }

    @Override
    public void handleDataDeleted(String dataPath) throws Exception {
        System.out.println(">>> 进入了handleDataDeleted");
        System.out.println(">>> dataPath = " + dataPath);
    }
});

// 监听zookeeper状态变化
zkClient.subscribeStateChanges(new IZkStateListener() {
    @Override
    public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
        System.out.println(">>> 进入了subscribeStateChanges");
        System.out.println(">>> state = " + state);
    }

    @Override
    public void handleNewSession() throws Exception {
        System.out.println(">>> 进入了handleNewSession");
    }
});

 

  • 获取子节点

返回PATH下的所有子节点,

zkClient.getChildren(PATH);

 

  • 其他的操作

写数据:zkClient.writeData(PATH,"456");

检查节点是否存在:zkClient.exists(PATH);

 

  • 完整代码
package cn.lxm.attendance.controller;

import org.I0Itec.zkclient.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;

import java.util.List;

/**
 * @author lxm
 * @version 1.1
 * @date 2019/12/10 0010 16:01
 */
public class ZkClientTest {


    private static final String PATH = "/lxm/test";

    // 建立zk链接,同步方法。192.168.1.28:2181:服务器地址,50000:连接超时时间,在此范围内如果还未成功链接,则抛异常
    static ZkClient zkClient = new ZkClient("192.168.1.28:2181",50000);

    public static void main(String[] args) throws InterruptedException {

        // 注册一个监听事件,subscribeChildChanges,通过使用listen方式来监听来达到消息广播效果,监听子节点变化
        zkClient.subscribeChildChanges(PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                System.out.println(">>>进入了handleChildChange");
                System.out.println(">>>s = " + s);
                System.out.println(">>>list = " + list);
            }
        });
        // 注册一个监听事件,subscribeDataChanges 监听节点上数据的变化
        zkClient.subscribeDataChanges(PATH, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println(">>> 进入了handleDataChange");
                System.out.println(">>> dataPath = " + dataPath);
                System.out.println(">>> data = " + data);
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(">>> 进入了handleDataDeleted");
                System.out.println(">>> dataPath = " + dataPath);
            }
        });

        // 监听zookeeper状态变化
        zkClient.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                System.out.println(">>> 进入了subscribeStateChanges");
                System.out.println(">>> state = " + state);
            }

            @Override
            public void handleNewSession() throws Exception {
                System.out.println(">>> 进入了handleNewSession");
            }
        });

        // 获取子节点
        List<String> children = zkClient.getChildren(PATH);

        // 创建节点
        createNode();

        // 删除节点
        deleteNode();


        zkClient.createEphemeral(PATH,"123");
        Thread.sleep(1000);
        // 往节点上写数据
        zkClient.writeData(PATH,"456");

        // 检查节点是否存在
        zkClient.exists(PATH);

        Thread.sleep(Integer.MAX_VALUE);
    }

    // 创建node
    public static void createNode(){

        // 创建一个永久的node节点a
        zkClient.createPersistent("/a");

        // 创建一个永久性node节点b,这种创造方式和zookeeper的原生API很相识,不太一样的是隐藏了watcher注册和内容可以传object对象,zookeeper原生直接传数组
        // 节点类型:CreateMode有四种取值,大家可以自行脑补
        zkClient.create("/b","这里是内容", CreateMode.PERSISTENT);

        // 创建一个永久性node节点c/cc,true代表可以递归创建(如果c不存在,则会先创建c再创建cc节点),默认false(原生zookeeper Api不能跨层级创建,这就是ZKClient优势)
        zkClient.createPersistent("/c/cc",true);

        // 创建永久性node节点d,里面内容为‘内容d’
        zkClient.createPersistent("/d","内容d");

        // 创建一个临时节点f
        zkClient.createEphemeral("/f");

        // 创建一个临时节点g,内容为  ‘内容g’
        zkClient.createEphemeral("/g","内容g");

        // 创建一个  递增序号  临时节点h
        zkClient.createEphemeralSequential("/h","内容h");

        // 创建一个  递增序号  的永久节点i
        zkClient.createPersistentSequential("/i","内容i");

    }


    // 删除node
    public static void deleteNode(){
        // 删除节点c,以及节点c下面所有的子节点
        // 节点c下面,还有子节点cc,deleteRecursive方法可以递归的删除子节点,而zookeeper原生api不允许删除存在子节点的节点
        zkClient.deleteRecursive("/c");

        // 删除节点a
        zkClient.delete("/a");
    }
}

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐