限流尽可能在满足需求的情况下越简单越好!

分布式限流是指在分布式系统中对请求进行限制,以防止系统过载或滥用资源。以下是常见的分布式限流策略及其实现方式:

Redis实现限流的几种方案

1、基于 Redis 的固定窗口限流

原理

  • 设定一个时间窗口(如 1 秒)
  • 使用 Redis 维护一个计数器,存储当前窗口的请求数
  • 当请求到来时,INCR 计数器,如果超过阈值则拒绝
  • 过期后自动删除键,进入下一个窗口

优缺点: ✅ 简单易实现
❌ 在窗口交界处可能会出现短时间的突发流量("临界突增")

public class RedisRateLimiter {



    private final StringRedisTemplate redisTemplate;
    // 命令前缀
    private final String key;

    private final int rate;

    private final int window;

    public RedisRateLimiter(StringRedisTemplate redisTemplate, String key, int rate, int window) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.rate = rate;
        this.window = window;
    }

    // 检查并获取令牌
    public boolean acquire() {
        String currentKey = key + "_" + (getCurrentSeconds() / window);

        Long currentCount = redisTemplate.opsForValue().increment(currentKey);

        redisTemplate.expire(currentKey, window, TimeUnit.SECONDS);
        log.info("当前获取到的令牌数 key {}  count {} result {} ",currentKey,currentCount,currentCount > rate);

        if (currentCount > rate){
            return false;
        }
        return true;
    }

    private long getCurrentSeconds() {
        return System.currentTimeMillis()/1000;
    }


    public void acquireSleep()  {
        int count = 0;
        while (!acquire()){
            sleep(1);
            count++;
        }
    }

    private void sleep(int second) {
        try {
            TimeUnit.SECONDS.sleep(second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public boolean acquireSleep(int waitSecond) {
        int count = 0;
        while (!acquire()){
            if (count >= waitSecond){
                return false;
            }
            sleep(1);
            count++;
            log.info("RedisRateLimiter[{}] try acquire sleep {}",key,count);
        }
        return true;
    }

    public static void main(String[] args) throws InterruptedException {

        ch.qos.logback.classic.Logger logger=(ch.qos.logback.classic.Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);

        logger.setLevel(Level.OFF);

        StringRedisTemplate stringRedisTemplate=getStringRedisTemplate();

        RedisRateLimiter redisRateLimiter = new RedisRateLimiter(stringRedisTemplate,"request_interface",16,10);

        // 模拟 50 个并发线程,每个线程尝试获取 10 次令牌
        final int threadCount = 50;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                // 每个线程尝试多次调用限流方法
                for (int j = 0; j < 10; j++) {
                    redisRateLimiter.acquireSleep();
                    System.out.println("当前线程:"+Thread.currentThread().getName()+",获取到令牌,时间"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
                    // 模拟每次请求间隔 100 毫秒
                    redisRateLimiter.milliseconds(100);
                }
                latch.countDown();
            });
        }
        latch.await();
        executor.shutdown();


    }

    private static StringRedisTemplate getStringRedisTemplate() {

        // 1. 创建单机模式的配置
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName("127.0.0.1");
        redisStandaloneConfiguration.setPort(6379);

        // 2. 构造 LettuceConnectionFactory,并初始化
        LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration);
        factory.afterPropertiesSet();  // 初始化连接工厂

        // 3. 创建 StringRedisTemplate 并设置连接工厂
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(factory);
        stringRedisTemplate.afterPropertiesSet();  // 初始化模板
        return stringRedisTemplate;
    }

    private void milliseconds(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


}

  当然这种做法的弊端是很多的,比如当统计1-10秒的时候,无法统计2-11秒之内,如果需要统计N秒内的M个请求,那么我们的Redis中需要保持N个key等等问题。

当前线程:pool-1-thread-30,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-6,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-18,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-35,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-38,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-37,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-33,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-44,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-3,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-45,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-5,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-20,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-11,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-43,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-15,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-29,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-4,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-16,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-12,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-24,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-26,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-8,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-14,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-49,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-42,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-21,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-1,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-10,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-31,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-50,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-36,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-48,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-9,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-19,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-47,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-2,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-34,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-46,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-41,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-22,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-17,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-27,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-28,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-32,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-25,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-13,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-40,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-23,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-39,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-7,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-34,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-13,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-2,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-22,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-28,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-46,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-25,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-19,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-9,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-32,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-39,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-17,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-47,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-41,获取到令牌,时间2025-03-15 00:17:40

