前言

Redis除了能用作缓存外,还有很多其他用途,比如分布式锁,分布式限流,分布式唯一主键等,本文将和大家分享下基于Redis分布式限流的各种实现方案。


一、为什么需要限流

用最简单的话来说:外部请求是不可控的,而我们系统的负载是有限的,如果没有限流机制,一旦外部请求超过系统承载的压力,就会出现系统宕机等严重问题。加入限流正是为了保证系统负载在可以承受的范围内

比如春节的秒杀环节。我们在上线前预估了能应对的秒杀 qps 是 1kw/s,但是实际可能达到了1亿/s,这种情况下这多出来的9kw请求很可能压垮我们的数据库,进而影响到接下来所有的用户正常访问。
在这里插入图片描述

补充:
微服务保证稳定性的几个利器:缓存、熔断、降级、限流

  • 缓存的目的是为了降低系统的访问延迟,提高系统能力,给用户更好的体验
  • 熔断的目的是为了在发现某个服务故障熔断对下游依赖的请求,减少不必要的损耗
  • 降级的目的是为了在系统在某个环节故障(比如某个下游故障)不影响整体核心链路,比如返回作者列表,关注服务故障了获取不了关注真实的关注情况,这种情况可以考虑降级关注按钮,全部显示为未关注
  • 限流的目的是为了保证系统处理的请求量在可以承受的范围内,防止突发流量压垮系统,保证系统稳定性。

二、单机限流和分布式限流

1、单机限流的瓶颈

单机限流有个主要的缺陷就是不够精确,我们可能有1000个实例,但是下游存储只有一套,即多对一的关系,如果单纯的以单机限流作为衡量指标,很可能把下游打挂。如果以下游(数据库)的总请求量均衡到每台机器上,由于每个机器请求数据库量级可能不同,导致部分机器被限流严重,而部分机器压根没什么请求,造成误伤。
即不好根据下游数据库服务的负载压力,来评估上游应用服务器每台的负载压力是多少。

采用根据总负载来均摊负载的方式显然并不精确,并且不能充分发挥上游服务器的处理性能,极大的限制了系统的负载能力。

比如数据库负载为limit=300,由于采用上游采用负载均摊的方式每台应用服务器限制为limit=100,如果来个固定IP或固定用户的请求一直落在服务器Server1上,那么系统的实际并发就变成了100,显然这样处理并不科学,也降低了单台应用服务器的处理能力

在这里插入图片描述
单机限流也有好处:可以根据服务器配置的不同进行不同权重的限流配置
比如:4核CPU,16G内存的服务器上限流为1000;2核 8G内存的服务器上限流为500.

2、分布式限流

为了解决单机限流的瓶颈,需要引入分布式限流,即我们不应为每个单机设置限流阈值,而是根据下游的负载情况设置一个全局的阈值。目前主流的做法主要是通过 redis 或者 zookeeper 来实现。zookeeper 使用、运维都比较复杂,所以大部分是使用 redis + lua 脚本来实现

分布式限流器和单机限流的实现类似,只是计数存储换成了一些分布式存储而不是在单机内存中。
我们可以使用 redis + lua 脚本实现令牌桶的算法,因为 lua 的执行可以做到事务,要么全部成功要么全部失败。所以可以很简单地实现分布式的令牌桶逻辑,并且可以实现精确的限流。但是这种实现的缺陷是如果请求 qps很高,所有的请求都要和redis交互,redis 不一定能承载得住。
在这里插入图片描述

3、小结

  • 单机限流更适合对单机精确限流,比如针对单个mysql数据库的请求限流,某低配的应用服务器只能处理100并发的限流。
  • 分布式限流更适合分布式场景的总体流控,比如知道请求链路中下游的负载能力出现瓶颈,那么上游就需要根据下游瓶颈进行整体流量控制,如数据库的负载是limit=500,这时就可以采用分布式限流器来控制落在应用服务器上整体的请求数。

三、4种主流的限流算法

1、固定时间窗口

固定窗口算法又叫计数器算法,是一种简单方便的限流算法。主要通过一个支持原子操作的计数器来累计 1 秒内的请求次数,当 1 秒内计数达到限流阈值时触发拒绝策略。每过 1 秒,计数器重置为 0 开始重新计数。
在这里插入图片描述

  • 优点:时间窗口固定,实现简单,性能较高
  • 缺点:无法应对两个时间窗口临界时间内的突发流量,如上图所示,我们虽然通过限流器限制了单个时间窗口内只能有2个请求(即QPS=2),但是在两个1秒时间窗口中间1秒的时间窗口内却发生了4次请求。

2、滑动时间窗口

