限流-滑动窗口
1. 背景1.1 为什么需要限流大量正常用户高频访问导致服务器宕机恶意用户高频访问导致服务器宕机网页爬虫 ,对于这些情况我们需要对用户的访问进行限流访问2. 介绍目前主流的限流算法:令牌、漏桶、滑动窗口。Nginx都实现了漏桶算法,Springcloud Gateway和Guava Ratelimiter实现了令牌桶,阿里的 Sentinel实现了滑动窗口。2.1 滑动窗口2.1.1 为什么引入滑
1. 介绍
目前主流的限流算法:令牌、漏桶、滑动窗口。Nginx都实现了漏桶算法,Springcloud Gateway和Guava Ratelimiter实现了令牌桶,阿里的 Sentinel实现了滑动窗口。
1.1 为什么需要限流
- 大量正常用户高频访问导致服务器宕机
- 恶意用户高频访问导致服务器宕机
- 网页爬虫 ,对于这些情况我们需要对用户的访问进行限流访问
1.2 为什么引入滑动窗口
固定窗口可能遇到的问题
-
限流不均匀
-
两倍的配置速率问题
假如限流设置为:
1秒钟1000个请求,在 第一秒的最后100ms,以及 第二秒 最开始100ms,都收到1000次请求。就会出现在这个 200ms 的周期中收到 2000次请求,并且限流通过,这就是 两倍的配置速率问题。
1.3 定义
滑动窗口为固定窗口的改良版,解决了固定窗口在窗口切换时会受到两倍于阈值数量的请求,滑动窗口在固定窗口的基础上,将一个窗口分为若干个等份的小窗口,每个小窗口对应不同的时间点,拥有独立的计数器,当请求的时间点大于当前窗口的最大时间点时,则将窗口向前平移一个小窗口(将第一个小窗口的数据舍弃,第二个小窗口变成第一个小窗口,当前请求放在最后一个小窗口),整个窗口的所有请求数相加不能大于阀值。
2. 源码
2.1 阿里Sentinel
2.1.1 系统级别限流SystemSlot
系统自适应限流 —— 过载保护。定义自适应限流规则需要提供多个参数
- 系统的负载水平线,超过这个值时触发过载保护功能
- 允许的最大线程数、最长响应时间和最大 QPS,可以不设置
List<SystemRule> rules = new ArrayList<SystemRule>();
SystemRule rule = new SystemRule();
rule.setHighestSystemLoad(3.0);
rule.setAvgRt(10);
rule.setQps(20);
rule.setMaxThread(10);
rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
从代码中也可以看出系统自适应限流规则不需要定义资源名称,因为它是全局的规则,会自动应用到所有的临界区。如果当负载超标时,所有临界区资源将一起勒紧裤腰带渡过难关。
(1)检查系统指标,qps、thread、rt、load、cpu
定时任务会每秒检查自身系统状况。
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
// 只有配置了系统自适应限流规则才能进入,SystemRuleManager.loadRules去加载
if (!checkSystemStatus.get()) {
return;
}
// for inbound traffic only
// 只有入口流量才能进入
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// total qps
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
(2) 如何加载系统规则
List<SystemRule> rules = new ArrayList<SystemRule>();
SystemRule rule = new SystemRule();
rule.setHighestSystemLoad(3.0);
rule.setAvgRt(10);
rule.setQps(20);
rule.setMaxThread(10);
rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
2.1.2 流控规则FlowSlot
FlowSlot使用的核心思路是滑动窗口。
(1)定义窗口
public class StatisticNode implements Node {
/**
* Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* 滚动计数器:保存最近60秒的统计信息。windowLengthInMs故意设置为1000毫秒,意思是每一个桶每秒,这样我们就可以得到每一秒的准确统计。
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/**
* 线程数
*/
private LongAdder curThreadNum = new LongAdder();
/**
* 获取度量时的最后一个时间戳
*/
private long lastFetchTime = -1;
// ...
}
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
// ...
}
- sampleCount
要知道有多少个小窗口,在sentinel中也就是sampleCount,比如说我们有60个窗口。
- intervalInMs(间隔-毫秒)
intervalInMs是用来计算这个窗口长度的,intervalInMs/窗口数量= 窗口长度。也就是我给你1分钟,你给我分成60个窗口,这个时候窗口长度就是1s了,那如果我给你1s,你给我分2个窗口,这个时候窗口长度就是500毫秒了,这个1分钟,就是intervalInMs。
- enableOccupy
是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量,这里对应 LeapArray 的两个实现类,如果允许抢占,则为 OccupiableBucketLeapArray,否则为 BucketLeapArray。
(2)判断是否触发限流标准
以qps为基准,qps的定义是每秒的流量,sentinel以滑动窗口作为核心,中qps的计算公式:每秒通过数量/每秒的间隔。本质上rollingCounterInSecond.pass()就代表了qps指标,因为默认的间隔为1s,只是窗口数量为2。
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
判断qps是否超标
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
// 流量等级,VIP通道问题
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
// 流量超标,直接拒绝
return false;
}
return true;
}
(3)滑动窗口的设计
计算每秒的通过数量
public long pass() {
// 锁定当前时间所在的滑动窗口,为什么没有返回是因为滑动窗口的新建或者更新是需要锁资源的,当无法竞争到资源时需要等待,等待是会释放CPU控制权,而不释放锁
data.currentWindow();
long pass = 0;
// 根据当前时间筛选未过期的滑动窗口,如何判定:当前时间-窗口开始时间>间隔时间
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
如何根据当前时间去锁定滑动窗口(CAS + 可重入锁)
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 根据当前时间计算滑动窗口下标
int idx = calculateTimeIdx(timeMillis);
// 根据当前时间计算滑动窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// CAS
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 基本不可能到这里
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
前面分析了如何统计qps,现在分析qps如何往上加
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 计数
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
// ...
} catch (Throwable e) {
// ...
}
}
3. 实战
4. FAQ
4.1 如何判断系统已经到达崩溃边缘,qps、thread、rt、load、cpu?
4.2 LongAdder和AtomicLong有什么区别?
并发场景下的number操作,都会选用java.util.concurrent.atomic包下的数据结构,比如典型的计数场景,而AtomicLong是其中一个,但是jdk1.8后Doug Lea大神又添加LongAdder和DoubleAdder,在大并发场景下性能会远高于AtomicLong。AtomicLong是以CAS保证原子性;而LongAdder是拆分锁的粒度,比如原先是多个线程争抢修改一个value的,变成多个线程争抢修改多个value。
4.3 根据当前时间筛选未过期的滑动窗口为什么不是:当前时间-窗口开始时间>窗口长度时间,而是大于间隔时间。按照样本数量为2计算,窗口长度时间为500ms,而间隔时间为1000ms?
A:pass数量是以整个窗口(包含N个滑动窗口)的长度为测量单位的,滑动窗口越多只能说明更精确。
4.4 固定窗口的两倍速率问题,滑动窗口也存在的吧,只是滑动窗口将大窗口拆细,来减少可能额外超出的量不至于太多。
5. 参考资料
更多推荐
所有评论(0)