Storm 实时计算分布式锁 Curator的使用
Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情,包括:1、封装ZooKeeper client与ZooKeeper server之间的连接处理;2、提供了一套Fluent风格的操作API;3、提供ZooKeeper各种应用场景(rec
Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情,包括:
1、封装ZooKeeper client与ZooKeeper server之间的连接处理;
2、提供了一套Fluent风格的操作API;
3、提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装.
关于Zookeeper开源客户端框架Curator详细介绍请参考别人写的这篇博客,上面的话都是引用他的,相当不错
http://macrochen.iteye.com/blog/1366136
因为Storm是多线程并行计算,因此在数据处理过程中,可能会产生数据一致性问题,比如数据落地到数据库,更新旧数据,因为各种异常就会造成数据不一致问题。
因此我们需要在这种情况的时候,类似数据更新这样的操作能够单线程操作,以保证数据一致性。而Curator提供ZooKeeper的应用场景中正好有共享锁服务这个服务,因此我们需要拿来用。
下面是定义Curator 分布式锁服务的客户端源码
public class LockCuratorSrc {
private static CuratorFramework client = null;
public synchronized static CuratorFramework getCF(){
if(client==null){
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
System.out.println("create client--------");
client = CuratorFrameworkFactory.newClient(Constant.ZK_HOST_PORT, retryPolicy);
client.start();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return client;
}
}
1、首先需要定义CuratorFramework的客户端,然后对客户端进行初始化
2、retryPolicy表示重试指定的次数, 且每一次重试之间停顿的时间逐渐增加。
3、CuratorFrameworkFactory类提供方法newClient可以创建一个默认的实例。 当4、CuratorFramework实例构建完成, 紧接着调用start()方法, 在应用结束的时候, 需要调用close()方法. CuratorFramework是线程安全的. 在一个应用中可以共享同一个zk集群的CuratorFramework。下面引入Curator的全局锁类InterProcessMutex
他的构造函数是
public InterProcessMutex(CuratorFramework client, String path)
client是前面定义的客户端,而path则是zookeeper集群里的锁目录(需要事先创建)
让定义zookeeper分布式锁
InterProcessMutex lock = new InterProcessMutex(LockCuratorSrc.getCF(), Constant.LOCKS_ETLLOG);
通过acquire获得锁,并提供超时机制.
使用的代码则是
while (lock.acquire(10, TimeUnit.MINUTES) ){
数据更新等操作
}
通过release()方法释放锁。
完毕!更多推荐
所有评论(0)