关历史文章(阅读本文前,您可能需要先看下之前的系列👇

国内最全的Spring Boot系列之四

享元模式:共享女友 - 第355篇

什么是 ZooKeeper - 第347篇

ZooKeeper安装 - 第348篇

ZooKeeper数据结构和实操  - 第349篇

ZooKeeper的watch机制 - 第350篇

ZooKeeper的acl权限控制  - 第351篇

ZooKeeper内存数据和持久化  - 第352篇

ZooKeeper集群搭建 - 第354篇

ZooKeeper Java客户端的基本使用 - 第356篇

ZooKeeper客户端Curator - 第358篇

ZooKeeper客户端Curator的进阶使用 - 第359篇

ZooKeeper客户端Curator实现Watch事件监听 - 第361篇

Spring Boot 使用 Curator 操作 ZooKeeper - 第363篇

Spring Boot使用Apache Curator实现服务的注册和发现 - 第364篇
Spring Boot使用Apache Curator实现分布式锁(可重入排它锁) - 第365篇

Spring Boot使用Apache Curator实现leader选举 - 第366篇

Spring Boot使用Apache Curator实现分布式计数器 - 367篇

ZooKeeper Session 基本原理 - 第369篇

ZooKeeper分桶策略实现高性能的会话管理 - 第371篇

ZooKeeper集群架构以及读写原理 - 第372篇

这节我们来看看通过java代码怎么和ZK进行交互,这里主要使用apache提供的org.apache.zookeeper来进行和zk server进行连接的。

一、client : zookeeper

1.1 说明

       现在我们要使用代码的方式进行连接到zk server,那么我们一个zookeeper java客户端的依赖包,那这里我们使用的就是apache提供的zookeeper.jar。

       在zookeeper提供了和zk server交互的核心类就是ZooKeeper,接下来我们看下的步骤:

(1)创建项目;

(2)在pom.xml文件中添加zookeeper的依赖包;

(3)编写一个测试类,实现创建一个节点和获取一个节点;

1.2 开发环境

(1)操作系统:Mac OS;

(2)ZK Server : 3.6.2;

(3)开发工具:idea;

(4)JDK:1.8

1.3 hello小栗子

1.3.1 创建一个项目

       使用idea创建一个maven project,取名为:zookeeper-java。

1.3.2 添加依赖

       在pom.xml文件中添加依赖,zookeeper(zk client包)和junit(单元测试包):

<!-- zookeeper java客户端,选择的版本号最好是和zk的服务端是相同的版本,避免引发奇奇怪怪的问题 -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.2</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <scope>test</scope>
</dependency>

说明:zookeeper的版本最好是和服务端的版本是一样的。

1.3.3 编写例子

(1)和zk server建立连接:

String connectString = "127.0.0.1:2181";
int timeout = 4000;

/*
 * 连接过程是异步的
 * 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。
 * 这里也就是main线程结束守护线程也就结束了。
 */
ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
    public void process(WatchedEvent event) {
        if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){
            System.out.println("连接已经建立");
        }
    }
});

说明:

① 连接过程是异步的;

② 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。这里也就是main线程结束守护线程也就结束了。

③ 如何避免main线程不结束呢?使用线程的sleep,休眠一下。

       集群的情况下,多个地址使用逗号分隔:

String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";

(2)创建节点

       有了zooKeeper对象之后,就可以使用这个对象的create方法创建节点了:

String path = "/myconfig";
byte[] bytes = new String("hello").getBytes();
//节点不能多次创建,多次创建会报错: KeeperErrorCode = NodeExists for /myconfig
String rs = zooKeeper.create(path,bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//result:/myconfig
System.out.println("result:"+rs);

说明:

① 创建节点调用的方法是:zooKeeper.create(),第一个参数是节点的path;第二个参数是byte[] data;第三个参数是:ACl权限信息;第四个参数是:节点的类型;

② 节点不能多次创建,否则会报错:KeeperErrorCode = NodeExists for /myconfig。

(3)获取节点的数据:

       获取节点的数据是getData,这里有一个Watcher可以监听到数据的变化:

//通过create创建数据,通过get获取数据
//这种方式只能监听一次
byte[] dataBytes = zooKeeper.getData(path,new Watcher(){
    public void process(WatchedEvent event) {
        if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
            System.out.println("数据改变了:"+event.getPath());

        }
    }
},null);
System.out.println("获取到的数据是:"+ new String(dataBytes));

整个类的代码如下:

package com.kfit;

import org.apache.zookeeper.*;
import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;

/**
 * TODO
 *
 * @author 悟纤「公众号SpringBoot」
 * @date 2021-03-16
 * @slogan 大道至简 悟在天成
 */
public class ZkJavaClientDemo {


