Spring Cloud Gateway 自定义限流
在使用Spring Cloud Gateway限流功能时官网提供的限流中的流速以及桶容量是针对所有策略的,意思是只要配置上那么所有的都是一样的,不能根据不同的类型配置不同的参数,例如:A渠道、B渠道,若配置上replenishRate(流速)和burstCapacity(令牌桶容量),那么不管是A渠道还是B渠道都是这个值,如果修改那么对应的其他渠道也会修改,如何能做到分为不同渠道进行限流呢,A渠道
在使用Spring Cloud Gateway限流功能时官网提供的限流中的流速以及桶容量是针对所有策略的,意思是只要配置上那么所有的都是一样的,不能根据不同的类型配置不同的参数,例如:A渠道、B渠道,若配置上replenishRate(流速)和burstCapacity(令牌桶容量),那么不管是A渠道还是B渠道都是这个值,如果修改那么对应的其他渠道也会修改,如何能做到分为不同渠道进行限流呢,A渠道replenishRate:10,burstCapacity:100,B渠道:replenishRate:20,burstCapacity:1000,下面开始分析:
限流方式采用的redis,底层使用的redis的lua脚本实现的,具体可以自行查阅,不做讲解,默认限流类:RedisRateLimiter,本文也是仿照这个进行重写的。
本文是参考“Spring Cloud Gateway 结合配置中心限流”进行简化改造,通过配置的方式进行实现。
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
自定义限流类
参照RedisRateLimiter进行自定义限流类SystemRedisRateLimiter用于渠道限流方式,实现代码如下:
/**
* @author : Erick
* @version : 1.0
* @Description :
* @time :2018-12-1
*/
public class SystemRedisRateLimiter extends AbstractRateLimiter<SystemRedisRateLimiter.Config> implements ApplicationContextAware {
//这些变量全部从RedisRateLimiter复制的,都会用到。
public static final String REPLENISH_RATE_KEY = "replenishRate";
public static final String BURST_CAPACITY_KEY = "burstCapacity";
public static final String CONFIGURATION_PROPERTY_NAME = "sys-redis-rate-limiter";
public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";
//处理速度
private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
//容量
private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";
private ReactiveRedisTemplate<String, String> redisTemplate;
private RedisScript<List<Long>> script;
private AtomicBoolean initialized = new AtomicBoolean(false);
private String remainingHeader = REMAINING_HEADER;
/** The name of the header that returns the replenish rate configuration. */
private String replenishRateHeader = REPLENISH_RATE_HEADER;
/** The name of the header that returns the burst capacity configuration. */
private String burstCapacityHeader = BURST_CAPACITY_HEADER;
private Config defaultConfig;
public SystemRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
RedisScript<List<Long>> script, Validator validator) {
super(Config.class , CONFIGURATION_PROPERTY_NAME , validator);
this.redisTemplate = redisTemplate;
this.script = script;
initialized.compareAndSet(false,true);
}
public SystemRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity){
super(Config.class , CONFIGURATION_PROPERTY_NAME , null);
defaultConfig = new Config()
.setReplenishRate(defaultReplenishRate)
.setBurstCapacity(defaultBurstCapacity);
}
//具体限流实现,此处调用的是lua脚本
@Override
public Mono<RateLimiter.Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
if (ObjectUtils.isEmpty(rateLimiterConf) ){
throw new IllegalArgumentException("No Configuration found for route " + routeId);
}
//获取的是自定义的map
Map<String , Integer> rateLimitMap = rateLimiterConf.getRateLimitMap();
//缓存的key
String replenishRateKey = routeId + "." + id + "." + REPLENISH_RATE_KEY;
//若map中不存在则采用默认值,存在则取值。
int replenishRate = ObjectUtils.isEmpty(rateLimitMap.get(replenishRateKey)) ? rateLimitMap.get(DEFAULT_REPLENISHRATE) : rateLimitMap.get(replenishRateKey);
//容量key
String burstCapacityKey = routeId + "." + id + "." + BURST_CAPACITY_KEY;
//若map中不存在则采用默认值,存在则取值。
int burstCapacity = ObjectUtils.isEmpty(rateLimitMap.get(burstCapacityKey)) ? rateLimitMap.get(DEFAULT_BURSTCAPACITY) : rateLimitMap.get(burstCapacityKey);
try {
List<String> keys = getKeys(id);
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}) .map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
RateLimiter.Response response = new RateLimiter.Response(allowed, getHeaders(replenishRate , burstCapacity , tokensLeft));
return response;
});
} catch (Exception e) {
e.printStackTrace();
}
return Mono.just(new RateLimiter.Response(true, getHeaders(replenishRate , burstCapacity , -1L)));
}
private RateLimiterConf rateLimiterConf;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.rateLimiterConf = applicationContext.getBean(RateLimiterConf.class);
}
public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity , Long tokensLeft) {
HashMap<String, String> headers = new HashMap<>();
headers.put(this.remainingHeader, tokensLeft.toString());
headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
return headers;
}
static List<String> getKeys(String id) {
// use `{}` around keys to use Redis Key hash tags
// this allows for using redis cluster
// Make a unique key per user.
//此处可以自定义redis前缀信息
String prefix = "request_sys_rate_limiter.{" + id;
// You need two Redis keys for Token Bucket.
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}
@Validated
public static class Config{
@Min(1)
private int replenishRate;
@Min(1)
private int burstCapacity = 1;
public int getReplenishRate() {
return replenishRate;
}
public Config setReplenishRate(int replenishRate) {
this.replenishRate = replenishRate;
return this;
}
public int getBurstCapacity() {
return burstCapacity;
}
public Config setBurstCapacity(int burstCapacity) {
this.burstCapacity = burstCapacity;
return this;
}
@Override
public String toString() {
return "Config{" +
"replenishRate=" + replenishRate +
", burstCapacity=" + burstCapacity +
'}';
}
}
}
在继承AbstractRateLimiter泛型使用的是自定义类中的config:SystemRedisRateLimiter.Config
配置类
配置类主要用于初始化map参数。
/**
* @author : Erick
* @version : 1.0
* @Description :
* @time :2018-12-1
*/
@Component
//使用配置文件的方式进行初始化
@ConfigurationProperties(prefix = "ratelimiter-conf")
public class RateLimiterConf {
//处理速度
private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
//容量
private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";
private Map<String , Integer> rateLimitMap = new ConcurrentHashMap<String , Integer>(){
{
put(DEFAULT_REPLENISHRATE , 10);
put(DEFAULT_BURSTCAPACITY , 100);
}
};
public Map<String, Integer> getRateLimitMap() {
return rateLimitMap;
}
public void setRateLimitMap(Map<String, Integer> rateLimitMap) {
this.rateLimitMap = rateLimitMap;
}
}
配置文件主要采用配置文件的方式进行初始化,若配置则进行添加,若没有配置则采用默认值。
配置文件
主要用于配置各个渠道的限流阀值,例如文章开头举例的A渠道和B渠道,配置如下:
//与配置类RateLimiterConf保持一致
ratelimiter-conf:
#配置限流参数与RateLimiterConf类映射
rateLimitMap:
#格式为:routeid(gateway配置routes时指定的).系统名称.replenishRate(流速)/burstCapacity令牌桶大小
service.A.replenishRate: 10
service.A.burstCapacity: 100
service.B.replenishRate: 20
service.B.burstCapacity: 1000
到此配置限流相关的代码已经完成,需要在启动类和bootstrap.yml中进行配置才能够真正的使用。
启动类中声名
在启动类中声明使用的策略,指定自定义限流类。
@Bean
KeyResolver sysKeyResolver(){
//从请求地址中截取sys值,进行限流。
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("sys"));
}
@Bean
@Primary
//使用自己定义的限流类
SystemRedisRateLimiter systemRedisRateLimiter(
ReactiveRedisTemplate<String, String> redisTemplate,
@Qualifier(SystemRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> script,
Validator validator){
return new SystemRedisRateLimiter(redisTemplate , script , validator);
}
采用的是从请求地址中获取sys参数对应的值,当然也可以设置为其他值,再声名自定义的限流类。
配置限流
spring:
cloud:
gateway:
routes:
- id: service //id名称需要与配置限流的保持一致
uri: lb://serviceA
predicates:
- Path=/service/**/ //最后的/可以去掉,在本文中特意添加的如果去掉会把filters当成注释内容。
filters:
- name: RequestRateLimiter
args:
//需要与上边的方法名保持一致
rate-limiter: "#{@systemRedisRateLimiter}"
//需要与策略类的方法名保持一致。
key-resolver: "#{@sysKeyResolver}"
至此自定义限流代码全部完成,如有什么不妥支持请指正,同时也希望对其他人有帮助。实例代码已上传到码云可以点击查看。
更多推荐
所有评论(0)