zookeeper实现分布式锁(公平锁|非公平锁)
无意中发现了一个巨牛的人工智能教程,忍不住分享一下给大家。教程不仅是零基础,通俗易懂,而且非常风趣幽默,像看小说一样!觉得太牛了,所以分享给大家。点这里以跳转到教程。zk实现分布式锁,此类资源很多,但是能经得起的推荐的却很少,同时也看了看twitter实现的分布式锁,很吊。但是twitter中封装的zookeepre工具包,内容有点多,如果我们直希望使用分布式,就显得略微冗重了。因此自己实现了..
·
zk实现分布式锁,此类资源很多,但是能经得起的推荐的却很少,同时也看了看twitter实现的分布式锁,很吊。但是twitter中封装的zookeepre工具包,内容有点多,如果我们直希望使用分布式,就显得略微冗重了。因此自己实现了一个zk实现分布式锁的工具类,其中包括公平锁和非公品锁。直接贴出代码及测试类。
配置类,读取properties中的配置属性
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* zk 配置信息
* @author zc.ding
* @since 2019/3/23
*/
@Component
@PropertySource("classpath:zookeeper.properties")
@ConfigurationProperties(prefix = "zk")
@Data
public class ZkConfig {
private String address;
private int sessionTimeout;
private long waitConnTimeout;
private int fairLock;
}
分布式锁实现类,
import com.base.framework.core.commons.Constants;
import com.base.framework.core.utils.ApplicationContextUtils;
import com.base.framework.core.utils.ThreadPoolUtil;
import com.base.framework.zk.config.ZkConfig;
import com.base.framework.zk.exceptions.ZkFrameworkExpception;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* zk实现公平/非公平分布式锁
* @author zc.ding
* @since 2019/3/23
*/
@Component
@DependsOn({"zkConfig", "applicationContextUtils"})
public class DistributedLock implements InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);
private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<ZooKeeper> ZK_THREAD_LOCAL = new ThreadLocal<>();
/** 锁的根路径 **/
private static final String LOCK_ROOT_PATH = "/locks";
/** 锁后缀 **/
private static final String LOCK_SUFFIX = "_NO_";
/** 创建根节点同步锁 **/
private static final String CREATE_ROOT_LOCK = "LOCK";
/** 公平锁 **/
private static final int LOCK_FAIR = 1;
private final static byte[] BUF = new byte[0];
private static ZkConfig zkConfig;
/**
* 获得所有
* @param key 路径不含‘/’
* @return boolean
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
public static boolean tryLock(String key) {
return tryLock(key, Constants.LOCK_EXPIRES, Constants.LOCK_WAITTIME);
}
/**
* 获得锁
* @param key 键/路径
* @param expire 过期时间
* @param wait 等待时间
* @return boolean
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
public static boolean tryLock(String key, long expire, long wait) {
ZooKeeper zooKeeper = getZooKeeper();
ZK_THREAD_LOCAL.set(zooKeeper);
return tryLock(zooKeeper, key, expire, wait);
}
/**
* 获得锁
* @param zooKeeper zk连接
* @param key 路径
* @param expire 过期时间
* @param wait 等待时间
* @return boolean
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
private static boolean tryLock(ZooKeeper zooKeeper, String key, long expire, long wait) {
expire = expire * 1000;
wait = wait * 1000;
final String currNode;
String path = LOCK_ROOT_PATH + "/" + key + LOCK_SUFFIX;
try {
currNode = zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//步骤一
List<String> nodes = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
//过滤掉集合中不是当前业务的临时节点
nodes = nodes.stream().filter(o -> o.startsWith(key)).collect(Collectors.toList());
nodes.sort(String::compareTo);
//如果集合为空说明当前创建节点的session在步骤一处已经断开,并且创建的节点已经被zk服务器删除, 此种情况比较极端
if (nodes.size() == 0) {
return false;
}
//最小的节点就是自己创建的节点表示拿到锁
if (currNode.endsWith(nodes.get(0))) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//没有拿到锁
CountDownLatch countDownLatch = new CountDownLatch(1);
//非公平锁
if(zkConfig.getFairLock() == LOCK_FAIR){
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
delPath(zooKeeper);
//等待锁超时
if(!countDownLatch.await(wait, TimeUnit.MILLISECONDS)){
return tryLock(zooKeeper, key, expire, wait);
}
}
}
}else{
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//当前节点的前一个节点
if (currNode.endsWith(nodes.get(i + 1))) {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
// 等待锁超时,如果是公平锁,等待时间是默认等待时间的2倍,防止因为拿锁的线程处理业务时间太久
// 导致当前线程等待超时
if(!countDownLatch.await(wait * 2, TimeUnit.MILLISECONDS)){
delPath(zooKeeper);
return false;
}
return true;
}
}
}
}
} catch (KeeperException | InterruptedException e) {
LOG.error("create '{}' node fail.", key, e);
}
return false;
}
/**
* 释放锁
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
public static void unLock() {
ZooKeeper zooKeeper = ZK_THREAD_LOCAL.get();
delPath(zooKeeper);
close(ZK_THREAD_LOCAL.get());
THREAD_LOCAL.remove();
ZK_THREAD_LOCAL.remove();
}
/**
* 创建分布式锁的根路径
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
private static void createLockRootPath() {
ZooKeeper zooKeeper = getZooKeeper();
try {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
synchronized (CREATE_ROOT_LOCK) {
stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
LOG.info("create lock root path '{}'", LOCK_ROOT_PATH);
zooKeeper.create(LOCK_ROOT_PATH, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获得zooke会话连接
* @return org.apache.zookeeper.ZooKeeper
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
private static ZooKeeper getZooKeeper() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
final ZooKeeper zooKeeper = new ZooKeeper(zkConfig.getAddress(), zkConfig.getSessionTimeout(), null);
zooKeeper.register((watchedEvent) -> {
switch (watchedEvent.getState()) {
case Expired:
close(zooKeeper);
break;
case SyncConnected:
countDownLatch.countDown();
default:
}
});
if(!countDownLatch.await(zkConfig.getWaitConnTimeout(), TimeUnit.MILLISECONDS)){
close(zooKeeper);
throw new ZkFrameworkExpception("wait for creating zookeeper connection timeout, timeout is [" + zkConfig.getWaitConnTimeout() + "]");
}
return zooKeeper;
} catch (IOException | InterruptedException e) {
throw new ZkFrameworkExpception("create Zookeeper instance fail.", e);
}
}
/**
* 启动一个线程来判断锁的过期时间,方式业务假死,zk不断开导致死锁
* @param zooKeeper zk连接
* @param currNode 当前节点
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
private static void runExpireThread(final ZooKeeper zooKeeper, String currNode, long expire){
THREAD_LOCAL.set(currNode);
ThreadPoolUtil.callFixedThreadPool(() -> {
Thread.sleep(expire * 1000);
LOG.info("等待了{}秒, 主动结束.", expire);
delPath(zooKeeper);
return null;
});
}
/**
* 删除创建的路径
* @param zooKeeper zk连接
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
private static void delPath(ZooKeeper zooKeeper) {
try {
//无论节点是否存在,直接执行删除操作
zooKeeper.delete(THREAD_LOCAL.get(), -1);
} catch (Exception e){
LOG.error("lock expire, delete lock");
}
}
/**
* 断开连接
* @param zooKeeper zk连接
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
private static void close(ZooKeeper zooKeeper) {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void afterPropertiesSet() {
zkConfig = ApplicationContextUtils.getBean(ZkConfig.class);
createLockRootPath();
}
/**
*
* 监听节点删除事件
* @since :2019/3/23
* @author :zc.ding@foxmail.com
*/
static class LockWatcher implements Watcher {
private CountDownLatch latch;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted){
latch.countDown();
}
}
}
}
其中依赖了"applicationContextUtils",这个是自定义的工具类,可以通过实现“ApplicationContextAware”代替。、
测试类:
import com.base.framework.zk.utils.DistributedLock;
import org.apache.zookeeper.*;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 测试zookeeper分布式锁
*
* @author zc.ding
* @since 2019/3/23
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AppServerApplicationTest.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestDistributedLock {
@Test
public void testConcurrentGetLock() throws Exception{
int count = 50;
ExecutorService executorService = Executors.newFixedThreadPool(count);
for( int i = 0; i < count; i++) {
executorService.execute(() -> {
try {
if(DistributedLock.tryLock("lock")){
System.out.println("我拿到了锁");
if(sum == 0){
System.out.println("sum已经是0");
return;
}
System.out.println("还没有停止,sum=" + sum);
sum = sum - 1;
// System.out.println(Thread.currentThread().getName() + ": 我要等待6秒,测试其他等人超市是否有效.");
// Thread.sleep(6000);
// System.out.println("我的等待结束了,其他人可以过来拿锁了.");
}else{
System.out.println("zk等的花都谢了!");
}
} catch (Exception e) {
e.printStackTrace();
}finally{
DistributedLock.unLock();
}
});
}
executorService.shutdown();
Thread.sleep(30000);
System.out.println("finish, sum=" + sum);
}
}
以上就是zk实现等过程及单元测试。
PS:此实现包括公平锁、非公平锁。公平是在节点删除时只通知相邻的节点来获取的锁;非公平锁(惊群效应)是节点删除所有等待锁的线程重新竞争。具体实现看实现代码。
更多推荐
已为社区贡献3条内容
所有评论(0)