Zookeeper实现分布式锁
介绍实现分布式锁的方式很多,zookeeper是其中一种。企业生产中一般选择Redis,业务简单可以选择基于表的分布式锁,这篇文章介绍zookeeper。zookeeper实现分布式锁原理:基于临时顺序节点: 1.客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。 ...
·
介绍
实现分布式锁的方式很多,zookeeper是其中一种。企业生产中一般选择Redis,业务简单可以选择基于表的分布式锁,这篇文章介绍zookeeper。
zookeeper实现分布式锁原理:
基于临时顺序节点:
1.客户端调用create()方法创建节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
2.客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点。
3.客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
4.如果创建的节点不是所有节点中序号最小的,那么则监视比自己创建节点的序列号小的最大的节点,进入等待。直到下次监视的子节点变更的时候,再进行子节点的获取,判断是否获取锁。
释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可
简单代码实现
mian方法:
public class Main {
private static int n = 500;
private static void process() {
System.out.println(--n);
}
public static void main(String[] args) {
Runnable runnable = () -> {
DistributedLock distributedLock = null;
try {
distributedLock = new DistributedLock("127.0.0.1:2181", "test1");
distributedLock.lock();
process();
System.out.println(Thread.currentThread().getName() + " is running");
} finally {
if (distributedLock != null) {
distributedLock.unlock();
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}
分布式锁实现类:
public class DistributedLock implements Lock, Watcher {
private ZooKeeper zkConn = null;
private String ROOT_LOCK = "/locks";
private String lockName;
private String WAIT_LOCK;
private String CURRENT_LOCK;
private CountDownLatch countDownLatch;
private int sessionTimeout = 3000000;
private List<Exception> exceptionList = new ArrayList<>();
DistributedLock(String url, String lockName) {
this.lockName = lockName;
try {
zkConn = new ZooKeeper(url, sessionTimeout, this);
if (zkConn.exists(ROOT_LOCK, false) == null)
zkConn.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
public void lock() {
if (exceptionList.size() > 0) throw new LockException(exceptionList.get(0));
try {
if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() + " " + lockName + " get the lock!");
} else {
waitForLock(WAIT_LOCK, sessionTimeout);
}
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
public boolean tryLock() {
try {
String splitStr = "_lock_";
CURRENT_LOCK = zkConn.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(CURRENT_LOCK + " is created");
List<String> subNodes = zkConn.getChildren(ROOT_LOCK, false);
List<String> lockObjects = new ArrayList<>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if (_node.equals(lockName)) lockObjects.add(node);
}
Collections.sort(lockObjects);
System.out.println(Thread.currentThread().getName() + "'lock is " + CURRENT_LOCK);
if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
return true;
}
String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
int prevNodePosition = Collections.binarySearch(lockObjects, prevNode);
WAIT_LOCK = lockObjects.get(prevNodePosition - 1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
return false;
}
private boolean waitForLock(String waitLock, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zkConn.exists(ROOT_LOCK + "/" + waitLock, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
});
if (stat != null) {
System.out.println(Thread.currentThread().getName() + " is waiting for " + ROOT_LOCK + "/" + waitLock);
this.countDownLatch = new CountDownLatch(1);
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
System.out.println(Thread.currentThread().getName() + " get the lock ");
}
return true;
}
public void unlock() {
try {
System.out.println(CURRENT_LOCK+" has release the lock ");
zkConn.delete(CURRENT_LOCK, -1);
CURRENT_LOCK = null;
zkConn.close();
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
public Condition newCondition() {
return null;
}
public void lockInterruptibly() {
this.lock();
}
@Override
public void process(WatchedEvent watchedEvent) {
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
LockException(String e) {
super(e);
}
LockException(Exception e) {
super(e);
}
}
public boolean tryLock(long timeout, TimeUnit unit) {
try {
if (this.tryLock()) {
return true;
}
return waitForLock(WAIT_LOCK, timeout);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
到此简单的一个分布式锁已经实现,以后在讲解Redis,表实现分布式锁。谢谢观看
更多推荐
已为社区贡献4条内容
所有评论(0)