    public static void main(String[] args) throws Exception {
        String connectString = "127.0.0.1:2181";
        int timeout = 4000;

        /*
         * 连接过程是异步的
         * 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。
         * 这里也就是main线程结束守护线程也就结束了。
         */
        ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
            public void process(WatchedEvent event) {
                if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){
                    System.out.println("连接已经建立");
                }
            }
        });

        System.out.println("开始创建节点...");
        String path = "/myconfig";
        byte[] bytes = new String("hello").getBytes();
        //节点不能多次创建,多次创建会报错: KeeperErrorCode = NodeExists for /myconfig
        String rs = zooKeeper.create(path,bytes, OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //result:/myconfig
        System.out.println("result:"+rs);

        //通过create创建数据,通过get获取数据
        //这种方式只能监听一次
        byte[] dataBytes = zooKeeper.getData(path,new Watcher(){
            public void process(WatchedEvent event) {
                if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
                    System.out.println("数据改变了:"+event.getPath());

                }
            }
        },null);
        System.out.println("获取到的数据是:"+ new String(dataBytes));
        /*
         * 通过sleep 避免主线程结束
         */
        Thread.sleep(Integer.MAX_VALUE);
        System.out.println("main end.");
    }

}

       运行main方法:

       这时候我们通过zkCli.sh进行访问zk server,然后修改节点数据/myconfig:

set /myconfig hello1

 

1.3.4 永久监听

我们发现现在我们的编码只能监听一次,如何实现永久监听呢?很简单,只要在监听的代码里再次监听就可以了,很简单:

Watcher watcher = new Watcher(){
    public void process(WatchedEvent event) {
        if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
            System.out.println("数据改变了:"+event.getPath());
            try {
                byte[] dataBytes = zooKeeper.getData(path,this,null);
                System.out.println("获取到的数据是:"+ new String(dataBytes));
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
};
byte[] dataBytes = zooKeeper.getData(path,watcher,null);
System.out.println("获取到的数据是:"+ new String(dataBytes));

       多次执行set 都是能够监听到的。

1.3.5 永久监听方式二

       上面的方式不管是之前的版本还是最新的版本都是通用的,如果是3.6.x版本的是支持永久递归监听的,那么怎么玩呐?也很简单:

zooKeeper.addWatch(path, event -> {
    System.out.println("获取到的数据是:"+ event);
},AddWatchMode.PERSISTENT);

event这里使用了Lambda表达式,非Lambda的写法是:

zooKeeper.addWatch(path, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println("获取到的数据是:"+ event);
    }
}, AddWatchMode.PERSISTENT);

       这里的AddWatchMode可以是PERSISTENT和PERSISTENT_RECURSIVE。

二、进阶的使用

2.1 修改节点

       修改节点使用的是set的操作,我们看下简单的例子:

//path = "/myconfig";

byte[] bytes = new String("hello-update").getBytes();
Stat stat = zooKeeper.setData(path,bytes,-1);

说明:

(1)执行set的时候,如果node不存在,会抛出异常:KeeperErrorCode = NoNode for /myconfig。在执行zooKeeper.setData()就会抛出异常,就不会有返回值Stat了

(2)这里的第三个参数是version:

       ① 如果不考虑并发修改的问题的话,那么version=-1;

    ② 如果填写的version和我们节点的version对不上的话,那么执行setData会报错:KeeperErrorCode = BadVersion for /myconfig。

2.2 并发修改节点

       对于节点的修改,会出现多个线程进行并发修改的问题,那么我们控制并发节点的修改问题呐,很简单,在前面的例子中的第三个参数version就是用来解决节点的并发修改问题的。具体的一个修改思路:

(1)、通过getData获取到节点的版本信息;

(2)、在执行setData的时候,传递当前获取到的版本号;

       具体的代码如下:

Stat nodeStat = new Stat();
zooKeeper.getData(path, false,nodeStat);

byte[] dataBytes = new String("hello-update").getBytes();
zooKeeper.setData(path,dataBytes,nodeStat.getVersion());

 

2.3 删除节点

       删除节点很简单,我们直接来看下代码:

zooKeeper.delete(path, -1);

版本号的说明:-1 代表匹配所有版本号,直接删除。任意大于-1的代表可以指定数据版本删除。

2.4 异步获取数据

       我们之前getData的方式是同步的,那么如何异步获取呢?对于zookeeper也提供了相应的方式:

    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println(Thread.currentThread().getName());//main-EventThread
        System.out.println("ctx="+ctx);//ctx=1000
        System.out.println("data="+new String(data));//data=hello
    }
}, "1000");

三、小结

(1)使用java client的核心思路:引jar包,使用zookeeper类连接上ZK Server,然后就可以调用create/get/set/delete等操作节点的方法。

(2)对于节点的监听watcher操作,只监听一次,可以使用循环监听的方式进行永久监听;在3.6.x的版本可以使用addWatch方法永久递归监听。

(3)不能递归创建和删除节点。

越分享越成长,越分享越自信。

通过本文,你最大的是收获是什么?请在留言区分享你的收获。

我就是我,是颜色不一样的烟火。
我就是我,是与众不同的小苹果。

à悟空学院:https://t.cn/Rg3fKJD

学院中有Spring Boot相关的课程!!

SpringBoot视频:http://t.cn/A6ZagYTi

SpringBoot交流平台:https://t.cn/R3QDhU0

SpringSecurity5.0视频:http://t.cn/A6ZadMBe

ShardingJDBC分库分表:http://t.cn/A6ZarrqS

分布式事务解决方案:http://t.cn/A6ZaBnIr

JVM内存模型调优实战:http://t.cn/A6wWMVqG

Spring入门到精通:https://t.cn/A6bFcDh4

大话设计模式之爱你:https://dwz.cn/wqO0MAy7

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