zookeeper开源客户端Curator介绍(六)
原文地址,转载请注明出处: https://blog.csdn.net/qq_34021712/article/details/82872311     ©王赛超 上一篇文章介绍了zookeeper原生API的使用,使用过原生API不得不说,有
原文地址,转载请注明出处: https://blog.csdn.net/qq_34021712/article/details/82872311 ©王赛超
上一篇文章 介绍了zookeeper原生API的使用,使用过原生API不得不说,有很多的问题,比如:不能递归创建和删除节点、Watcher只能使用一次、还有很多可以解决分布式应用问题的api(比如分布式锁,leader选举等),但由于ZooKeeper提供的原始API并不是很易用,在其基础上封装一些高级应用又是一件很复杂的事情。
这个时候,Curator出现了,Curator是Netflix公司开源的一个Zookeeper客户端,后捐献给Apache,Curator框架在zookeeper原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。
Curator主要从以下几个方面降低了zk使用的复杂性:
- 重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)
- 连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
- zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性
- 各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况
参考官网
http://curator.apache.org/index.html
兼容性介绍
虽然ZooKeeper开发团队仍然将ZooKeeper 3.5.x视为“测试版”,但实际情况是它被许多用户用于生产,当然了生产上也有很多3.4.x的版本,Curator现在最新版本是Curator 4.0,Curator 4.0对ZooKeeper 3.5.x有很强的依赖性如果您使用的是ZooKeeper 3.5.x,则无需执行任何操作 - 只需使用Curator 4.0即可。对ZooKeeper 3.4.x也是兼容的,但是你必须在maven依赖中排除 zookeeper依赖包,如下:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
项目组成
git地址:https://github.com/apache/curator 整个框架由下面这些模块组成:
模块名称 | 描述 |
---|---|
curator-recipes | 所有典型应用场景。例如:分布式锁,Master选举等。需要依赖client和framework,需设置自动获取依赖。 |
curator-async | jdk8 异步操作 |
curator-framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
curator-client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
curator-test | 包含TestingServer,TestingCluster和一些其他有助于测试的工具。 |
curator-examples | 各种使用Curator特性的例子。 |
curator-x-discovery | 服务注册发现,在SOA /分布式系统中,服务需要相互寻找。curator-x-discovery提供了服务注册,找到特定服务的单个实例,和通知服务实例何时更改。 |
curator-x-discovery-server | 服务注册发现管理器,可以和curator-x-discovery 或者非java程序程序使用RESTful Web服务以注册,删除,查询等服务。 |
下面学习Curator常用节点的增删改查操作
pom添加依赖
一般依赖recipes就可以了,内部依赖有client和framework 能用的上的功能都有了,另外注意,记得排除zookeeper的包,我使用的客户端是3.5.3,所以使用当前最新的4.0.1。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
创建会话
上面已经说过了,curator是Fluent风格API,如果您以前没有使用它,它可能看起来很奇怪,所以建议您熟悉这种风格。
Curator的创建会话方式与原生的API创建方式区别很大。Curator创建客户端为CuratorFramework,是由CuratorFrameworkFactory工厂类来实现的,CuratorFramework是线程安全的,要连接的每个ZooKeeper集群只需要一个 CuratorFramework对象就可以了。
CuratorFrameworkFactory提供了两种构造CuratorFramework的方法,一种是直接newClient,还有一种方式是使用CuratorFrameworkFactory.builder(),提供了细粒度的控制。
CuratorFrameworkFactory提供了两个默认的直接构造CuratorFramework的方法,如下:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);
其中connectString参数就是zk的地址,参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略。
使用上面方法创建出一个CuratorFramework之后,需要再调用其start()方法完成会话创建。
下面给出创建CuratorFramework例子代码:
package com.zookeeper.test.framework;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* @author: WangSaiChao
* @date: 2018/9/12
* @description: 创建会话的例子
*/
public class CreateClientExamples{
public static void main(String[] args) {
String connectionString = "127.0.0.1";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);
/**
* connectionString zk地址
* retryPolicy 重试策略
* 默认的sessionTimeoutMs为60000
* 默认的connectionTimeoutMs为15000
*/
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
curatorFramework.start();
/**
* connectionString zk地址
* sessionTimeoutMs 会话超时时间
* connectionTimeoutMs 连接超时时间
* retryPolicy 重试策略
*/
CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(connectionString, 60000, 15000, retryPolicy);
curatorFramework1.start();
/**
* connectionString zk地址
* sessionTimeoutMs 会话超时时间
* connectionTimeoutMs 连接超时时间
* namespace 每个curatorFramework 可以设置一个独立的命名空间,之后操作都是基于该命名空间,比如操作 /app1/message 其实操作的是/node1/app1/message
* retryPolicy 重试策略
*/
CuratorFramework curatorFramework2 = CuratorFrameworkFactory.builder().connectString(connectionString)
.sessionTimeoutMs(60000)
.connectionTimeoutMs(15000)
.namespace("/node1")
.retryPolicy(retryPolicy)
.build();
curatorFramework2.start();
}
}
重试策略
上面创建会话的例子中,我们使用了ExponentialBackoffRetry的重试策略, retryPolicy默认提供了以下实现,分别为:
ExponentialBackoffRetry:
/**
* 随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))。
* 有两个构造方法
* baseSleepTimeMs初始sleep时间
* maxRetries最多重试几次
* maxSleepMs最大的重试时间
* 如果在最大重试次数内,根据公式计算出的睡眠时间超过了maxSleepMs,将打印warn级别日志,并使用最大sleep时间。
* 如果不指定maxSleepMs,那么maxSleepMs的默认值为Integer.MAX_VALUE。
*/
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries);
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs);
BoundedExponentialBackoffRetry:
/**
* 继承与ExponentialBackoffRetry ,BoundedExponentialBackoffRetry只有一个三个参数构造器,效果跟ExponentialBackoffRetry三个函数构造器是一样的,只是内部实现不一样。
* baseSleepTimeMs初始sleep时间
* maxSleepTimeMs最大sleep时间
* maxRetries最大重试次数
*/
public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries);
RetryForever:永远重试策略
/**
* RetryForever:永远重试策略
* retryIntervalMs重试时间间隔
*/
public RetryForever(int retryIntervalMs);
RetryNTimes:重试N次
/**
* RetryNTimes:重试N次
* n重试几次
* sleepMsBetweenRetries每次重试间隔时间
*/
public RetryNTimes(int n, int sleepMsBetweenRetries);
RetryOneTime:只重试一次
/**
* RetryOneTime:只重试一次
* sleepMsBetweenRetry:每次重试间隔的时间
*/
public RetryOneTime(int sleepMsBetweenRetry);
RetryUntilElapsed:一直重试,直到超过指定时间
/**
* RetryUntilElapsed:一直重试,直到超过指定时间
* maxElapsedTimeMs最大重试时间
* sleepMsBetweenRetries每次重试间隔时间
*/
public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries);
SleepingRetry:抽象类,上面的RetryNTimes就是继承它
常用的API操作
package com.zookeeper.test.framework;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
/**
* @author: WangSaiChao
* @date: 2018/9/12
* @description: 节点操作例子
*/
public class CrudExamples {
public static void main(String[] args) throws Exception {
String connectionString = "192.168.58.42:2181";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
curatorFramework.start();
//=========================创建节点=============================
/**
* 创建一个 允许所有人访问的 持久节点
*/
curatorFramework.create()
.creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点
.withMode(CreateMode.PERSISTENT)//节点类型,持久节点
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
.forPath("/node10/child_01","123456".getBytes());
/**
* 创建一个容器节点
*/
curatorFramework.create()
.creatingParentContainersIfNeeded()//递归创建,如果没有父节点,自动创建父节点
.withMode(CreateMode.CONTAINER)//节点类型,容器节点,当最后一个子节点删除的时候,服务器会在未来的一段时间将该容器节点删除(不是立刻删除)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
.forPath("/node11","123".getBytes());
/**
* 创建一个定时节点
* 临时节点存在时间为3秒
* ACL为 digest:wangsaichao:123456:cdrwa
*/
curatorFramework.create()
.withTtl(3000)//设置临时节点的有效期,单位是毫秒
.creatingParentContainersIfNeeded()//递归创建,如果没有父节点,自动创建父节点
.withMode(CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL)//节点类型,定时节点,在一定的时间内,如果没有操作该节点,并且该节点没有子节点,那么服务器将自动删除该节点
.withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw="))))//ACL
.forPath("/node12","123".getBytes());
//=========================获取节点=============================
/**
* 获取节点 /node10/child_01 数据 和stat信息
*/
Stat node10Stat = new Stat();
byte[] node10 = curatorFramework.getData()
.storingStatIn(node10Stat)//获取stat信息存储到stat对象
.forPath("/node10/child_01");
System.out.println("=====>该节点信息为:" + new String(node10));
System.out.println("=====>该节点的数据版本号为:" + node10Stat.getVersion());
/**
* 获取节点信息 并且留下 Watcher事件 该Watcher只能触发一次
*/
byte[] bytes = curatorFramework.getData()
.usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("=====>wathcer触发了。。。。");
System.out.println(event);
}
})
.forPath("/node10/child_01");
System.out.println("=====>获取到的节点数据为:"+new String(bytes));
Thread.sleep(1000000);
//=========================设置节点=============================
Stat stat = curatorFramework.setData()
.withVersion(-1)
.forPath("/node10/child_01", "I love you".getBytes());
System.out.println("=====>修改之后的版本为:" + stat.getVersion());
//=========================删除节点=============================
/**
* 删除node节点 不递归 如果有子节点,将报异常
*/
Void aVoid = curatorFramework.delete()
.forPath("/node10");
System.out.println("=====>" + aVoid);
/**
* 递归删除,如果有子节点 先删除子节点
*/
curatorFramework.delete()
.deletingChildrenIfNeeded()
.forPath("/node10");
curatorFramework.close();
/**
* 删除
*/
curatorFramework.delete()
.guaranteed()
.forPath("/node11");
//=========================判断节点是否存在=============================
/**
* CuratorFramework类有一个判断节点是否存在的接口checkExists(),该接口返回一个org.apache.zookeeper.data.Stat对象,对象中有一个ephemeralOwner属性。
* 如果该节点是持久化节点,ephemeralOwner的值为0
* 如果该节点是临时节点,ephemeralOwner的值大于0
*/
Stat existsNodeStat = curatorFramework.checkExists().forPath("/node10");
if(existsNodeStat == null){
System.out.println("=====>节点不存在");
}
if(existsNodeStat.getEphemeralOwner() > 0){
System.out.println("=====>临时节点");
}else{
System.out.println("=====>持久节点");
}
}
}
更多推荐
所有评论(0)