我们已经知道固定窗口算法的实现方式以及它所存在的问题,而滑动窗口算法是对固定窗口算法的改进。既然固定窗口算法在遇到时间窗口的临界突变时会有问题,那么我们在遇到下一个时间窗口前也调整时间窗口不就可以了吗?

下面是滑动窗口的示意图:
在这里插入图片描述
上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题。

那么有没有什么办法更精确的统计时间窗口内的请求数呢?

答案是有的,就是记录下所有的请求时间点,新请求到来时以请求时间作为时间窗口的结尾时间统计时间窗口范围内的请求数量是否超过指定阈值,由此来确定是否达到限流,这种方式没有了时间窗口突变的问题,限流比较准确,但是因为要记录下每次请求的时间点,所以占用的内存较多
该方法又叫做滑动日志算法。不过其算法的本质还是滑动时间窗口那套,区别在于滑动时间窗口是固定时间间隔滑动时间窗口,而滑动日志算法由于保存了每个请求的时间戳,可以根据最新请求的时间戳计算出当前时间窗口内的请求数。
在这里插入图片描述

  • 优点:优化了固定时间窗口的临界问题,限流更加精准
  • 缺点:实现较为复杂,会占用较多内存,每个请求都需要重新统计最新时间窗口内的请求数,性能较低。

3、漏桶

漏桶算法中将限流器比作一个漏斗,每一个请求到来就会向桶中添加一定的水量,桶底有一个孔,以恒定速度不断的漏出水;当一个请求过来需要向加水时,如果漏桶剩余容积不足以容纳添加的水量,就会触发拒绝策略。

假设限流要求是每分钟允许30个请求,可以将漏桶的容积看作30,每次请求加水量为1,漏桶中水的流出速度为30/时间周期1分钟。

当出现最大30个并发时,漏桶会被瞬间注满水,后续请求都会被拒绝。只有随着水量以固定速度流出后,漏斗中有剩余空间容纳新的水量,系统才能接受的新的请求。

注意:
1、漏桶算法模型中并没有队列的概念,每个请求到来时向漏斗中加水并不是加入队列等待被消费,所以漏斗并不能像消息队列那样削峰填谷、缓解突发的请求压力,限流器只负责判断当前请求是被允许还是需要拒绝。只要容器中能继续加水,请求就被允许,否则拒绝请求。
2、漏桶中水的流出速度并不等于请求的并发量,往漏桶中加水的速度才是当前系统的实际并发量。水的流出可以等同于令牌桶算法中向桶中投放令牌,水流出的速度越快,漏桶中就越快腾出空余空间用来存放新的请求到来需要添加的水量。所以不要简单认为水的流出速度恒定,就能控制当前系统的并发量保持均衡,两者并不是一个概念
3、桶的容积决定了限流器允许的最大并发。当漏桶中没有水时,允许出现最大的并发流量。

很多地方对漏桶的说法都是可以缓冲请求,对此我有不同的看法,所有的限流器不管是采用何种方法实现,都仅仅只是做请求是否被允许的判定器,请求要么被允许通过,要么被直接拒绝,其本身并不提供缓冲请求的功能。漏桶模型中的桶的容量并不会像消息队列那样缓冲请求,不会存放真实的请求信息,等待被处理消费。
在这里插入图片描述

  • 优点:能够起到一定的平滑突发流量的作用。水的恒定流出速度,可以等价于固定速度的投放令牌,不会出现一下子投放大量令牌,立刻被抢空,导致突发流量的问题。

  • 缺点:
    1、资源利用率低:漏桶并不能高效地利用可用的资源。因为它只在固定的时间间隔放行请求,所以在很多情况下,流量非常低,即使不存在资源争用,也无法有效地消耗资源。
    2、饥饿问题:当短时间内有大量突发请求,即使服务器没有任何负载,由于漏桶中的水还没有流出,请求会大量被拒绝。
    3、实现复杂,性能较低,会占用较多内存

4、令牌桶

漏桶是看处理效率和生产效率来控制流速,但是这个流速是静态的,很可能无法充分利用机器的性能。比如,服务器能处理的速率是100qps,但是我们配置的恒定流速只有50qps,这个时候服务器资源还非常地冗余。

令牌桶算法能比较灵活的调整以最大化利用资源:系统每接受到一个请求时,都要求有一个令牌,如果拿到令牌就处理,否则就拒绝,处理完以后把令牌丢弃。

桶中能存放的最大令牌数决定了令牌桶算法的最大并发,当桶中放满令牌时,允许达到最大并发。

令牌桶限流算法的核心就在于如何控制令牌的发放策略。这样可以做的很动态,例如利用系统的负载、业务高峰情况等,高峰时且负载允许加快令牌发放。

