实现GlobalFilter接口,

这个接口中只有一个方法 filter (ServerwebExchange exchange,GatewatFilterChain chain);

exchange: 请求上下文 可以获取route的一系列相关信息并存入上下文中

chain:过滤器链,放行,把请求委托给下一个过滤器

 过滤器执行顺序

用jmeter测试过滤器

 

=====================================================================

gateway中专门用来做请求限流的类org/springframework/cloud/gateway/filter/factory/RequestRateLimiterGatewayFilterFactory.java

@ConfigurationProperties("spring.cloud.gateway.filter.request-rate-limiter")
public class RequestRateLimiterGatewayFilterFactory extends
		AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {}


限制客户端的访问流量,限制每个客户端单位时间内可访问的次数,避免一个客户端一次性占用大量的信号

gateway基于redis和lua实现令牌桶

令牌工厂由代码提供

桶由redis提供

计算由lua实现

(1)spring cloud gateway 默认使用redis的RateLimter限流算法来实现。所以我们要使用首先需要引入redis的依赖

<!--redis-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    <version>2.1.3.RELEASE</version>
</dependency>

在GatewayApplicatioin引导类中添加如下代码,KeyResolver用于计算某一个类型的限流的KEY也就是说,可以通过KeyResolver来指定限流的Key。

 //定义一个KeyResolver
    @Bean
    public KeyResolver ipKeyResolver() {
        return new KeyResolver() {
            @Override
//string如果是常量 1 则所有客户端 一秒只能访问一次 极限桶满访问五次
//string如果是IP 那么是每个客户端绑定令牌 每个客户端一秒访问一次 极限访问五次
            public Mono<String> resolve(ServerWebExchange exchange) {
                return Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
//return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress);
            }
        };
    }

修改application.yml中配置项,指定限制流量的配置以及REDIS的配置,修改后最终配置如下:

  • burstCapacity:令牌桶总容量。
  • replenishRate:令牌桶每秒填充平均速率。
  • key-resolver:用于限流的键的解析器的 Bean 对象的名字。它使用 SpEL 表达式根据#{@beanName}从 Spring 容器中获取 Bean 对象
  • 通过在replenishRate和中设置相同的值来实现稳定的速率burstCapacity。设置burstCapacity高于时,可以允许临时突发replenishRate。在这种情况下,需要在突发之间允许速率限制器一段时间(根据replenishRate),因为2次连续突发将导致请求被丢弃(HTTP 429 - Too Many Requests)

    key-resolver: "#{@userKeyResolver}" 用于通过SPEL表达式来指定使用哪一个KeyResolver.
     

          filters:
        - StripPrefix= 1
        - name: RequestRateLimiter #请求数限流 名字不能随便写 
          args:
            key-resolver: "#{@ipKeyResolver}"
            redis-rate-limiter.replenishRate: 1 #令牌桶每秒填充平均速率
            redis-rate-limiter.burstCapacity: 1 #令牌桶总容量

自定义路由过滤器,只在一个路由上生效,且需要配置

==================================================================

固定计数器窗口

public class CounterRateLimiter extends MyRateLimiter {
    /**
     * 每秒限制请求数
     */
    private final long permitsPerSecond;
    /**
     * 上一个窗口的开始时间
     */
    public long timestamp = System.currentTimeMillis();
    /**
     * 计数器
     */
    private int counter;
 
    public CounterRateLimiter(long permitsPerSecond) {
        this.permitsPerSecond = permitsPerSecond;
    }
 
    @Override
    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        // 窗口内请求数量小于阈值,更新计数放行,否则拒绝请求
        if (now - timestamp < 1000) {
            if (counter < permitsPerSecond) {
                counter++;
                return true;
            } else {
                return false;
            }
        }
        // 时间窗口过期,重置计数器和时间戳
        counter = 0;
        timestamp = now;
        return true;
    }
}
复制代码

滑动计数器窗口

 
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * 滑动窗口。该窗口同样的key,都是单线程计算。
 *
 * @author wuweifeng wrote on 2019-12-04.
 */
