背景

在多线程环境下,通常会使用锁来保证有且只有一个线程来操作共享资源

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁。

可靠性
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

互斥性

在任意时刻,只有一个客户端能持有锁。不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。


redis连接工具类,其中在枷锁中使用到了只有后两个方法,接口类就不列举了:

package com.ganinfo.redis;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.ganinfo.utils.CommonUtil;
import com.ganinfo.utils.UUIDUtil;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Service;

import redis.clients.jedis.Jedis;

import com.ganinfo.utils.SerializeUtil;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


/**
 * @author shuyu.wang
 * @version V1.0
 * @ClassName: RedisDaoImpl
 * @Description: TODO
 * @date 2017年10月19日 下午2:54:39
 */
@Service
@RefreshScope
public class RedisServiceImpl implements RedisService {
    private Logger logger = Logger.getLogger(RedisServiceImpl.class);

    @Value("${spring.redis.host}")
    private String ip;
    @Value("${spring.redis.port}")
    private Integer port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.maxActive}")
    private Integer maxActive;
    @Value("${spring.redis.maxIdle}")
    private Integer maxIdle;
    @Value("${spring.redis.maxWait}")
    private Long maxWait;
    @Value("${spring.redis.testOnBorrow}")
    private Boolean testOnBorrow;
    @Value("${spring.redis.testOnReturn}")
    private Boolean testOnReturn;
    @Value("${spring.redis.timeOut}")
    private Integer timeOut;
    @Value("${spring.redis.redisExpire}")
    private int redisExpire;
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    private static final String VIRTUAL_COURSE_PREX = "lc_vc_";

    /**
     * 非切片链接池
     */
    private JedisPool jedisPool;

    /**
     * 在多线程环境同步初始化
     */
    private void poolInit() {
        if (jedisPool == null) {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxTotal(maxActive);
            config.setMaxIdle(maxIdle);
            config.setMaxWaitMillis(maxWait);
            config.setTestOnBorrow(testOnBorrow);
            config.setTestOnReturn(testOnReturn);
            jedisPool = new JedisPool(config, ip, port, timeOut, password);
            logger.info("初始化redis连接池");
        }

    }

    /**
     * 非切片客户端链接 同步获取非切片Jedis实例
     *
     * @return Jedis
     */
    @SuppressWarnings("deprecation")
    private synchronized Jedis getJedis() {
        if (jedisPool == null) {
            poolInit();
        }
        Jedis jedis = null;
        try {
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }
        return jedis;

    }




    private String buildKey(String key) {
        return VIRTUAL_COURSE_PREX + key;
    }

    @Override
    public void set(String key, String param) {
        String bKey = buildKey(key);
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                jedis.set(bKey.getBytes(), SerializeUtil.serialize(param));
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }

    }

    @Override
    public void setWithExpireTime(String key, String value, int expireTime) {
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                jedis.setex(buildKey(key), expireTime, value);
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }

    }

    @Override
    public void setWithExpireTime(String key, String value) {
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                logger.info("获取redis连接");
                jedis.setex(buildKey(key), redisExpire, value);
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }

    }

    @Override
    public String get(String key) {
        String bKey = buildKey(key);
        String retru = null;
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                logger.info("获取redis连接");
                if (jedis == null || !jedis.exists(bKey.getBytes())) {
                    return null;
                }
                byte[] in = jedis.get(bKey.getBytes());
                retru = SerializeUtil.unserialize(in).toString();

            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }
        return retru;

    }

    @Override
    public void del(String key) {
        String bKey = buildKey(key);
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis != null && jedis.exists(bKey.getBytes())) {
                    jedis.del(bKey.getBytes());
                }

            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }


    }

    @Override
    public Boolean exists(String key) {
        String bKey = buildKey(key);
        Jedis jedis = null;
        boolean flag=false;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis != null && jedis.exists(bKey.getBytes())) {
                    flag= true;
                }
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }
        }
        return flag;

    }


    /**
     * @param key
     * @param bean
     */
    @Override
    public <T> void setBean(String key, Object bean) {
        String bKey = buildKey(key);
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                jedis.set(bKey.getBytes(), SerializeUtil.serialize(bean));
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }

    }

    @Override
    public <T> T getBean(String key) {
        String bKey = buildKey(key);
        T bean = null;
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis == null || !jedis.exists(bKey.getBytes())) {
                    return null;
                }
                byte[] in = jedis.get(bKey.getBytes());
                bean = (T) SerializeUtil.unserialize(in);
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }
        return bean;

    }

    /**
     * @param key
     * @param list
     */
    @Override
    public <T> void setList(String key, List<T> list) {
        String bKey = buildKey(key);
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                jedis.set(bKey.getBytes(), SerializeUtil.serialize(list));
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
            }

        }

    }

    /**
     * @param key
     * @return list
     */
    @Override
    public <T> List<T> getList(String key) {
        Jedis jedis = null;
        String bKey = buildKey(key);
        List<T> list = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis == null || !jedis.exists(bKey.getBytes())) {
                    return null;
                }
                byte[] in = jedis.get(bKey.getBytes());
                list = (List<T>) SerializeUtil.unserialize(in);
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }
        return list;
    }

    /*
     * (non-Javadoc)
     *
     * @see com.hc.redis.dao.RedisDao#login(java.lang.String, int)
     */
    @Override
    public String login(String userId) {
        logger.info("用户登录");
        String accessToken = UUIDUtil.creatUUID();
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis == null || !jedis.exists(userId)) {
                    // token生产规则自定义ֵ
                    jedis.setex(accessToken, redisExpire, userId);
                    jedis.setex(userId, redisExpire, accessToken);
                } else {
                    //销毁之前的token
                    String token = jedis.get(userId);
                    if (CommonUtil.isNotNullEmpty(token)) {
//                        if (jedis == null || !jedis.exists(token)) {
                            jedis.del(token);
//                        }
                    }
                    jedis.del(userId);
                    //重新生成token
                    jedis.setex(accessToken, redisExpire, userId);
                    jedis.setex(userId, redisExpire, accessToken);
                }
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }
        }
        return accessToken;
    }

    @Override
    public void validate(String token) {
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis == null || !jedis.exists(token)) {
                } else {
                    //重新设置有效时间
                    String userId = this.getUserId(token);
                    jedis.expire(token, redisExpire);
                    jedis.expire(userId, redisExpire);
                }
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }
        }

    }

    @Override
    public void logout(String token) {
        Jedis jedis = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis != null && jedis.exists(token)) {
                    String userId = this.getUserId(token);
                    if (CommonUtil.isNotNullEmpty(userId)) {
                        jedis.del(userId);
                    }
                    jedis.del(token);
                }
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }

    }

    @Override
    public String getUserId(String token) {
        Jedis jedis = null;
        String userId = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis != null && jedis.exists(token)) {
                    userId = jedis.get(token);
                }
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
               
            }

        }
        return userId;
    }
    @Override
    public String getSupportType() {
        return "redis";
    }

    @Override
    public long getValue(String key) {
        Jedis jedis = null;
        String count = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                if (jedis != null && jedis.exists(key)) {
                    count = jedis.get(key);
                }else {
                    return 0;
                }
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);

            }

        }
        return Long.valueOf(count);
    }

    @Override
    public String setLock(String lockKey, String requestId, int expireTime) {
        Jedis jedis = null;
        String result = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
            }
        }
        return result;
    }

    @Override
    public Object  releaseLock(String lockKey, String requestId) {
        Jedis jedis = null;
        Object  result = null;
        try {
            if (jedisPool == null) {
                poolInit();
            }
            if (jedisPool != null) {
                jedis = jedisPool.getResource();
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

            }
        } catch (Exception e) {
            logger.error("redis连接异常");
            e.printStackTrace();
            // 释放jedis对象
            jedisPool.returnBrokenResource(jedis);
        } finally {
            // 返还连接池
            if (jedis != null && jedisPool != null) {
                jedisPool.returnResource(jedis);
            }
        }
        return result;
    }
}

