原文地址,转载请注明出处: https://blog.csdn.net/qq_34021712/article/details/82872311     ©王赛超 

上一篇文章 介绍了zookeeper原生API的使用,使用过原生API不得不说,有很多的问题,比如:不能递归创建和删除节点、Watcher只能使用一次、还有很多可以解决分布式应用问题的api(比如分布式锁,leader选举等),但由于ZooKeeper提供的原始API并不是很易用,在其基础上封装一些高级应用又是一件很复杂的事情。

这个时候,Curator出现了,Curator是Netflix公司开源的一个Zookeeper客户端,后捐献给Apache,Curator框架在zookeeper原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。

Curator主要从以下几个方面降低了zk使用的复杂性:

  • 重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)
  • 连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
  • zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性
  • 各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况

参考官网
http://curator.apache.org/index.html

兼容性介绍
虽然ZooKeeper开发团队仍然将ZooKeeper 3.5.x视为“测试版”,但实际情况是它被许多用户用于生产,当然了生产上也有很多3.4.x的版本,Curator现在最新版本是Curator 4.0,Curator 4.0对ZooKeeper 3.5.x有很强的依赖性如果您使用的是ZooKeeper 3.5.x,则无需执行任何操作 - 只需使用Curator 4.0即可。对ZooKeeper 3.4.x也是兼容的,但是你必须在maven依赖中排除 zookeeper依赖包,如下:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

项目组成
git地址:https://github.com/apache/curator 整个框架由下面这些模块组成:

模块名称描述
curator-recipes所有典型应用场景。例如:分布式锁,Master选举等。需要依赖client和framework,需设置自动获取依赖。
curator-asyncjdk8 异步操作
curator-frameworkZookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
curator-clientZookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
curator-test包含TestingServer,TestingCluster和一些其他有助于测试的工具。
curator-examples各种使用Curator特性的例子。
curator-x-discovery服务注册发现,在SOA /分布式系统中,服务需要相互寻找。curator-x-discovery提供了服务注册,找到特定服务的单个实例,和通知服务实例何时更改。
curator-x-discovery-server服务注册发现管理器,可以和curator-x-discovery 或者非java程序程序使用RESTful Web服务以注册,删除,查询等服务。

下面学习Curator常用节点的增删改查操作

pom添加依赖

一般依赖recipes就可以了,内部依赖有client和framework 能用的上的功能都有了,另外注意,记得排除zookeeper的包,我使用的客户端是3.5.3,所以使用当前最新的4.0.1。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>
创建会话

上面已经说过了,curator是Fluent风格API,如果您以前没有使用它,它可能看起来很奇怪,所以建议您熟悉这种风格。

Curator的创建会话方式与原生的API创建方式区别很大。Curator创建客户端为CuratorFramework,是由CuratorFrameworkFactory工厂类来实现的,CuratorFramework是线程安全的,要连接的每个ZooKeeper集群只需要一个 CuratorFramework对象就可以了。

CuratorFrameworkFactory提供了两种构造CuratorFramework的方法,一种是直接newClient,还有一种方式是使用CuratorFrameworkFactory.builder(),提供了细粒度的控制。

CuratorFrameworkFactory提供了两个默认的直接构造CuratorFramework的方法,如下:

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);

其中connectString参数就是zk的地址,参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略。
使用上面方法创建出一个CuratorFramework之后,需要再调用其start()方法完成会话创建。

下面给出创建CuratorFramework例子代码:
package com.zookeeper.test.framework;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author: WangSaiChao
 * @date: 2018/9/12
 * @description: 创建会话的例子
 */
public class CreateClientExamples{

    public static void main(String[] args) {

        String connectionString = "127.0.0.1";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);

        /**
         * connectionString zk地址
         * retryPolicy 重试策略
         * 默认的sessionTimeoutMs为60000
         * 默认的connectionTimeoutMs为15000
         */
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
        curatorFramework.start();

        /**
         * connectionString zk地址
         * sessionTimeoutMs 会话超时时间
         * connectionTimeoutMs 连接超时时间
         * retryPolicy 重试策略
         */
        CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(connectionString, 60000, 15000, retryPolicy);
        curatorFramework1.start();

        /**
         * connectionString zk地址
         * sessionTimeoutMs 会话超时时间
         * connectionTimeoutMs 连接超时时间
         * namespace 每个curatorFramework 可以设置一个独立的命名空间,之后操作都是基于该命名空间,比如操作 /app1/message 其实操作的是/node1/app1/message
         * retryPolicy 重试策略
         */
        CuratorFramework curatorFramework2 = CuratorFrameworkFactory.builder().connectString(connectionString)
                .sessionTimeoutMs(60000)
                .connectionTimeoutMs(15000)
                .namespace("/node1")
                .retryPolicy(retryPolicy)
                .build();
        curatorFramework2.start();


    }
}
重试策略

上面创建会话的例子中,我们使用了ExponentialBackoffRetry的重试策略, retryPolicy默认提供了以下实现,分别为:

ExponentialBackoffRetry:
/**
 * 随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))。
 * 有两个构造方法
 * baseSleepTimeMs初始sleep时间
 * maxRetries最多重试几次
 * maxSleepMs最大的重试时间
 * 如果在最大重试次数内,根据公式计算出的睡眠时间超过了maxSleepMs,将打印warn级别日志,并使用最大sleep时间。
 * 如果不指定maxSleepMs,那么maxSleepMs的默认值为Integer.MAX_VALUE。
 */
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries);
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs);
BoundedExponentialBackoffRetry:
/**
 * 继承与ExponentialBackoffRetry ,BoundedExponentialBackoffRetry只有一个三个参数构造器,效果跟ExponentialBackoffRetry三个函数构造器是一样的,只是内部实现不一样。
 * baseSleepTimeMs初始sleep时间
 * maxSleepTimeMs最大sleep时间
 * maxRetries最大重试次数
 */
public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries);
RetryForever:永远重试策略
/**
 * RetryForever:永远重试策略
 * retryIntervalMs重试时间间隔
 */
public RetryForever(int retryIntervalMs);
RetryNTimes:重试N次
/**
 * RetryNTimes:重试N次
 * n重试几次
 * sleepMsBetweenRetries每次重试间隔时间
 */
public RetryNTimes(int n, int sleepMsBetweenRetries);
RetryOneTime:只重试一次
/**
 * RetryOneTime:只重试一次
 * sleepMsBetweenRetry:每次重试间隔的时间
 */
public RetryOneTime(int sleepMsBetweenRetry);
RetryUntilElapsed:一直重试,直到超过指定时间
/**
 * RetryUntilElapsed:一直重试,直到超过指定时间
 * maxElapsedTimeMs最大重试时间
 * sleepMsBetweenRetries每次重试间隔时间
 */
public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries);
SleepingRetry:抽象类,上面的RetryNTimes就是继承它
常用的API操作
package com.zookeeper.test.framework;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;

/**
 * @author: WangSaiChao
 * @date: 2018/9/12
 * @description: 节点操作例子
 */
public class CrudExamples {

    public static void main(String[] args) throws Exception {

        String connectionString = "192.168.58.42:2181";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
        curatorFramework.start();

        //=========================创建节点=============================

        /**
         * 创建一个 允许所有人访问的 持久节点
         */
        curatorFramework.create()
                .creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点
                .withMode(CreateMode.PERSISTENT)//节点类型,持久节点
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
                .forPath("/node10/child_01","123456".getBytes());


        /**
         * 创建一个容器节点
         */
        curatorFramework.create()
                .creatingParentContainersIfNeeded()//递归创建,如果没有父节点,自动创建父节点
                .withMode(CreateMode.CONTAINER)//节点类型,容器节点,当最后一个子节点删除的时候,服务器会在未来的一段时间将该容器节点删除(不是立刻删除)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
                .forPath("/node11","123".getBytes());

        /**
         * 创建一个定时节点
         * 临时节点存在时间为3秒
         * ACL为 digest:wangsaichao:123456:cdrwa
         */
        curatorFramework.create()
                .withTtl(3000)//设置临时节点的有效期,单位是毫秒
                .creatingParentContainersIfNeeded()//递归创建,如果没有父节点,自动创建父节点
                .withMode(CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL)//节点类型,定时节点,在一定的时间内,如果没有操作该节点,并且该节点没有子节点,那么服务器将自动删除该节点
                .withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw="))))//ACL
                .forPath("/node12","123".getBytes());


        //=========================获取节点=============================

        /**
         * 获取节点 /node10/child_01 数据 和stat信息
         */
        Stat node10Stat = new Stat();
        byte[] node10 = curatorFramework.getData()
                .storingStatIn(node10Stat)//获取stat信息存储到stat对象
                .forPath("/node10/child_01");
        System.out.println("=====>该节点信息为:" + new String(node10));
        System.out.println("=====>该节点的数据版本号为:" + node10Stat.getVersion());

        /**
         * 获取节点信息 并且留下 Watcher事件 该Watcher只能触发一次
         */
        byte[] bytes = curatorFramework.getData()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println("=====>wathcer触发了。。。。");
                        System.out.println(event);
                    }
                })
                .forPath("/node10/child_01");
        System.out.println("=====>获取到的节点数据为:"+new String(bytes));
        Thread.sleep(1000000);


        //=========================设置节点=============================

        Stat stat = curatorFramework.setData()
                .withVersion(-1)
                .forPath("/node10/child_01", "I love you".getBytes());
        System.out.println("=====>修改之后的版本为:" + stat.getVersion());


        //=========================删除节点=============================

        /**
         * 删除node节点 不递归  如果有子节点,将报异常
         */
        Void aVoid = curatorFramework.delete()
                .forPath("/node10");
        System.out.println("=====>" + aVoid);

        /**
         * 递归删除,如果有子节点 先删除子节点
         */
        curatorFramework.delete()
                .deletingChildrenIfNeeded()
                .forPath("/node10");

        curatorFramework.close();

        /**
         * 删除
         */
        curatorFramework.delete()
                .guaranteed()
                .forPath("/node11");

        //=========================判断节点是否存在=============================

        /**
         * CuratorFramework类有一个判断节点是否存在的接口checkExists(),该接口返回一个org.apache.zookeeper.data.Stat对象,对象中有一个ephemeralOwner属性。
         * 如果该节点是持久化节点,ephemeralOwner的值为0
         * 如果该节点是临时节点,ephemeralOwner的值大于0
         */

        Stat existsNodeStat = curatorFramework.checkExists().forPath("/node10");
        if(existsNodeStat == null){
            System.out.println("=====>节点不存在");
        }
        if(existsNodeStat.getEphemeralOwner() > 0){
            System.out.println("=====>临时节点");
        }else{
            System.out.println("=====>持久节点");
        }


    }
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