在分布式系统中,存在一个让人很头痛的问题:

单机器时,控制并发相对简单,使用Java提供的synchronized关键字或者显式锁ReentrantLock
但是在分布式系统中,JVM级别的锁已经不能满足系统的需求,需要实现分布式锁,可以借助Redis或Zookeeper实现,本篇博客记录一下用Redis实现一个比较完美的分布式锁。

通过一个“商品秒杀抢购”案例来实现。

商品抢购案例

购买商品的逻辑:先查库存,库存大于0时,才可以购买,购买成功后库存减1,没有库存则返回“商品售罄”。

在实现分布式锁之前,先来看一下,只使用synchronized关键字会出现什么问题?

商品抢购的简单业务逻辑
在这里插入图片描述
使用Nginx做负载均衡,服务搭建两套。
在这里插入图片描述
对抢购服务进行并发测试,这里使用Jmeter,20个并发,压测3秒钟。

并发测试结果

8001服务
在这里插入图片描述
8002服务
在这里插入图片描述
如上图所示,10个库存,但是抢购成功了11次,库存最后为-1,很显然程序存在问题。

分布式锁实现

一个分布式锁需要满足的条件:

  • 互斥性
    在任意时刻,只有一个客户端可以获得锁。

  • 防死锁
    即使获得锁的客户端在没有释放锁之前崩溃宕机,后续客户端仍然可以竞争到锁。

  • 防止误解锁
    释放锁的客户端一定是加锁的客户端,不能错误的释放其他客户端加的锁。

1.0版本

RedisLock

@Component
public class RedisLock {
	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	public void lock(String key){
		ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
		while (!forValue.setIfAbsent(key, key)) {
			//竞争锁失败,暂时让出CPU资源
			Thread.yield();
		}
		//竞争锁成功
	}

	//释放锁
	public void unlock(String key) {
		redisTemplate.delete(key);
	}
}

业务代码加锁修改

@GetMapping("buy")
public R buy(){
	//竞争锁
	redisLock.lock(LOCK_KEY);
	Integer count;
	try {
		count = (Integer) redisTemplate.opsForValue().get(GOODS_COUNT_KEY);
		if (count <= 0) {
			System.err.println("商品售罄.");
			return R.error("商品售罄.");
		}
		count = redisTemplate.opsForValue().decrement(GOODS_COUNT_KEY).intValue();
		System.err.println("秒杀成功,库存:" + count);
	}finally {
		//释放锁
		redisLock.unlock(LOCK_KEY);
	}
	return R.success(count);
}

一个最简单的分布式锁就实现了,存在很多问题,但是效果已经符合预期。

再次并发测试,结果如图:
在这里插入图片描述
在这里插入图片描述
经过多次并发测试,结果均正确,分布式锁确实生效了,尽管它存在很多问题。

1.0版本的问题

锁极度不安全,当任意客户端获得锁后,在释放锁之前该服务一旦崩溃宕机,锁将永远都不会释放。
这意味着:其他服务都将竞争不到锁,竞争不到锁的线程在不断自旋,与此同时还会不断有新的请求接入,最终导致服务宕机,Redis宕机。

2.0版本

锁超时,防止锁永远不释放

@Component
public class RedisLock {
	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	public void lock(String key){
		ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
		//锁超时10s 10s后未释放锁,其他线程可以竞争
		while (!forValue.setIfAbsent(key, key, 10, TimeUnit.SECONDS)) {
			//竞争锁失败,暂时让出CPU资源
			Thread.yield();
		}
		//竞争锁成功
	}

	//释放锁
	public void unlock(String key) {
		redisTemplate.delete(key);
	}
}

给LockKey加一个过期时间,这样哪怕获得锁的服务突然宕机,锁也会在指定时间内失效,其他服务就可以正常竞争锁了。

2.0版本的问题

锁面临永久失效。
假设过期时间10S,服务A可能在10S内还没执行完业务逻辑,锁超时后服务B获得了锁,这时服务A执行完了业务逻辑,错误的释放了由服务B加的锁,后面依次类推,导致锁失效。

3.0版本

只能释放自己加的锁,解决锁失效

@Component
public class RedisLock {
	@Autowired
	private RedisTemplate<String, Object> redisTemplate;
	private ConcurrentMap<String, String> keyMap = new ConcurrentHashMap<>(16);

