最近在公司一个项目需要用到限流,就想到concurrent包下的Semaphore(信号量)。不过实际场景中并不能用,一个原因是项目采用的微服务架构,分布式场景并不适用。第二个场景是限流采用令牌桶算法,每秒钟某个接口限流多少,超过丢弃。

 

后面实现采用的redis分布式限流方案,每分钟设置一个计数器,计数器一旦达到限流次数,后面的请求被直接挡回。

 

后面无意中看到有Redisson这个东东,发现能在分布式场景下支持java concurrent下很多工具类(Reentrant Lock、Semaphore、CountDownLatch等 )。

 

就去学习了一波,试用一下Semaphore功能,就记录下使用方法。

在springBoot中引入Redisson

                <dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.4.3</version>
		</dependency>

然后配置RedissonClient的相关信息

通过配置类的方式

package com.sendi.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ClassUtils;

import io.netty.channel.nio.NioEventLoopGroup;

@Configuration
public class RedissonConfig {
	private String address = "redis://localhost:6379";
    private int connectionMinimumIdleSize = 10;
    private int idleConnectionTimeout = 10000;
    private int pingTimeout = 1000;
    private int connectTimeout = 10000;
    private int timeout = 3000;
    private int retryAttempts = 3;
    private int retryInterval = 1500;
    private int reconnectionTimeout = 3000;
    private int failedAttempts = 3;
    private String password = null;
    private int subscriptionsPerConnection = 5;
    private String clientName = null;
    private int subscriptionConnectionMinimumIdleSize = 1;
    private int subscriptionConnectionPoolSize = 50;
    private int connectionPoolSize = 64;
    private int database = 0;
    private boolean dnsMonitoring = false;
    private int dnsMonitoringInterval = 5000;

    private int thread; //当前处理核数量 * 2

    private String codec = "org.redisson.codec.JsonJacksonCodec";

    @Bean(destroyMethod = "shutdown")
    RedissonClient redisson() throws Exception {
        Config config = new Config();
        config.useSingleServer().setAddress(address)
                .setConnectionMinimumIdleSize(connectionMinimumIdleSize)
                .setConnectionPoolSize(connectionPoolSize)
                .setDatabase(database)
                .setDnsMonitoring(dnsMonitoring)
                .setDnsMonitoringInterval(dnsMonitoringInterval)
                .setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize)
                .setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize)
                .setSubscriptionsPerConnection(subscriptionsPerConnection)
                .setClientName(clientName)
                .setFailedAttempts(failedAttempts)
                .setRetryAttempts(retryAttempts)
                .setRetryInterval(retryInterval)
                .setReconnectionTimeout(reconnectionTimeout)
                .setTimeout(timeout)
                .setConnectTimeout(connectTimeout)
                .setIdleConnectionTimeout(idleConnectionTimeout)
                .setPingTimeout(pingTimeout)
                .setPassword(password);
        Codec codec = (Codec) ClassUtils.forName("org.redisson.codec.JsonJacksonCodec", ClassUtils.getDefaultClassLoader()).newInstance();
        config.setCodec(codec);
        config.setThreads(thread);
        config.setEventLoopGroup(new NioEventLoopGroup());
        config.setUseLinuxNativeEpoll(false);
        return Redisson.create(config);
    }
}

配置完后就可以在java类中使用了

//注入
@Autowired
private RedissonClient redissonClient;




//使用
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
			semaphore.trySetPermits(1);//设置许可
			semaphore.acquire();//获得一个许可
			semaphore.release();//释放一个许可

使用方法可以在githup上面的api里面查看说明  https://github.com/redisson/redisson

和java有一点区别,不过基本使用方法是一样的,猜测实现原理还是根据redis键值对应计数方法实现,就去redis中看了一下

果然在redis中看到了"semaphore"这个键

消耗完许可后

查看源码发现其实现方式还是网上的分布式锁类似

 public void acquire(int permits) throws InterruptedException {
        if (tryAcquire(permits)) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe();
        commandExecutor.syncSubscription(future);
        try {
            while (true) {
                if (tryAcquire(permits)) {
                    return;
                }

                getEntry().getLatch().acquire(permits);
            }
        } finally {
            unsubscribe(future);
        }
//        get(acquireAsync(permits));
    }
 @Override
    public RFuture<Boolean> tryAcquireAsync(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException("Permits amount can't be negative");
        }

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                  "local value = redis.call('get', KEYS[1]); " +
                  "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                      "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                      "return 1; " +
                  "end; " +
                  "return 0;",
                  Collections.<Object>singletonList(getName()), permits);
    }

第一段代码是许可获取的源码,可以看出是通过while循环实现,直到拿到许可这个方法才返回,继续往下面代码执行

第二段代码是通过lua脚本执行原子操作(信号量许可获取和释放)

 

总结:发现Redisson底层实现方案和之前在githup上面看到的一些分布式锁,限流之类的基本大同小异。

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