关历史文章(阅读本文前,您可能需要先看下之前的系列👇

国内最全的Spring Boot系列之四

享元模式:共享女友 - 第355篇

什么是 ZooKeeper - 第347篇

ZooKeeper安装 - 第348篇

ZooKeeper数据结构和实操  - 第349篇

ZooKeeper的watch机制 - 第350篇

ZooKeeper的acl权限控制  - 第351篇

ZooKeeper内存数据和持久化  - 第352篇

ZooKeeper集群搭建 - 第354篇

ZooKeeper Java客户端的基本使用 - 第356篇

ZooKeeper客户端Curator - 第358篇

ZooKeeper客户端Curator的进阶使用 - 第359篇

ZooKeeper客户端Curator实现Watch事件监听 - 第361篇

Spring Boot 使用 Curator 操作 ZooKeeper - 第363篇

Spring Boot使用Apache Curator实现服务的注册和发现 - 第364篇
Spring Boot使用Apache Curator实现分布式锁(可重入排它锁) - 第365篇

Spring Boot使用Apache Curator实现leader选举 - 第366篇

Spring Boot使用Apache Curator实现分布式计数器 - 367篇

ZooKeeper Session 基本原理 - 第369篇

ZooKeeper分桶策略实现高性能的会话管理 - 第371篇

ZooKeeper集群架构以及读写原理 - 第372篇

在前面我们对于Curator有了一个基本的了解和使用,这一节我们看一下更深入的知识。

一、Curator进阶使用

1.1 Curator构建CuratorFramework推荐方式

       在前面我们说到了,第二种方式会比较好,直接上源码:

/*
    方式二:
     * connectionString zk地址
     * sessionTimeoutMs 会话超时时间
     * connectionTimeoutMs 连接超时时间
     * namespace 每个curatorFramework 可以设置一个独立的命名空间,之后操作都是基于该命名空间,比如操作 /user/message 其实操作的是/curator/user/message
     * retryPolicy 重试策略
 */
String connectString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
        .connectString(connectString)
        .sessionTimeoutMs(60000)
        .connectionTimeoutMs(15000)
        .namespace("curator")
        .retryPolicy(retryPolicy)
        .build();
curatorFramework.start();

1.2 连接成功的监听

       我们创建连接是否成功可以通过方法进行监听:

curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if(newState == ConnectionState.CONNECTED){
            System.out.println("连接成功!");
        }
    }
});

       添加监听的代码需要放到curatorFramework.start();代码之前。

1.3 创建节点

       上面使用create进行节点的创建,但是如果节点存在的话,也是会报错的:

    创建一个 允许所有人访问的 持久节点
    由于使用了namespace,所以最终创建的节点是:/curator/test2
 */
curatorFramework.create()
        .creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点
        .withMode(CreateMode.PERSISTENT)//节点类型,持久节点
        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
        .forPath("/test2", "hello".getBytes());

说明:由于使用了namespace,所以最终创建的节点是:/curator/test2

 

       假设我们多次创建节点的话,是会保存的,那么怎么办呐?

       可以先判断节点是否存在,在决定是否进行创建:

/*
    创建一个 允许所有人访问的 持久节点
    由于使用了namespace,所以最终创建的节点是:/curator/test2
 */
String path = "/test3";
Stat stat = curatorFramework.checkExists().forPath(path);
if(stat == null){
    curatorFramework.create()
            .creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点
            .withMode(CreateMode.PERSISTENT)//节点类型,持久节点
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
            .forPath(path, "hello".getBytes());
}

       protection模式,防止由于异常原因,导致僵尸节点:

curatorFramework.create()
        .creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点
        .withProtection()//protection模式,防止由于异常原因,导致僵尸节点。
        .withMode(CreateMode.PERSISTENT_SEQUENTIAL)//节点类型,持久节点
        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
        .forPath("/test4/id-", "hello".getBytes());

       当我们创建节点是持久有序节点的时候,当我们提供请求给ZK Server,ZK Server创建成功了之后会给客户端进行反馈,但在反馈的过程中,由于网络原因导致客户端没有收到反馈结果,由于curator有重试的机制,会造次进行发起请求进行创建,那么就会出现之前的节点没有被使用的情况,也就是所谓的僵尸节点。如果我们配置了withProtection()的话,那么就能够解决僵尸节点的问题。

       如果客户端所连接的服务器崩溃了,但还没来得及返回客户端所创建的有序节点的节点名称(即节点序列号),或者客户端只是连接丢失,客户端没接收到所请求操作的响应信息,结果,客户端并不知道所创建的znode节点路径名称。回忆对于有序节点的应用场景,例如,建立一个有序的所有客户端列表。为了解决这个问题,CreateBuilder提供了一个 withProtection 方法来通知Curator客户端,在创建的有序节点前添加一个唯一标识符,如果create操作失败了,客户端就会开始重试操作,而重试操作的一个步骤就是验证是否存在一个节点包含这个唯一标识符。

       那么具体是什么原理呢?

       我看下底层的代码CreateBuilderImpl:

       跟进setProtectdMode方法:

       说白了调用withProtection方法就是初始化了一个protectedId。那这个家伙又是怎么使用的呐?

       我们看下生成的数据:

       这个protectedId被当做了节点path的前缀了。

       通过这里我们就可以分析出来,底层应该是每次都会把protectedId进行拼凑成一个新的path,重试操作的一个步骤就是验证是否存在一个节点包含这个唯一标识符。。

1.4 异步获取节点数据

       获取节点可以使用get进行获取,如果是异步获取的,使用inBackground:

curatorFramework.getData().inBackground(new BackgroundCallback(){
    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
        System.out.println(event.getPath()+":"+new String(event.getData()));
    }
}).forPath("/test3");

       对于inBackground还可以指定我们自己的线程池对象:

public T inBackground(BackgroundCallback callback, Executor executor);

       这个大家可以去尝试一下。

1.5 修改节点

       修改节点就很简单了:

curatorFramework.setData().forPath("/test3", "hello-update".getBytes());

        根据版本进行修改的话:

Stat stat = curatorFramework.checkExists().forPath("/test3");
curatorFramework.setData().withVersion(stat.getVersion()).forPath("/test3", "hello-update1".getBytes());

 

1.6 删除节点

       删除节点使用delete().forPath():

/*
    会删除节点:
        /curator/test4
    以及
        /curator/test4下的子节点
 */
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/test4");

 

我就是我,是颜色不一样的烟火。
我就是我,是与众不同的小苹果。

à悟空学院:https://t.cn/Rg3fKJD

学院中有Spring Boot相关的课程!!

SpringBoot视频:http://t.cn/A6ZagYTi

SpringBoot交流平台:https://t.cn/R3QDhU0

SpringSecurity5.0视频:http://t.cn/A6ZadMBe

ShardingJDBC分库分表:http://t.cn/A6ZarrqS

分布式事务解决方案:http://t.cn/A6ZaBnIr

JVM内存模型调优实战:http://t.cn/A6wWMVqG

Spring入门到精通:https://t.cn/A6bFcDh4

大话设计模式之爱你:https://dwz.cn/wqO0MAy7

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