从本质上来说,令牌桶中以恒定速度生成令牌和漏桶算法中以恒定速度控制水的流出速度是等同的,都可以实现相同的限流效果
在这里插入图片描述
漏桶 vs 令牌桶
网上的大多数说法:
令牌桶相比漏桶会更加灵活,可以根据业务诉求配置灵活的发放策略。比如可以任意时间放入令牌,或者依据机器负载放入多个令牌,但是漏桶的整形效果会更好,对下游服务会更加友好,因为不容易出现突刺。

个人理解:
两者本质上只是模型不同,但都能实现相同的限流效果。令牌桶能调整令牌的发放策略,漏桶也可以改进水流出的速率。只是我们一般的默认理解是令牌桶是定时投放一定数量的令牌,而漏桶中的水是恒定速度流出,这样的前提下,漏桶相较之下更能起到平滑突发流量的作用,不会像令牌桶那样出现令牌投放后立刻被抢空的情况,而令牌桶则是能更好的适应突发流量。

  • 优点:可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理。当然在漏桶算法中只要桶中剩余空间够大,也能够应付突发的流量。

  • 缺点:实现复杂,性能较低,会占用较多内存

四、基于Redis的Lua分布式限流实战

基于Redis实现分布式限流一般都会采用Lua实现,以保证操作的原子性。
在这里插入图片描述

1、固定时间窗口

fixedWindow-rateLimit.lua脚本内容:

--KEYS[1]: 限流 key
--ARGV[1]: 阈值
--ARGV[2]: 时间窗口,计数器的过期时间
local rateLimitKey = KEYS[1];
local rate = tonumber(ARGV[1]);
local rateInterval = tonumber(ARGV[2]);

local allowed = 1;
-- 每次调用,计数器rateLimitKey的值都会加1
local currValue = redis.call('incr', rateLimitKey);

if (currValue == 1) then
--  初次调用时,通过给计数器rateLimitKey设置过期时间rateInterval达到固定时间窗口的目的
    redis.call('expire', rateLimitKey, rateInterval);
    allowed = 1;
else
--  当计数器的值(固定时间窗口内) 大于频度rate时,返回0,不允许访问
    if (currValue > rate) then
        allowed = 0;
    end
end
return allowed

固定时间窗口限流器实现FixedWindowRateLimiter:

public class FixedWindowRateLimiter implements RateLimiter{
    private final static  String PREFIX = "fixedWindowRateLimiter";

    private final RScript rScript;

    public FixedWindowRateLimiter(RedissonClient client) {
        //获取RScript时一定要指定:LongCodec.INSTANCE,不然Lua中获取参数值是可能会失败
        this.rScript = client.getScript(LongCodec.INSTANCE);
    }


    @Override
    public boolean isAllowed(Rule rule) {
        String keys = String.format("%s.{%s}",PREFIX,rule.getKey());
        String script = LuaScript.getFixedWindowRateLimiterScript();
        long timeoutInMillis = -1L;
        if (rule.getRateInterval() >= 0L) {
            //由于通过expire命令设置过期时长的单位是秒s,这里需要将时间转为秒
            timeoutInMillis = rule.getTimeUnit().toSeconds(rule.getRateInterval());
        }
        Long result = rScript.eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.VALUE, Collections.singletonList(keys), rule.getRate(), timeoutInMillis);
        return result == 1L;
    }
}

单元测试:

    @Test
    void fixedWindowRateLimitServiceTest() throws InterruptedException, ExecutionException {
        //创建名称为laowan.fixedWindow的固定窗口限流器,每60s允许30个请求
        Rule rule = new Rule("laowan.fixedWindow",30,60,TimeUnit.SECONDS, Mode.FIXED_WINDOW);
        while (true){
            boolean result = rateLimiterService.isAllowed(rule);
            Thread.sleep(1000L);
            if(result){
                log.info("请求正常");
            }else{
                log.info("请求太多频繁,请稍后再试");
            }
        }
  }  

2、滑动时间窗口

slidingWindow-rateLimit.lua内容:

--KEYS[1]: 限流器的 key
--ARGV[1]: 当前时间窗口的开始时间
--ARGV[2]: 请求的时间戳(也作为score)
--ARGV[3]: 阈值
-- 1. 移除时间窗口之前的数据
redis.call('zremrangeByScore', KEYS[1], 0, ARGV[1])
-- 2. 统计当前元素数量
local res = redis.call('zcard', KEYS[1])
-- 3. 是否超过阈值
if (res == nil) or (res < tonumber(ARGV[3])) then
    -- 4、保存每个请求的时间搓
    redis.call('zadd', KEYS[1], ARGV[2], ARGV[2])
    return 1
