Redisson之Semaphore使用记录
最近在公司一个项目需要用到限流,就想到concurrent包下的Semaphore(信号量)。不过实际场景中并不能用,一个原因是项目采用的微服务架构,分布式场景并不适用。第二个场景是限流采用令牌桶算法,每秒钟某个接口限流多少,超过丢弃。 后面实现采用的redis分布式限流方案,每分钟设置一个计数器,计数器一旦达到限流次数,后面的请求被直接挡回。 后面无意中看到有Redisson这个...
最近在公司一个项目需要用到限流,就想到concurrent包下的Semaphore(信号量)。不过实际场景中并不能用,一个原因是项目采用的微服务架构,分布式场景并不适用。第二个场景是限流采用令牌桶算法,每秒钟某个接口限流多少,超过丢弃。
后面实现采用的redis分布式限流方案,每分钟设置一个计数器,计数器一旦达到限流次数,后面的请求被直接挡回。
后面无意中看到有Redisson这个东东,发现能在分布式场景下支持java concurrent下很多工具类(Reentrant Lock、Semaphore、CountDownLatch等 )。
就去学习了一波,试用一下Semaphore功能,就记录下使用方法。
在springBoot中引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.4.3</version>
</dependency>
然后配置RedissonClient的相关信息
通过配置类的方式
package com.sendi.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ClassUtils;
import io.netty.channel.nio.NioEventLoopGroup;
@Configuration
public class RedissonConfig {
private String address = "redis://localhost:6379";
private int connectionMinimumIdleSize = 10;
private int idleConnectionTimeout = 10000;
private int pingTimeout = 1000;
private int connectTimeout = 10000;
private int timeout = 3000;
private int retryAttempts = 3;
private int retryInterval = 1500;
private int reconnectionTimeout = 3000;
private int failedAttempts = 3;
private String password = null;
private int subscriptionsPerConnection = 5;
private String clientName = null;
private int subscriptionConnectionMinimumIdleSize = 1;
private int subscriptionConnectionPoolSize = 50;
private int connectionPoolSize = 64;
private int database = 0;
private boolean dnsMonitoring = false;
private int dnsMonitoringInterval = 5000;
private int thread; //当前处理核数量 * 2
private String codec = "org.redisson.codec.JsonJacksonCodec";
@Bean(destroyMethod = "shutdown")
RedissonClient redisson() throws Exception {
Config config = new Config();
config.useSingleServer().setAddress(address)
.setConnectionMinimumIdleSize(connectionMinimumIdleSize)
.setConnectionPoolSize(connectionPoolSize)
.setDatabase(database)
.setDnsMonitoring(dnsMonitoring)
.setDnsMonitoringInterval(dnsMonitoringInterval)
.setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize)
.setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize)
.setSubscriptionsPerConnection(subscriptionsPerConnection)
.setClientName(clientName)
.setFailedAttempts(failedAttempts)
.setRetryAttempts(retryAttempts)
.setRetryInterval(retryInterval)
.setReconnectionTimeout(reconnectionTimeout)
.setTimeout(timeout)
.setConnectTimeout(connectTimeout)
.setIdleConnectionTimeout(idleConnectionTimeout)
.setPingTimeout(pingTimeout)
.setPassword(password);
Codec codec = (Codec) ClassUtils.forName("org.redisson.codec.JsonJacksonCodec", ClassUtils.getDefaultClassLoader()).newInstance();
config.setCodec(codec);
config.setThreads(thread);
config.setEventLoopGroup(new NioEventLoopGroup());
config.setUseLinuxNativeEpoll(false);
return Redisson.create(config);
}
}
配置完后就可以在java类中使用了
//注入
@Autowired
private RedissonClient redissonClient;
//使用
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(1);//设置许可
semaphore.acquire();//获得一个许可
semaphore.release();//释放一个许可
使用方法可以在githup上面的api里面查看说明 https://github.com/redisson/redisson
和java有一点区别,不过基本使用方法是一样的,猜测实现原理还是根据redis键值对应计数方法实现,就去redis中看了一下
果然在redis中看到了"semaphore"这个键
消耗完许可后
查看源码发现其实现方式还是网上的分布式锁类似
public void acquire(int permits) throws InterruptedException {
if (tryAcquire(permits)) {
return;
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {
return;
}
getEntry().getLatch().acquire(permits);
}
} finally {
unsubscribe(future);
}
// get(acquireAsync(permits));
}
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), permits);
}
第一段代码是许可获取的源码,可以看出是通过while循环实现,直到拿到许可这个方法才返回,继续往下面代码执行
第二段代码是通过lua脚本执行原子操作(信号量许可获取和释放)
总结:发现Redisson底层实现方案和之前在githup上面看到的一些分布式锁,限流之类的基本大同小异。
更多推荐
所有评论(0)