public class SlidingWindow {
    /**
     * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
     */
    private AtomicInteger[] timeSlices;
    /**
     * 队列的总长度
     */
    private int timeSliceSize;
    /**
     * 每个时间片的时长,以毫秒为单位
     */
    private int timeMillisPerSlice;
    /**
     * 共有多少个时间片(即窗口长度)
     */
    private int windowSize;
    /**
     * 在一个完整窗口期内允许通过的最大阈值
     */
    private int threshold;
    /**
     * 该滑窗的起始创建时间,也就是第一个数据
     */
    private long beginTimestamp;
    /**
     * 最后一个数据的时间戳
     */
    private long lastAddTimestamp;
 
    public static void main(String[] args) {
        //1秒一个时间片,窗口共5个
        SlidingWindow window = new SlidingWindow(100, 4, 8);
        for (int i = 0; i < 100; i++) {
            System.out.println(window.addCount(2));
 
            window.print();
            System.out.println("--------------------------");
            try {
                Thread.sleep(102);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    public SlidingWindow(int duration, int threshold) {
        //超过10分钟的按10分钟
        if (duration > 600) {
            duration = 600;
        }
        //要求5秒内探测出来的,
        if (duration <= 5) {
            this.windowSize = 5;
            this.timeMillisPerSlice = duration * 200;
        } else {
            this.windowSize = 10;
            this.timeMillisPerSlice = duration * 100;
        }
        this.threshold = threshold;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2;
 
        reset();
    }
 
    public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
        this.timeMillisPerSlice = timeMillisPerSlice;
        this.windowSize = windowSize;
        this.threshold = threshold;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2;
 
        reset();
    }
 
    /**
     * 初始化
     */
    private void reset() {
        beginTimestamp = SystemClock.now();
        //窗口个数
        AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
        for (int i = 0; i < timeSliceSize; i++) {
            localTimeSlices[i] = new AtomicInteger(0);
        }
        timeSlices = localTimeSlices;
    }
 
    private void print() {
        for (AtomicInteger integer : timeSlices) {
            System.out.print(integer + "-");
        }
    }
 
    /**
     * 计算当前所在的时间片的位置
     */
    private int locationIndex() {
        long now = SystemClock.now();
        //如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了
        if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
            reset();
        }
 
        return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
    }
 
    /**
     * 增加count个数量
     */
    public boolean addCount(int count) {
        //当前自己所在的位置,是哪个小时间窗
        int index = locationIndex();
//        System.out.println("index:" + index);
        //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
        //譬如1秒分4个窗口,那么数组共计8个窗口
        //当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
        clearFromIndex(index);
 
        int sum = 0;
        // 在当前时间片里继续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时间片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }
        System.out.println(sum + "---" + threshold);
 
        lastAddTimestamp = SystemClock.now();
 
        return sum >= threshold;
    }
 
    private void clearFromIndex(int index) {
        for (int i = 1; i <= windowSize; i++) {
            int j = index + i;
            if (j >= windowSize * 2) {
                j -= windowSize * 2;
            }
            timeSlices[j].set(0);
        }
    }
 
}
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
 
/**
 * 用于解决高并发下System.currentTimeMillis卡顿
 * @author lry
 */
public class SystemClock {
 
    private final int period;
 
    private final AtomicLong now;
 
    private static class InstanceHolder {
        private static final SystemClock INSTANCE = new SystemClock(1);
    }
 
    private SystemClock(int period) {
        this.period = period;
        this.now = new AtomicLong(System.currentTimeMillis());
        scheduleClockUpdating();
    }
 
    private static SystemClock instance() {
        return InstanceHolder.INSTANCE;
    }
 
    private void scheduleClockUpdating() {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "System Clock");
                thread.setDaemon(true);
                return thread;
            }
        });
        scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
    }
 
    private long currentTimeMillis() {
        return now.get();
    }
 
    /**
     * 用来替换原来的System.currentTimeMillis()
     */
    public static long now() {
        return instance().currentTimeMillis();
    }
}
public class SlidingWindowRateLimiter extends MyRateLimiter {
    /**
     * 每分钟限制请求数
     */
    private final long permitsPerMinute;
    /**
     * 计数器, k-为当前窗口的开始时间值秒,value为当前窗口的计数
     */
    private final TreeMap<Long, Integer> counters;
 
