基于zookeeper的分布式锁实现
ZooKeeper是Apache软件基金会的一个软件项目,他为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成 4 种节点类型:持久节点(PERSISTENT),持久顺序节点(PERSIST
ZooKeeper是Apache软件基金会的一个软件项目,他为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成 4 种节点类型:持久节点(PERSISTENT),持久顺序节点(PERSISTENT_SEQUENTIAL),临时节点(EPHEMERAL),临时顺序节点(EPHEMERAL_SEQUENTIAL);
实现分布式锁主要利用以下特性:
-
有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点;zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推。
-
临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
-
事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变更。
逻辑:
实现代码:
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class ZKDistributeImproveLock implements Lock {
/*
* 利用zookeeper的同父子节点不可重名的特点来实现分布式锁
* 加锁:去创建指定名称的节点,如果能创建成功,则获得锁(加锁成功),如果节点已存在,就标识锁被别人获取了, 你就得阻塞,等待
* 释放锁:删除指定名称的节点
*/
private String LockPath;
private ZkClient client;
private ThreadLocal<String> currentPath = new ThreadLocal<>();
private ThreadLocal<String> beforePath = new ThreadLocal<>();
public ZKDistributeImproveLock(String lockPath) {
super();
LockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(LockPath)) {
this.client.createPersistent(LockPath);
}
}
@Override
public boolean tryLock() {
if (this.currentPath.get() == null) {
currentPath.set(this.client
.createEphemeralSequential(LockPath + "/", "aaa"));
}
// 获得所有的子
List<String> children = this.client.getChildren(LockPath);
// 排序list
Collections.sort(children);
// 判断当前节点是否是最小的
if (currentPath.get().equals(LockPath + "/" + children.get(0))) {
return true;
} else {
// 取到前一个
// 得到字节的索引号
int curIndex = children.indexOf(
currentPath.get().substring(LockPath.length() + 1));
beforePath.set(LockPath + "/" + children.get(curIndex - 1));
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
// 阻塞等待
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
// 怎么让自己阻塞
CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data)
throws Exception {
}
};
client.subscribeDataChanges(this.beforePath.get(), listener);
if (this.client.exists(this.beforePath.get())) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}
@Override
public void unlock() {
// 删除节点
this.client.delete(this.currentPath.get());
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
使用org.I0Itec.zkclient创建ZkClient实例时,若不指定序列化类,会默认使用org.I0Itec.zkclient.serialize.SerializableSerializer,该默认序列化类使用可能有问题,如往zk节点上写data会出现乱码,或Kafka通过方法AdminUtils.createTopic(ZkClient, topic, partitions, replicationFactor, topicConfig)时,topic下的分区节点创建不了。因此最好自己实现一个序列化类,创建ZkClient实例后通过setZkSerializer()方法配置自己写的序列化实现。
public class MyZkSerializer implements ZkSerializer {
String charset = "UTF-8";
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
public byte[] serialize(Object obj) throws ZkMarshallingError {
try {
return String.valueOf(obj).getBytes(charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
}
更多推荐
所有评论(0)