【转】基于Redis Lua脚本实现的分布式锁(Java实现)
最近项目中需要用到一个分布式的锁,考虑到基于会话节点实现的zookeeper锁性能不够,于是想使用redis来实现一个分布式的锁。看了网上的几个实现方案后,发现都不够严谨。比如这篇:用Redis实现分布式锁里面设计的锁有个最大的问题是锁的超时值TTL会一直被改写,“尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计”,其实在高并发的时候会导致进程“饿死
最近项目中需要用到一个分布式的锁,考虑到基于会话节点实现的zookeeper锁性能不够,于是想使用redis来实现一个分布式的锁。看了网上的几个实现方案后,发现都不够严谨。比如这篇:用Redis实现分布式锁里面设计的锁有个最大的问题是锁的超时值TTL会一直被改写,“尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计”,其实在高并发的时候会导致进程“饿死”(也有文章称为死锁)。还有这篇文章“两种分布式锁实现方案2”里面的v2=getset(key,时间戮+超时+1),其加1秒操作在大并发下也会触发同样的问题。网上这篇文章解决了这个“无休止的TTL”问题,我简单翻译了下。
锁是编程中非常常见的概念。在维基百科上对锁有个相当精确的定义:
在计算机科学中,锁是一种在多线程环境中用于强行限制资源访问的同步机制。锁被设计用于执行一个互斥的并发控制策略。
In computer science, a lock is a synchronization mechanism for
enforcing limits on access to a resource in an environment where there
are many threads of execution. A lock is designed to enforce a mutual
exclusion concurrency control policy.
简单的说,锁是一个单一的参考点,多个线程基于它来检查是否允许访问资源。例如,一个想写数据的线程,它必须先检查是否存在一个写锁。如果写锁存在,需要等待直到锁释放后它才能获取到属于它的锁并执行写操作。这样,通过锁就可以避免多个线程的同时写造成的数据冲突。
现代的操作系统提供了内置的函数来帮助程序员实现并发控制,例如 flock 函数。但是如果多线程的程序运行在多台机器上呢?如何在分布式系统下控制对资源的访问呢?
使用一个中心化的锁服务
首先,我们需要一个所有线程都可以访问到的地方来存储锁。这个锁只能存在于一个地方,从而保证只有一个权威的地方可以定义锁的建立和释放。
Redis是实现锁的一个理想的候选方案。作为一个轻量级的内存数据库,快速,事务性和一致性是选择redis所为锁服务的主要原因。
设计锁
锁本身是很简单的,就是redis数据库中一个简单的key。建立和释放锁,并保证绝对的安全,是这个锁的设计比较棘手的地方。有两个潜在的陷阱:
应用程序通过网络和redis交互,这意味着从应用程序发出命令到redis结果返回之间会有延迟。这段时间内,redis可能正在运行其他的命令,而redis内数据的状态可能不是你的程序所期待的。如果保证程序中获取锁的线程和其他线程不发生冲突?
如果程序在获取锁后突然crash,而无法释放它?这个锁会一直存在而导致程序进入“饿死”(原文成为“死锁”,感觉不太准确)。
建立锁
可能想到的最简单的方法是“用GET方法检查锁,如果锁不存在,就用SET方式设置一个值”。
这个方法虽然简单,但是不能保证独占锁。回顾前面所说的第1个陷阱:因为在GET和SET操作之间有延迟,我们没法知道从“发送命令”到“redis服务器返回结果”之间的这段时间内是否有其他线程也去建立锁。当然,这些都在几毫秒之内,发生的可能性相当低。但是如果在一个繁忙的环境中运行着大量的并发线程和命令,重叠的可能性并不是微不足道的。
为了解决这个问题,应该用SETNX命令。SETNX消除了GET命令需要等待返回值的问题,SETNX只有在key不存在时才返回成功。这意味着只有一个线程可以成功运行SETNX命令,而其他线程会失败,然后不断重试,直到它们能建立锁。
释放锁
一旦线程成功执行了SETNX命令,它就建立了锁并且可以基于资源进行工作。工作完成后,线程需要通过删除redis的key来释放这个锁,从而允许其他线程能尽快的获取锁。
尽管如此,也有需要小心的地方!回顾前面说的第2个陷阱:如果线程crash了,它永远都不会删除redis的key,所以这个锁会一直存在,从而导致“饿死”现象。那么如何避免这个问题呢?
锁的存活时间
我们可以给锁加一个存活时间(TTL),这样一旦TTL超时,这个锁的key会被redis自动删除。任何由于线程错误而遗留下来的锁在一个合适的时间之后都会被释放,从而避免了“饿死”。这纯粹是一个安全特性,更有效的方式仍然是确保尽量在线程里面释放锁。
可以通过PEXPIRE命令为Redis的key设置TTL,而且线程里可以通过MULTI/EXEC事务的方式在SETNX命令后立即执行,例如:
MULTI
SETNX lock-key
PEXPIRE 10000 lock-key
EXEC
尽管如此,这会产生另外一个问题。PEXPIRE命令没有判断SETNX命令的返回结果,无论如何都会设置key的TTL。如果这个地方无法获取到锁或有异常,那么多个线程每次想获取锁时,都会频繁更新key的TTL,这样会一直延长key的TTL,导致key永远都不会过期。为了解决这个问题,我们需要Redis在一个命令里面处理这个逻辑。我们可以通过Redis脚本的方式来实现。
注意-如果不采用脚本的方式来实现,可以使用Redis 2.6.12之后版本SET命令的PX和NX参数来实现。为了考虑兼容2.6.0之前的版本,我们还是采用脚本的方式来实现。
Redis脚本
由于Redis支持脚本,我们可以写一个Lua脚本在Redis服务端运行多个Redis命令。应用程序通过一条EVALSHA命令就可以调用被Redis服务端缓存的脚本。这里强大的地方在于你的程序只需要运行一条命令(脚本)就可以以事务的方式运行多个redis命令,还能避免并发冲突,因为一个redis脚本同一时刻只能运行一次。
这是Redis里面一个设置带TTL的锁的Lua脚本:
--
-- Set a lock
--
-- KEYS[1] - key
-- KEYS[2] - ttl in ms
-- KEYS[3] - lock content
local key = KEYS[1]
local ttl = KEYS[2]
local content = KEYS[3]
local lockSet = redis.call('setnx', key, content)
if lockSet == 1 then
redis.call('pexpire', key, ttl)
end
return lockSet
从这个脚本可以很清楚的看到,我们通过在锁上只运行PEXPIRE命令就解决了前面提到的“无休止的TTL”问题。
Warlock: 一个成熟的基于Redis的分布式锁
上面我们介绍了基于Redis的锁的理论,这里有一个用Node.js写的开源模块Warlock,通过npm可以获取。它使用了redis脚本来创建/释放锁,用于为缓存,数据库,任务队列和其他需要并发的地方提供分布式的锁服务,详见Github。
原文中还缺少一个释放锁的脚本,如果一直依赖TTL来释放锁,效率会很低。Redis的SET操作文档就提供了一个释放锁的脚本:
if redis.call("get", KEYS[1]) == ARGV[1]
then
return redis.call("del", KEYS[1])
else
return 0
end
应用程序中只要加锁的时候指定一个随机数或特定的value作为key的值,解锁的时候用这个value去解锁就可以了。当然,每次加锁时的value必须要保证是唯一的。
转载结束~,转自:http://ju.outofmemory.cn/entry/74765
下面是基于这篇文章,使用Java代码来实现分布式锁,redis客户端使用RedisTemplate
import java.util.Collections;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
/**
* redis分布式锁.<br>
* 思路:
* <pre>
* 用SETNX命令,SETNX只有在key不存在时才返回成功。这意味着只有一个线程可以成功运行SETNX命令,而其他线程会失败,然后不断重试,直到它们能建立锁。
* 然后使用脚本来创建锁,因为一个redis脚本同一时刻只能运行一次。
* 创建锁代码:
* <code>
-- KEYS[1] key,
-- ARGV[1] value,
-- ARGV[2] expireTimeMilliseconds
if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
else
return 0
end
* </code>
* 最后使用脚本来解锁。
* 解锁代码:
*
* <code>
-- KEYS[1] key,
-- ARGV[1] value
if redis.call("get", KEYS[1]) == ARGV[1]
then
return redis.call("del", KEYS[1])
else
return 0
end
* </code>
* </pre>
*
* @author tanghc
*/
public class RedisLockUtil {
private static final Long SUCCESS = 1L;
// 加锁脚本
private static final String SCRIPT_LOCK = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('pexpire', KEYS[1], ARGV[2]) return 1 else return 0 end";
// 解锁脚本
private static final String SCRIPT_UNLOCK = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 加锁脚本sha1值
private static final String SCRIPT_LOCK_SHA1 = Sha1Util.encrypt(SCRIPT_LOCK);
// 解锁脚本sha1值
private static final String SCRIPT_UNLOCK_SHA1 = Sha1Util.encrypt(SCRIPT_UNLOCK);
/**
* 尝试获取分布式锁
*
* @param redisTemplate
* Redis客户端
* @param lockKey
* 锁
* @param requestId
* 请求标识
* @param expireTimeMilliseconds
* 超期时间,多少毫秒后这把锁自动释放
* @return 返回true表示拿到锁
*/
@SuppressWarnings("unchecked")
public static boolean tryGetDistributedLock(@SuppressWarnings("rawtypes") final RedisTemplate redisTemplate,
final String lockKey, final String requestId, final int expireTimeMilliseconds) {
Object result = redisTemplate.execute(new RedisScript<Long>() {
@Override
public String getSha1() {
return SCRIPT_LOCK_SHA1;
}
@Override
public Class<Long> getResultType() {
return Long.class;
}
@Override
public String getScriptAsString() {
return SCRIPT_LOCK;
}
}, Collections.singletonList(lockKey),// KEYS[1]
requestId, // ARGV[1]
String.valueOf(expireTimeMilliseconds) // ARGV[2]
);
return SUCCESS.equals(result);
}
/**
* 释放分布式锁
*
* @param redisTemplate
* Redis客户端
* @param lockKey
* 锁
* @param requestId
* 请求标识
* @return 返回true表示释放锁成功
*/
@SuppressWarnings("unchecked")
public static boolean releaseDistributedLock(@SuppressWarnings("rawtypes") RedisTemplate redisTemplate,
String lockKey, String requestId) {
Object result = redisTemplate.execute(new RedisScript<Long>() {
@Override
public String getSha1() {
return SCRIPT_UNLOCK_SHA1;
}
@Override
public Class<Long> getResultType() {
return Long.class;
}
@Override
public String getScriptAsString() {
return SCRIPT_UNLOCK;
}
}, Collections.singletonList(lockKey), requestId);
return SUCCESS.equals(result);
}
}
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class Sha1Util {
private static final String ZERO = "0";
private static final String ALGORITHM = "SHA1";
/**
* sha1加密
* @param str
* @return 返回十六进制的字符串形式,全部小写
*/
public static String encrypt(String str) {
if (str == null) {
return null;
}
try {
MessageDigest messageDigest = MessageDigest.getInstance(ALGORITHM);
messageDigest.update(str.getBytes());
return byte2hex(messageDigest.digest());
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
/**
* 二进制转十六进制字符串
*
* @param bytes
* @return
*/
public static String byte2hex(byte[] bytes) {
StringBuilder sign = new StringBuilder();
for (int i = 0; i < bytes.length; i++) {
String hex = Integer.toHexString(bytes[i] & 0xFF);
if (hex.length() == 1) {
sign.append(ZERO);
}
sign.append(hex);
}
return sign.toString();
}
}
更多推荐
所有评论(0)