    public SlidingWindowRateLimiter(long permitsPerMinute) {
        this.permitsPerMinute = permitsPerMinute;
        this.counters = new TreeMap<>();
    }
 
    @Override
    public synchronized boolean tryAcquire() {
        // 获取当前时间的所在的子窗口值; 10s一个窗口
        long currentWindowTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) / 10 * 10;
        // 获取当前窗口的请求总量
        int currentWindowCount = getCurrentWindowCount(currentWindowTime);
        if (currentWindowCount >= permitsPerMinute) {
            return false;
        }
        // 计数器 + 1
        counters.merge(currentWindowTime, 1, Integer::sum);
        return true;
    }
    /**
     * 获取当前窗口中的所有请求数(并删除所有无效的子窗口计数器)
     *
     * @param currentWindowTime 当前子窗口时间
     * @return 当前窗口中的计数
     */
    private int getCurrentWindowCount(long currentWindowTime) {
        // 计算出窗口的开始位置时间
        long startTime = currentWindowTime - 50;
        int result = 0;
 
        // 遍历当前存储的计数器,删除无效的子窗口计数器,并累加当前窗口中的所有计数器之和
        Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, Integer> entry = iterator.next();
            if (entry.getKey() < startTime) {
                iterator.remove();
            } else {
                result += entry.getValue();
            }
        }
        return result;
    }
}
复制代码

 漏桶算法

@Slf4j
public class LeakyBucketLimiter {
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
 
    // 桶的容量
    public int capacity = 10;
    // 当前水量
    public int water = 0;
    //水流速度/s
    public int rate = 4;
    // 最后一次加水时间
    public long lastTime = System.currentTimeMillis();
 
    public void acquire() {
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            long now = System.currentTimeMillis();
            //计算当前水量
            water = Math.max(0, (int) (water - (now - lastTime) * rate /1000));
            int permits = (int) (Math.random() * 8) + 1;
            log.info("请求数:" + permits + ",当前桶余量:" + (capacity - water));
            lastTime = now;
            if (capacity - water < permits) {
                // 若桶满,则拒绝
                log.info("限流了");
            } else {
                // 还有容量
                water += permits;
                log.info("剩余容量=" + (capacity - water));
            }
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
 
    public static void main(String[] args) {
        LeakyBucketLimiter limiter = new LeakyBucketLimiter();
        limiter.acquire();
    }
}
public class LeakyBucketRateLimiter extends MyRateLimiter {
    // 桶的容量
    private final int capacity;
    // 漏出速率
    private final int permitsPerSecond;
    // 剩余水量
    private long leftWater;
    // 上次注入时间
    private long timeStamp = System.currentTimeMillis();
 
    public LeakyBucketRateLimiter(int permitsPerSecond, int capacity) {
        this.capacity = capacity;
        this.permitsPerSecond = permitsPerSecond;
    }
 
    @Override
    public synchronized boolean tryAcquire() {
        //1. 计算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp) / 1000;
        leftWater = Math.max(0, leftWater - timeGap * permitsPerSecond);
        timeStamp = now;
        
        // 如果未满,则放行;否则限流
        if (leftWater < capacity) {
            leftWater += 1;
            return true;
        }
        return false;
    }
}
复制代码

令牌桶

public class TokenBucketRateLimiter extends MyRateLimiter {
    /**
     * 令牌桶的容量「限流器允许的最大突发流量」
     */
    private final long capacity;
    /**
     * 令牌发放速率
     */
    private final long generatedPerSeconds;
    /**
     * 最后一个令牌发放的时间
     */
    long lastTokenTime = System.currentTimeMillis();
    /**
     * 当前令牌数量
     */
    private long currentTokens;
 
    public TokenBucketRateLimiter(long generatedPerSeconds, int capacity) {
        this.generatedPerSeconds = generatedPerSeconds;
        this.capacity = capacity;
    }
 