2. 基于 Redis 的滑动窗口限流

统计N秒内的M个请求,那么我们的Redis中需要保持N个key问题。滑动窗口就能解决。

原理

  • 维护一个基于时间的列表(ZSET,有序集合)
  • 每次请求时,记录当前时间戳到 ZSET
  • 删除超出窗口时间范围的请求
  • 统计 ZSET 中当前窗口内的请求数,超出阈值则拒绝

优缺点: ✅ 解决了固定窗口的临界突增问题
❌ 存储和计算成本比固定窗口稍高

Lua脚本逻辑
1. 根据传入的当前时间和窗口大小计算窗口起始时间: windowStart = now - window*1000
2. 移除 ZSet 中 score 小于 windowStart 的记录
3. 统计当前窗口内的请求数量(ZCOUNT)
4. 如果当前请求数 >= rate,则返回 0 表示限流拒绝
5. 否则,添加当前请求记录(member 由当前时间和 uuid 拼接组成),设置 key 过期时间,并返回 1 表示允许请求
@Slf4j
public class RedisSlidingWindowRateLimiter {

    private final StringRedisTemplate redisTemplate;
    private final String key;
    private final int rate;
    private final int window; // 窗口长度,单位秒

    public RedisSlidingWindowRateLimiter(StringRedisTemplate redisTemplate, String key, int rate, int window) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.rate = rate;
        this.window = window;
    }

    /**
     * 使用 Lua 脚本实现滑动窗口限流,保证原子性操作
     */
    public boolean allowRequestWithLua(){
        long now = System.currentTimeMillis();
        String uuid = UUID.randomUUID().toString();

        // Lua 脚本逻辑:
        // 1. 根据传入的当前时间和窗口大小计算窗口起始时间: windowStart = now - window*1000
        // 2. 移除 ZSet 中 score 小于 windowStart 的记录
        // 3. 统计当前窗口内的请求数量(ZCOUNT)
        // 4. 如果当前请求数 >= rate,则返回 0 表示限流拒绝
        // 5. 否则,添加当前请求记录(member 由当前时间和 uuid 拼接组成),设置 key 过期时间,并返回 1 表示允许请求
        String luaScript =
                "local now = tonumber(ARGV[1])\n" +
                        "local window = tonumber(ARGV[2])\n" +
                        "local rate = tonumber(ARGV[3])\n" +
                        "local uuid = ARGV[4]\n" +
                        "local windowStart = now - window * 1000\n" +
                        "\n" +
                        "-- 移除窗口外的请求记录\n" +
                        "redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, windowStart)\n" +
                        "\n" +
                        "-- 统计当前窗口内的请求数量\n" +
                        "local count = redis.call('ZCOUNT', KEYS[1], windowStart, now)\n" +
                        "if count >= rate then\n" +
                        "    return 0\n" +
                        "end\n" +
                        "\n" +
                        "-- 添加当前请求记录,member 由当前时间和 uuid 组成\n" +
                        "local member = tostring(now) .. '_' .. uuid\n" +
                        "redis.call('ZADD', KEYS[1], now, member)\n" +
                        "\n" +
                        "-- 设置 key 的过期时间\n" +
                        "redis.call('EXPIRE', KEYS[1], window)\n" +
                        "return 1";

        byte[][] keys = new byte[][] { key.getBytes() };
        byte[][] args = new byte[][] {
                String.valueOf(now).getBytes(),
                String.valueOf(window).getBytes(),
                String.valueOf(rate).getBytes(),
                uuid.getBytes()
        };

        // 合并 keys 和 args 到一个新的数组中
        byte[][] keysAndArgs = new byte[keys.length + args.length][];
        System.arraycopy(keys, 0, keysAndArgs, 0, keys.length);
        System.arraycopy(args, 0, keysAndArgs, keys.length, args.length);

        Long result = redisTemplate.execute((RedisCallback<Long>) connection ->
                connection.eval(luaScript.getBytes(), ReturnType.INTEGER, keys.length, keysAndArgs)
        );

        return result != null && result == 1;
    }

    // 采用轮询方式等待获取令牌
    public void acquireSleep() {
        int count = 0;
        while (!allowRequestWithLua()){
            ThreadUtil.sleep(100, TimeUnit.MILLISECONDS);
            count++;
        }
    }

    public boolean acquireSleep(int waitSecond) {
        int count = 0;
        while (!allowRequestWithLua()){
            if (count >= waitSecond){
                return false;
            }
            ThreadUtil.sleep(200, TimeUnit.MILLISECONDS);
            count++;
        }
        return true;
    }

    public static void main(String[] args) throws InterruptedException {
        ch.qos.logback.classic.Logger logger=(ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);

        logger.setLevel(Level.OFF);

        StringRedisTemplate stringRedisTemplate=getStringRedisTemplate();

        RedisSlidingWindowRateLimiter redisSlidingWindowRateLimiter = new RedisSlidingWindowRateLimiter(stringRedisTemplate,"redis_sliding_window_key",16,10);

        // 模拟 50 个并发线程,每个线程尝试获取 10 次令牌
        final int threadCount = 50;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                // 每个线程尝试多次调用限流方法
                for (int j = 0; j < 10; j++) {
                    redisSlidingWindowRateLimiter.acquireSleep();
                    System.out.println("当前线程:"+Thread.currentThread().getName()+",获取到令牌,时间"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
                    // 模拟每次请求间隔 100 毫秒
                    ThreadUtil.sleep(200,TimeUnit.MILLISECONDS);
                }
                latch.countDown();
            });
        }
        latch.await();
        executor.shutdown();
    }

    private static StringRedisTemplate getStringRedisTemplate() {

        // 1. 创建单机模式的配置
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName("127.0.0.1");
        redisStandaloneConfiguration.setPort(6379);

        // 2. 构造 LettuceConnectionFactory,并初始化
        LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration);
        factory.afterPropertiesSet();  // 初始化连接工厂

        // 3. 创建 StringRedisTemplate 并设置连接工厂
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(factory);
        stringRedisTemplate.afterPropertiesSet();  // 初始化模板
        return stringRedisTemplate;
    }
}

