java实现redis分布式锁
背景在多线程环境下,通常会使用锁来保证有且只有一个线程来操作共享资源分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁。
背景
在多线程环境下,通常会使用锁来保证有且只有一个线程来操作共享资源
分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁。
可靠性首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
互斥性
在任意时刻,只有一个客户端能持有锁。不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
redis连接工具类,其中在枷锁中使用到了只有后两个方法,接口类就不列举了:
package com.ganinfo.redis; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import com.ganinfo.utils.CommonUtil; import com.ganinfo.utils.UUIDUtil; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.stereotype.Service; import redis.clients.jedis.Jedis; import com.ganinfo.utils.SerializeUtil; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * @author shuyu.wang * @version V1.0 * @ClassName: RedisDaoImpl * @Description: TODO * @date 2017年10月19日 下午2:54:39 */ @Service @RefreshScope public class RedisServiceImpl implements RedisService { private Logger logger = Logger.getLogger(RedisServiceImpl.class); @Value("${spring.redis.host}") private String ip; @Value("${spring.redis.port}") private Integer port; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.maxActive}") private Integer maxActive; @Value("${spring.redis.maxIdle}") private Integer maxIdle; @Value("${spring.redis.maxWait}") private Long maxWait; @Value("${spring.redis.testOnBorrow}") private Boolean testOnBorrow; @Value("${spring.redis.testOnReturn}") private Boolean testOnReturn; @Value("${spring.redis.timeOut}") private Integer timeOut; @Value("${spring.redis.redisExpire}") private int redisExpire; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final String VIRTUAL_COURSE_PREX = "lc_vc_"; /** * 非切片链接池 */ private JedisPool jedisPool; /** * 在多线程环境同步初始化 */ private void poolInit() { if (jedisPool == null) { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMaxWaitMillis(maxWait); config.setTestOnBorrow(testOnBorrow); config.setTestOnReturn(testOnReturn); jedisPool = new JedisPool(config, ip, port, timeOut, password); logger.info("初始化redis连接池"); } } /** * 非切片客户端链接 同步获取非切片Jedis实例 * * @return Jedis */ @SuppressWarnings("deprecation") private synchronized Jedis getJedis() { if (jedisPool == null) { poolInit(); } Jedis jedis = null; try { if (jedisPool != null) { jedis = jedisPool.getResource(); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return jedis; } private String buildKey(String key) { return VIRTUAL_COURSE_PREX + key; } @Override public void set(String key, String param) { String bKey = buildKey(key); Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); jedis.set(bKey.getBytes(), SerializeUtil.serialize(param)); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } @Override public void setWithExpireTime(String key, String value, int expireTime) { Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); jedis.setex(buildKey(key), expireTime, value); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } @Override public void setWithExpireTime(String key, String value) { Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); logger.info("获取redis连接"); jedis.setex(buildKey(key), redisExpire, value); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } @Override public String get(String key) { String bKey = buildKey(key); String retru = null; Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); logger.info("获取redis连接"); if (jedis == null || !jedis.exists(bKey.getBytes())) { return null; } byte[] in = jedis.get(bKey.getBytes()); retru = SerializeUtil.unserialize(in).toString(); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return retru; } @Override public void del(String key) { String bKey = buildKey(key); Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis != null && jedis.exists(bKey.getBytes())) { jedis.del(bKey.getBytes()); } } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } @Override public Boolean exists(String key) { String bKey = buildKey(key); Jedis jedis = null; boolean flag=false; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis != null && jedis.exists(bKey.getBytes())) { flag= true; } } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return flag; } /** * @param key * @param bean */ @Override public <T> void setBean(String key, Object bean) { String bKey = buildKey(key); Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); jedis.set(bKey.getBytes(), SerializeUtil.serialize(bean)); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } @Override public <T> T getBean(String key) { String bKey = buildKey(key); T bean = null; Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis == null || !jedis.exists(bKey.getBytes())) { return null; } byte[] in = jedis.get(bKey.getBytes()); bean = (T) SerializeUtil.unserialize(in); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return bean; } /** * @param key * @param list */ @Override public <T> void setList(String key, List<T> list) { String bKey = buildKey(key); Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); jedis.set(bKey.getBytes(), SerializeUtil.serialize(list)); } } catch (Exception e) { e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } /** * @param key * @return list */ @Override public <T> List<T> getList(String key) { Jedis jedis = null; String bKey = buildKey(key); List<T> list = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis == null || !jedis.exists(bKey.getBytes())) { return null; } byte[] in = jedis.get(bKey.getBytes()); list = (List<T>) SerializeUtil.unserialize(in); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return list; } /* * (non-Javadoc) * * @see com.hc.redis.dao.RedisDao#login(java.lang.String, int) */ @Override public String login(String userId) { logger.info("用户登录"); String accessToken = UUIDUtil.creatUUID(); Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis == null || !jedis.exists(userId)) { // token生产规则自定义ֵ jedis.setex(accessToken, redisExpire, userId); jedis.setex(userId, redisExpire, accessToken); } else { //销毁之前的token String token = jedis.get(userId); if (CommonUtil.isNotNullEmpty(token)) { // if (jedis == null || !jedis.exists(token)) { jedis.del(token); // } } jedis.del(userId); //重新生成token jedis.setex(accessToken, redisExpire, userId); jedis.setex(userId, redisExpire, accessToken); } } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return accessToken; } @Override public void validate(String token) { Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis == null || !jedis.exists(token)) { } else { //重新设置有效时间 String userId = this.getUserId(token); jedis.expire(token, redisExpire); jedis.expire(userId, redisExpire); } } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } @Override public void logout(String token) { Jedis jedis = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis != null && jedis.exists(token)) { String userId = this.getUserId(token); if (CommonUtil.isNotNullEmpty(userId)) { jedis.del(userId); } jedis.del(token); } } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } } @Override public String getUserId(String token) { Jedis jedis = null; String userId = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis != null && jedis.exists(token)) { userId = jedis.get(token); } } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return userId; } @Override public String getSupportType() { return "redis"; } @Override public long getValue(String key) { Jedis jedis = null; String count = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); if (jedis != null && jedis.exists(key)) { count = jedis.get(key); }else { return 0; } } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return Long.valueOf(count); } @Override public String setLock(String lockKey, String requestId, int expireTime) { Jedis jedis = null; String result = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return result; } @Override public Object releaseLock(String lockKey, String requestId) { Jedis jedis = null; Object result = null; try { if (jedisPool == null) { poolInit(); } if (jedisPool != null) { jedis = jedisPool.getResource(); String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); } } catch (Exception e) { logger.error("redis连接异常"); e.printStackTrace(); // 释放jedis对象 jedisPool.returnBrokenResource(jedis); } finally { // 返还连接池 if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } return result; } }
redis加锁工具类:
@Component public class RedisDisLock { private static final Logger logger = LoggerFactory.getLogger(RedisDisLock.class); private static final String LOCK_SUCCESS = "OK"; private static final Long RELEASE_SUCCESS = 1L; @Autowired private RedisFactory redisFactory; private static final long expired = 1000;//1秒超时 /** * @param lockKey 锁名唯一 * @param requestId UUID生成 * @param expireTime key的过期时间 * @author:shuyu.wang * @description:上锁 * @date: 2018/5/11 15:01 */ public boolean tryGetDistributedLock(String lockKey, String requestId, int expireTime) { boolean success = false; long now = System.currentTimeMillis(); long timeOutAt = now + expireTime; RedisService redisService = redisFactory.getRedis(); while (true){ String result = redisService.setLock(lockKey, requestId, expireTime); if (LOCK_SUCCESS.equals(result)) { success = true; return success; } else { //超出请求时间返回false if (System.currentTimeMillis() > timeOutAt){ success = false; System.err.println("未获取到资源"); return success; }else { // 休眠一段时间,继续获取锁 try { System.err.println("睡眠1秒继续获取"); Thread.sleep((long)1000); }catch (InterruptedException e){ e.printStackTrace(); } } } } } /** * @param lockKey 锁名唯一 * @param requestId UUID生成 * @author:shuyu.wang * @description:释放锁 * @date: 2018/5/11 15:03 */ public boolean releaseLock(String lockKey, String requestId) { RedisService redisService = redisFactory.getRedis(); Object result = redisService.releaseLock(lockKey, requestId); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } }
测试类,测试的接口类也不列举了:
@Service public class LockImpl implements Lock{ @Autowired private RedisDisLock redisDisLock; @Override public void zhuanzhang(String a) { final int expire = 20000; long start=System.currentTimeMillis(); String s = UUIDUtil.creatUUID(); boolean test = redisDisLock.tryGetDistributedLock("test", s, expire); if (test){ System.out.println("我已经拿到"+a+"锁"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } redisDisLock.releaseLock("test",s); long end=System.currentTimeMillis(); System.err.println("耗时"+(end-start)); }else { System.out.println("我没有拿到"+a+"锁"); } } }
测试的Controller:
@RestController @RequestMapping(value = "/lock") public class Controller { @Autowired private Lock lock; @RequestMapping(value = "/test", method = RequestMethod.GET) @ResponseBody public ApiResult getSchoollist(@RequestParam(value = "name")String name) { ApiResult apiResult = new ApiResult(); try { lock.zhuanzhang(name); } catch (Exception e) { e.printStackTrace(); } return apiResult; } }
测试请求接口,分别请求
:http://127.0.0.1:8080/springboot/lock/test?name=aa
http://127.0.0.1:8080/springboot/lock/test?name=bb
程序日志如下:
我已经拿到aa锁
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
睡眠1秒继续获取
耗时10466
2018-05-11 15:10:50.794 INFO [springboot,0707be837509ae15,0707be837509ae15,false] 20608 --- [ask-scheduler-6] o.s.i.codec.kryo.CompositeKryoRegistrar : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2018-05-11 15:10:50.810 INFO [springboot,0707be837509ae15,0707be837509ae15,false] 20608 --- [ask-scheduler-6] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [172.16.1.29:5672]
我已经拿到bb锁
2018-05-11 15:10:51.044 INFO [springboot,0707be837509ae15,0707be837509ae15,false] 20608 --- [ask-scheduler-6] o.s.a.r.c.CachingConnectionFactory : Created new connection: SpringAMQP#5deefca0:0/SimpleConnection@281be6c4 [delegate=amqp://admin@172.16.1.29:5672/, localPort= 55309]
耗时18570
更多推荐
所有评论(0)