经过线上实战的redis 分布式锁与zookeeper分布式锁区别
经过线上实战的redis 分布式锁代码。能用,但是性能较差。已考虑:1.只能被拥有锁的线程解锁2. 设置节点和超时时间用同一个key未考虑:1. 不能重入2. 没有本地锁,并发性能会比较差,不使用用在并发争锁较多的场景下。本地锁非自旋3. 未考虑锁等待排序. 这个是redis很难实现的.可以通过redis
redis 实现 curator 分布式锁_个人渣记录仅为自己搜索用的博客-CSDN博客
经过线上实战的redis 分布式锁代码。
难点: redis的锁释放, 可以利用 数据库事务 + 内存重试 + 定时任务重试来操作.
能用,但是性能较差。
已考虑:
1.只能被拥有锁的线程解锁
2. 设置节点和超时时间用同一个key
未考虑:
1. 不能重入
2. 没有本地锁,并发性能会比较差,不使用用在并发争锁较多的场景下。本地锁非自旋
3. 未考虑锁等待排序. 这个是redis很难实现的.
可以通过redis的list实现,但缺点是list下每个子节点无超时时间. redis也无法进行模糊查询 key*.
故还是通过zookeeper实现比较好. 但zookeeper 会遇到性能瓶颈,我们线下的就出现了,经常注册不上的情况.
zookeeper原理是临时节点
使用方式:
GlobalLockRedisImpl globalLockRedis = new GlobalLockRedisImpl(casRedis, maxLockSeconds, sleepTimeMillis,
redisKey);
//获取分布式加锁
globalLockRedis.lock();
try{
// do something
}finnaly{
// 分布式锁释放
globalLockRedis.unlock();
}
private boolean setIfAbsent(String key, String value, int expireMilliSeconds) {
String result = casRedis.set(key, value, "nx", "px", expireMilliSeconds);
if (result != null && result.equalsIgnoreCase("OK")) {
return true;
}
return false;
代码:
基于阿里云redis实现的分布式锁,可以实现本地加锁机制,增大并发能力.
/**
* 基于redis实现的全局锁,不能当做单例使用.
*
* @author loufei
* <p>
* 2015-5-28
* implements GlobalLock
*/
public class GlobalLockRedisImpl implements GlobalLock {
private static Logger LOGGER = LoggerFactory.getLogger(GlobalLockRedisImpl.class);
private final TairStringCluster casRedis;
private final int redisExpiredSeconds;
private final long sleepTimeMillis;
private Thread exclusiveOwnerThread;
private final String key;
private String randonNum = null;
private static ConcurrentMap<String, ReentrantLock> locks = Maps.newConcurrentMap();
public GlobalLockRedisImpl(TairStringCluster casRedis, int redisExpiredSeconds, long avgCost, String key) {
this.casRedis = casRedis;
if (redisExpiredSeconds > 3) {
this.redisExpiredSeconds = 3;
} else {
this.redisExpiredSeconds = redisExpiredSeconds;
}
if (avgCost > 100) {
this.sleepTimeMillis = 100L;
} else {
this.sleepTimeMillis = avgCost;
}
this.key = key;
}
public boolean acquireDistributedLock(TairStringCluster jedis, String resourceKey, String randomValue, int expireTime) {
ExsetParams setParams = new ExsetParams();
setParams.nx().ex(expireTime);
String result = jedis.exset(resourceKey, randomValue, setParams);
return "OK".equals(result);
}
public boolean releaseDistributedLock(TairStringCluster jedis, String resourceKey, String randomValue) {
Long ret = jedis.cad(resourceKey, randomValue);
return 1 == ret;
}
public boolean renewDistributedLock(TairStringCluster jedis, String resourceKey, String randomValue, int expireTime) {
CasParams setParams = new CasParams();
setParams.ex(expireTime);
Long ret = jedis.cas(resourceKey, randomValue, randomValue, setParams);
return 1 == ret;
}
public void lock() throws ConfServiceException {
long startTime = System.currentTimeMillis();
int tryCount = 0;
while (true) {
tryCount++;
//尝试加锁,失败的话自旋重试
Boolean result = tryLock();
if (result != null && result) {
break;
}
//检查是否超时
checkExpired(startTime, tryCount);
LOGGER.info(" spin ,key=" + key + ",tryCount=" + tryCount);
try {
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
// nothing need to be done
}
}
long endTime = System.currentTimeMillis();
long costTime = endTime - startTime;
LOGGER.info("lock success,key={},cost={},retryCount={}" +
",sleepTimeMillis={},costTimePerTime= {}" +
",startTime={},endTime={}",
key, costTime, tryCount,
sleepTimeMillis,
costTime / ((double) tryCount), startTime, endTime);
}
public void unlock() {
long startTime = System.currentTimeMillis();
if (exclusiveOwnerThread != null && exclusiveOwnerThread == Thread.currentThread()) {
ReentrantLock reentrantLock = locks.get(key);
if (reentrantLock != null) {
reentrantLock.lock();
} else {
LOGGER.error("reentrantLock is null when lock {}", key);
}
boolean success = releaseDistributedLock(casRedis, key, randonNum);
//最后释放,避免无法再次释放
exclusiveOwnerThread = null;
long endTime = System.currentTimeMillis();
LOGGER.info(" global unlock,del count=" + success + ".key=" + key + " ,costTime millis="
+ (endTime - startTime) + ",startTime=" + startTime + ",endTime=" + endTime);
} else {
LOGGER.error(" thread do not get lock ,can not unlock. key={},exclusiveOwnerThread={},current thrad={}"
, key, exclusiveOwnerThread, Thread.currentThread());
}
}
private void checkExpired(long startTime, int tryCount) throws ConfServiceException {
long end = System.currentTimeMillis();
//超时
long costTime = end - startTime;
// 分布式自旋等待时间已经已经超过了某个时间(暂定位为3秒),说明分布式竞争失败或者key没有正确的被设置超时时间.
long sleepMillisWaterMark;
if (tryCount > 2) {
String message = "get redis global lock error .1. compete failed 2. key do not set timeOut ,exist for ever ,maxLockSeconds="
+ redisExpiredSeconds
+ ",costTimeMillis="
+ costTime
+ ",key="
+ key
+ ",retryCount="
+ tryCount
+ ",sleepTimeMillis="
+ sleepTimeMillis
+ ",costTimePerTime="
+ (costTime / ((double) tryCount));
// 抛运行期异常,幂等重复执行不会总是抛错.
throw new ConfServiceException(CodeEnum.INNER_SYS_ERROR, KV.instance().kv("msg", message));
}
}
private boolean tryLock() {
ReentrantLock reentrantLock = locks.get(key);
synchronized (locks) {
reentrantLock = locks.get(key);
if (reentrantLock == null) {
locks.put(key, new ReentrantLock());
}
}
if (reentrantLock.isLocked() && !Thread.currentThread().equals(exclusiveOwnerThread)) {
return false;
}
// setIfAbsent tps对于并发锁控制够用了.
String randomValue = UUID.randomUUID().toString();
Boolean success = acquireDistributedLock(casRedis, key, randomValue, redisExpiredSeconds * 1000);
// 处理锁的自动释放,前三次尝试加锁都会进行超时设置,保证分布式锁有timeOut.避免主加锁进程被无故停止,导致key无失效时间,锁永远不被释放.
if (success) {
reentrantLock.lock();
exclusiveOwnerThread = Thread.currentThread();
randonNum = randomValue;
}
return success;
}
public int getRedisExpiredSeconds() {
return redisExpiredSeconds;
}
}
版本2 , 未经线上验证.
/**
* 基于redis实现的全局锁,不能当做单例使用.
*
* @author loufei
*
* 2015-5-28
*/
public class GlobalLockSeqRedisImpl implements GlobalLock {
private static long maxWaitSeconds = 2;
private static final String split = "____";
private static AtomicInteger seqCount = new AtomicInteger();
private static String host = null;
private static ILog LOGGER = LogFactory.getLog(GlobalLockRedisImpl.class);
private final static ExecutorService threadPoolExecutor = TaxiExecutors
.newCachedThreadPool(new ThreadFactoryBuilder("GlobalLockRedisImpl"));
private final didikuaidi.redis.clients.jedis.JedisCommands casRedis;
private final int maxLockSeconds;
private final long sleepTimeMillis;
private Thread exclusiveOwnerThread;
private final String key;
private final Lock lock = new ReentrantLock();
private String value = null;
static {
InetAddress localHost = null;
try {
localHost = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
host = (UUID.randomUUID() + localHost.getHostAddress()).replace(split, "");
}
public GlobalLockSeqRedisImpl(JedisCommands casRedis, int maxLockSeconds, long sleepTimeMillis, String key) {
this.casRedis = casRedis;
this.maxLockSeconds = maxLockSeconds;
if (maxLockSeconds > 30) {
maxLockSeconds = 30;
}
this.sleepTimeMillis = sleepTimeMillis;
if (sleepTimeMillis > 1000) {
sleepTimeMillis = 1000;
}
this.key = key;
}
private boolean setIfAbsent(String key, String value, int expireMilliSeconds) {
String result = casRedis.set(key, value, "nx", "px", expireMilliSeconds);
if (result != null && result.equalsIgnoreCase("OK")) {
return true;
}
return false;
}
private int lockRedisSeq(long startMillis) {
int seqNo = seqCount.incrementAndGet();
value = host + split + new Date().getTime() + split + seqNo;
Long listSize = casRedis.lpush(key, value);
int count = 1;
LOGGER.info("listSize=" + listSize);
if (listSize.longValue() == 1l) {
// 长度是1,说明只有自己拿到锁.
return count;
} else {
// 如果锁过多,打印error报警
if (listSize > DynamicConfig.getInt(DynamicConfigKeys.KUAIPAY_LOCK_LIMIT, 21)) {
LOGGER.error("too_much_lock_node listSize=" + listSize);
} else {
LOGGER.info("lock_queue_size=" + listSize + ",key=" + key);
}
// 说明没有锁住
while (true) {
try {
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
// -1代表从尾部获取.FIFO原则
String tailNodeValue = casRedis.lindex(key, -1);
if (tailNodeValue == null) {
// 自己的节点莫名其妙被人删除了
String message = "tailNodeValue is null ,all node is delete inclued itself key=" + key;
LOGGER.error(message);
throw new GlobalLockTimeOutException(message);
} else if (tailNodeValue.equals(value)) {
// 拿到锁了
return count;
} else {
String[] split = tailNodeValue.split(GlobalLockSeqRedisImpl.split);
String host = split[0];
// 保持数据兼容性避免报错
Date nodeCreateTime = new Date(Long.valueOf(split[1]));
long diffInMillis = new Date().getTime() - nodeCreateTime.getTime();
long maxLockTime = TimeUnit.MINUTES.toMillis(1);
if (host.equals(this.host) && diffInMillis > TimeUnit.SECONDS.toMillis(3)) {
// 发现当前节点的机器是本机(不用担心不同机器的时间不一致问题),且超过3秒,立即删除.
lrem(tailNodeValue);
}
if (diffInMillis > TimeUnit.SECONDS.toMillis(30) && diffInMillis <= maxLockTime) {
// 大量error报警,发现问题.请检查对应ip机器和日志所在机器的时间是否一致.并且java默认时区是否一致
LOGGER.error("Lock Node donot unLock ,请检查对应机器的时间是否一致,key=" + key + " unlockNodeIp= " + split[0]
+ ",nodeCreateTime=" + nodeCreateTime);
} else if (diffInMillis > maxLockTime) {
// 判断下别人节点的时间是否超时
LOGGER.error("Lock Node donot unLock ,请检查对应机器的时间是否一致,key=" + key + " unlockNodeIp= " + split[0]
+ ",nodeCreateTime=" + nodeCreateTime);
lrem(tailNodeValue);
}
}
long now = System.currentTimeMillis();
long costTime = now - startMillis;
count++;
if (costTime > TimeUnit.SECONDS.toMillis(maxLockSeconds)) {
String errorString = getErrorString(count, costTime);
throw new GlobalLockTimeOutException(errorString);
}
}
}
//
}
private void lrem(String tailNodeValue) {
// -1代表从尾部获取.FIFO原则.采用rem时间复杂度可能增加,这种情况毕竟少见
long remCount = casRedis.lrem(key, -2, tailNodeValue);
if (remCount != 1) {
LOGGER.error("del count dot not euqal 1,remCount=" + remCount);
}
}
@Override
public void lock() {
long startTime = System.currentTimeMillis();
// 抢占本地锁,会自动唤醒
try {
boolean success = lock.tryLock(maxWaitSeconds, TimeUnit.SECONDS);
if (!success) {
long end = System.currentTimeMillis();
// 超时
long costTime = end - startTime;
String message = getErrorString(1, costTime);
LOGGER.error("lock timeOut " + message);
throw new GlobalLockTimeOutException(message);
}
} catch (Exception e) {
long end = System.currentTimeMillis();
// 被其他人中断
long costTime = end - startTime;
String message = getErrorString(1, costTime);
LOGGER.error("lock meet exception " + message, e);
throw new GlobalLockTimeOutException(message);
}
int tryCount = lockRedisSeq(startTime);
exclusiveOwnerThread = Thread.currentThread();
long endTime = System.currentTimeMillis();
long costTime = endTime - startTime;
LOGGER.info("get redis global lock success,maxLockSeconds=" + maxLockSeconds + ",costTimeMillis=" + costTime
+ ",key=" + key + ",retryCount=" + tryCount + ",sleepTimeMillis=" + sleepTimeMillis
+ ",costTimePerTime=" + costTime / ((double) tryCount) + ",startTime=" + startTime + ",endTime="
+ endTime);
}
private int lockRedis(long startTime) {
// 抢占远程分布式锁,无解锁通知,故采用自旋等待
int tryCount = 0;
while (true) {
tryCount++;
// setIfAbsent tps对于并发锁控制够用了.
Boolean result = this.setIfAbsent(key, "1", maxLockSeconds * 1000);
// 处理锁的自动释放,前三次尝试加锁都会进行超时设置,保证分布式锁有timeOut.避免主加锁进程被无故停止,导致key无失效时间,锁永远不被释放.
if (result == null || !result) {// 加锁失败,阻塞调用线程
LOGGER.info(" spin ,key=" + key + ",tryCount=" + tryCount);
try {
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
// nothing need to be done
}
long end = System.currentTimeMillis();
// 超时
long costTime = end - startTime;
// 分布式自旋等待时间已经已经超过了某个时间(暂定位为3秒),说明分布式竞争失败或者key没有正确的被设置超时时间.
long sleepMillisWaterMark = TimeUnit.SECONDS.toMillis(maxWaitSeconds);
if (costTime > sleepMillisWaterMark) {
String message = getErrorString(tryCount, costTime);
// 抛运行期异常,幂等重复执行不会总是抛错.
throw new GlobalLockTimeOutException(message);
}
continue;
}
exclusiveOwnerThread = Thread.currentThread();
break;
}
return tryCount;
}
private String getErrorString(int tryCount, long costTime) {
return "get redis global lock error .1. compete failed 2. key do not set timeOut ,exist for ever ,maxLockSeconds="
+ maxLockSeconds + ",costTimeMillis=" + costTime + ",key=" + key + ",retryCount=" + tryCount
+ ",sleepTimeMillis=" + sleepTimeMillis + ",costTimePerTime=" + (costTime / ((double) tryCount));
}
@Override
public void unlock() {
long startTime = System.currentTimeMillis();
try {
lock.unlock();
} catch (IllegalMonitorStateException e) {
LOGGER.error("IllegalMonitorStateException", e);
}
if (exclusiveOwnerThread == Thread.currentThread()) {
// 解锁,如果解锁失败,重试解锁三次
Long delCount = unlockSeqRedisAndRetryIfError();
long endTime = System.currentTimeMillis();
exclusiveOwnerThread = null;
LOGGER.info(" global unlock,del count=" + delCount + ".key=" + key + " ,costTime millis="
+ (endTime - startTime) + ",startTime=" + startTime + ",endTime=" + endTime);
} else {
LOGGER.info(" thread do not get lock ,can not unlock. key=" + key + ",exclusiveOwnerThread="
+ exclusiveOwnerThread + ",current thrad=" + Thread.currentThread());
}
}
private Long unlockSeqRedisAndRetryIfError() {
Long delCount = 0l;
try {
delCount = unlockSeqRedisLock();
} catch (Exception e) {
LOGGER.error("unlockSeqRedisLock error", e);
threadPoolExecutor.execute(getRetryUnlockTask());
}
return delCount;
}
private Runnable getRetryUnlockTask() {
return new Runnable() {
@Override
public void run() {
sleepSlience();
for (int i = 0; i < 3; i++) {
try {
unlockSeqRedisLock();
} catch (Exception e) {
// 出现网络超时等异常情况时,重试
LOGGER.error("unlockSeqRedisLock error", e);
sleepSlience();
continue;
}
break;
}
}
private void sleepSlience() {
try {
Thread.sleep(1000l);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
};
}
@Deprecated
// 请使用unlockSeqRedisAndRetryIfError
private Long unlockSeqRedisLock() {
// -1代表从尾部删除.FIFO原则
String lastNodeValue = casRedis.rpop(key);
if (!value.equals(lastNodeValue)) {
LOGGER.error("del result do not match expect value,expect=" + value + ",acutal value=" + lastNodeValue);
// 不能用Rpush,否则将导致锁节点被随意变更,造成混乱.
casRedis.lpush(key, lastNodeValue);
Long lrem = casRedis.lrem(key, -2, value);
if (lrem != 1) {
LOGGER.error("del resutl do not match expect value,expect" + value);
}
return lrem;
}
return 1l;
}
private Long unlockRedisLock() {
return casRedis.del(key);
}
public int getMaxLockSeconds() {
return maxLockSeconds;
}
}
更多推荐
所有评论(0)