Zookeeper分布式锁实现
每个线程都创建一个带序号的临时节点,其中序号最小的临时节点的创建线程获得锁,其它线程挂起并监听它前一个节点的变化,如果前一个节点删除了,那么它的监听线程收到通知,然后唤醒它。package DistributedLock;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs;import org.apa
·
每个线程都创建一个带序号的临时节点,其中序号最小的临时节点的创建线程获得锁,其它线程挂起并监听它前一个节点的变化,如果前一个节点删除了,那么它的监听线程收到通知,然后唤醒它。
package DistributedLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
public class DistributedLock {
private ZooKeeper zkClient;
ThreadLocal<String> threadLocal = new ThreadLocal<>();
public DistributedLock() {
try {
String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
int sessionTimeout = 2000;
zkClient = new ZooKeeper(connectString, sessionTimeout, null);
// 判断节点 /exclusive_lock 是否存在
if (zkClient.exists("/exclusive_lock", false) == null) {
// 不存在则创建节点
zkClient.create("/exclusive_lock", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void lock() {
try {
// 创建对应的临时带序号节点
String currentLockNode = zkClient.create("/exclusive_lock/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
List<String> children = zkClient.getChildren("/exclusive_lock", false);
// 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断谁最小
if (children.size() > 1) {
Collections.sort(children);
// 获取节点名称 seq-00000000
String thisNode = currentLockNode.substring("/exclusive_lock/".length());
// 通过 seq-00000000 获取该节点在children集合的位置
int index = children.indexOf(thisNode);
/**
* 因为在zkClient.create和zkClient.getChildren("/exclusive_lock", false);可能有其它线程也创建了节点,
* 所以并不是说只有 children.size() == 1 这个线程才是第一个创建节点的线程
*/
if (index == 0) {// 如果自己就是第一个节点,那么获得锁,
System.out.println(Thread.currentThread().getName() + "获得锁");
threadLocal.set(currentLockNode);
return;
}
//
String preNode = "/exclusive_lock/" + children.get(index - 1);
Thread thread = Thread.currentThread();
// 监听它前一个节点的变化,如果前一个节点删除了,会调用回调函数把自己唤醒
zkClient.getData(preNode, watchedEvent -> LockSupport.unpark(thread), null);
// 把自己挂起
LockSupport.park();
}
threadLocal.set(currentLockNode);
System.out.println(Thread.currentThread().getName() + "获得锁");
} catch (Exception e) {
e.printStackTrace();
}
}
public void unlock() {
try {
System.out.println(Thread.currentThread().getName() + "释放了锁");
zkClient.delete(threadLocal.get(), -1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试类需要new 两个 DistributedLock,因为在同一台机器里的并发线程都是用的同一个锁对象,而在在不同主机里的线程那就用的是不同的锁对象,这里模拟的是分布式锁的场景,所以需要new 两个锁对象。
public class DistributedLockTest {
public static void main(String[] args) {
DistributedLock lock01 = new DistributedLock();
Runnable task01 = () -> {
lock01.lock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock01.unlock();
};
DistributedLock lock02 = new DistributedLock();
Runnable task02 = () -> {
lock02.lock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock02.unlock();
};
for (int i = 0; i < 10; i++) {
new Thread(task01, "server01-thread-" + i).start();
new Thread(task02, "server02-thread-" + i).start();
}
}
}
测试结果如下:
代码参考自尚硅谷zookeeper的课程,图片也来课程配套的资料。大海哥写的分布式锁的代码有点小问题,把回调函数写在创建zookeeper客户端那里然后使用waitLatch.countDown()会唤醒所有等待的线程,并且waitLatch使用完一次就不能再二次使用了,必须重新new CountDownLatch对象,而直接把回调函数写在getData里就不用担心这些问题了,再配合上LockSupport就可以唤醒指定的线程。并且解锁时必须用ThreadLocal来删除节点,因为每个线程都会创建一个节点,节点都有自己的名字,使用实例字段的话是不行的。
更多推荐
已为社区贡献1条内容
所有评论(0)