分布式锁(一)__基于Zookeeper实现可重入分布式锁
1 重入的实现对于锁的重入,我们来想这样一个场景。当一个递归方法被sychronized关键字修饰时,在调用方法时显然没有发生问题,执行线程获取了锁之后仍能连续多次地获得该锁,也就是说sychronized关键字支持锁的重入。对于ReentrantLock,虽然没有像sychronized那样隐式地支持重入,但在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而..
1 重入的实现
对于锁的重入,我们来想这样一个场景。当一个递归方法被sychronized关键字修饰时,在调用方法时显然没有发生问题,执行线程获取了锁之后仍能连续多次地获得该锁,也就是说sychronized关键字支持锁的重入。对于ReentrantLock,虽然没有像sychronized那样隐式地支持重入,但在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。
如果想要实现锁的重入,至少要解决一下两个问题
- 线程再次获取锁:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
- 锁的最终释放:线程重复n次获取了锁,随后在n次释放该锁后,其他线程能够获取该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经释放
2、zookeeper分布式锁的实现:
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的架构通过冗余服务实现高可用性。因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机。ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。更新是全序的。
基于ZooKeeper分布式锁的流程
- 在zookeeper指定节点(locks)下创建临时顺序节点node_n
- 获取locks下所有子节点children
- 对子节点按节点自增序号从小到大排序
- 判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
- 若监听事件生效,则回到第二步重新进行判断,直到获取到锁
引入依赖:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
抽象的OrderService接口:
public interface OrderService {
void createOrder();
}
订单号生成类(模拟公共资源ps:需要用锁的地方):
package com.th.order;
import java.text.SimpleDateFormat;
import java.util.Date;
public class OrderCodeGenerator {
private static int i = 0;
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
return sdf.format(now) + ++i;
}
}
MyZkSerializer 继承了ZkSerializer:
package com.th.order;
import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
public class MyZkSerializer implements ZkSerializer {
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError(e);
}
}
@Override
public byte[] serialize(Object obj) throws ZkMarshallingError {
try {
return String.valueOf(obj).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError(e);
}
}
}
自己实现的可重入锁ZookeeperReAbleDisLock:
package com.th.order;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class ZookeeperReAbleDisLock implements Lock {
private ZkClient client;
private String lockPath;
private String currentPath;
private String beforePath;
// 线程获取锁的次数
private static volatile int state;
// 当前获取锁的线程
private static volatile Thread thread;
public ZookeeperReAbleDisLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("192.168.1.117:2181");
client.setZkSerializer(new MyZkSerializer());
if (!client.exists(lockPath)) {
try {
client.createPersistent(lockPath);
} catch (Exception e) {
}
}
}
@Override
public boolean tryLock() {
return tryLock(1);
}
public boolean tryLock(int acquires) {
// 获取当前线程
final Thread currntThread = Thread.currentThread();
// 获取当前锁的次数
int state = getState();
// state == 0 表示没有线程获取锁 进来的线程肯定能获取锁
if (state == 0) {
if (compareAndSetState(0, acquires, currntThread)) {
return true;
}
// state != 0 表示线程被获取了 判断是否是当前线程 如果是则 state+1 ,否则返回false
} else if (currntThread == getThread()) {
int nextS = getState() + acquires;
if (nextS < 0)
throw new Error("Maximum lock count exceeded");
// 这里不需要cas 原因不解释
setState(nextS);
System.out.println(Thread.currentThread().getName() + ":获取重入锁成功,当前获取锁次数: " + getState());
return true;
}
// 获取锁的不是当前线程
return false;
}
public boolean compareAndSetState(int expect, int update, Thread t) {
if (this.currentPath == null) {
currentPath = this.client.createEphemeralSequential(lockPath + "/", "1");
}
// 获取所有节点
List<String> children = this.client.getChildren(lockPath);
// 排序
Collections.sort(children);
// 判断当前节点是否为最小节点
if (getState() == expect && currentPath.equals(lockPath + "/" + children.get(0))) {
setState(update);
thread = t;
System.out.println(Thread.currentThread().getName() + ":获取锁成功,当前获取锁次数: " + getState());
return true;
}
// 取得前一个
// 得到字节的索引号
int curIndex = children.indexOf(currentPath.substring(lockPath.length() + 1));
beforePath = lockPath + "/" + children.get(curIndex - 1);
return false;
}
public final boolean tryRelease(int releases) {
// 可以判断是否自己获得锁 自己获得锁才能删除
final Thread currentThread = Thread.currentThread();
// 获取锁的不是当前线程
if (currentThread != getThread()) {
// throw new IllegalMonitorStateException();
return false;
}
// 释放锁的次数
int nextS = getState() - releases;
boolean free = false;
if (nextS == 0) {
free = true;
setThread(null);
// 删除zk节点
client.delete(currentPath);
System.out.println(Thread.currentThread().getName() + ": 所有锁释放成功:删除zk节点...");
}
setState(nextS);
if (!free)
System.out.println(Thread.currentThread().getName() + ": 释放重入锁成功: 剩余锁次数:" + getState());
return free;
}
@Override
public void lock() {
if (!tryLock()) {
// 没有获得锁,阻塞自己
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
}
@Override
public void handleDataDeleted(String arg0) throws Exception {
System.out.println("节点被删除了,开始抢锁...");
cdl.countDown();
}
};
// 完成watcher注册
this.client.subscribeDataChanges(beforePath, listener);
// 阻塞自己
if (this.client.exists(beforePath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
this.client.unsubscribeDataChanges(beforePath, listener);
}
@Override
public void unlock() {
// 可以判断是否自己获得锁 自己获得锁才能删除
// client.delete(currentPath);
tryRelease(1);
}
public static int getState() {
return state;
}
public static void setState(int state) {
ZookeeperReAbleDisLock.state = state;
}
public static Thread getThread() {
return thread;
}
public static void setThread(Thread thread) {
ZookeeperReAbleDisLock.thread = thread;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
OrderServiceImplWithZkDis 实现了 OrderServiceI接口,并且测试方法也写在了里面:
package com.th.order;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.Lock;
public class OrderServiceImplWithZkDis implements OrderService {
private static OrderCodeGenerator org = new OrderCodeGenerator();
//private Lock lock = new ZookeeperDisLock("/LOCK_TEST");
private Lock lock = new ZookeeperReAbleDisLock("/LOCK_TEST");
@Override
public void createOrder() {
String orderCode = null;
try {
lock.lock();
orderCode = org.getOrderCode();
TestReLock();
System.out.println(Thread.currentThread().getName() + "生成订单:" + orderCode);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void TestReLock() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "测试重入锁成功...");
lock.unlock();
}
public static void main(String[] args) {
int num = 20;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num);
for (int i = 0; i < num; i++) {
new Thread(new Runnable() {
@Override
public void run() {
OrderService orderService = new OrderServiceImplWithZkDis();
System.out.println(Thread.currentThread().getName() + ": 我准备好了");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
orderService.createOrder();
}
}).start();
}
}
}
测试结果:
更多推荐
所有评论(0)