    /**
     * 尝试获取令牌
     *
     * @return true表示获取到令牌,放行;否则为限流
     */
    @Override
    public synchronized boolean tryAcquire() {
          /**
           * 计算令牌当前数量
           * 请求时间在最后令牌是产生时间相差大于等于额1s(为啥时1s?因为生成令牌的最小时间单位时s),则
           * 1. 重新计算令牌桶中的令牌数
           * 2. 将最后一个令牌发放时间重置为当前时间
           */
        long now = System.currentTimeMillis();
        if (now - lastTokenTime >= 1000) {
            long newPermits = (now - lastTokenTime) / 1000 * generatedPerSeconds;
            currentTokens = Math.min(currentTokens + newPermits, capacity);
            lastTokenTime = now;
        }
        if (currentTokens > 0) {
            currentTokens--;
            return true;
        }
        return false;
    }
}
 
复制代码

=========================================================================

令牌桶深入学习RFC

 

 

报文过来,如果没有颜色,先色盲模式添加颜色, 再执行色敏模式(色敏模式就是纯判断了,比如绿色牌子过来 C桶满了,则进E桶,红色牌子直接丢弃,黄色牌子 C桶满了 则丢弃)

 

 

 

=========================================================================

Sentinel限流规则_Leon_Jinhai_Sun的博客-CSDN博客_sentinel限流规则

流量控制(限流算法)是解决雪崩问题

簇点链路

当请求进入微服务时,首先会访问DispatcherServlet,然后进入Controller、Service、Mapper,这样的一个调用链就叫做簇点链路。簇点链路中被监控的每一个接口就是一个资源

默认情况下sentinel会监控SpringMVC的每一个端点(Endpoint,也就是controller中的方法),因此SpringMVC的每一个端点(Endpoint)就是调用链路中的一个资源。

流控、熔断等都是针对簇点链路中的资源来设置的,因此我们可以点击对应资源后面的按钮来设置规则:

流控:流量控制

降级:降级熔断

热点:热点参数限流,是限流的一种

授权:请求的权限控制

其含义是限制 /order/{orderId}这个资源的单机QPS为1,即每秒只允许1次请求,超出的请求会被拦截并报错。

jmeter测试

 注意,不要点击菜单中的执行按钮来运行

流控模式(这里是重点

在添加限流规则时,点击高级选项,可以选择三种流控模式

直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也是默认的模式

关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流

链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流

默认是直接模式

关联模式

关联模式:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流

语法说明:当/write资源访问量触发阈值时,就会对/read资源限流,避免影响/write资源。

---------------------------------------------------------------------------------------------------------------------------

例子:

使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是优先支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对查询订单业务限流。

需求说明

        在OrderController新建两个端点:/order/query和/order/update,无需实现业务

        配置流控规则,当/order/ update资源被访问的QPS超过5时,对/order/query请求限流

1)定义/order/query端点,模拟订单查询

@GetMapping("/query")
public String queryOrder() {
    return "查询订单成功";
}

 2)定义/order/update端点,模拟订单更新

@GetMapping("/update")
public String updateOrder() {
    return "更新订单成功";
}

重启服务,查看sentinel控制台的簇点链路:

 

 3)配置流控规则

对哪个端点限流,就点击哪个端点后面的按钮。我们是对订单查询/order/query限流,因此点击它后面的按钮:

在表单中填写流控规则: 

4)在Jmeter测试

选择《流控模式-关联》:

可以看到1000个用户,100秒,因此QPS为10,超过了我们设定的阈值:5

查看http请求:

请求的目标是/order/update,这样这个断点就会触发阈值。

但限流的目标是/order/query,我们在浏览器访问,可以发现:

 5)总结

---------------------------------------------------------------------------------------------------------------------------

 =========================================================================

链路模式

链路模式只针对从指定链路访问到本资源的请求做统计,判断是否超过阈值。

配置示例

例如有两条请求链路:

/test1 --> /common

/test2 --> /common

 如果只希望统计从/test2进入到/common的请求,则可以这样配置:

---------------------------------------------------------------------------------------------------------------------------

例子:

实战案例

需求:有查询订单和创建订单业务,两者都需要查询商品。针对从查询订单进入到查询商品的请求统计,并设置限流。

步骤:

在OrderService中添加一个queryGoods方法,不用实现业务