3. 基于 Redis 的令牌桶限流

原理

  • 设定一个容量为 max_tokens 的令牌桶,初始装满
  • 以固定速率向桶中添加令牌(如每秒 10 个)
  • 每次请求需要消耗一个令牌,没有令牌时拒绝请求
  • 通常使用 Redis 的 Lua 脚本实现原子操作

优缺点: ✅ 更加平滑,支持突发流量
❌ 需要额外的定时任务或后台线程补充令牌

原理说明

  • 令牌桶算法中,设定一个桶最大容量 capacity,同时以一定速率 refillRate 补充令牌。
  • 每次请求需要消耗一个令牌,若当前桶内令牌不足,则拒绝请求。
  • 为保证原子性,利用 Redis 的 Lua 脚本将令牌获取和补充过程封装为原子操作。
@Slf4j
public class RedisTokenBucketRateLimiter {

    private final StringRedisTemplate redisTemplate;
    private final String key;
    // 桶的容量(最大令牌数)
    private final int capacity;
    // 令牌补充速率,单位:个/秒
    private final double refillRate;

    // Lua 脚本,用于原子化处理令牌桶逻辑
    private static final String LUA_SCRIPT =
            "local tokens_key = KEYS[1] .. ':tokens' \n" +
                    "local timestamp_key = KEYS[1] .. ':ts' \n" +
                    "local capacity = tonumber(ARGV[1]) \n" +
                    "local refill_rate = tonumber(ARGV[2]) \n" +
                    "local current_time = tonumber(ARGV[3]) \n" +
                    "local requested = tonumber(ARGV[4]) \n" +
                    "local tokens = tonumber(redis.call('get', tokens_key) or capacity) \n" +
                    "local last_refill = tonumber(redis.call('get', timestamp_key) or current_time) \n" +
                    "local delta = current_time - last_refill \n" +
                    "local tokens_to_add = delta * refill_rate \n" +
                    "tokens = math.min(capacity, tokens + tokens_to_add) \n" +
                    "if tokens < requested then \n" +
                    "   return 0 \n" +
                    "else \n" +
                    "   tokens = tokens - requested \n" +
                    "   redis.call('set', tokens_key, tokens) \n" +
                    "   redis.call('set', timestamp_key, current_time) \n" +
                    "   return 1 \n" +
                    "end";

