基于ZooKeeper Curator实现分布式锁
基于ZooKeeper分布式锁的流程1. 客户端连接上zookeeper,并在指定节点(locks)下创建临时顺序节点node_n2. 客户端获取locks目录下所有children节点3. 客户端对子节点按节点自增序号从小到大排序,并判断自己创建的节点是不是序号最小的,若是则获取锁;若不是,则监听比该节点小的那个节点的删除事件4. 获得子节点变更通知后重复此步骤直至获得锁;5. 执...
基于ZooKeeper分布式锁的流程
1. 客户端连接上zookeeper,并在指定节点(locks)下创建临时顺序节点node_n
2. 客户端获取locks目录下所有children节点
3. 客户端对子节点按节点自增序号从小到大排序,并判断自己创建的节点是不是序号最小的,若是则获取锁;若不是,则监听比该节点小的那个节点的删除事件
4. 获得子节点变更通知后重复此步骤直至获得锁;
5. 执行业务代码,完成业务流程后,删除对应的子节点释放锁。
Q&A
步骤1中为什么创建临时节点?
zk临时节点的特性是,当客户端与zk服务器的连接中断时,客户端创建的临时节点将自动删除;所以创建临时节点是为了保证在发生故障的情况下锁也能被释放,比如场景1:假如客户端a获得锁之后客户端所在机器宕机了,客户端没有主动删除子节点;如果创建的是永久的节点,那么这个锁永远不会释放,导致死锁;而如果创建的是临时节点,客户端宕机后,心跳检测时zookeeper没有收到客户端的心跳包就会判断该会话已失效,并且将临时节点删除从而释放锁。
步骤3中为什么不是监听locks目录,而仅监听比自己小的那一个节点?
如果每个客户端都监听locks目录,那么当某个客户端释放锁删除子节点时,其他所有的客户端都会收到监听事件,产生羊群效应,并且zookeeper在通知所有客户端时会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表为/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序号为1的客户端监听序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息。
两种实现方式
zk分布式锁的实现有两种方式,一是原生方式,使用java和zk api书写以上逻辑实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。通过实现Watch接口,实现process(WatchedEvent event)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。
另一种方式是使用Curator框架来实现分布式锁,Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。
两种方式对比来说,原生方式自己实现逻辑比较灵活,个性化高但是开发量比较大,使用Curator实现分布式锁非常简单,几行代码就可以搞定,隐藏了很多实现细节。
Curator实现分布式锁
/**
* Created by ErNiu on 2018/1/29.
*/
public class testlock {
private static final String ZK_ADDRESS = "10.2.1.1:2181";
private static final String ZK_LOCK_PATH = "/zktest/lock0";
/**
* 下面的程序会启动几个线程去争夺锁,拿到锁的线程会占用5秒
*/
public static void main(String[] args) throws InterruptedException {
// 1.Connect to zk
final CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(10, 5000));
client.start();
System.out.println(client.getState());
System.out.println("zk client start successfully!");
final InterProcessMutex mutex = new InterProcessMutex(client, ZK_LOCK_PATH);
for (int i = 0; i < 3; i++) {
Runnable myRunnable = new Runnable() {
public void run() {
doWithLock(client, mutex);
}
};
Thread thread = new Thread(myRunnable, "Thread-" + i);
thread.start();
}
}
private static void doWithLock(CuratorFramework client, InterProcessMutex mutex) {
try {
String name = Thread.currentThread().getName();
if (mutex.acquire(1, TimeUnit.SECONDS)) {
System.out.println(name + " hold lock");
System.out.println(client.getChildren().forPath(ZK_LOCK_PATH));
Thread.sleep(5000L);
System.out.println(name + " release lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
可以看到通过Curator实现分布式锁,只需要两行代码mutex.acquire()和mutex.release(),上面的代码同时测试了锁超时时间的作用,启动三个线程去获取锁,线程2获取到锁sleep5秒,超时时间设置为1s,线程0和线程1阻塞等待1s后,便会抛出异常。这里仍然存在一个问题,假如线程2获取到锁,并且线程2因为自身原因,一直不释放锁。这就会导致其他线程无法正常运行,所以需要对获取到锁的线程设置一个超时时间,超过规定时间仍未执行完,则强制释放锁,并抛出异常,来保证程序不会阻塞。当然这需要对Curator再次进行封装。
小结
本篇文章对分布式锁的实现逻辑进行了简单介绍,并且讲述两种实现zk分布式锁的方式,原生方式还是需要手动实现一下的,但是工作中为了效率还是建议使用Curator。
额,Curator源码分析下篇进行吧…
更多推荐
所有评论(0)