redis 实现 curator 分布式锁_个人渣记录仅为自己搜索用的博客-CSDN博客

经过线上实战的redis 分布式锁代码。

难点: redis的锁释放, 可以利用 数据库事务 + 内存重试 + 定时任务重试来操作.

    能用,但是性能较差。

已考虑:

    1.只能被拥有锁的线程解锁

    2. 设置节点和超时时间用同一个key

 未考虑:

 1. 不能重入

 2. 没有本地锁,并发性能会比较差,不使用用在并发争锁较多的场景下。本地锁非自旋

 3. 未考虑锁等待排序. 这个是redis很难实现的.

     可以通过redis的list实现,但缺点是list下每个子节点无超时时间. redis也无法进行模糊查询 key*.

     故还是通过zookeeper实现比较好. 但zookeeper 会遇到性能瓶颈,我们线下的就出现了,经常注册不上的情况.

    zookeeper原理是临时节点

   

使用方式:

GlobalLockRedisImpl globalLockRedis = new GlobalLockRedisImpl(casRedis, maxLockSeconds, sleepTimeMillis,
                redisKey);
            //获取分布式加锁
            globalLockRedis.lock();

try{

   // do something

}finnaly{

 // 分布式锁释放
                globalLockRedis.unlock();
}

    private boolean setIfAbsent(String key, String value, int expireMilliSeconds) {
            String result = casRedis.set(key, value, "nx", "px", expireMilliSeconds);
            if (result != null && result.equalsIgnoreCase("OK")) {
                return true;
            }
            return false;

   
 

代码:

基于阿里云redis实现的分布式锁,可以实现本地加锁机制,增大并发能力.

/**
 * 基于redis实现的全局锁,不能当做单例使用.
 *
 * @author loufei
 * <p>
 * 2015-5-28
 * implements GlobalLock
 */
public class GlobalLockRedisImpl implements GlobalLock {
    private static Logger LOGGER = LoggerFactory.getLogger(GlobalLockRedisImpl.class);

    private final TairStringCluster casRedis;
    private final int redisExpiredSeconds;
    private final long sleepTimeMillis;
    private Thread exclusiveOwnerThread;
    private final String key;

    private String randonNum = null;
    private static ConcurrentMap<String, ReentrantLock> locks = Maps.newConcurrentMap();

    public GlobalLockRedisImpl(TairStringCluster casRedis, int redisExpiredSeconds, long avgCost, String key) {
        this.casRedis = casRedis;
        if (redisExpiredSeconds > 3) {
            this.redisExpiredSeconds = 3;
        } else {
            this.redisExpiredSeconds = redisExpiredSeconds;
        }
        if (avgCost > 100) {
            this.sleepTimeMillis = 100L;
        } else {
            this.sleepTimeMillis = avgCost;
        }
        this.key = key;
    }

    public boolean acquireDistributedLock(TairStringCluster jedis, String resourceKey, String randomValue, int expireTime) {
        ExsetParams setParams = new ExsetParams();
        setParams.nx().ex(expireTime);
        String result = jedis.exset(resourceKey, randomValue, setParams);
        return "OK".equals(result);
    }


    public boolean releaseDistributedLock(TairStringCluster jedis, String resourceKey, String randomValue) {
        Long ret = jedis.cad(resourceKey, randomValue);
        return 1 == ret;
    }

    public boolean renewDistributedLock(TairStringCluster jedis, String resourceKey, String randomValue, int expireTime) {
        CasParams setParams = new CasParams();
        setParams.ex(expireTime);
        Long ret = jedis.cas(resourceKey, randomValue, randomValue, setParams);
        return 1 == ret;
    }

    public void lock() throws ConfServiceException {
        long startTime = System.currentTimeMillis();
        int tryCount = 0;
        while (true) {
            tryCount++;
            //尝试加锁,失败的话自旋重试
            Boolean result = tryLock();
            if (result != null && result) {
                break;
            }
            //检查是否超时
            checkExpired(startTime, tryCount);

            LOGGER.info(" spin ,key=" + key + ",tryCount=" + tryCount);

            try {
                Thread.sleep(sleepTimeMillis);
            } catch (InterruptedException e) {
                // nothing need to be done
            }
        }
        long endTime = System.currentTimeMillis();
        long costTime = endTime - startTime;
        LOGGER.info("lock success,key={},cost={},retryCount={}" +
                        ",sleepTimeMillis={},costTimePerTime= {}" +
                        ",startTime={},endTime={}",
                key, costTime, tryCount,
                sleepTimeMillis,
                costTime / ((double) tryCount), startTime, endTime);
    }

    public void unlock() {
        long startTime = System.currentTimeMillis();
        if (exclusiveOwnerThread != null && exclusiveOwnerThread == Thread.currentThread()) {
            ReentrantLock reentrantLock = locks.get(key);
            if (reentrantLock != null) {
                reentrantLock.lock();
            } else {
                LOGGER.error("reentrantLock is null when lock {}", key);
            }
            boolean success = releaseDistributedLock(casRedis, key, randonNum);
            //最后释放,避免无法再次释放
            exclusiveOwnerThread = null;

            long endTime = System.currentTimeMillis();

            LOGGER.info(" global unlock,del count=" + success + ".key=" + key + " ,costTime millis="
                    + (endTime - startTime) + ",startTime=" + startTime + ",endTime=" + endTime);
        } else {
            LOGGER.error(" thread do not get lock ,can not unlock. key={},exclusiveOwnerThread={},current thrad={}"
                    , key, exclusiveOwnerThread, Thread.currentThread());
        }

    }

    private void checkExpired(long startTime, int tryCount) throws ConfServiceException {
        long end = System.currentTimeMillis();

        //超时
        long costTime = end - startTime;
        // 分布式自旋等待时间已经已经超过了某个时间(暂定位为3秒),说明分布式竞争失败或者key没有正确的被设置超时时间.
        long sleepMillisWaterMark;
        if (tryCount > 2) {
            String message = "get redis global lock error .1. compete failed 2. key do not set  timeOut ,exist for ever  ,maxLockSeconds="
                    + redisExpiredSeconds
                    + ",costTimeMillis="
                    + costTime
                    + ",key="
                    + key
                    + ",retryCount="
                    + tryCount
                    + ",sleepTimeMillis="
                    + sleepTimeMillis
                    + ",costTimePerTime="
                    + (costTime / ((double) tryCount));
            // 抛运行期异常,幂等重复执行不会总是抛错.
            throw new ConfServiceException(CodeEnum.INNER_SYS_ERROR, KV.instance().kv("msg", message));
        }
    }

    private boolean tryLock() {
        ReentrantLock reentrantLock = locks.get(key);
        synchronized (locks) {
            reentrantLock = locks.get(key);

            if (reentrantLock == null) {
                locks.put(key, new ReentrantLock());
            }
        }
        if (reentrantLock.isLocked() && !Thread.currentThread().equals(exclusiveOwnerThread)) {
            return false;
        }
        // setIfAbsent tps对于并发锁控制够用了.
        String randomValue = UUID.randomUUID().toString();
        Boolean success = acquireDistributedLock(casRedis, key, randomValue, redisExpiredSeconds * 1000);
        // 处理锁的自动释放,前三次尝试加锁都会进行超时设置,保证分布式锁有timeOut.避免主加锁进程被无故停止,导致key无失效时间,锁永远不被释放.
        if (success) {
            reentrantLock.lock();
            exclusiveOwnerThread = Thread.currentThread();
            randonNum = randomValue;
        }
        return success;
    }


    public int getRedisExpiredSeconds() {
        return redisExpiredSeconds;
    }


}

版本2 , 未经线上验证.

/**
 * 基于redis实现的全局锁,不能当做单例使用.
 * 
 * @author loufei
 * 
 *         2015-5-28
 */
public class GlobalLockSeqRedisImpl implements GlobalLock {
    private static long maxWaitSeconds = 2;
    private static final String split = "____";
    private static AtomicInteger seqCount = new AtomicInteger();

    private static String host = null;
    private static ILog LOGGER = LogFactory.getLog(GlobalLockRedisImpl.class);

    private final static ExecutorService threadPoolExecutor = TaxiExecutors
            .newCachedThreadPool(new ThreadFactoryBuilder("GlobalLockRedisImpl"));
    private final didikuaidi.redis.clients.jedis.JedisCommands casRedis;
    private final int maxLockSeconds;
    private final long sleepTimeMillis;
    private Thread exclusiveOwnerThread;
    private final String key;
    private final Lock lock = new ReentrantLock();
    private String value = null;

    static {
        InetAddress localHost = null;
        try {
            localHost = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        host = (UUID.randomUUID() + localHost.getHostAddress()).replace(split, "");
    }

    public GlobalLockSeqRedisImpl(JedisCommands casRedis, int maxLockSeconds, long sleepTimeMillis, String key) {
        this.casRedis = casRedis;
        this.maxLockSeconds = maxLockSeconds;
        if (maxLockSeconds > 30) {
            maxLockSeconds = 30;
        }
        this.sleepTimeMillis = sleepTimeMillis;
        if (sleepTimeMillis > 1000) {
            sleepTimeMillis = 1000;
        }
        this.key = key;
    }

    private boolean setIfAbsent(String key, String value, int expireMilliSeconds) {
        String result = casRedis.set(key, value, "nx", "px", expireMilliSeconds);
        if (result != null && result.equalsIgnoreCase("OK")) {
            return true;
        }
        return false;

    }

    private int lockRedisSeq(long startMillis) {
        int seqNo = seqCount.incrementAndGet();

        value = host + split + new Date().getTime() + split + seqNo;
        Long listSize = casRedis.lpush(key, value);
        int count = 1;
        LOGGER.info("listSize=" + listSize);
        if (listSize.longValue() == 1l) {
            // 长度是1,说明只有自己拿到锁.
            return count;
        } else {
            // 如果锁过多,打印error报警
            if (listSize > DynamicConfig.getInt(DynamicConfigKeys.KUAIPAY_LOCK_LIMIT, 21)) {
                LOGGER.error("too_much_lock_node listSize=" + listSize);
            } else {
                LOGGER.info("lock_queue_size=" + listSize + ",key=" + key);
            }
            // 说明没有锁住
            while (true) {
                try {
                    Thread.sleep(sleepTimeMillis);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // -1代表从尾部获取.FIFO原则
                String tailNodeValue = casRedis.lindex(key, -1);
                if (tailNodeValue == null) {
                    // 自己的节点莫名其妙被人删除了
                    String message = "tailNodeValue is null ,all node is delete inclued itself key=" + key;
                    LOGGER.error(message);
                    throw new GlobalLockTimeOutException(message);
                } else if (tailNodeValue.equals(value)) {
                    // 拿到锁了
                    return count;
                } else {
                    String[] split = tailNodeValue.split(GlobalLockSeqRedisImpl.split);
                    String host = split[0];
                    // 保持数据兼容性避免报错
                    Date nodeCreateTime = new Date(Long.valueOf(split[1]));
                    long diffInMillis = new Date().getTime() - nodeCreateTime.getTime();
                    long maxLockTime = TimeUnit.MINUTES.toMillis(1);
                    if (host.equals(this.host) && diffInMillis > TimeUnit.SECONDS.toMillis(3)) {
                        // 发现当前节点的机器是本机(不用担心不同机器的时间不一致问题),且超过3秒,立即删除.
                        lrem(tailNodeValue);
                    }
                    if (diffInMillis > TimeUnit.SECONDS.toMillis(30) && diffInMillis <= maxLockTime) {
                        // 大量error报警,发现问题.请检查对应ip机器和日志所在机器的时间是否一致.并且java默认时区是否一致
                        LOGGER.error("Lock Node donot unLock ,请检查对应机器的时间是否一致,key=" + key + " unlockNodeIp= " + split[0]
                                + ",nodeCreateTime=" + nodeCreateTime);
                    } else if (diffInMillis > maxLockTime) {
                        // 判断下别人节点的时间是否超时
                        LOGGER.error("Lock Node donot unLock ,请检查对应机器的时间是否一致,key=" + key + " unlockNodeIp= " + split[0]
                                + ",nodeCreateTime=" + nodeCreateTime);
                        lrem(tailNodeValue);

                    }
                }

                long now = System.currentTimeMillis();
                long costTime = now - startMillis;
                count++;

                if (costTime > TimeUnit.SECONDS.toMillis(maxLockSeconds)) {
                    String errorString = getErrorString(count, costTime);
                    throw new GlobalLockTimeOutException(errorString);
                }

            }
        }
        //
    }

    private void lrem(String tailNodeValue) {
        // -1代表从尾部获取.FIFO原则.采用rem时间复杂度可能增加,这种情况毕竟少见
        long remCount = casRedis.lrem(key, -2, tailNodeValue);
        if (remCount != 1) {
            LOGGER.error("del count dot not  euqal 1,remCount=" + remCount);
        }
    }

    @Override
    public void lock() {
        long startTime = System.currentTimeMillis();

        // 抢占本地锁,会自动唤醒
        try {
            boolean success = lock.tryLock(maxWaitSeconds, TimeUnit.SECONDS);
            if (!success) {
                long end = System.currentTimeMillis();
                // 超时
                long costTime = end - startTime;
                String message = getErrorString(1, costTime);
                LOGGER.error("lock timeOut " + message);
                throw new GlobalLockTimeOutException(message);
            }
        } catch (Exception e) {
            long end = System.currentTimeMillis();
            // 被其他人中断
            long costTime = end - startTime;
            String message = getErrorString(1, costTime);
            LOGGER.error("lock meet exception " + message, e);
            throw new GlobalLockTimeOutException(message);
        }
        int tryCount = lockRedisSeq(startTime);
        exclusiveOwnerThread = Thread.currentThread();
        long endTime = System.currentTimeMillis();
        long costTime = endTime - startTime;
        LOGGER.info("get redis global lock success,maxLockSeconds=" + maxLockSeconds + ",costTimeMillis=" + costTime
                + ",key=" + key + ",retryCount=" + tryCount + ",sleepTimeMillis=" + sleepTimeMillis
                + ",costTimePerTime=" + costTime / ((double) tryCount) + ",startTime=" + startTime + ",endTime="
                + endTime);
    }

    private int lockRedis(long startTime) {
        // 抢占远程分布式锁,无解锁通知,故采用自旋等待
        int tryCount = 0;
        while (true) {
            tryCount++;
            // setIfAbsent tps对于并发锁控制够用了.
            Boolean result = this.setIfAbsent(key, "1", maxLockSeconds * 1000);
            // 处理锁的自动释放,前三次尝试加锁都会进行超时设置,保证分布式锁有timeOut.避免主加锁进程被无故停止,导致key无失效时间,锁永远不被释放.

            if (result == null || !result) {// 加锁失败,阻塞调用线程
                LOGGER.info(" spin ,key=" + key + ",tryCount=" + tryCount);
                try {
                    Thread.sleep(sleepTimeMillis);
                } catch (InterruptedException e) {
                    // nothing need to be done
                }
                long end = System.currentTimeMillis();

                // 超时
                long costTime = end - startTime;
                // 分布式自旋等待时间已经已经超过了某个时间(暂定位为3秒),说明分布式竞争失败或者key没有正确的被设置超时时间.
                long sleepMillisWaterMark = TimeUnit.SECONDS.toMillis(maxWaitSeconds);
                if (costTime > sleepMillisWaterMark) {
                    String message = getErrorString(tryCount, costTime);
                    // 抛运行期异常,幂等重复执行不会总是抛错.
                    throw new GlobalLockTimeOutException(message);
                }
                continue;
            }
            exclusiveOwnerThread = Thread.currentThread();

            break;
        }
        return tryCount;
    }

    private String getErrorString(int tryCount, long costTime) {
        return "get redis global lock error .1. compete failed 2. key do not set  timeOut ,exist for ever  ,maxLockSeconds="
                + maxLockSeconds + ",costTimeMillis=" + costTime + ",key=" + key + ",retryCount=" + tryCount
                + ",sleepTimeMillis=" + sleepTimeMillis + ",costTimePerTime=" + (costTime / ((double) tryCount));
    }

    @Override
    public void unlock() {
        long startTime = System.currentTimeMillis();
        try {
            lock.unlock();
        } catch (IllegalMonitorStateException e) {
            LOGGER.error("IllegalMonitorStateException", e);
        }
        if (exclusiveOwnerThread == Thread.currentThread()) {

            // 解锁,如果解锁失败,重试解锁三次
            Long delCount = unlockSeqRedisAndRetryIfError();
            long endTime = System.currentTimeMillis();
            exclusiveOwnerThread = null;
            LOGGER.info(" global unlock,del count=" + delCount + ".key=" + key + " ,costTime millis="
                    + (endTime - startTime) + ",startTime=" + startTime + ",endTime=" + endTime);
        } else {
            LOGGER.info(" thread do not get lock ,can not unlock. key=" + key + ",exclusiveOwnerThread="
                    + exclusiveOwnerThread + ",current thrad=" + Thread.currentThread());
        }

    }

    private Long unlockSeqRedisAndRetryIfError() {
        Long delCount = 0l;
        try {
            delCount = unlockSeqRedisLock();
        } catch (Exception e) {
            LOGGER.error("unlockSeqRedisLock error", e);
            threadPoolExecutor.execute(getRetryUnlockTask());
        }
        return delCount;
    }

    private Runnable getRetryUnlockTask() {
        return new Runnable() {
            @Override
            public void run() {
                sleepSlience();
                for (int i = 0; i < 3; i++) {
                    try {
                        unlockSeqRedisLock();
                    } catch (Exception e) {
                        // 出现网络超时等异常情况时,重试
                        LOGGER.error("unlockSeqRedisLock error", e);
                        sleepSlience();
                        continue;
                    }
                    break;
                }
            }

            private void sleepSlience() {
                try {
                    Thread.sleep(1000l);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        };
    }

    @Deprecated
    // 请使用unlockSeqRedisAndRetryIfError
    private Long unlockSeqRedisLock() {
        // -1代表从尾部删除.FIFO原则
        String lastNodeValue = casRedis.rpop(key);
        if (!value.equals(lastNodeValue)) {
            LOGGER.error("del result do not match expect value,expect=" + value + ",acutal value=" + lastNodeValue);
            // 不能用Rpush,否则将导致锁节点被随意变更,造成混乱.
            casRedis.lpush(key, lastNodeValue);
            Long lrem = casRedis.lrem(key, -2, value);
            if (lrem != 1) {
                LOGGER.error("del resutl do not match expect value,expect" + value);
            }
            return lrem;
        }
        return 1l;

    }

    private Long unlockRedisLock() {
        return casRedis.del(key);
    }

    public int getMaxLockSeconds() {
        return maxLockSeconds;
    }
}


   

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