微服务项目中,很多资源需要互斥使用,比如一些分布式任务,比如下单的处理,退货的处理等等。这些都需要用到借助分布式锁来保证处理的唯一性。 一开始我们也手工实现了分布式锁,但是随着业务的发展,我们对锁的特性也要求越来越完善,最后选用了Redis官方推荐的Redisson。

一、Spring Boot中使用Redisson

Spring Boot使用Redisson特别简单,只要引入一个依赖就可以,redis的配置跟其他的redis客户端可以兼容,可以不用再额外配置

二、引入依赖

 

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.2</version>
</dependency>

三、属性文件Redis配置

 

# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379

四、快速入门

4.1 改造RedisDistributedLockApplication启动类

使用锁RedissonClient,并实现业务逻辑在ApplicationRunner#run()方法。

 

package com.erbadagang.springboot.redisdistributedlock;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;

@Slf4j
@SpringBootApplication
public class RedisDistributedLockApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(RedisDistributedLockApplication.class, args);
    }

    /**
     * 直接注入RedissonClient就可以直接使用.
     */
    @Resource
    private RedissonClient redissonClient;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("spring boot run");

        //创建锁
        RLock helloLock = redissonClient.getLock("hello");

        //加锁
        helloLock.lock();
        try {
            log.info("locked");
            Thread.sleep(1000 * 10);
        } finally {
            //释放锁
            helloLock.unlock();
        }
        log.info("finished");
    }
}

4.2 测试

启动Redis和RedisDistributedLockApplication,控制台输出:

 

2020-08-02 22:51:17.169  INFO 8876 --- [main] c.e.s.r.RedisDistributedLockApplication  : spring boot run
2020-08-02 22:51:36.486  INFO 8876 --- [main] c.e.s.r.RedisDistributedLockApplication  : locked
2020-08-02 22:51:46.493  INFO 8876 --- [main] c.e.s.r.RedisDistributedLockApplication  : finished

4.3 Rlock 常用的方法

 