    public RedisTokenBucketRateLimiter(StringRedisTemplate redisTemplate, String key, int capacity, double refillRate) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.capacity = capacity;
        this.refillRate = refillRate;
    }

    // 检查并获取令牌
    public boolean acquire() {
        // 当前时间(单位秒)
        long currentTime = System.currentTimeMillis() / 1000;


        // 请求消耗 1 个令牌
        Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
            byte[][] keys = new byte[][] { key.getBytes() };
            byte[][] args = new byte[][] {
                    String.valueOf(capacity).getBytes(),
                    String.valueOf(refillRate).getBytes(),
                    String.valueOf(currentTime).getBytes(),
                    "1".getBytes()
            };

            // 合并 keys 和 args 到一个新的数组中
            byte[][] keysAndArgs = new byte[keys.length + args.length][];
            System.arraycopy(keys, 0, keysAndArgs, 0, keys.length);
            System.arraycopy(args, 0, keysAndArgs, keys.length, args.length);
            return connection.eval(LUA_SCRIPT.getBytes(), ReturnType.INTEGER, keys.length,keysAndArgs);
        });
        return result != null && result == 1;
    }

    // 采用轮询方式等待获取令牌
    public void acquireSleep() {
        int count = 0;
        while (!acquire()){
            ThreadUtil.sleep(100, TimeUnit.MILLISECONDS);
            count++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ch.qos.logback.classic.Logger logger=(ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);

        logger.setLevel(Level.OFF);

        StringRedisTemplate stringRedisTemplate=getStringRedisTemplate();

        RedisTokenBucketRateLimiter redisTokenBucketRateLimiter = new RedisTokenBucketRateLimiter(stringRedisTemplate,"redisTokenBucketRateLimiter",100,100.0/60.0);

        // 模拟 50 个并发线程,每个线程尝试获取 10 次令牌
        final int threadCount = 50;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                // 每个线程尝试多次调用限流方法
                for (int j = 0; j < 10; j++) {
                    redisTokenBucketRateLimiter.acquireSleep();
                    System.out.println("当前线程:"+Thread.currentThread().getName()+",获取到令牌,时间"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
                    // 模拟每次请求间隔 100 毫秒
                    ThreadUtil.sleep(200,TimeUnit.MILLISECONDS);
                }
                latch.countDown();
            });
        }
        latch.await();
        executor.shutdown();
    }

    private static StringRedisTemplate getStringRedisTemplate() {

        // 1. 创建单机模式的配置
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName("127.0.0.1");
        redisStandaloneConfiguration.setPort(6379);

        // 2. 构造 LettuceConnectionFactory,并初始化
        LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration);
        factory.afterPropertiesSet();  // 初始化连接工厂

        // 3. 创建 StringRedisTemplate 并设置连接工厂
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(factory);
        stringRedisTemplate.afterPropertiesSet();  // 初始化模板
        return stringRedisTemplate;
    }
}

lua代码说明

local tokens_key = KEYS[1] .. ':tokens' 
local timestamp_key = KEYS[1] .. ':ts' 
local capacity = tonumber(ARGV[1]) 
local refill_rate = tonumber(ARGV[2]) 
local current_time = tonumber(ARGV[3]) 
local requested = tonumber(ARGV[4]) 
local tokens = tonumber(redis.call('get', tokens_key) or capacity) 
local last_refill = tonumber(redis.call('get', timestamp_key) or current_time) 
local delta = current_time - last_refill 
local tokens_to_add = delta * refill_rate 
tokens = math.min(capacity, tokens + tokens_to_add) 
if tokens < requested then 
   return 0 
else 
   tokens = tokens - requested 
   redis.call('set', tokens_key, tokens) 
   redis.call('set', timestamp_key, current_time) 
   return 1 