在OrderController中,改造/order/query端点,调用OrderService中的queryGoods方法

在OrderController中添加一个/order/save的端点,调用OrderService的queryGoods方法

给queryGoods设置限流规则,从/order/query进入queryGoods的方法限制QPS必须小于2

实现:

1)添加查询商品方法

在order-service服务中,给OrderService类添加一个queryGoods方法:

public void queryGoods(){
    System.err.println("查询商品");
}

2)查询订单时,查询商品

在order-service的OrderController中,修改/order/query端点的业务逻辑:

@GetMapping("/query")
public String queryOrder() {
    // 查询商品
    orderService.queryGoods();
    // 查询订单
    System.out.println("查询订单");
    return "查询订单成功";
}

3)新增订单,查询商品

在order-service的OrderController中,修改/order/save端点,模拟新增订单:

@GetMapping("/save")
public String saveOrder() {
    // 查询商品
    orderService.queryGoods();
    // 查询订单
    System.err.println("新增订单");
    return "新增订单成功";
}

4)给查询商品添加资源标记

默认情况下,OrderService中的方法是不被Sentinel监控的,需要我们自己通过注解来标记要监控的方法。

给OrderService的queryGoods方法添加@SentinelResource注解:

@SentinelResource("goods")
public void queryGoods(){
    System.err.println("查询商品");
}

链路模式中,是对不同来源的两个链路做监控。但是sentinel默认会给进入SpringMVC的所有请求设置同一个root资源,会导致链路模式失效。

我们需要关闭这种对SpringMVC的资源聚合,修改order-service服务的application.yml文件

spring:
  cloud:
    sentinel:
      web-context-unify: false # 关闭context整合

重启服务,访问/order/query和/order/save,可以查看到sentinel的簇点链路规则中,出现了新的资源:

5)添加流控规则

点击goods资源后面的流控按钮,在弹出的表单中填写下面信息:

 只统计从/order/query进入/goods的资源,QPS阈值为2,超出则被限流。

6)Jmeter测试

选择《流控模式-链路》:

可以看到这里200个用户,50秒内发完,QPS为4,超过了我们设定的阈值2

一个http请求是访问/order/save:

 运行的结果:

完全不受影响。

另一个是访问/order/query:

 运行结果:  

 

 每次只有2个通过。

总结

流控模式有哪些?

•直接:对当前资源限流

•关联:高优先级资源触发阈值,对低优先级资源限流。

•链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流

---------------------------------------------------------------------------------------------------------------------------

 =========================================================================

流控效果

在流控的高级选项中,还有一个流控效果选项:

流控效果是指请求达到流控阈值时应该采取的措施,包括三种:

1 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常。是默认的处理方式。

2 warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值。

3 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长

=========================================================================

warm up

阈值一般是一个微服务能承担的最大QPS,但是一个服务刚刚启动时,一切资源尚未初始化(冷启动),如果直接将QPS跑到最大值,可能导致服务瞬间宕机。

warm up也叫预热模式,是应对服务冷启动的一种方案。请求阈值初始值是 maxThreshold / coldFactor,持续指定时长后,逐渐提高到maxThreshold值。而coldFactor的默认值是3.


例如,我设置QPS的maxThreshold为10,预热时间为5秒,那么初始阈值就是 10 / 3 ,也就是3,然后在5秒后逐渐增长到10

---------------------------------------------------------------------------------------------------------------------------

例子:

案例

需求:给/order/{orderId}这个资源设置限流,最大QPS为10,利用warm up效果,预热时长为5秒

1)配置流控规则:

2)Jmeter测试

选择《流控效果,warm up》:

 

QPS为10.

刚刚启动时,大部分请求失败,成功的只有3个,说明QPS被限定在3:

 随着时间推移,成功比例越来越高:

 到Sentinel控制台查看实时监控:

一段时间后: 

---------------------------------------------------------------------------------------------------------------------------

 =========================================================================

排队等待

当请求超过QPS阈值时,快速失败和warm up 会拒绝新的请求并抛出异常。

排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝。

工作原理