else
    return 0
end

滑动窗口限流器实现SlidingWindowRateLimiter:

public class SlidingWindowRateLimiter implements RateLimiter{
    private final RScript rScript;

    private final static  String PREFIX = "slidingWindowRateLimiter";

    public SlidingWindowRateLimiter(RedissonClient client) {
        this.rScript = client.getScript(LongCodec.INSTANCE);
    }

    @Override
    public boolean isAllowed(Rule rule) {
        String keys = String.format("%s.{%s}",PREFIX,rule.getKey());
        String script = LuaScript.getSlidingWindowRateLimiterScript();

        long interval = -1L;
        if (rule.getRateInterval() >= 0L) {
            interval = rule.getTimeUnit().toMillis(rule.getRateInterval());
        }
        //新窗口的结束时间
        long now = System.currentTimeMillis();
        //新窗口的起始时间
        long windowStartTime = now - interval;
        Long result = rScript.eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.INTEGER,
                Collections.singletonList(keys),
                windowStartTime,now, rule.getRate());
        return result == 1L;
    }
}

3、漏斗

leakyBucket-rateLimit.lua内容:

--参数说明:
--KEYS[1]: 限流器的 key
--ARGV[1]: 容量,决定最大的并发量
--ARGV[2]: 漏水速率,决定平均的并发量
--ARGV[3]: 一次请求的加水量
--ARGV[4]: 时间戳
local limitInfo = redis.call('hmget', KEYS[1], 'capacity', 'passRate', 'addWater','water', 'lastTs')
local capacity = limitInfo[1]
local passRate = limitInfo[2]
local addWater= limitInfo[3]
local water = limitInfo[4]
local lastTs = limitInfo[5]

--初始化漏斗
if capacity == false then
    capacity = tonumber(ARGV[1])
    passRate = tonumber(ARGV[2])
    --请求一次所要加的水量,一定不能大于容量值的
    addWater=tonumber(ARGV[3])
    --当前储水量,初始水位一般为0
    water = tonumber(ARGV[1])
    lastTs = tonumber(ARGV[4])
    redis.call('hmset', KEYS[1], 'capacity', capacity, 'passRate', passRate,'addWater',addWater,'water', water, 'lastTs', lastTs)
    return 1
else
    local nowTs = tonumber(ARGV[4])
    --计算距离上一次请求到现在的漏水量 =  流水速度 *  (nowTs - lastTs)
    local waterPass = tonumber(ARGV[2] *  (nowTs - lastTs))
    --计算当前剩余水量   =  上次水量  - 时间间隔中流失的水量
    water = math.max(tonumber(0), tonumber(water - waterPass))
    --设置本次请求的时间
    lastTs = nowTs
	--判断是否可以加水   (容量 - 当前水量 >= 增加水量,判断剩余容量是否能够容纳增加的水量)
   if capacity - water >= addWater then
        -- 加水
        water = water + addWater
        -- 更新增加后的当前水量和时间戳
        redis.call('hmset', KEYS[1], 'water', water, 'lastTs', lastTs)
        return 1
    end
    -- 请求失败
    return 0
end

漏桶实现LeakyBucketRateLimiter:

public class LeakyBucketRateLimiter implements RateLimiter{
    private final static  String PREFIX = "leakyBucketRateLimiter";

    private final RScript rScript;

    public LeakyBucketRateLimiter(RedissonClient client) {
        this.rScript = client.getScript(LongCodec.INSTANCE);
    }

    @Override
    public boolean isAllowed(Rule rule) {
        String keys = String.format("%s.{%s}",PREFIX,rule.getKey());
        long rateInterval = -1L;
        if (rule.getRateInterval() >= 0L) {
            rateInterval = rule.getTimeUnit().toMillis(rule.getRateInterval());
        }
        //容量,决定最大并发数
        long capacity = rule.getRate();
        //水流的速度,即容器中的水量在周期时间流完的速度
        double passRate = (double)rule.getRate()/rateInterval;
        //每次请求增加的水量,相当于每次请求获取的令牌数
        long addWater = 1L;
        //请求时间戳,使用毫秒
        long lastTs   = System.currentTimeMillis();

        String script = LuaScript.getLeakyBucketRateLimiterScript();
        Long result = rScript.eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.INTEGER,
                Collections.singletonList(keys),
                capacity,passRate,addWater, lastTs);
        return result == 1L;
    }
}

4、令牌桶

令牌桶的lua脚本是参考Redisson的RRateLimiter中的lua脚本实现的,并没有开启单独的线程去添加令牌,而是每次收到新的请求时判断是否需要添加令牌。

