如何实现一个比较完美的分布式锁?
在分布式系统中,存在一个让人很头痛的问题:锁。单机器时,控制并发相对简单,使用Java提供的synchronized关键字或者显式锁ReentrantLock。但是在分布式系统中,JVM级别的锁已经不能满足系统的需求,需要实现分布式锁,可以借助Redis或Zookeeper实现,本篇博客记录一下用Redis实现一个比较完美的分布式锁。通过一个“商品秒杀抢购”案例来实现。商品抢购案例购买商...
在分布式系统中,存在一个让人很头痛的问题:锁。
单机器时,控制并发相对简单,使用Java提供的synchronized关键字或者显式锁ReentrantLock。
但是在分布式系统中,JVM级别的锁已经不能满足系统的需求,需要实现分布式锁,可以借助Redis或Zookeeper实现,本篇博客记录一下用Redis实现一个比较完美的分布式锁。
通过一个“商品秒杀抢购”案例来实现。
商品抢购案例
购买商品的逻辑:先查库存,库存大于0时,才可以购买,购买成功后库存减1,没有库存则返回“商品售罄”。
在实现分布式锁之前,先来看一下,只使用synchronized关键字会出现什么问题?
商品抢购的简单业务逻辑
使用Nginx做负载均衡,服务搭建两套。
对抢购服务进行并发测试,这里使用Jmeter,20个并发,压测3秒钟。
并发测试结果
8001服务
8002服务
如上图所示,10个库存,但是抢购成功了11次,库存最后为-1,很显然程序存在问题。
分布式锁实现
一个分布式锁需要满足的条件:
-
互斥性
在任意时刻,只有一个客户端可以获得锁。 -
防死锁
即使获得锁的客户端在没有释放锁之前崩溃宕机,后续客户端仍然可以竞争到锁。 -
防止误解锁
释放锁的客户端一定是加锁的客户端,不能错误的释放其他客户端加的锁。
1.0版本
RedisLock
@Component
public class RedisLock {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void lock(String key){
ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
while (!forValue.setIfAbsent(key, key)) {
//竞争锁失败,暂时让出CPU资源
Thread.yield();
}
//竞争锁成功
}
//释放锁
public void unlock(String key) {
redisTemplate.delete(key);
}
}
业务代码加锁修改
@GetMapping("buy")
public R buy(){
//竞争锁
redisLock.lock(LOCK_KEY);
Integer count;
try {
count = (Integer) redisTemplate.opsForValue().get(GOODS_COUNT_KEY);
if (count <= 0) {
System.err.println("商品售罄.");
return R.error("商品售罄.");
}
count = redisTemplate.opsForValue().decrement(GOODS_COUNT_KEY).intValue();
System.err.println("秒杀成功,库存:" + count);
}finally {
//释放锁
redisLock.unlock(LOCK_KEY);
}
return R.success(count);
}
一个最简单的分布式锁就实现了,存在很多问题,但是效果已经符合预期。
再次并发测试,结果如图:
经过多次并发测试,结果均正确,分布式锁确实生效了,尽管它存在很多问题。
1.0版本的问题
锁极度不安全,当任意客户端获得锁后,在释放锁之前该服务一旦崩溃宕机,锁将永远都不会释放。
这意味着:其他服务都将竞争不到锁,竞争不到锁的线程在不断自旋,与此同时还会不断有新的请求接入,最终导致服务宕机,Redis宕机。
2.0版本
锁超时,防止锁永远不释放
@Component
public class RedisLock {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void lock(String key){
ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
//锁超时10s 10s后未释放锁,其他线程可以竞争
while (!forValue.setIfAbsent(key, key, 10, TimeUnit.SECONDS)) {
//竞争锁失败,暂时让出CPU资源
Thread.yield();
}
//竞争锁成功
}
//释放锁
public void unlock(String key) {
redisTemplate.delete(key);
}
}
给LockKey加一个过期时间,这样哪怕获得锁的服务突然宕机,锁也会在指定时间内失效,其他服务就可以正常竞争锁了。
2.0版本的问题
锁面临永久失效。
假设过期时间10S,服务A可能在10S内还没执行完业务逻辑,锁超时后服务B获得了锁,这时服务A执行完了业务逻辑,错误的释放了由服务B加的锁,后面依次类推,导致锁失效。
3.0版本
只能释放自己加的锁,解决锁失效
@Component
public class RedisLock {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private ConcurrentMap<String, String> keyMap = new ConcurrentHashMap<>(16);
//加锁成功的同时,会得到一个锁签名,根据keySign来释放锁
public String lock(String key){
ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
String keySign = UUID.randomUUID().toString(true);
//锁超时10s 10s后未释放锁,其他线程可以竞争
while (!forValue.setIfAbsent(key, keySign, 10, TimeUnit.SECONDS)) {
//竞争锁失败,暂时让出CPU资源
Thread.yield();
}
//竞争锁成功
keyMap.put(key, keySign);
return keySign;
}
//释放锁
public void unlock(String key, String keySign) {
String s = keyMap.get(key);
if (!keySign.equals(s)) {
//不是我加的锁,不能释放
return;
}
//是我加的锁,可以释放
redisTemplate.delete(key);
}
}
业务代码稍作修改,根据锁签名来释放锁,防止误释放
@GetMapping("buy")
public R buy(){
//竞争锁,获得锁签名
String lockSign = redisLock.lock(LOCK_KEY);
Integer count;
try {
count = (Integer) redisTemplate.opsForValue().get(GOODS_COUNT_KEY);
if (count <= 0) {
System.err.println("商品售罄.");
return R.error("商品售罄.");
}
count = redisTemplate.opsForValue().decrement(GOODS_COUNT_KEY).intValue();
System.err.println("秒杀成功,库存:" + count);
}finally {
//根据锁签名来释放锁,防止误释放
redisLock.unlock(LOCK_KEY, lockSign);
}
return R.success(count);
}
3.0版本的问题
3.0版本其实已经比较完美了,但还有一些瑕疵。
锁超时-时间不好确定,太长的话,某台服务宕机后,其他服务需要等待很久才能获得锁,太短的话,可能大部分服务还没有执行完业务逻辑,锁还来不及释放就过期了,产生并发安全问题。
4.0版本
守护线程续命,避免锁来不及释放而失效。
@Component
public class RedisLock {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private ConcurrentMap<String, String> keyMap = new ConcurrentHashMap<>(16);
private ConcurrentMap<String, ContinueThread> threadMap = new ConcurrentHashMap<>(16);
//加锁成功的同时,会得到一个锁签名,根据keySign来释放锁
public String lock(String key){
ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
String keySign = UUID.randomUUID().toString(true);
//锁超时10s 10s后未释放锁,其他线程可以竞争
while (!forValue.setIfAbsent(key, keySign, 10, TimeUnit.SECONDS)) {
//竞争锁失败,暂时让出CPU资源
Thread.yield();
}
//竞争锁成功
keyMap.put(key, keySign);
//守护线程续命
ContinueThread continueThread = new ContinueThread(key, keySign);
continueThread.setDaemon(true);
threadMap.put(key, continueThread);
continueThread.start();
return keySign;
}
//释放锁
public void unlock(String key, String keySign) {
String s = keyMap.get(key);
if (!keySign.equals(s)) {
//不是我加的锁,不能释放
return;
}
//是我加的锁,可以释放
redisTemplate.delete(key);
//续命线程停止
ContinueThread thread = threadMap.get(key);
if (thread != null) {
thread.stopThread();
}
}
//续命线程内部类
private class ContinueThread extends Thread {
private boolean stop = false;
private String key;
private String keySign;
public ContinueThread(String key,String keySign) {
super("Thread-"+UUID.randomUUID().toString(true));
this.key = key;
this.keySign = keySign;
}
public void stopThread(){
this.stop = true;
}
@Override
public void run() {
while (!stop) {
System.err.println(!isInterrupted());
try {
//3s续一次命
Thread.sleep(10000/3);
} catch (InterruptedException e) {}
System.err.println("续命...");
redisTemplate.opsForValue().set(key, keySign, 10, TimeUnit.SECONDS);
}
}
}
}
如果服务没有宕机,只是业务代码执行耗费的时间比较长,那么守护线程会每3s续一次命,保证锁不会失效。
如果服务宕机了,那么10S后锁失效,其他服务可以竞争到锁,解决了3.0版本存在的问题。
总结
基于Redis实现简单的分布式锁很简单,但是要写出稳定、安全、健壮性强的锁就需要考虑很多因素了,例如:服务宕机、锁失效等。
除了自己实现外,也有一些优秀的开源框架提供了分布式锁实现,例如:Redisson,官网:https://redisson.org/。
更多推荐
所有评论(0)