什么是分布式锁?Zookeeper和Redis是如何实现的?
为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁。具体定义,实现和使用场景,请参详什么是分布式锁,该篇文章强力推荐,写得很透彻。着重强调下,分布式锁具体的条件和重要的实现逻辑。在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行常见的情景,例如定时任务在多个服务运行。同一个用户对同一...
什么是分布式锁?
在分布式服务中,服务都是多实例部署。在高并发访问的情况下,容易出现数据的不一致。以用户下单的场景为例,用户下单时,如果连续的点击下单的操作,可能在某一时刻多次下单请求被转发到不同的订单服务实例。这时,该用户可能会生成多个订单,流程如下:
这时,需要使用分布式锁在某个运行点对服务进行同步控制,在该时刻只有一个服务才能下单成功。流程如下:
分布式锁应该具备哪些条件?
- 在分布式系统环境下,一个方法在同一时间只能被一个服务的一个线程执行。
- 具备可重入特性,同一个线程可以多次竞争到这个锁。
- 具备锁失效机制,即拿到分布式锁的服务即使服务宕机,服务不可用,该分布式锁会自动失效,防止其他未得到服务不能拿到该分布式锁而出现死锁。流程如图:
4.具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
常见的分布式锁实现方案
curator 基于zookeeper实现的分布式锁
curator是Netfix公司开源的一套zookeeper客户端框架,提供了分布式锁服务接口InterProcessLock,接口定义如下:
public interface InterProcessLock
{
/**
* 阻塞获取锁
*/
public void acquire() throws Exception;
/**
* 在指定的时间内尝试获取锁
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* 释放锁
*/
public void release() throws Exception;
/**
* 是否被占用
*/
boolean isAcquiredInThisProcess();
}
其子类包括,InterProcessMutex,可重入的互斥锁;InterProcessMultiLock,类似容器的锁;InterProcessReadWriteLock,读写锁;InterProcessSemaphoreMutex,不可重入的互斥锁。InterProcessMutex(可重入的互斥锁)可以实现分布式锁的场景,curator基于zookeeper临时顺序节点与watcher事件特性实现。
在InterProcessMutex中,根据分布式锁特性1和特性2,在一个服务中,一个线程可重复的获取互斥锁,获取的逻辑如下:
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
/**
* 获取锁
*/
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
// 一个服务中的当前线程
Thread currentThread = Thread.currentThread();
// 根据当前线程获取锁
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// 重入获取锁,增加获取锁的次数
lockData.lockCount.incrementAndGet();
return true;
}
// 在规定时间内创建临时顺序节点
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
// 没有创建序号最小的临时节点,则获取锁失败
return false;
}
}
curator使用zookeeper的临时顺序节点机制(如果此时zk客户端服务宕机,服务不可用,zk服务器端检测到zk客户端如果与其断开链接,就会删除该临时节点)创建指定路径节点。源码如下:
// LockInternals.class
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes)
throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null)
? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while (!isDone) {
isDone = true;
try {
// 生成临时顺序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 获取序号最小的节点
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch (KeeperException.NoNodeException e) {
// 根据重试机制创建节点失败
if (client.getZookeeperClient().getRetryPolicy()
.allowRetry(retryCount++,
System.currentTimeMillis() - startMillis,
RetryLoop.getDefaultRetrySleeper())) {
isDone = false;
} else {
throw e;
}
}
}
// 如果是序号最小的临时节点则获取锁成功
if (hasTheLock) {
return ourPath;
}
return null;
}
// StandardLockInternalsDriver.class
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
// 判断节点数据是否为空,生成节点
// CreateMode.EPHEMERAL_SEQUENTIAL即为临时顺序节点
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
// LockInternals.class 循环获取锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
// 获取所有顺序节点
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 根据最小序号获取锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
// 获取锁实现,对前一个节点进行监听,如果前一个节点数据变化则触发watcher事件,唤起阻塞线程
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// 监听previousSequencePath 节点 client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
// LockInternals.class 唤醒线程的Watcher
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
client.postSafeNotify(LockInternals.this);
}
};
// CuratorFramework.class 具体的唤醒逻辑
default CompletableFuture<Void> postSafeNotify(Object monitorHolder)
{
return runSafe(() -> {
synchronized(monitorHolder) {
monitorHolder.notifyAll();
}
});
}
curator释放分布式锁,或根据当前获取锁的线程判断,是否是当前线程在执行释放锁操作。并且,检查重入锁是否已经完全释放,最后才可以真正的释放分布式锁。源码如下:
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>{
@Override
public void release() throws Exception {
// 释放锁的当前线程
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
// 如果释放锁的当前线程没有锁则抛出异常
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " +
basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
// 检查重入锁次数
if (newLockCount > 0) {
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException(
"Lock count has gone negative for lock: " + basePath);
}
// 重入锁次数为0时,执行释放锁逻辑
try {
internals.releaseLock(lockData.lockPath);
} finally {
threadData.remove(currentThread);
}
}
}
使用redis基于StringRedisTemplate实现分布式锁
基于redis实现分布式锁的业务代码需要自己实现。在我实现中,定义在规定时间内获取锁和释放锁的方法,接口定义如下:
/**
* 超时获取锁
*
* @param condition 获取锁条件
*/
boolean tryLock(TryLockPrams condition);
/**
* 释放锁
*
* @param key key
* @param owner 锁的owner
*/
boolean unlock(String key, String owner);
在竞争获取锁时,使用redis的set方法(原子性的操作key,value和失效时间,以及操作类型),使redis获取锁的操作具有原子性,并且具备锁失效机制,防止死锁。此种方式具备排他性,但是不具备可重入性。set方法具体代码如下:
Boolean set(byte[] key, byte[] value, Expiration expiration, SetOption option);
竞争获取锁时,根据用户设置的时间设置,在规定的时间内尝试获取锁。详细代码如下:
@Override
public boolean tryLock(TryLockPrams prams) {
String key;// 设置锁的key
String owner;// 拥有锁的标识
long expireTime;// 锁失效的时间
TimeUnit expireTimeUnit;// 锁失效的时间单位
if (StringUtils.isBlank(key = prams.getKey())
|| StringUtils.isBlank(owner = prams.getOwner())
|| (expireTime = prams.getExpireTime()) <= 0
|| Objects.isNull(expireTimeUnit = prams.getExpireTimeUnit())) {
log.warn("prams [{}] is invalid!", JSON.toJSONString(prams));
return false;
}
long startTime = System.currentTimeMillis();
long tryLockMaxTime = prams.getTryLockMaxTime();
long executeTime;
do {
try {
RedisSerializer<String> keySerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
RedisSerializer<String> valueSerializer = (RedisSerializer<String>) redisTemplate.getValueSerializer();
boolean success = redisTemplate.execute((RedisCallback<Boolean>)
// 调用set方法进行原子性,并且具有时效性的设置
connection -> connection.set(
keySerializer.serialize(key),
valueSerializer.serialize(owner),
Expiration.from(expireTime, expireTimeUnit),
RedisStringCommands.SetOption.ifAbsent()));
// 设置成功
if (success) {
// 守护线程监控该key的失效时间,如果在即将失效时,锁还未释放,则增加锁的失效时间
prams.setDeadLine(System.currentTimeMillis() + expireTimeUnit.toMillis(expireTime));
extendExpiredTimeMap.putIfAbsent(key, prams);
return success;
}
} catch (Exception e) {
log.warn("set if absent error!key {},value {},expireTime {}", key, owner, expireTime);
}
}
while ((executeTime = System.currentTimeMillis() - startTime) < expireTimeUnit.toMillis(tryLockMaxTime));
log.info("try lock error!prams {},executeTime {},expireTimeUnit {}",
JSON.toJSONString(prams), executeTime, expireTimeUnit.name());
return false;
}
释放锁时,使用lua脚本实现,保证操作的原子性。实现逻辑是,根据key获取的值与传入的对于的value值进行比较,如果相同则删除,不删除则操作失败。一般业务场景中,key为业务数据对应的标识(例如,限制单人的重复操作,则key为用户id),value为存储当前操作线程的标识。详细代码如下:
/**
* 解锁lua脚本
*/
private static final String LUA_UNLOCK = "if redis.pcall('get', KEYS[1]) == ARGV[1] then return redis.pcall('del', KEYS[1]) else return 0 end";
@Override
public boolean unlock(String key, String owner) {
Long result = redisTemplate.execute((RedisCallback<Long>) connection ->
connection.eval(LUA_UNLOCK.getBytes(), ReturnType.INTEGER, 1, key.getBytes(), owner.getBytes()));
boolean success = Objects.nonNull(result) && result > 0;
if (success) {
// 移除延长失效时间数据
extendExpiredTimeMap.remove(key);
} else {
log.warn("unlock error!key {},owner {}", key, owner);
}
return success;
}
由于redis是设置的指定失效时间来保证锁的失效性,即获取锁的服务宕机服务不可用时,在指定的失效到期后锁会释放。但是,如果在指定的时间内,服务的逻辑还未执行完毕,锁就自动释放了,这种情况如何避免呢?可以使用ScheduledExecutorService定时执行重新设置失效时间的任务,避免获取锁的服务还未执行完毕就释放锁。详细代码如下:
public RedisLock() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
extendExpiredTimeMap = new ConcurrentHashMap<>();
init();
}
private void init() {
scheduledExecutorService.scheduleAtFixedRate(this, 0, executingTimePeriod, TimeUnit.SECONDS);
}
/**
* 定时任务,对即将过期的key再次设置失效时间
*/
@Override
public void run() {
if (log.isDebugEnabled()) {
log.debug("execute extend expired time task!extend expired time map size {}", extendExpiredTimeMap.size());
}
if (extendExpiredTimeMap.size() <= 0) {
return;
}
List<TryLockPrams> tryLockPrams = new LinkedList<>();
for (ConcurrentHashMap.Entry<String, TryLockPrams> entry : extendExpiredTimeMap.entrySet()) {
// 校验参数
if (!isValid(entry) || !isExpiringTime(entry.getValue())) {
continue;
}
tryLockPrams.add(entry.getValue());
if (tryLockPrams.size() > 0 && 0 == tryLockPrams.size() % PIPELINE_LIMIT) {
setIfPresentPipeline(tryLockPrams);
tryLockPrams.clear();
}
}
if (tryLockPrams.size() > 0) {
setIfPresentPipeline(tryLockPrams);
tryLockPrams.clear();
}
}
/**
* 在缓存中即将过期数据才会重新设置失效时间,降低操作redis的频率
*
* @param tryLockPrams
* @return
*/
private boolean isExpiringTime(TryLockPrams tryLockPrams) {
long deadLine = tryLockPrams.getDeadLine();
long current = System.currentTimeMillis();
long validTime = deadLine - current;
if (BigDecimal.valueOf(validTime).divide(BigDecimal.valueOf(TimeUnit.SECONDS.toMillis(executingTimePeriod))).intValue() <= 2) {
if (log.isDebugEnabled()) {
log.debug("tryLockPrams [{}] is expiring!current is {}", JSON.toJSONString(tryLockPrams), current);
}
tryLockPrams.setDeadLine(current + tryLockPrams.getExpireTimeUnit().toMillis(tryLockPrams.getExpireTime()));
return true;
} else {
return false;
}
}
private boolean isValid(ConcurrentHashMap.Entry<String, TryLockPrams> entry) {
TryLockPrams prams;
if (Objects.isNull(prams = entry.getValue())
|| StringUtils.isBlank(prams.getKey())
|| StringUtils.isBlank(prams.getOwner())
|| (prams.getExpireTime()) <= 0
|| Objects.isNull(prams.getExpireTimeUnit())) {
log.warn("prams [{}] is invalid!", JSON.toJSONString(prams));
return false;
}
return true;
}
private void setIfPresentPipeline(List<TryLockPrams> tryLockPrams) {
redisTemplate.execute((RedisCallback<Object>) connection -> {
// 开启Pipeline
connection.openPipeline();
for (TryLockPrams tryLockPram : tryLockPrams) {
redisTemplate.opsForValue().setIfPresent(tryLockPram.getKey(), tryLockPram.getOwner(),
tryLockPram.getExpireTime(), tryLockPram.getExpireTimeUnit());
}
// 关闭Pipeline
connection.closePipeline();
return null;
});
}
更多推荐
所有评论(0)