tokenBucket-rateLimit.lua内容如下:

-- 本质上和滑动窗口类似,都是通过有序集合zset来保存缓存信息,时间戳作为score,通过时间戳维护时间窗口或令牌的有效性
-- 区别在于滑动窗口是保存每次请求的时间搓来计分,而令牌桶保存的是许可证生成的时间戳。
--参数说明:
--KEYS[1]: 限流器 key
--KEYS[2]: 桶中剩余的许可证  key
--KEYS[3]: 记录所有许可证发出的时间戳的   key

--ARGV[1]: 本次请求申请的令牌数
--ARGV[2]: 请求时间搓
--ARGV[3]: 随机数
--ARGV[4]: 令牌桶容量
--ARGV[5]: 时间周期

local limitInfo = redis.call('hmget', KEYS[1], 'rate', 'interval');
local rate = limitInfo[1];
local interval = limitInfo[2];
--初始化限流信息
if rate == false then
    -- rate表示令牌桶的最大容量
    rate = tonumber(ARGV[4]);
    -- 生成令牌的时间周期
    interval = tonumber(ARGV[5]);
    redis.call('hmset', KEYS[1], 'rate', rate, 'interval', interval);
end;
-- assert(rate ~= false and interval ~= false , 'RateLimiter is not initialized')


-- 限流器名称
local valueName = KEYS[2];
local permitsName = KEYS[3];
-- 单机时才需要
-- if type == '1' then
--    valueName = KEYS[3];
--    permitsName = KEYS[5];
-- end;
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate');

-- 获取限流器中的剩余许可数量
local currentValue = redis.call('get', valueName);
local res;
if currentValue ~= false then
-- 如果有令牌过期,就会在下次请求时,重新发放令牌,重新发放的令牌数量 released
   local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
   local released = 0;
   for i, v in ipairs(expiredValues) do
      -- permits表示单次请求发放的令牌个数
      local random, permits = struct.unpack('Bc0I', v);
      released = released + permits;
   end;

-- 更新剩余许可数量currentValue
   if released > 0 then
      --  清除过期的许可
       redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
           -- 如果  剩余许可数   +  发放的许可数   >  桶的大小tonumber(rate),桶已经满了
           if tonumber(currentValue) + released > tonumber(rate) then
             --   剩余许可  = 请求许可数  -     已发放的未过期的许可数
              currentValue = tonumber(rate) - redis.call('zcard', permitsName);
           else
              -- 桶未满,发放新的许可数量,更新剩余可用许可:剩余许可  = 当前剩余许可 + 新发放的许可
              currentValue = tonumber(currentValue) + released;
           end;
       redis.call('set', valueName, currentValue);
   end;

    --  如果剩余许可不够,需要在res中返回下个许可需要等待多长时间
    if tonumber(currentValue) < tonumber(ARGV[1]) then
       -- zrange获取有序集合下标区间 00 的成员,firstValue表示第一个许可生成的时间
       local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');
       -- 计算下次许可生成的时间
       -- res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
       res = 0;
    else
      -- 保存发放的许可:生成许可保存到permitsName中,
      redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
      -- 减去valueName中的许可数量
      redis.call('decrby', valueName, ARGV[1]);
      res = 1;
    end;
else
  -- 初始化限流器,struct.pack用于数据打包
  redis.call('set', valueName, rate);
  -- ARGV[2]表示分数,即时间戳,   ARGV[1]为申请的令牌数
  redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
  redis.call('decrby', valueName, ARGV[1]);
  res = 1;
end;

-- 刷新过期时间
-- Pttl命令以毫秒为单位返回 key 的剩余过期时间 ,如果存在过期时间,需要更新valueName和permitsName对象的过期时间
local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 then
  redis.call('pexpire', valueName, ttl);
  redis.call('pexpire', permitsName, ttl);
end;
return res;

令牌桶限流器实现TokenBucketRateLimiter:

public class TokenBucketRateLimiter implements RateLimiter{
    private final static  String PREFIX = "tokenBucketRateLimiter";

    private final RScript rScript;

    public TokenBucketRateLimiter(RedissonClient client) {
        this.rScript = client.getScript(LongCodec.INSTANCE);
    }


    @Override
    public boolean isAllowed(Rule rule) {
        //keys参数组装
        String keyName = String.format("%s.%s",PREFIX,rule.getKey());
        String valueName = String.format("{%s}.value",keyName);
        String permitsName = String.format("{%s}.permits",keyName);

        long rateInterval = -1L;
        if (rule.getRateInterval() >= 0L) {
            rateInterval = rule.getTimeUnit().toMillis(rule.getRateInterval());
        }
        byte[] random = new byte[8];
        ThreadLocalRandom.current().nextBytes(random);

        String script = LuaScript.getTokenBucketRateLimiterScript();
        Long result = rScript.eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.INTEGER,
                Arrays.asList(keyName, valueName, permitsName),
                new Object[]{1, System.currentTimeMillis(), random,rule.getRate(),rateInterval}
        );
        return result == 1L;
    }
}