end
  1. 键的构造

    • tokens_key:通过传入的 key 加上后缀 :tokens,用于存储当前桶中剩余的令牌数。
    • timestamp_key:同样拼接上 :ts,记录上一次令牌补充的时间。
  2. 参数转换

    • capacity:令牌桶的最大容量,代表桶中最多可以存储多少令牌。
    • refill_rate:令牌补充速率,通常表示每秒钟补充的令牌数。
    • current_time:当前时间(需要与 refill_rate 配合)。
    • requested:本次请求需要消耗的令牌数量(通常为 1)。
  3. 获取当前状态

    • 从 Redis 中获取当前令牌数 tokens,如果不存在则初始化为桶的最大容量。
    • 获取上一次令牌补充的时间 last_refill,若不存在则设置为当前时间。
  4. 计算令牌补充

    • delta:计算自上次令牌补充以来经过的时间。
    • tokens_to_add:根据时间差和补充速率计算应补充的令牌数。
    • 更新令牌数:将当前令牌数加上补充的令牌数,但不会超过桶的最大容量(利用 math.min(capacity, tokens + tokens_to_add))。
  5. 判断是否有足够令牌

    • 如果当前令牌数小于本次请求所需的令牌数,则返回 0 表示请求被拒绝。
  6. 允许请求并更新状态

    • 如果有足够的令牌,则从令牌数中扣除本次请求需要的令牌数量,并更新 Redis 中 tokens_keytimestamp_key 为最新的令牌数和当前时间,返回 1 表示允许请求。

这种令牌桶算法的核心在于:令牌以固定速率补充到桶中,当请求到达时,从桶中扣除一定数量的令牌。如果桶中令牌不足,则拒绝请求,从而达到平滑限流的效果。

  • Lua 脚本总结
    • 获取当前桶中剩余令牌数和上次补充时间,若不存在则默认初始化为满桶状态。
    • 根据当前时间与上次更新时间的差值计算应补充的令牌数,并更新桶内令牌。
    • 判断是否有足够令牌供本次请求(默认请求 1 个令牌),若不足返回 0,否则扣减令牌并更新上次补充时间,返回 1。
  • 原子执行:通过 redisTemplate 的 eval 方法保证 Lua 脚本的原子性,避免并发问题。

4. 基于 Redis 的漏桶限流

原理

  • 设定一个队列模拟漏桶
  • 按固定速率从队列取出请求执行
  • 请求过多时,超出队列长度的请求被丢弃

优缺点: ✅ 输出速率稳定,不受突发流量影响
❌ 可能会丢弃部分流量

原理说明

  • 漏桶算法中,将请求看作向桶中注入的“水”,桶以固定速率漏水(处理请求)。
  • 当桶中水量超过预设容量时,则拒绝新请求。
  • 同样利用 Lua 脚本保证原子操作。
@Slf4j
public class RedisLeakyBucketRateLimiter {

    private final StringRedisTemplate redisTemplate;
    private final String key;
    // 桶的容量(允许的最大突发请求数)
    private final int capacity;
    // 漏水速率,单位:个/秒,表示每秒可处理的请求数
    private final double leakRate;

    // Lua 脚本,用于原子化处理漏桶逻辑
    private static final String LUA_SCRIPT =
            "local level_key = KEYS[1] .. ':level' \n" +
                    "local timestamp_key = KEYS[1] .. ':ts' \n" +
                    "local capacity = tonumber(ARGV[1]) \n" +
                    "local leak_rate = tonumber(ARGV[2]) \n" +
                    "local current_time = tonumber(ARGV[3]) \n" +
                    "local level = tonumber(redis.call('get', level_key) or '0') \n" +
                    "local last_time = tonumber(redis.call('get', timestamp_key) or current_time) \n" +
                    "local delta = current_time - last_time \n" +
                    "local leaked = delta * leak_rate \n" +
                    // 计算漏水后桶内水量,不能低于 0
                    "level = math.max(0, level - leaked) \n" +
                    "if level + 1 > capacity then \n" +
                    "   return 0 \n" +
                    "else \n" +
                    "   level = level + 1 \n" +
                    "   redis.call('set', level_key, level) \n" +
                    "   redis.call('set', timestamp_key, current_time) \n" +
                    "   return 1 \n" +
                    "end";