例如:QPS = 5,意味着每200ms处理一个队列中的请求;timeout = 2000,意味着预期等待时长超过2000ms的请求会被拒绝并抛出异常。

那什么叫做预期等待时长呢?

比如现在一下子来了12 个请求,因为每200ms执行一个请求,那么:

第6个请求的预期等待时长 = 200 * (6 - 1) = 1000ms

第12个请求的预期等待时长 = 200 * (12-1) = 2200ms

现在,第1秒同时接收到10个请求,但第2秒只有1个请求,此时QPS的曲线这样的:

如果使用队列模式做流控,所有进入的请求都要排队,以固定的200ms的间隔执行(漏桶算法),QPS会变的很平滑:

 平滑的QPS曲线,对于服务器来说是更友好的。

---------------------------------------------------------------------------------------------------------------------------

例子:

案例

需求:给/order/{orderId}这个资源设置限流,最大QPS为10,利用排队的流控效果,超时时长设置为5s

1)添加流控规则

 2)Jmeter测试

QPS为15,已经超过了我们设定的10。

如果是之前的 快速失败、warmup模式,超出的请求应该会直接报错。

但是我们看看队列模式的运行结果:

 全部都通过了。

再去sentinel查看实时监控的QPS曲线:

QPS非常平滑,一致保持在10,但是超出的请求没有被拒绝,而是放入队列。因此响应时间(等待时间)会越来越长。

当队列满了以后,才会有部分请求失败:

总结
流控效果有哪些?

1 快速失败:QPS超过阈值时,拒绝新的请求

2 warm up: QPS超过阈值时,拒绝新的请求;QPS阈值是逐渐提升的,可以避免冷启动时高并发导致服务宕机。

3 排队等待:请求会进入队列,按照阈值允许的时间间隔依次执行请求;如果请求预期等待时长大于超时时间,直接拒绝

---------------------------------------------------------------------------------------------------------------------------

 =========================================================================

热点参数限流

之前的限流是统计访问某个资源的所有请求,判断是否超过QPS阈值。而热点参数限流是分别统计参数值相同的请求,判断是否超过QPS阈值。

全局参数限流

例如,一个根据id查询商品的接口:

访问/goods/{id}的请求中,id参数值会有变化,热点参数限流会根据参数值分别统计QPS,统计结果:

当id=1的请求触发阈值被限流时,id值不为1的请求不受影响。

配置示例:

 

代表的含义是:对hot这个资源的0号参数(第一个参数)做统计,每1秒相同参数值的请求数不能超过5

热点参数限流

刚才的配置中,对查询商品这个接口的所有商品一视同仁,QPS都限定为5.

而在实际开发中,可能部分商品是热点商品,例如秒杀商品,我们希望这部分商品的QPS限制与其它商品不一样,高一些。那就需要配置热点参数限流的高级选项了:

结合上一个配置,这里的含义是对0号的long类型参数限流,每1秒相同参数的QPS不能超过5,有两个例外:

 如果参数值是100,则每1秒允许的QPS为10

 如果参数值是101,则每1秒允许的QPS为15

---------------------------------------------------------------------------------------------------------------------------

例子:

案例

案例需求:给/order/{orderId}这个资源添加热点参数限流,规则如下:

默认的热点参数规则是每1秒请求量不超过2

给102这个参数设置例外:每1秒请求量不超过4

给103这个参数设置例外:每1秒请求量不超过10

注意事项:热点参数限流对默认的SpringMVC资源无效,需要利用@SentinelResource注解标记资源

1)标记资源

给order-service中的OrderController中的/order/{orderId}资源添加注解:

2)热点参数限流规则

访问该接口,可以看到我们标记的hot资源出现了:

 

这里不要点击hot后面的按钮,页面有BUG

点击左侧菜单中热点规则菜单:

 点击新增,填写表单:

3)Jmeter测试

选择《热点参数限流 QPS1》:

这里发起请求的QPS为5.

包含3个http请求:

普通参数,QPS阈值为2

 运行结果:

QPS阈值为10

 运行结果:

 =======================================================================

=======================================================================

Sentinel漏桶匀速限流_瓜农老梁的博客-CSDN博客

Sentinel漏桶匀速限流

排队等待模式