五、怎么兼容单机限流

上面采用Redis的Lua脚本实现了经典的4种限流算法,并且都是分布式限流。那么基于Redis的限流器,可以兼容单机限流吗?

其中参考Redisson中的RRateLimiter实现,我们很容易发现,针对单机限流的实现只需要在每个限流器的Key中添加客户端标识信息即可,比如IP信息。

这样每种限流器实现就既可以支持分布式限流,也可以支持单机限流。

这里只提供思路,感兴趣的朋友可以自己实现。

六、其他限流方案

1、Guava的RateLimiter

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,而且十分高效。

其提供了2种令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现,但是由于限流信息都保存在本地JVM中,因此RateLimiter只能用于单机限流。

<dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>23.0</version>
</dependency>

使用样例:

@Service
public class GuavaRateLimiterService {
    /*每秒控制5个许可*/
    RateLimiter rateLimiter = RateLimiter.create(5.0);
 
    /**
     * 获取令牌
     *
     * @return
     */
    public boolean tryAcquire() {
        return rateLimiter.tryAcquire();
    }
}

    @Autowired
    private GuavaRateLimiterService rateLimiterService;
    
    @ResponseBody
    @RequestMapping("/ratelimiter")
    public Result testRateLimiter(){
        if(rateLimiterService.tryAcquire()){
            return ResultUtil.success1(1001,"成功获取许可");
        }
        return ResultUtil.success1(1002,"未获取到许可");
    }

思考🤔?

Google Guava 工具包中的 RateLimiter 的实现是令牌桶限流,那么是怎么添加令牌的呢?
如果按照指定间隔添加令牌,那么需要开一个线程去定时添加,如果有很多个接口很多个 RateLimiter 实例,线程数会随之增加,这显然不是一个好的办法。显然 Google 也考虑到了这个问题,在 RateLimiter 中,是在每次令牌获取时才进行计算令牌是否足够的。它通过存储的下一个令牌生成的时间,和当前获取令牌的时间差,再结合阈值,去计算令牌是否足够,同时再记录下一个令牌的生成时间以便下一次调用。

2、Redisson的RRateLimiter

Redisson的RRateLimiter也是通过Lua实现的令牌桶限流,并且支持兼容分布式和单机限流,整个框架比较成熟,也是我比较推荐的在项目中使用的方案。

添加依赖:

 <dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.20.1</version>
 </dependency>

使用RRateLimiter限流:

@Autowired
RedissonClient redissonClient;
    
 @Test
 void limitTest() throws InterruptedException {
     RRateLimiter rateLimiter = redissonClient.getRateLimiter("laowan.limiter");
     //RateType.OVERALL 全局,RateType.PER_CLIENT每个客户端
     rateLimiter.trySetRate(RateType.OVERALL, 2, 10, RateIntervalUnit.SECONDS);
     while (true){
         //rateLimiter.acquire(1); // 申请1份许可,直到成功
         Thread.sleep(1000L);
         boolean res = rateLimiter.tryAcquire(1, 2, TimeUnit.SECONDS);
         if(res){
             log.info("获取许可");
         }else{
             log.info("失败");
         }
     }
 }

3、Redis的redis-cell限流模块

Git地址:redis-cell

Redis 4.0提供了一个限流Redis模块,名称为redis-cell,并提供原子的限流指令。
该模块只有一条指令cl.throttle,其参数和返回值比较复杂。
在这里插入图片描述
根据官网说明,redis-cell的好处主要是两点:
1、性能很好,非常快
2、封装了底层实现,避免自定义Lua出现幼稚写法。

但是由于作者声称redis-cell模块目前已经达到很好的性能和实现,后续已经不再积极维护。
不知道是不是我在Redis7.X版本安装该模块始终失败的原因。

感兴趣的朋友可以自己研究下。

4、Nginx 限流

在Nginx中可以通过limit_req_zone配置限流策略。

limit_req_zone 用来限制单位时间内的请求数,采用的漏桶算法 “leaky bucket”。

limit_req_zone $binary_remote_addr zone=test:10m rate=10r/s; #定义限流策略
  1. $binary_remote_addr 指定按 ip 限流统计。
  2. zone=test:10m 表示生成一个大小为 10M,名字为 one 的内存区域,用来存储访问的频次信息。
  3. rate=10r/s 表示允许同一个客户端的访问频次是每秒 10 次。