    public RedisLeakyBucketRateLimiter(StringRedisTemplate redisTemplate, String key, int capacity, double leakRate) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.capacity = capacity;
        this.leakRate = leakRate;
    }

    // 检查并获取请求处理资格
    public boolean acquire() {
        // 当前时间(单位秒)
        long currentTime = System.currentTimeMillis() / 1000;
        Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {

            byte[][] keys = new byte[][] { key.getBytes() };
            byte[][] args = new byte[][] {
                    String.valueOf(capacity).getBytes(),
                    String.valueOf(leakRate).getBytes(),
                    String.valueOf(currentTime).getBytes(),
            };

            // 合并 keys 和 args 到一个新的数组中
            byte[][] keysAndArgs = new byte[keys.length + args.length][];
            System.arraycopy(keys, 0, keysAndArgs, 0, keys.length);
            System.arraycopy(args, 0, keysAndArgs, keys.length, args.length);

            return connection.eval(LUA_SCRIPT.getBytes(), ReturnType.INTEGER, keys.length,keysAndArgs);
        });
        return result != null && result == 1;
    }


    // 采用轮询方式等待获取令牌
    public void acquireSleep() {
        int count = 0;
        while (!acquire()){
            ThreadUtil.sleep(100, TimeUnit.MILLISECONDS);
            count++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ch.qos.logback.classic.Logger logger=(ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);

        logger.setLevel(Level.OFF);

        StringRedisTemplate stringRedisTemplate=getStringRedisTemplate();

        RedisLeakyBucketRateLimiter redisLeakyBucketRateLimiter = new RedisLeakyBucketRateLimiter(stringRedisTemplate,"redisTokenBucketRateLimiter",100,100.0/60.0);

        // 模拟 50 个并发线程,每个线程尝试获取 10 次令牌
        final int threadCount = 50;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                // 每个线程尝试多次调用限流方法
                for (int j = 0; j < 10; j++) {
                    redisLeakyBucketRateLimiter.acquireSleep();
                    System.out.println("当前线程:"+Thread.currentThread().getName()+",获取到令牌,时间"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
                    // 模拟每次请求间隔 100 毫秒
                    ThreadUtil.sleep(200,TimeUnit.MILLISECONDS);
                }
                latch.countDown();
            });
        }
        latch.await();
        executor.shutdown();
    }

    private static StringRedisTemplate getStringRedisTemplate() {

        // 1. 创建单机模式的配置
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName("127.0.0.1");
        redisStandaloneConfiguration.setPort(6379);

        // 2. 构造 LettuceConnectionFactory,并初始化
        LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration);
        factory.afterPropertiesSet();  // 初始化连接工厂

        // 3. 创建 StringRedisTemplate 并设置连接工厂
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(factory);
        stringRedisTemplate.afterPropertiesSet();  // 初始化模板
        return stringRedisTemplate;
    }

}

代码说明

local level_key = KEYS[1] .. ':level' 
local timestamp_key = KEYS[1] .. ':ts' 
local capacity = tonumber(ARGV[1]) 
local leak_rate = tonumber(ARGV[2]) 
local current_time = tonumber(ARGV[3]) 
local level = tonumber(redis.call('get', level_key) or '0') 
local last_time = tonumber(redis.call('get', timestamp_key) or current_time) 
local delta = current_time - last_time 
local leaked = delta * leak_rate 
-- 计算漏水后桶内水量,不能低于 0
level = math.max(0, level - leaked) 
if level + 1 > capacity then 
   return 0 
else 
   level = level + 1 
   redis.call('set', level_key, level) 
   redis.call('set', timestamp_key, current_time) 
   return 1 
end