Sentinel中的排队等待由RateLimiterController实现,通过控制请求通过的时间间隔来实现达到匀速的目的。 

 

代码逻辑
@1 计算请求通过的间隔时间
假如设置的阈值为count=100即每秒允许100个请求,每次通过一个请求acquireCount=1,套入公式costTime=10。即两次请求的时间间隔为10秒
@2 计算这次请求通过的预期时间=上次请求通过的时间+时间间隔
@3 当前时间大于预期时间,则允许通过并更新上次请求时间戳
@4 当前时间小于预期时间,则需要等待;计算需要等待的时间
@5 需要等待的时间大于超时时间则拒绝,默认超时时间为500毫秒
@6 再算一遍等待时间,算法跟第4步一样,并再次判断是否超过等待时间
@7 线程sleep等待时间后允许请求通过

三、匀速模式局限 

Sentinel等待模式为什么只支持1000以内QPS?文章开头提出的问题。下面看下时间间隔计算公式,每次通过一个请求acquireCount=1。

long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
下个请求的预期通过时间为:

long expectedTime = costTime + latestPassedTime.get();
随着阈值count即一秒期望通过的请求数,下面观察随着阈值的变化,时间间隔变化情况。

 由上表看出阈值count大于1000小于2000,时间间隔一致为1毫秒,而大于2000后,时间间隔则掉为0,即后面的所有判断将失效。因此Sentinel提供的匀速器只支持QPS在1000以内的请求场景。

预热模式+排队等待

Sentinel还提供一种预热+排队等待相结合的限流模式,也就是令牌桶和漏桶相结合的模式,示意图如下:请求的通过需要从令牌桶中获取令牌,获取令牌的流量需要经过漏桶匀速通过。

 

备注
整体可以分配两个部分,上部分基于令牌桶计算部分,下部分基于漏桶计算部分。

@1 warningToken令牌桶中的一个阈值,超过该值时开启预热
@2 小于warningToken不开启预热,根据阈值计算下个请求通过时距离上个请求的时间间隔
@3 warmingQps根据斜率计算出预热时的Qps
@4 计算预热时下个请求通过时距离上个请求的时间间隔
@5 这部分与上面匀速排队逻辑一致

小结:预热模式+排队等待模式比单纯的预热模式,在请求通过是增加了请求之间时间间隔的判断;相比单纯的排队模式,在时间间隔上更加灵活,根据预热时的Qps计算时间间隔。

=========================================================================

=========================================================================

Sentinel热点参数如何限流【原理源码】_瓜农老梁的博客-CSDN博客

Sentinel热点参数如何限流【原理源码】

前言

热点参数限流通过对请求的第几个参数以及参数值的流量进行统计,超过阈值触发流控的一种方式,例如:售卖的热销产品的抢购场景。

那如果入参是对象如何限流?例如入参是Product对象

参数的不一样,那么多参数是如何统计的呢?

热点参数都支持哪些限流类型?不同限流类型原理是什么

二、热点参数流程 

1.入口代码

负责热点参数限流的插槽为ParamFlowSlot,当达到设置的阈值时抛出ParamFlowException。

 2.判断流程

小结:通过获取热点参数索引号(即第几个参数由用户设置),获取参数值;如果入参是对象,那么该对象需要实现ParamFlowArgument接口并重写paramFlowKey方法提供需要热点参数。

3.参数类型

 小结:如果热点参数类型为Collection或者Array,对其中的每个元素循环进行校验。

4.限流类型

 

小结:热点参数限流支持QPS和并发线程数两种类型。其中QPS的限流效果包括匀速限流和直接限流。

三、直接限流 

直接限流注解

 

小结:热点参数直接限流通过简易令牌桶算法来实现的,请求通过时通过比较剩余令牌的数量。有令牌则放行,无令牌触发热点参数限流抛出ParamFlowException。添加令牌的时机选择在两次请求的时间间隔超过时间窗口大小时,计算出这段时间需要给令牌桶添加多少令牌。热点参数的最大令牌数即用户设置的限流阈值与允许突发流量之和。

四、匀速限流

匀速限流注解