5、Gateway网关限流

Spring Cloud Gateway 的代码发现,RateLimiter 接口只提供了一个实现类 RedisRateLimiter:

在这里插入图片描述
RedisRateLimiter可以用于分布式限流,它的实现原理依然是基于令牌桶算法的,不过实现逻辑是放在一段 lua 脚本中的,我们可以在 src/main/resources/META-INF/scripts 目录下找到该脚本文件 request_rate_limiter.lua:

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end

return { allowed_num, new_tokens }

这段代码和上面介绍令牌桶算法时用 Java 实现的那段经典代码几乎是一样的。这里使用 lua 脚本,主要是利用了 Redis 的单线程特性,以及执行 lua 脚本的原子性,避免了并发访问时可能出现请求量超出上限的现象。想象目前令牌桶中还剩 1 个令牌,此时有两个请求同时到来,判断令牌是否足够也是同时的,两个请求都认为还剩 1 个令牌,于是两个请求都被允许了。

有两种方式来配置 Spring Cloud Gateway 自带的限流。第一种方式是通过配置文件,比如下面所示的代码,可以对某个 route 进行限流:

 spring:
   cloud:
     gateway:
       routes:
       - id: test
         uri: http://httpbin.org:80/get
         filters:
         - name: RequestRateLimiter
           args:
            key-resolver: '#{@hostAddrKeyResolver}'
            redis-rate-limiter.replenishRate: 1
            redis-rate-limiter.burstCapacity: 3

其中,key-resolver 使用 SpEL 表达式 #{@beanName} 从 Spring 容器中获取 hostAddrKeyResolver 对象,burstCapacity 表示令牌桶的大小,replenishRate 表示每秒往桶中填充多少个令牌,也就是填充速度。

第二种方式是通过下面的代码来配置:

 @Bean
 public RouteLocator myRoutes(RouteLocatorBuilder builder) {
   return builder.routes()
     .route(p -> p
       .path("/get")
       .filters(filter -> filter.requestRateLimiter()
         .rateLimiter(RedisRateLimiter.class, rl -> rl.setBurstCapacity(3).setReplenishRate(1)).and())
       .uri("http://httpbin.org:80"))
     .build();
}

6、阿里的sentinel限流

官网:sentinel

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应过载保护、热点流量防护等多个维度保护服务的稳定性。

Sentinel 的主要特性:
在这里插入图片描述
在这里插入图片描述
可以看到Sentinel的功能十分强大,并且完美兼容目前的微服务体系,并且提供了Web控制台对限流规则实现可视化配置。

7、ratelimiter-spring-boot-starter

Git地址:ratelimiter-spring-boot-starter

介绍该方案主要是作者对基于Redis限流的封装方式比较优雅,其基于注解的限流控制使得项目中对接口进行限流十分优雅便捷。想自己封装 限流-spring-boot-starter的同学可以好好借鉴下。

<dependency>
  <groupId>com.github.taptap</groupId>
  <artifactId>ratelimiter-spring-boot-starter</artifactId>
  <version>1.3</version>
</dependency>

8、信号量 Semaphore

信号量Semaphore也可以用来进行单机环境下的并发控制。

实际应用中,可以用来限制访问某种资源的数量,比如在Hystrix中就有基于Semaphore的资源隔离策略。

public class SemaphoreTest {

