Spring Boot 使用 Curator 操作 ZooKeeper
本文主要介绍使用 Curator 访问 ZooKeeper 的一些基本方法,而不仅仅限于指定的 Recipes,你可以使用 Curator API 任意的访问 ZooKeeper。Curator 框架提供了一套高级的 API,简化了 ZooKeeper 的操作。它增加了很多使用 ZooKeeper 开发的特性,可以处理 ZooKeeper 集群复杂的连接管理和重试机制。
本文主要介绍使用 Curator 访问 ZooKeeper 的一些基本方法,而不仅仅限于指定的 Recipes,你可以使用 Curator API 任意的访问 ZooKeeper。Curator 框架提供了一套高级的 API,简化了 ZooKeeper 的操作。它增加了很多使用 ZooKeeper 开发的特性,可以处理 ZooKeeper 集群复杂的连接管理和重试机制。
三种 zk 客户端对比:
原生api | zkclient | apache curator | |
---|---|---|---|
优点 | 1、session会话超时重连; 2、解决watcher反复注册; 3、简化api开发; | 1、解决watch注册一次就会失效的问题; 2、api更加简单易用; 3、提供了更多解决方案并且实现简单,比如分布式锁; 4、提供了常用的zk工具类; | |
缺点 | 1、watch注册一次后会失效; 2、session超时之后没有实现重连机制; 3、异常处理繁琐; 4、只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化; 5、创建节点时如果节点存在抛出异常,需要自行检查节点是否存在; 6、删除节点无法实现级联删除; | 1、异常处理简化,抛出RuntimeException; 2、重试机制比较难用; 3、没有提供各种使用场景的实现; |
使用 Curator 需要添加 Maven 依赖(两个模块按需添加,ZooKeeper 版本需要与服务器实例中 ZooKeeper 版本一致):
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
</exclusions>
</dependency>
可以看出,Curator 功能分两大类,一类是对 ZooKeeper 的一些基本命令的封装,比如增删改查,即 Framework 模块;另一类是他的高级特性,即 Recipes 模块。下面将分两部分分别进行阐述。
1.Curator Framework的使用
Curator 框架通过 CuratorFrameworkFactory 以工厂模式和 builder 模式创建 CuratorFramework 实例。CuratorFramework 实例都是线程安全的,你应该在你的应用中共享同一个 CuratorFramework 实例。工厂方法 newClient() 提供了一个简单方式创建实例。而 Builder 提供了更多的参数控制。一旦你创建了一个 CuratorFramework 实例,你必须调用它的 start() 启动,在应用退出时调用 close() 方法关闭。
下面的例子演示了两种创建Curator的方法:
创建方式一(以 builder 模式创建,Fluent 风格,建议使用这种):
// 多个地址逗号隔开
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(1000) // 连接超时时间
.connectionTimeoutMs(1000) // 会话超时时间
// 刚开始重试间隔为1秒,之后重试间隔逐渐增加,最多重试不超过三次
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
//...
CloseableUtils.closeQuietly(client);//建议放在finally块中
创建方式二(以工厂模式创建):
// 多个地址逗号隔开
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",
// 刚开始重试间隔为1秒,之后重试间隔逐渐增加,最多重试不超过三次
new ExponentialBackoffRetry(1000, 3));
client.start();
//...
CloseableUtils.closeQuietly(client); // 建议放在finally块中
下面看下 CuratorFramework 提供的方法:
方法名 | 描述 |
---|---|
create() | 开始创建操作,可以调用额外的方法(比如方式mode或者后台执行background)并在最后调用forPath()指定要操作的ZNode |
delete() | 开始删除操作,可以调用额外的方法(版本或者后台处理version or background)并在最后调用forPath()指定要操作的ZNode |
checkExists() | 开始检查ZNode是否存在的操作,可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode |
getData() | 开始获得ZNode节点数据的操作,可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat)并在最后调用forPath()指定要操作的ZNode |
setData() | 开始设置ZNode节点数据的操作,可以调用额外的方法(版本或者后台处理)并在最后调用forPath()指定要操作的ZNode |
getChildren() | 开始获得ZNode的子节点列表,以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat)并在最后调用forPath()指定要操作的ZNode |
inTransaction() | 开始是原子ZooKeeper事务,可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交 |
后台操作的通知和监控可以通过 ClientListener 接口发布,你可以在 CuratorFramework 实例上通过 addListener() 注册 listener,Listener 实现了下面的方法:
- eventReceived() 一个后台操作完成或者一个监控被触发
事件类型以及事件的方法如下:
Event Type | Event Methods |
---|---|
CREATE | getResultCode()、getPath() |
DELETE | getResultCode()、getPath() |
EXISTS | getResultCode()、getPath()、getStat() |
GETDATA | getResultCode()、getPath()、getStat()、getData() |
SETDATA | getResultCode()、getPath()、getStat() |
CHILDREN | getResultCode()、getPath()、getStat()、getChildren() |
WATCHED | getWatchedEvent() |
还可以通过 ConnectionStateListener 接口监控连接的状态,强烈推荐你增加这个监控器。你可以使用命名空间 Namespace 避免多个应用的节点的名称冲突。CuratorFramework 提供了命名空间的概念,这样 CuratorFramework 会为它的 API 调用的 path 加上命名空间:
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.namespace("configCenter") // 命名空间
...
.build();
client.start();
client.create().forPath("/search/business/test", data);
//node was actually written to: "/configCenter/search/business/test"
...
CloseableUtils.closeQuietly(client); // 建议放在finally块中
Curator 还提供了临时的 CuratorFramework: CuratorTempFramework,3分钟不活动连接就被关闭,你也可以指定不活动的时间。创建 builder 时不是调用 build() 而是调用 buildTemp()。CuratorTempFramework 只提供了 close()、inTransaction()、getData() 这三个方法。
下面我们看下 Curator 增删改查 ZooKeeper 的具体写法:
client.start();
// 判断是否存在,Stat就是对znode所有属性的一个映射,stat=null表示节点不存在
Stat stat = client.checkExists().forPath("/search/business/test");
// 查询子节点
List<String> childNodes = client.getChildren()
.forPath("/search/business/test"));
// 创建节点
client.create().creatingParentsIfNeeded() // 若创建节点的父节点不存在则先创建父节点再创建子节点
.withMode(CreateMode.PERSISTENT) // 创建的是持久节点
.withACL(Ids.OPEN_ACL_UNSAFE) // 默认匿名权限,权限scheme id:'world,'anyone,:cdrwa
.forPath("/search/business/test","1".getBytes());
// 更新节点数据
client.setData()
.withVersion(2) // 乐观锁
.forPath("/search/business/test","0".getBytes());
// 读取节点数据
Stat stat = new Stat(); // Stat就是对znode所有属性的一个映射,stat=null表示节点不存在
String re = new String(client.getData()
.storingStatIn(stat) // 在获取节点内容的同时把状态信息存入Stat对象,如果不写的话只会读取节点数据
.forPath("/search/business/test"));
// 删除节点
client.delete()
.guaranteed() // 保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
.deletingChildrenIfNeeded() // 若当前节点包含子节点,子节点也删除
.withVersion(2) // 指定版本号
.forPath("/search/business/test");
// watcher事件,使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
client.getData()
.usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
logger.info("触发watcher, path:{}", event.getPath());
}
})
.forPath("/search/business/test");
// watcher事件, NodeCache一次注册N次监, 缺点:没法监听当前节点增删改操作,所以引出了PathChildrenCache
final NodeCache nodeCache = new NodeCache(client, "/search/business/test");
nodeCache.start(true); // 当zkServer与客户端链接的时候, NodeCache会把zk数据缓存到我们本地
if (nodeCache.getCurrentData() != null) {
logger.info("节点初始化数据为:{}", new String(nodeCache.getCurrentData().getData()));
} else {
logger.info("节点初始化数据为空");
}
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() != null) {
String re = new String(nodeCache.getCurrentData().getData());
logger.info("节点路径:{}, 节点数据:{}", nodeCache.getCurrentData().getPath(), re);
} else {
logger.info("当前节点被删除了");
}
}
});
// 监听父节点以下所有的子节点, 当子节点发生变化的时候(增删改)都会监听到
// 为子节点添加watcher事件
// PathChildrenCache监听数据节点的增删改
final PathChildrenCache childrenCache = new PathChildrenCache(client, "/search", true);
// NORMAL:异步初始化, BUILD_INITIAL_CACHE:同步初始化, POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData(); // 当前数据节点的子节点数据列表
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
logger.info("子节点初始化ok..");
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
logger.info("添加子节点, path:{}, data:{}", event.getData().getPath(), event.getData().getData());
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
logger.info("删除子节点, path:{}", event.getData().getPath());
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
logger.info("修改子节点, path:{}, data:{}", event.getData().getPath(), event.getData().getData());
}
}
});
CloseableUtils.closeQuietly(client);//建议放在finally块中
所有这些方法都以 forpath() 结尾,辅以 watch (监听),withMode (指定模式),和 inBackground (后台运行) 等方法来使用。
此外,Curator 还支持事务,一组crud操作要么都完成,要么都不完成:
client.start();
// 开启事务
CuratorTransaction transaction = client.inTransaction();
Collection<CuratorTransactionResult> results = transaction.create().forPath("/curator/1", "0".getBytes())
.and().setData().forPath("/curator/2", "-1".getBytes())
.and().delete().forPath("/curator/3")
.and().commit();
CloseableUtils.closeQuietly(client);//建议放在finally块中
2.Curator Recipes的使用
Recipes 模块主要有 Elections (选举)、Locks (锁)、Barriers (关卡)、Atomic (原子量)、Caches、Queues 等。
1.Elections
选举主要依赖于 LeaderSelector 和 LeaderLatch 两个类。前者是所有存活的客户端不间断的轮流做 Leader。后者是一旦选举出 Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。这两者在实现上是可以切换的。
下面看一下 LeaderSelector 的选举:
/**
* 本类基于leaderSelector实现,所有存活的client会公平的轮流做leader
* 如果不想频繁的变化Leader,需要在takeLeadership方法里阻塞leader的变更! 或者使用 {@link}
* LeaderLatchClient
*/
public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
private static final Log log = LogFactory.getLog(LeaderSelectorClient.class);
private final String name;
private final LeaderSelector leaderSelector;
private final String PATH = "/leaderselector";
public LeaderSelectorClient(CuratorFramework client, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, PATH, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
/**
* client成为leader后,会调用此方法
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
int waitSeconds = (int) (5 * Math.random()) + 1;
log.info(name + "是当前的leader");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
log.info(name + "让出领导权\n");
}
}
}
2.Locks
Curator Lock 相关的实现在 recipes.locks 包里。顶级接口都是 InterProcessLock。我们直接看最有代表性的 InterProcessReadWriteLock 进程内部读写锁(可重入读写锁)。什么叫可重入,什么叫读写锁。不清楚的先查好资料吧。总之读写锁一定是成对出现的。 我们先定义两个任务,可并行的执行的,和互斥执行的。
参考:
http://www.cnblogs.com/hzhuxin/archive/2012/11/01/2749341.html
http://macrochen.iteye.com/blog/1366136
更多推荐
所有评论(0)