详细说明

  1. 键的构造

    • level_key:使用传入的第一个 key(例如“myBucket”)加上后缀 :level,用于保存当前桶中水量(即当前请求计数)。
    • timestamp_key:同样拼接上 :ts,用于记录上一次状态更新的时间。
  2. 参数转换

    • capacity:桶的最大容量(最大允许的水量)。
    • leak_rate:漏水速率,每秒钟流出的水量。
    • current_time:当前的时间(通常以秒或毫秒为单位,需与 leak_rate 配合)。
  3. 获取当前状态

    • 从 Redis 中获取当前桶内的水量 level,如果不存在则默认是 0。
    • 获取上次更新时间 last_time,若不存在则默认为当前时间。
  4. 计算水量流失

    • delta 计算从上次更新时间到当前的时间差。
    • leaked 根据时间差和漏水速率计算出在这段时间内流失的水量。
  5. 更新桶中水量

    • 利用 math.max(0, level - leaked) 计算更新后的桶内水量,确保不会小于 0。
  6. 判断是否超过容量

    • 检查更新后的水量加上本次请求的 1 个单位是否超过桶的容量。如果超过则返回 0,表示拒绝当前请求。
  7. 允许请求并更新状态

    • 如果加上本次请求后的水量不超过容量,则将桶内水量加 1,并更新 Redis 中的 level_keytimestamp_key 为当前状态和时间,最后返回 1 表示允许请求。

这种漏桶算法的思路在于:请求像水滴一样进入桶,桶以固定速率漏水。如果请求太频繁,桶内水量会快速累积到超过容量,从而拒绝请求。

  • Lua 脚本总结
    • 从 Redis 中获取当前桶内水量(即请求数量)和上次更新的时间。
    • 根据当前时间与上次更新时间的差值和设定的漏水速率计算“漏掉”的水量,并更新桶内水量(不能低于 0)。
    • 判断加入当前请求后是否超过桶的容量,超过则返回 0(拒绝),否则将水量加 1 并更新记录,返回 1 表示允许。
  • 原子执行:同样通过 eval 方法保证操作原子性,避免并发修改问题。

总结

  • 滑动窗口:使用 Redis ZSet 记录请求时间戳,动态统计窗口内请求数,平滑控制突发流量。
  • 令牌桶:通过 Lua 脚本实现令牌的自动补充和扣减,支持一定的突发请求。
  • 漏桶:用固定漏水速率保证请求以均匀的速率被处理,避免瞬间大量请求。

平滑限流

使用Guava RateLimiter实现平滑限流

这个刚好满足我的业务(请求爬虫接口1分钟平滑请求100次,不允许突发流量)

pom依赖

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>32.0.1-jre</version>
</dependency>           
    RateLimiter limiter = RateLimiter.create(100.0 / 60);


    @Test
    public void test1() throws InterruptedException {
        final int threadCount = 50;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++){
            executor.submit(() -> {
                // 每个线程尝试多次调用限流方法
                for (int j = 0; j < 10; j++) {
                    limiter.acquire();
                    System.out.println("当前线程:"+Thread.currentThread().getName()+",获取到令牌,时间"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
                }
                latch.countDown();
            });
        }
        latch.await();
        executor.shutdown();
    }

那么有个思考题,这个满足单机执行是没问题的。如果是集群呢

案例:当前有一个下载服务 6台机器部署 ,其中下载视频 有一个需要先拿到链接解析的请求。这个请求是请求第三方的 第三方要求1分钟不超过800请求。

给出以下解决办法可以尝试下。(我感觉2是最简单并且满足要求的,尽可能把业务满足要求的情况下做简单点)

  1. 全局限流

    利用 Redis 或 Redisson 实现一个全局令牌桶,使整个集群每分钟只发放800个令牌。所有节点都从这个共享的令牌桶中申请令牌。

  2. 本地平滑限流

    假设你有6台机器,那么理论上每台机器分摊到大约800/6 ≈ 133个令牌。如果各节点请求量较为均衡,你可以在每台机器上再用一个本地限流器(例如 Guava RateLimiter),设置速率为 133 次/分钟(或约 2.22 次/秒),这样即使令牌获取是全局共享的,每台机器也能平滑地发出自己的请求。

  3. 使用 Redisson 的 RRateLimiter

    Redisson 提供了分布式限流器 RRateLimiter,支持阻塞式获取令牌。你可以全局配置为 800 次/分钟,所有节点调用 acquire() 方法时,共享的令牌桶会根据设定速率发放令牌。但需要注意,默认情况下,令牌的分配并不会自动保证各节点的均匀性——如果某台机器调用更频繁,它可能会获得更多令牌。

点击阅读全文
Logo

一起探索未来云端世界的核心,云原生技术专区带您领略创新、高效和可扩展的云计算解决方案,引领您在数字化时代的成功之路。

更多推荐