Redis 实现分布式锁和Zookeeper实现分布式锁
使用 Redis 实现分布式锁在 JUC 包中除了阻塞锁外还有一种叫 CAS 的无阻塞锁(具体可以参考:Java 并发编程之美:并发编程基础晋级篇),CAS 操作本身是原子性的,多个线程操作同一个变量的 CAS 时候只有一个线程能进行 CAS 成功,失败的线程接下来那么使用乐观锁机制直接失败要么使用自旋方式使用 CPU 资源重复进行 CAS 尝试。那么在分布式锁的实现中我们也可以使用类似的方式..
使用 Redis 实现分布式锁
在 JUC 包中除了阻塞锁外还有一种叫 CAS 的无阻塞锁(具体可以参考:Java 并发编程之美:并发编程基础晋级篇),CAS 操作本身是原子性的,多个线程操作同一个变量的 CAS 时候只有一个线程能进行 CAS 成功,失败的线程接下来那么使用乐观锁机制直接失败要么使用自旋方式使用 CPU 资源重复进行 CAS 尝试。
那么在分布式锁的实现中我们也可以使用类似的方式,比如 Redis 提供了一个保证原子性的 setnx 函数,多个线程调用该函数操作同一个 key 的时候,只有一个线程会返回 OK,其他线程返回 null,那么多个 JVM 中的线程同时设置同一个 key 时候只有一个 JVM 里面的一个线程可以返回 OK,返回 OK 的线程就相当于获取了全局锁,返回 null 的线程则可以选择自旋重试。获取到锁的线程使用完毕后调用 del 函数删除对应的 key,然后自旋的线程就会有一个返回 OK…
函数讲解:
1)String set(final String key, final String value, final String nxxx, final String expx,final int time)
- 在 Redis 中支持 kv 存储,这里 key 就是 kv 中的 key,value 就是 kv 中的 value。
- 从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改;其中 nxxx 的枚举值为 NX 和 XX,模式 NX 意思是说如果 key 不存在则插入该 key 对应的 value 并返回 OK,否者什么都不做返回 null;XX 意思是只在 key 已经存在时,才对 key 进行设置操作, 否者 null, 如果已经存在 key 并且进行了多次设置,则最终 key 对应的值为最后一次设置的值。
- 其中 expx 的枚举值为 EX 和 PX,当为 EX 时候标示设置超时时间为 time 秒,当为 PX 时候标示设置超时时间为 time 毫秒。
为了实现 CAS 的效果,本文选用 nxxx 为 NX 模式,因为这种模式下当多个线程设置同一个 key 时只有一个线程会返回 OK,其他线程则会返回 null,返回 OK 的线程标示获取到了分布式锁,返回 null 的则视为获取锁失败,则通过自旋来不断尝试获取。
然后 value 值使用请求 id 来标示,多个线程设置同一个 key 的时候对应的 value 值要不一样,这是为了保证只有获取到锁的线程才应该释放锁,下面会具体讲解。
2) String get(final String key)
- 获取 key 对应的 value 值,key 不存在则返回 null
3)Long del(String key)
- 删除 key 对应的 value 值, 如果 key 存在则返回 1,否者返回 0
由于需要保证只有获取锁的线程才能释放锁,所以需要在获取锁时候调用 set 方法传递一个唯一的 value 值,上面说了,可以传递请求 id; 然后在释放锁的时候需要调用 get 方法获取 key 对应的 value,如果 value 值等于当前线程的请求 id 则说明是当前线程获取的锁,则调用 del 方法删除该 key 对应的 value,这就相当于当前线程释放了锁;如果 value 不等于当前线程的请求 id 则不做删除操作。
可见释放锁的操作需要调用 get 方法,然后 if 语句进行判断,判断 OK 然后调用 del 删除,而这三步并不是原子性的,如果不是原子性的会存在什么问题那?
假设线程 A 调用 set 方法设置 key 对应的 value 为 AA 成功, 则线程 A 获取到了锁,然后在执行完业务逻辑后,首先通过 get 方法获取 key 对应的 value,然后通过 if 语句判断为 true,假设在执行 del 方法前对应的 key 已经超时了,并且线程 B 调用 set 方法设置 key 对应的 value 为 BB 成功了,也就是线程 B 获取到了锁,但是这时候线程 A 开始执行 del 方法了,则会把线程 B 对应的 key 的值删除了(不同线程调用 set 的时候 key 一样),也就是释放了锁,这时候其他线程就会竞争到该锁。这明显是错误的。
Redis 有一个叫做 eval 的函数,支持 Lua 脚本执行,并且能够保证脚本执行的原子性,也就是在执行脚本期间,其它执行 redis 命令的线程都会被阻塞。这里解锁时候使用下面脚本:
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
其中 keys[1] 为 unLock 方法传递的 key,argv[1] 为 unLock 方法传递的 requestId;脚本 redis.call(‘get’, KEYS[1]) 的作用是获取 key 对应的 value 值,这里会返回通过 Lock 方法传递的 requetId,然后看当前传递的 RequestId 是否等于 key 对应的值,等于则说明当前要释放锁的线程就是获取锁的线程,则继续执行 redis.call(‘del’, KEYS[1]) 脚本,删除 key 对应的值。
具体代码:
package com.jiaduo.DistributedLock;
import java.util.Collections;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class DistributedLock {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
private static void validParam(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
if (null == jedisPool) {
throw new IllegalArgumentException("jedisPool obj is null");
}
if (null == lockKey || "".equals(lockKey)) {
throw new IllegalArgumentException("lock key is blank");
}
if (null == requestId || "".equals(requestId)) {
throw new IllegalArgumentException("requestId is blank");
}
if (expireTime < 0) {
throw new IllegalArgumentException("expireTime is not allowed less zero");
}
}
/**
*
* @param jedis
* @param lockKey
* @param requestId
* @param expireTime
* @return
*/
public static boolean tryLock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
validParam(jedisPool, lockKey, requestId, expireTime);
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
} catch (Exception e) {
throw e;
} finally {
if (null != jedis) {
jedis.close();
}
}
return false;
}
/**
*
* @param jedis
* @param lockKey
* @param requestId
* @param expireTime
*/
public static void lock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
validParam(jedisPool, lockKey, requestId, expireTime);
while (true) {
if (tryLock(jedisPool, lockKey, requestId, expireTime)) {
return;
}
}
}
/**
*
* @param jedis
* @param lockKey
* @param requestId
* @return
*/
public static void unLock(JedisPool jedisPool, String lockKey, String requestId) {
validParam(jedisPool, lockKey, requestId, 1);
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
System.out.println("relese lock ok ");
}
} catch (Exception e) {
throw e;
} finally {
if (null != jedis) {
jedis.close();
}
}
}
}
-
通过 tryLock 方法尝试获取锁,内部是具体调用 Redis 的 set 方法,多个线程同时调用 tryLock 时候, 会同时调用 set 方法,但是 set 方法本身是保证原子性的,对应同一个 key 来说,多个线程调用 set 方法时候只有一个线程返回 OK,其它线程因为 key 已经存在会返回 null,返回 OK 的线程就相当与获取到了锁,其它返回 null 的线程则相当于获取锁失败。
-
通过 lock 方法让使用 tryLock 获取锁失败的线程本地自旋转重试获取锁,这类似 JUC 里面的 CAS。
-
通过 unLock 方法使用 redis 的 eval 函数传递 lua 脚本来保证操作的原子性。
使用 Zookeeper 来实现分布式锁
在 ZK 中是使用文件目录的格式存放节点内容,其中节点类型分为:
- 持久节点(PERSISTENT ):节点创建后,一直存在,直到主动删除了该节点。
- 临时节点(EPHEMERAL):生命周期和客户端会话绑定,一旦客户端会话失效,这个节点就会自动删除。
- 序列节点(SEQUENTIAL ):多个线程创建同一个顺序节点时候,每个线程会得到一个带有编号的节点,节点编号是递增不重复的,如下图:如上图,三个线程分别创建路径为 /locks/lockOne 的节点,可知在 ZK 服务器端会在根路径 locks下创建三个 lockOne 节点,并且器编号是唯一递增的。
具体在节点创建过程中,可以混合使用上面三种模式,比如创建临时顺序节点(EPHEMERAL_SEQUENTIAL),这里我们就使用临时顺序节点来实现分布式锁。
分布式锁实现步骤,每个想要获取锁的线程都要执行下面步骤:
- 创建临时顺序节点,比如 /locks/lockOne,假设返回结果为 /locks/lockOne000000000*。
- 获取 /locks下所有孩子节点,用自己创建的节点 /locks/lockOne000000000* 的序号 lockOne000000000* 与所有子节点比较,看看自己是不是编号最小的。如果是最小的则就相当于获取到了锁;如果自己不是最小的,则从所有子节点里面获取比自己次小的一个节点,然后设置监听该节点的事件,然后挂起当前线程。
- 当最小编号的线程获取锁,处理完业务后删除自己对应的节点,删除后会激活比自己大一号的节点的线程从阻塞变为运行态,被激活的线程应该就是当前 node 序列号最小的了,然后就会获取到锁。
整个过程是一个类似循环监听的模式:
如上图当三个线程启动时候分别执行步骤(1)(2)(3),分别在 zk 服务器上创建自己的顺序节点。
-
由于线程1创建的节点的序列最小,所以线程1获取到了锁;线程 2 发现自己不是最小的所以首先注册监听线程1创建的 LockOne001 节点的事件,然后挂起自己;线程 3 发现自己不是最小的所以首先注册监听线程 2 创建的 LockOne002 节点的事件,然后挂起自己。
-
当线程 1 获取锁后,执行完了业务逻辑后,会执行步骤 6 删除创建的 LockOne001 节点,删除后线程 2 由于设置了对 LockOne1 的监听,所以 zk 服务器会给线程 2 所在机器发送事件,接受事件后发现是 LockOne1 的删除事件,则会激活线程 2,这时候线程 2 就获取到了锁。
-
当线程 2 获取锁后,执行完了业务逻辑后,会执行步骤 8 删除创建的 LockOne002 节点,删除后线程3由于设置了对 LockOne2 的监听,所以 zk 服务器会给线程 3 所在机器发送事件,接受事件后发现是 LockOne2 的删除事件,则会激活线程 3,这时候线程 3 就获取到了锁。
使用 Redis 来实现分布式锁优点是实现简单,并且获取锁的 setnx 方法使用 cas 算法来判断获取锁是否成功,吞吐量不错;另外 setnx 方法自带了超时参数,这可以有效避免当一个线程获取到锁后,在释放锁前机器挂了后,其他线程一直阻塞到获取锁的情况,等超时时间过了,锁会被自动释放;缺点也很明显,本文例子获取锁时候是类似 CAS 自旋重试的,在高并发情况下会造成大量线程共同竞争锁时候的本地自旋,这很像 JUC 中的 AtomicLong 一样,在高并发下多个线程竞争同一个资源时候造成大量线程占用 cpu 进行重试操作。这时候其实可以随机生成一个等待时间,等时间到后在进行重试,以减少潜在的同时对一个资源进行竞争的并发量;另外本文使用的是最简单的 Redis 单实例实现,如果单实例挂了,也会存在问题。
使用 Zookeeper 实现分布式锁优点是可以对节点进行监听,多个线程获取锁时候没有获取到锁的线程不需要本地自旋重试,而是挂起自己,等待获取锁的线程释放锁后发送事件激活自己;由于线程阻塞自己使用的是 JUC 包的 CountDownLatch,在调用 await 的时候是可以添加超时时间的(本文并没有加这个参数),所以 zk 方式也可以在实现获取锁时候超时候自动返回;缺点是使用 zk 实现比较重,实现起来不是那么简单,其实 Apache Curator 对 zk 进行了封装,大家下去可以研究下:https://curator.apache.org/
更多推荐
所有评论(0)