	//加锁成功的同时,会得到一个锁签名,根据keySign来释放锁
	public String lock(String key){
		ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
		String keySign = UUID.randomUUID().toString(true);
		//锁超时10s 10s后未释放锁,其他线程可以竞争
		while (!forValue.setIfAbsent(key, keySign, 10, TimeUnit.SECONDS)) {
			//竞争锁失败,暂时让出CPU资源
			Thread.yield();
		}
		//竞争锁成功
		keyMap.put(key, keySign);
		return keySign;
	}

	//释放锁
	public void unlock(String key, String keySign) {
		String s = keyMap.get(key);
		if (!keySign.equals(s)) {
			//不是我加的锁,不能释放
			return;
		}
		//是我加的锁,可以释放
		redisTemplate.delete(key);
	}
}

业务代码稍作修改,根据锁签名来释放锁,防止误释放

@GetMapping("buy")
public R buy(){
	//竞争锁,获得锁签名
	String lockSign = redisLock.lock(LOCK_KEY);
	Integer count;
	try {
		count = (Integer) redisTemplate.opsForValue().get(GOODS_COUNT_KEY);
		if (count <= 0) {
			System.err.println("商品售罄.");
			return R.error("商品售罄.");
		}
		count = redisTemplate.opsForValue().decrement(GOODS_COUNT_KEY).intValue();
		System.err.println("秒杀成功,库存:" + count);
	}finally {
		//根据锁签名来释放锁,防止误释放
		redisLock.unlock(LOCK_KEY, lockSign);
	}
	return R.success(count);
}
3.0版本的问题

3.0版本其实已经比较完美了,但还有一些瑕疵。
锁超时-时间不好确定,太长的话,某台服务宕机后,其他服务需要等待很久才能获得锁,太短的话,可能大部分服务还没有执行完业务逻辑,锁还来不及释放就过期了,产生并发安全问题。

4.0版本

守护线程续命,避免锁来不及释放而失效。

@Component
public class RedisLock {
	@Autowired
	private RedisTemplate<String, Object> redisTemplate;
	private ConcurrentMap<String, String> keyMap = new ConcurrentHashMap<>(16);
	private ConcurrentMap<String, ContinueThread> threadMap = new ConcurrentHashMap<>(16);

	//加锁成功的同时,会得到一个锁签名,根据keySign来释放锁
	public String lock(String key){
		ValueOperations<String, Object> forValue = redisTemplate.opsForValue();
		String keySign = UUID.randomUUID().toString(true);
		//锁超时10s 10s后未释放锁,其他线程可以竞争
		while (!forValue.setIfAbsent(key, keySign, 10, TimeUnit.SECONDS)) {
			//竞争锁失败,暂时让出CPU资源
			Thread.yield();
		}
		//竞争锁成功
		keyMap.put(key, keySign);

		//守护线程续命
		ContinueThread continueThread = new ContinueThread(key, keySign);
		continueThread.setDaemon(true);
		threadMap.put(key, continueThread);
		continueThread.start();
		return keySign;
	}

	//释放锁
	public void unlock(String key, String keySign) {
		String s = keyMap.get(key);
		if (!keySign.equals(s)) {
			//不是我加的锁,不能释放
			return;
		}
		//是我加的锁,可以释放
		redisTemplate.delete(key);
		//续命线程停止
		ContinueThread thread = threadMap.get(key);
		if (thread != null) {
			thread.stopThread();
		}
	}

	//续命线程内部类
	private class ContinueThread extends Thread {
		private boolean stop = false;
		private String key;
		private String keySign;

		public ContinueThread(String key,String keySign) {
			super("Thread-"+UUID.randomUUID().toString(true));
			this.key = key;
			this.keySign = keySign;
		}

		public void stopThread(){
			this.stop = true;
		}

		@Override
		public void run() {
			while (!stop) {
				System.err.println(!isInterrupted());
				try {
					//3s续一次命
					Thread.sleep(10000/3);
				} catch (InterruptedException e) {}
				System.err.println("续命...");
				redisTemplate.opsForValue().set(key, keySign, 10, TimeUnit.SECONDS);
			}
		}
	}
}

如果服务没有宕机,只是业务代码执行耗费的时间比较长,那么守护线程会每3s续一次命,保证锁不会失效。
如果服务宕机了,那么10S后锁失效,其他服务可以竞争到锁,解决了3.0版本存在的问题。

总结

基于Redis实现简单的分布式锁很简单,但是要写出稳定、安全、健壮性强的锁就需要考虑很多因素了,例如:服务宕机、锁失效等。

除了自己实现外,也有一些优秀的开源框架提供了分布式锁实现,例如:Redisson,官网:https://redisson.org/

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