void lock();
void lock(long leaseTime, TimeUnit unit);
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
  • 第一个方法void lock():第一表示lock表示去加锁,加锁成功,没有返回值,继续执行下面代码;但是如果redis已经有这个锁了,它会一直阻塞,直到锁的时间失效(默认30秒),再继续往下执行。这个方法是要保证一定要抢到锁的,它的默认过期时间也是30秒,和tryLock()不同的是,它如果没抢占到锁,会一直自旋。
  • 第二个方法void lock(long leaseTime, TimeUnit unit):和第一无参数lock逻辑一样,只是可以直接设置锁失效时间。用法:helloLock.lock(5, TimeUnit.SECONDS);
  • 第三个方法两个参数的boolean tryLock(long time, TimeUnit unit)表示尝试去加锁(第一个参数表示the maximum time to wait for the lock),加锁成功,返回true,继续执行true下面代码;但是如果redis已经有这个锁了他会等待,还拿不到锁它会返回false,执行false的代码块。为了实现waitTime,使用了redis的订阅发布功能。也就是没有抢到锁的线程订阅消息,直至waitTime过期返回false或者被通知新一轮的开始抢占锁。当然,它如果抢占到锁,锁的过期时间也是30秒,同样也会存在一个定时任务续过期时间,保证业务执行时间不会超过过期时间,抢占失败即返回false。

 

        String key ="product:001";
        RLock lock = redisson.getLock(key);
        try {
            boolean res = lock.tryLock(10,TimeUnit.SECONDS);
            if ( res){
                System.out.println("这里是你的业务代码");
            }else{
                System.out.println("系统繁忙");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

如果把lock.unlock();注释,第一次执行正常加锁,可以跑到业务逻辑代码,快速第二次执行发现他等待10秒,如果拿不到锁就走else的系统繁忙逻辑。

  • 三个参数的tryLock(long waitTime, long leaseTime, TimeUnit unit)表示尝试去加锁(第一个参数表示等待时间,第二个参数表示key的失效时间),加锁成功,返回true,继续执行true下面代码;如果返回false,它会等待第一个参数设置的时间,然后去执行false下面的代码。个方法的参数leaseTime如果不是-1的话,是不会有定时任务续过期时间的,也就存在业务处理时间可能超过过期时间的风险。其他的和tryLock(long waitTime, TimeUnit unit)一致。

 

 boolean res = lock.tryLock(5,3, TimeUnit.SECONDS);

这种情况,锁3秒失效,我们配置的是等待5秒,在单机刷的情况下,肯定每次都能拿到锁。

4.4 异步执行分布式锁

 

        /**
         * 异步锁
         */
        lock = redissonClient.getLock("erbadagang-lock");
        Future<Boolean> res = null;
        try {
            // lock.lockAsync();
            // lock.lockAsync(100, TimeUnit.SECONDS);
            res = lock.tryLockAsync(3, 100, TimeUnit.SECONDS);
            if (res.get()) {
                System.out.println("这里是你的Async业务代码");
            } else {
                System.out.println("系统繁忙Async");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (res.get()) {
                lock.unlock();
            }
        }
        log.info("finished");
    }

4.5 公平锁(Fair Lock)

Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。在提供了自动过期解锁功能的同时,保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。

 

    /**
     * 公平锁测试。
     */
    @Test
    public void testFairLock() {
        RLock fairLock = redissonClient.getFairLock("anyLock");
        try {
            // 最常见的使用方法
            fairLock.lock();
            // 支持过期解锁功能, 10秒钟以后自动解锁,无需调用unlock方法手动解锁
            fairLock.lock(10, TimeUnit.SECONDS);
            // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
            boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
            if (res) {
                System.out.println("这里是你的业务代码");
            } else {
                System.out.println("系统繁忙");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            fairLock.unlock();
        }
    }

Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:

 

RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

五、扩展实现

Redis单节点配置:

 

server:
  port: 8080

spring:
  application:
    name: redis-distributed-lock

  ################ Redis  ##############
  redis:
    host: 127.0.0.1
    port: 6379
    #password:
    timeout: 3000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
    redisson:
      config:
        # 单节点配置
        singleServerConfig:
          # 连接空闲超时,单位:毫秒
          idleConnectionTimeout: 10000
          pingTimeout: 1000
          # 连接超时,单位:毫秒
          connectTimeout: 10000
          # 命令等待超时,单位:毫秒
          timeout: 3000
          # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
          # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
          retryAttempts: 3
          # 命令重试发送时间间隔,单位:毫秒
          retryInterval: 1500
          # 重新连接时间间隔,单位:毫秒
          reconnectionTimeout: 3000
          # 执行失败最大次数
          failedAttempts: 3
          # 密码
          password: null
          # 单个连接最大订阅数量
          subscriptionsPerConnection: 5
          # 客户端名称
          clientName: null
          # 节点地址
          address: redis://127.0.0.1:6379
          # 发布和订阅连接的最小空闲连接数
          subscriptionConnectionMinimumIdleSize: 1
          # 发布和订阅连接池大小
          subscriptionConnectionPoolSize: 50
          # 最小空闲连接数
          connectionMinimumIdleSize: 32
          # 连接池大小
          connectionPoolSize: 64
          # 数据库编号
          database: 0
          # DNS监测时间间隔,单位:毫秒
          dnsMonitoringInterval: 5000
        # 线程池数量,默认值: 当前处理核数量 * 2
        threads: 0
        # Netty线程池数量,默认值: 当前处理核数量 * 2
        nettyThreads: 0
        # 编码
        #codec: !<org.redisson.codec.JsonJacksonCodec> {}
        # 传输模式
        transportMode: "NIO"

Redis集群配置:

 

spring:
  redis:
    redisson:
      config:
        clusterServersConfig:
          idleConnectionTimeout: 10000
          connectTimeout: 10000
          timeout: 3000
          retryAttempts: 3
          retryInterval: 1500
          failedSlaveReconnectionInterval: 3000
          failedSlaveCheckInterval: 60000
          password: null
          subscriptionsPerConnection: 5
          clientName: null
          loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
          subscriptionConnectionMinimumIdleSize: 1
          subscriptionConnectionPoolSize: 50
          slaveConnectionMinimumIdleSize: 24
          slaveConnectionPoolSize: 64
          masterConnectionMinimumIdleSize: 24
          masterConnectionPoolSize: 64
          readMode: "SLAVE"
          subscriptionMode: "SLAVE"
          nodeAddresses:
          - "redis://192.168.35.142:7002"
          - "redis://192.168.35.142:7001"
          - "redis://192.168.35.142:7000"
          scanInterval: 1000
          pingConnectionInterval: 0
          keepAlive: false
          tcpNoDelay: false
        threads: 16
        nettyThreads: 32
        #codec: !<org.redisson.codec.FstCodec> {}
        transportMode: "NIO"

多线程测试:

 

package com.erbadagang.springboot.redisdistributedlock;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;

@SpringBootTest
@Slf4j
class RedisDistributedLockApplicationTests {


    /**
     * 有锁测试共享变量
     */
    private Integer lockCount = 10;

    /**
     * 无锁测试共享变量
     */
    private Integer count = 10;

    /**
     * 模拟线程数
     */
    private static int threadNum = 10;

    /**
     * 直接注入RedissonClient就可以直接使用.
     */
    @Resource
    private RedissonClient redissonClient;

    /**
     * 模拟并发测试加锁和不加锁2个方法。
     *
     * @return
     */
    @Test
    public void lock() {
        // 计数器
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < threadNum; i++) {
            MyRunnable myRunnable = new MyRunnable(countDownLatch);
            Thread myThread = new Thread(myRunnable);
            myThread.start();
        }
        // 释放所有线程
        countDownLatch.countDown();
    }

    /**
     * 加锁测试
     */
    private void testLockCount() {
        String lockKey = "lock-test";
        //创建锁
        RLock helloLock = redissonClient.getLock(lockKey);

        try {
            //加锁
            helloLock.lock();
            lockCount--;
            log.info("lockCount值:" + lockCount);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            // 释放锁
            helloLock.unlock();
        }
    }

    /**
     * 无锁测试
     */
    private void testCount() {
        count--;
        log.info("count值:" + count);
    }


    public class MyRunnable implements Runnable {
        /**
         * 计数器
         */
        final CountDownLatch countDownLatch;

        public MyRunnable(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                // 阻塞当前线程,直到计时器的值为0
                countDownLatch.await();
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
            }
            // 无锁操作
            testCount();
            // 加锁操作
            testLockCount();
        }

    }

}

控制台输出:

 

2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-281] s.r.RedisDistributedLockApplicationTests : count值:3
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-283] s.r.RedisDistributedLockApplicationTests : count值:2
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-279] s.r.RedisDistributedLockApplicationTests : count值:5
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-282] s.r.RedisDistributedLockApplicationTests : count值:2
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-284] s.r.RedisDistributedLockApplicationTests : count值:2
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-278] s.r.RedisDistributedLockApplicationTests : count值:5
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-280] s.r.RedisDistributedLockApplicationTests : count值:4
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-275] s.r.RedisDistributedLockApplicationTests : count值:2
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-276] s.r.RedisDistributedLockApplicationTests : count值:2
2020-08-02 23:55:39.832  INFO 4144 --- [     Thread-277] s.r.RedisDistributedLockApplicationTests : count值:2
2020-08-02 23:55:39.848  INFO 4144 --- [     Thread-280] s.r.RedisDistributedLockApplicationTests : lockCount值:9
2020-08-02 23:55:39.848  INFO 4144 --- [     Thread-275] s.r.RedisDistributedLockApplicationTests : lockCount值:8

2020-08-02 23:55:39.883  INFO 4144 --- [     Thread-281] s.r.RedisDistributedLockApplicationTests : lockCount值:7
2020-08-02 23:55:39.885  INFO 4144 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2020-08-02 23:55:39.885  INFO 4144 --- [     Thread-279] s.r.RedisDistributedLockApplicationTests : lockCount值:6
2020-08-02 23:55:39.885  INFO 4144 --- [     Thread-277] s.r.RedisDistributedLockApplicationTests : lockCount值:5
2020-08-02 23:55:39.885  INFO 4144 --- [     Thread-284] s.r.RedisDistributedLockApplicationTests : lockCount值:4
2020-08-02 23:55:39.903  INFO 4144 --- [     Thread-282] s.r.RedisDistributedLockApplicationTests : lockCount值:3

根据打印结果可以明显看到,未加锁的count--后值是乱序的,而加锁后的结果和我们预期的一样。由于条件问题没办法测试分布式的并发。只能模拟单服务的这种并发,但是原理是一样

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