    private static ExecutorService threadPool = Executors.newFixedThreadPool(100);
    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("Request processing ...");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStack();
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

9、Bucket4j

官方文档:Bucket4j

Bucket4j是一个基于令牌桶算法实现的强大的限流库,它不仅支持单机限流,还支持通过诸如 Hazelcast、Ignite、Coherence、Infinispan 或其他兼容 JCache API (JSR 107) 规范的分布式缓存实现分布式限流。

和 Guava 的限流器相比,Bucket4j 的功能显然要更胜一筹,毕竟 Guava 的目的只是用作通用工具类,而不是用于限流的。使用 Bucket4j 基本上可以满足我们的大多数要求,不仅支持单机限流和分布式限流,而且可以很好的集成监控,搭配 Prometheus 和 Grafana 简直完美。值得一提的是,有很多开源项目譬如 JHipster API Gateway 就是使用 Bucket4j 来实现限流的。

Bucket4j 唯一不足的地方是它只支持请求频率限流,不支持并发量限流,另外还有一点,虽然 Bucket4j 支持分布式限流,但它是基于 Hazelcast 这样的分布式缓存系统实现的,不能使用 Redis,这在很多使用 Redis 作缓存的项目中就很不爽,所以我们还需要在开源的世界里继续探索。

<dependency>
    <groupId>com.bucket4j</groupId>
    <artifactId>bucket4j-core</artifactId>
    <version>8.1.1</version>
</dependency>
// define the limit 1 time per 10 minute
Bandwidth limit = Bandwidth.simple(1, Duration.ofMinutes(10));
// construct the bucket
Bucket bucket = Bucket.builder().addLimit(limit).build();
...

try {
   executor.execute(anyRunnable);
} catch (RejectedExecutionException e) {
    // print stacktraces only if limit is not exceeded
    if (bucket.tryConsume(1)) {
        ThreadInfo[] stackTraces = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
        StacktraceUtils.print(stackTraces);
    }
    throw e;
}

10、Resilience4j

官网:Resilience4j

Resilience4j 是一款轻量级、易使用的高可用框架。用过 Spring Cloud 早期版本的同学肯定都听过 Netflix Hystrix,Resilience4j 的设计灵感就来自于它。自从 Hystrix 停止维护之后,官方也推荐大家使用 Resilience4j 来代替 Hystrix

Resilience4j 的底层采用 Vavr,这是一个非常轻量级的 Java 函数式库,使得 Resilience4j 非常适合函数式编程。Resilience4j 以装饰器模式提供对函数式接口或 lambda 表达式的封装,提供了一波高可用机制:重试(Retry)、熔断(Circuit Breaker)、限流(Rate Limiter)、限时(Timer Limiter)、隔离(Bulkhead)、缓存(Caceh) 和 降级(Fallback)。我们重点关注这里的两个功能:限流(Rate Limiter) 和 隔离(Bulkhead),Rate Limiter 是请求频率限流,Bulkhead 是并发量限流。

Resilience4j 提供了两种限流的实现:SemaphoreBasedRateLimiter 和 AtomicRateLimiter。SemaphoreBasedRateLimiter 基于信号量实现,用户的每次请求都会申请一个信号量,并记录申请的时间,申请通过则允许请求,申请失败则限流,另外有一个内部线程会定期扫描过期的信号量并释放,很显然这是令牌桶的算法。AtomicRateLimiter 和上面的经典实现类似,不需要额外的线程,在处理每次请求时,根据距离上次请求的时间和生成令牌的速度自动填充。

Resilience4j 在功能特性上比 Bucket4j 强大不少,而且还支持并发量限流。不过最大的遗憾是,Resilience4j 不支持分布式限流

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>2.0.2</version>
</dependency>

通过代码配置:

//限制每秒10次调用
RateLimiterConfig config = RateLimiterConfig.custom()
  .limitRefreshPeriod(Duration.ofSeconds(1))
  .limitForPeriod(10)
  .timeoutDuration(Duration.ofMillis(25))
  .build();
// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);

// Use registry
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry
  .rateLimiter("name1");
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
    .decorateCheckedRunnable(rateLimiterWithDefaultConfig, backendService::doSomething);
Try.run(restrictedCall)
    .onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));

通过属性文件配置:

resilience4j.ratelimiter:
    instances:
        backendA:
            limitForPeriod: 10
            limitRefreshPeriod: 1s
            timeoutDuration: 0
            registerHealthIndicator: true
            eventConsumerBufferSize: 100
        backendB:
            limitForPeriod: 6
            limitRefreshPeriod: 500ms
            timeoutDuration: 3s

spring注解实现流控:

@RateLimiter(name = "backendA",fallbackMethod = "fallback" )
public Mono<String> method(String param1) {
    return Mono.error(new NumberFormatException());
}

private Mono<String> fallback(String param1, IllegalArgumentException e) {
    return Mono.just("test");
}

private Mono<String> fallback(String param1, RuntimeException e) {
    return Mono.just("test");
}

总结

本文主要介绍了基于Redis通过Lua脚本实现分布式限流的几种方案。
1、4种典型的限流算法:固定时间窗口,滑动时间窗口,漏桶,令牌牌。
2、通过Lua脚本实现4种典型的分流算法。
3、其他限流实现方案介绍:Guava的RateLimiter、Redisson的RRateLimiter、Redis的redis-cell限流模块、Nginx 限流、 Spring Cloud Gateway网关限流、阿里的sentinel限流、信号量Semaphore单机并发限流、Bucket4j和Resilience4j限流。

系统设计 | 分布式限流器
5 种限流算法,7 种限流方式,挡住突发流量?
redis (3) - 各种限流方式及源码实现
超详细的Guava RateLimiter限流原理解析
ratelimiter-spring-boot-starter
Spring Cloud Gateway 限流实战,终于有人写清楚了!

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