redis加锁工具类:

@Component
public class RedisDisLock {
    private static final Logger logger = LoggerFactory.getLogger(RedisDisLock.class);

    private static final String LOCK_SUCCESS = "OK";
    private static final Long RELEASE_SUCCESS = 1L;
    @Autowired
    private RedisFactory redisFactory;

    private static final long expired = 1000;//1秒超时


    /**
     * @param lockKey 锁名唯一
     * @param requestId UUID生成
     * @param expireTime key的过期时间
     * @author:shuyu.wang
     * @description:上锁
     * @date: 2018/5/11 15:01
    */
    public boolean tryGetDistributedLock(String lockKey, String requestId, int expireTime) {
        boolean success = false;
        long now = System.currentTimeMillis();
        long timeOutAt = now + expireTime;
        RedisService redisService = redisFactory.getRedis();
        while (true){
            String result = redisService.setLock(lockKey, requestId, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                success = true;
                return success;
            }
            else {
                //超出请求时间返回false
                if (System.currentTimeMillis() > timeOutAt){
                    success = false;
                    System.err.println("未获取到资源");
                    return success;
                }else {
                    // 休眠一段时间,继续获取锁
                    try {
                        System.err.println("睡眠1秒继续获取");
                        Thread.sleep((long)1000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }


    /**
     * @param lockKey 锁名唯一
     * @param requestId UUID生成
     * @author:shuyu.wang
     * @description:释放锁
     * @date: 2018/5/11 15:03
    */
    public boolean releaseLock(String lockKey, String requestId) {
        RedisService redisService = redisFactory.getRedis();
        Object result = redisService.releaseLock(lockKey, requestId);
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }


}

测试类,测试的接口类也不列举了:

@Service
public class LockImpl implements Lock{
    @Autowired
    private RedisDisLock redisDisLock;
    @Override
    public void zhuanzhang(String a) {
        final int expire = 20000;
        long start=System.currentTimeMillis();
        String s = UUIDUtil.creatUUID();
        boolean test = redisDisLock.tryGetDistributedLock("test", s, expire);
        if (test){
            System.out.println("我已经拿到"+a+"锁");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            redisDisLock.releaseLock("test",s);
            long end=System.currentTimeMillis();
            System.err.println("耗时"+(end-start));
        }else {
            System.out.println("我没有拿到"+a+"锁");
        }

    }
}

测试的Controller:

@RestController
@RequestMapping(value = "/lock")
public class Controller {

    @Autowired
    private Lock lock;

    @RequestMapping(value = "/test", method = RequestMethod.GET)
    @ResponseBody
    public ApiResult getSchoollist(@RequestParam(value = "name")String name) {
        ApiResult apiResult = new ApiResult();
        try {
            lock.zhuanzhang(name);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return apiResult;
    }
}
 


测试请求接口,分别请求
:http://127.0.0.1:8080/springboot/lock/test?name=aa

http://127.0.0.1:8080/springboot/lock/test?name=bb

程序日志如下:

我已经拿到aa锁
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
耗时10466
2018-05-11 15:10:50.794  INFO [springboot,0707be837509ae15,0707be837509ae15,false] 20608 --- [ask-scheduler-6] o.s.i.codec.kryo.CompositeKryoRegistrar  : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2018-05-11 15:10:50.810  INFO [springboot,0707be837509ae15,0707be837509ae15,false] 20608 --- [ask-scheduler-6] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [172.16.1.29:5672]
我已经拿到bb锁
2018-05-11 15:10:51.044  INFO [springboot,0707be837509ae15,0707be837509ae15,false] 20608 --- [ask-scheduler-6] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SpringAMQP#5deefca0:0/SimpleConnection@281be6c4 [delegate=amqp://admin@172.16.1.29:5672/, localPort= 55309]
耗时18570

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