需求:在集群环境下,读写同一个数据库表,我们为了保证数据的最终一致性,需要让任务排队执行。分布式锁的实现方式,网上有很多种方式。
1.使用数据库表实现;
2.使用zookeeper实现;
3.使用redis实现;
这里讲用redis实现的方法,其他两种实现方式,读者可以自行百度。
redis是个很好的NoSQL数据库,多用于缓存数据的场景,但同时也可以用来制作一个分布式事务锁,其实现的原理基于几个命令:
SETNX key val;
解释:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
expire key timeout;
为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
delete key;
删除key。
那么如何实现这个分布式事务锁呢?
1.添加项目中的redis的jar包依赖;

        <dependency>
            <groupId>commons-pool</groupId>
            <artifactId>commons-pool</artifactId>
            <version>1.6</version>
        </dependency>

        <dependency>  
            <groupId>org.springframework.data</groupId>  
            <artifactId>spring-data-redis</artifactId>  
            <version>${spring-data-redis.version}</version>  
        </dependency>

        <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>${redis.version}</version>
        </dependency>

2.在项目的Spring配置文件中配置redis

<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"> 
    <!-- 最大空闲数 -->
    <property name="maxIdle" value="300" /> 
    <!-- 最大连接数 -->
    <property name="maxTotal" value="600" /> 
    <!-- 最大建立连接等待时间,单位毫秒--> 
    <property name="maxWaitMillis" value="200000" /> 
    <!-- 指明是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个 -->
    <property name="testOnBorrow" value="true" />

  </bean> 

 <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> 
        <property name="hostName" value="119.23.134.93" /> 
        <property name="port" value="6379"/> 
        <property name="password" value="coeexp123456"/>
        <property name="poolConfig" ref="poolConfig" /> 
        <property name="usePool" value="true"/> 
    </bean>

  <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> 
    <property name="connectionFactory"   ref="jedisConnectionFactory" /> 
  </bean>

3.在实际开发中都是使用Spring的注解,面向切面编程,
这里需要自定义两个注解:

注解1:用于方法上
/**
 * 类说明
 * @author  ll
 * @version 创建时间:2017年10月27日上午10:10:47
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
//@Component
public @interface P4jSyn {
     /** 
     * 锁的key<br/> 
     * 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项<br/> 
     * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey<br/> 
     *  
     */  
    String synKey();  

    /** 
     * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/> 
     * 单位毫秒,默认20秒<br/> 
     * 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/> 
     * 但是没有比较强的业务要求下,不建议设置为0 
     */  
    long keepMills() default 20 * 1000;  

    /** 
     * 当获取锁失败,是继续等待还是放弃<br/> 
     * 默认为继续等待 
     */  
    boolean toWait() default true;  

    /** 
     * 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/> 
     * 默认为10毫秒 
     *  
     * @return 
     */  
    long sleepMills() default 10;  

    /** 
     * 锁获取超时时间:<br/> 
     * 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出 
     * {@link java.util.concurrent.TimeoutException.TimeoutException} 
     * ,可捕获此异常做相应业务处理;<br/> 
     * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去; 
     *  
     * @return 
     */  
    long maxSleepMills() default 60 * 1000;  
}

注解2:用于参数上

/**
 * 类说明
 * @author  ll
 * @version 创建时间:2017年10月27日上午10:12:25
 */
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface P4jSynKey {
    /** 
     * key的拼接顺序 
     *  
     * @return 
     */  
    int index() default 0; 
}

4.分布式锁的切面类

/**
* 类说明
* @author   ll
* @version 创建时间:2017年10月27日上午10:14:06
*/
@Order(1)
@Aspect  
@Component("redisLockAspect")
public class RedisLockAspect {
private static final Logger logger = LoggerFactory.getLogger(RedisLockAspect.class);
    @Autowired  
   @Qualifier("redisTemplate")  
   private RedisTemplate<String, Long> redisTemplate;  