小结:热点参数的匀速限流依然使用漏桶原理,需要注意的是匀速限流最大请求QPS依然为1000。获取匀速通过漏桶时的平均时间,通过与两次请求的时间间隔以及排队等待的时间比较来决定触发热点参数限流还是放行。

=========================================================================

=========================================================================

Sentinel使用令牌桶实现预热【原理源码】_瓜农老梁的博客-CSDN博客

Sentinel使用令牌桶实现预热【原理源码】

前言

Sentinel的QPS流控效果有快速失败、预热模式、排队等待、预热+排队等待模式,本文主要分析预热模式中是如何使用令牌桶算法限流的。

一、流控效果源码结构

在FlowRule更新缓存时,根据配置的不同类型初始化不同的流控效果处理类。 

1.流控效果封装入口

 2.分发不同的控制类

 3.流控控制类图

二、快速失败

快速失败即发送流控时抛出FlowException。 

1.快速失败流程

备注
@1 获取当前已经使用的线程数或者QPS
@2 与阈值进行判断,是否允许本次通行
@3 QPS流控并且prioritized设置为true表示预占用令牌
@4 大于阈值触发流控
@5 小于阈值允许通行

2.预占用令牌

预占用令牌:当基于QPS流控时并且prioritized设置为true,表示当前时间窗口令牌不够时,预占用下个时间窗口的令牌并返回需要等待的时间。

 备注:在不考虑优先级(预占用)令牌的情况,快速失败比较简单。

令牌桶限流原理

假如系统平时流量很低,突然陡增的流量需要缓慢增加。具体到令牌桶,可以通过控制令牌的生产速率来对流量进行控制。令牌生产速率如何控制?

我们在使用sentinel设置QPS的预热流控时,需要设置阈值count和预热时长warmUpPeriodInSec,下面梳理下与下图坐标图的关系

 坐标图说明

换算关系

        count,已知由用户设置,例如每秒允许通过100个请求

        warmUpPeriodInSec,已知由用户设置,默认为10秒,时间区域上红色(2)梯形区域

        coldFactor,已知默认为3

公式一:stableInterval = 1/count
公式二:coldInterval = stableInterval * coldFactor

备注:由于coldFactor默认为3,y轴stableInterval~coldInterval的距离是0~stableInterval的距离两倍,时间区域上红色(2)梯形区域是红色1的长方形区域的两倍。

公式三:坐标时间(1)长方形区域面积 = 长(thresholdPermits(warningToken)) * 宽(stableInterval)
公式四:坐标时间(1)长方形区域面积 = 0.5 * warmUpPeriodInSec
公式五:thresholdPermits(warningToken)=0.5 * warmUpPeriodInSec/stableInterval

备注:梯形的面积 = (上低+下低)* 高 ➗ 2 推导出maxPermits(maxToken)的值。

公式六:maxPermits(maxToken) = thresholdPermits(warningToken) + 2 * warmUpPeriodInSec ➗ (stableInterval + coldInterval)

备注:由斜率公式k=(y1-y2)➗(x1-x2),得出斜率如下。

slope = (coldInterval-stableInterval)➗(maxPermits(maxToken)-thresholdPermits(warningToken))

原理概述

当令牌桶中的令牌数小于thresholdPermits(warningToken)时,令牌按照固定速率生产,请求流量稳定。当令牌数大于thresholdPermits(warningToken)时,开启预热。此段时期,生产的令牌的速率小于令牌滑落的速度,一段时间后,令牌小于等于thresholdPermits(warningToken),请求回归到稳定状态,预热结束。

四、预热源码分析

1.WarmUpController构造函数

备注:此部分主要计算了warningToken阈值、最大令牌数maxToken、斜率slope,详细推导过程见原理部分。

2.请求判断逻辑

 3.令牌的生产和滑落

 备注:从代码可以看出sentinel中桶中的令牌生产和移除是在下次请求到来时一起处理的,另外Sentinel提供的令牌生产公式与当前时间currentTime有关系,如果一个冷系统好久没有流量,瞬间来了很大流量,此时的桶中令牌数会直接到达最大值maxToken,这也是官方提供的曲线图中开始流量比较陡的原因。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