   @Around("execution(* com.coe..*SycLock(..))")  
   public Object lock(ProceedingJoinPoint  pjp) throws Throwable {  
    //获取P4jSyn注解
       P4jSyn lockInfo = getLockInfo(pjp);  
       if (lockInfo == null) {  
           throw new IllegalArgumentException("配置参数错误");  
       }  
       String synKey = getSynKey(pjp, lockInfo.synKey());  
       if (synKey == null || "".equals(synKey)) {  
           throw new IllegalArgumentException("配置参数synKey错误");  
       }  
       boolean lock = false;  //标志物,true表示获取了到了该锁
       Object obj = null;  
       try {  
           //超时时间 (60秒),系统当前时间再往后加60秒
           long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();  
           while (!lock) {
            //持锁时间,系统当前时间再往后加20秒
               long keepMills = System.currentTimeMillis() + lockInfo.keepMills();  
               //为key“synKey”设置值keepMills,如果设置成功,则返回true
               lock = setIfAbsent(synKey, keepMills);  
               //lock为true表示得到了锁,没有人加过相同的锁  
               if(lock){  
                //如果获得了该锁,则调用目标方法,执行业务逻辑任务
                   obj = pjp.proceed();  
               }  
               // 锁设置了没有超时时间  
               /**如果没有通过setIfAbsent拿到数据,然后判断是否对锁设置了超时机制
               ,没有设置则判断是否需要继续等待*/
               else if(lockInfo.keepMills() <= 0){  
                   // 继续等待获取锁  
                   if (lockInfo.toWait()) {  
                       // 如果超过最大等待时间抛出异常  
                       if(lockInfo.maxSleepMills()>0&&System.currentTimeMillis()> maxSleepMills){  
                           throw new TimeoutException("获取锁资源等待超时");  
                       }
                       //只要当前时间没有大于超时时间,则继续等待10毫秒,以便继续尝试去获取锁
                       TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());  
                   }else{ 
                    //如果注解上的“toWait()”为false,表示如果当前没有获取到锁,则放弃获取该锁,
                    //即放弃执行此任务
                       break;  
                   }  
               }  
               // 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁  
              /**
               * 1.如果当前线程2进入的时候,
               * 系统时间已经大于了上个任务的持锁时间(由于上次任务大导致其执行时间过长),
               * 则表示需要强制让上个任务释放锁,让本任务获得锁,以执行本次任务;
               * 2.如果线程1释放了锁,刚好线程2过了 if(lock){ //to do something}的判断,
               * 而进入了此处判断,需要对线程2任务加锁,保证事务不冲突
               */
               else if(System.currentTimeMillis()>getLock(synKey)&&(System.currentTimeMillis()> getSet(synKey, keepMills))) {  
                   lock = true;             //lock一定要设置成true,不然释放不了锁
                   obj = pjp.proceed();  
               }  
               // 没有得到任何锁  
               else {  
                   // 继续等待获取锁  
                   if (lockInfo.toWait()) {  
                       // 如果超过最大等待时间抛出异常  
                       if (lockInfo.maxSleepMills()>0&&System.currentTimeMillis() maxSleepMills) {  
                           throw new TimeoutException("获取锁资源等待超时");  
                       }  
                       //只要当前时间没有大于超时时间,则继续等待10毫秒,以便继续尝试去获取锁
                       TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());  
                   }else {  
                    // 放弃等待,放弃获取锁(放弃本任务的执行) 
                       break;  
                   }  
               }  
           }  
       } catch (Exception e) {  
           e.printStackTrace(); 
           throw e;  
       } finally {  
           // 如果获取到了锁,释放锁  
           if (lock) {  
               releaseLock(synKey);  
           }  
       }  
       return obj;  
   }  

   /** 
    * 获取包括方法参数上的key<br/> 
    * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey 
    *  
    */  
   private String getSynKey(JoinPoint pjp, String synKey) {  
       try {  
           synKey = "RedisSyn+" + synKey;  //指定synKey的值固定为RedisSyn+synKey
           Object[] args = pjp.getArgs();  //获取切点上的所有参数
           if (args != null && args.length > 0) {  
               MethodSignature methodSignature = (MethodSignature) pjp.getSignature();  
               Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();  
               SortedMap<Integer, String> keys = new TreeMap<Integer, String>();  
               for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {  
                   P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);  
                   if (p4jSynKey != null) {  
                       Object arg = args[ix];  
                       if (arg != null) {  
                           keys.put(p4jSynKey.index(), arg.toString());  
                       }  
                   }  
               }  
               if (keys != null && keys.size() > 0) {  
                   for (String key : keys.values()) {  
                       synKey = synKey + key;  
                   }  
               }  
           }  
           return synKey;  
       } catch (Exception e) {  
           e.printStackTrace();  
       }  
       return null;  
   }  

   @SuppressWarnings("unchecked")  
   private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) {  
       if (annotations != null && annotations.length > 0) {  
           for (final Annotation annotation : annotations) {  
               if (annotationClass.equals(annotation.annotationType())) {  
                   return (T) annotation;  
               }  
           }  
       }  
       return null;  
   }  

   /** 
    * 获取RedisLock注解信息 
    */  
   private P4jSyn getLockInfo(ProceedingJoinPoint  pjp) {  
       try {  
           MethodSignature methodSignature = (MethodSignature) pjp.getSignature();  
           Method method = methodSignature.getMethod();  
           P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);  
           return lockInfo;  
       } catch (Exception e) {  
           e.printStackTrace(); 
       }  
       return null;  
   }  

   public BoundValueOperations<String, Long> getOperations(String key) {  
       return redisTemplate.boundValueOps(key);  
   }  

   /** 
    * Set {@code value} for {@code key}, only if {@code key} does not exist. 
    * <p> 
    * See http://redis.io/commands/setnx 
    *  
    * @param key 
    *            must not be {@literal null}. 
    * @param value 
    *            must not be {@literal null}. 
    * @return 
    */  
   /**
    * 如果key不存在,则为key设置值value,并且返回true,否则返回false
    * @param key
    * @param value
    * @return
    */
   public boolean setIfAbsent(String key, Long value) {  
       return getOperations(key).setIfAbsent(value);   
   }  

   /**
    * 获取key上的值
    * @param key
    * @return
    */
   public long getLock(String key) {  
       Long time = getOperations(key).get();  
       if (time == null) {  
           return 0;  
       }  
       return time;  
   }  

   /**
    * 获取key上的旧值,并且为该key设置新值value,如果旧值不存在则返回0
    * @param key
    * @param value
    * @return
    */
   public long getSet(String key, Long value) {  
       Long time = getOperations(key).getAndSet(value);  
       if (time == null) {  
           return 0;  
       }  
       return time;  
   }  

   /**
    * 删除key
    * @param key
    */
   public void releaseLock(String key) { 
       redisTemplate.delete(key);  
   } 
}

锁写好之后,编写测试代码:
定义一个成员变量i,启动100个线程同时访问这个方法,让i++;

public class LockInfo {
private int i = 0;

    @P4jSyn(synKey="getTrackno")
    public void addSycLock(@P4jSynKey(index=1)String flag,@P4jSynKey(index=2) String channelCode){
        i++;
        System.out.println("i =====================" + i);
    }
}

开启100个线程同时执行这段代码

@Component("lockTest")
public class LockTest {
    @Autowired
    private LockInfo lockInfo ;

    @Scheduled(fixedRate=3600000,initialDelay = 10000)
    public void run(){
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.currentThread().sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lockInfo .addSycLock("11111", "222222");
                }
            }).start();
        }
    }
}

测试结果:虽然同时开启100个线程来调用这个方法,但是i的值始终是依次递增,大家可以试试,去掉注解之后再同时开启100个线程来调用这个方法,看是不是得到不同的结果
这里写图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