《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门,即可获取!

20.198239

30.196083

40.200609

可以看出限流器创建之后,初始会有一个令牌,然后每隔 200 毫秒生成一个令牌,所以第一次请求直接返回 0,后面的请求都会阻塞大约 200 毫秒。另外,SmoothBursty 还具有应对突发的能力,而且 还允许消费未来的令牌,比如下面的例子:

1RateLimiter limiter = RateLimiter.create(5);

2System.out.println(limiter.acquire(10));

3System.out.println(limiter.acquire(1));

4System.out.println(limiter.acquire(1));

会得到类似下面的输出:

10.0

21.997428

30.192273

40.200616

限流器创建之后,初始令牌只有一个,但是我们请求 10 个令牌竟然也通过了,只不过看后面请求发现,第二次请求花了 2 秒左右的时间把前面的透支的令牌给补上了。

Guava 支持的另一种限流方式是平滑预热限流器(SmoothWarmingUp),可以通过下面的方法创建:

1RateLimiter limiter = RateLimiter.create(2, 3, TimeUnit.SECONDS);

2System.out.println(limiter.acquire(1));

3System.out.println(limiter.acquire(1));

4System.out.println(limiter.acquire(1));

5System.out.println(limiter.acquire(1));

6System.out.println(limiter.acquire(1));

第一个参数还是每秒创建的令牌数量,这里是每秒 2 个,也就是每 500 毫秒生成一个,后面的参数表示从冷启动速率过渡到平均速率的时间间隔,也就是所谓的热身时间间隔(warm up period)。我们看下输出结果:

10.0

21.329289

30.994375

40.662888

50.501287

第一个请求还是立即得到令牌,但是后面的请求和上面平滑突发限流就完全不一样了,按理来说 500 毫秒就会生成一个令牌,但是我们发现第二个请求却等了 1.3s,而不是 0.5s,后面第三个和第四个请求也等了一段时间。不过可以看出,等待时间在慢慢的接近 0.5s,直到第五个请求等待时间才开始变得正常。从第一个请求到第五个请求,这中间的时间间隔就是热身阶段,可以算出热身的时间就是我们设置的 3 秒。

3.2 Bucket4j

Bucket4j是一个基于令牌桶算法实现的强大的限流库,它不仅支持单机限流,还支持通过诸如 Hazelcast、Ignite、Coherence、Infinispan 或其他兼容 JCache API (JSR 107) 规范的分布式缓存实现分布式限流。

在使用 Bucket4j 之前,我们有必要先了解 Bucket4j 中的几个核心概念:

  • Bucket

  • Bandwidth

  • Refill

Bucket 接口代表了令牌桶的具体实现,也是我们操作的入口。它提供了诸如 tryConsume 和 tryConsumeAndReturnRemaining 这样的方法供我们消费令牌。可以通过下面的构造方法来创建Bucket:

1Bucket bucket = Bucket4j.builder().addLimit(limit).build();

2if(bucket.tryConsume(1)) {

3 System.out.println(“ok”);

4} else {

5 System.out.println(“error”);

6}

Bandwidth 的意思是带宽, 可以理解为限流的规则。Bucket4j 提供了两种方法来创建 Bandwidth:simple 和 classic。下面是 simple 方式创建的 Bandwidth,表示桶大小为 10,填充速度为每分钟 10 个令牌:

1Bandwidth limit = Bandwidth.simple(10, Duration.ofMinutes(1));

simple方式桶大小和填充速度是一样的,classic 方式更灵活一点,可以自定义填充速度,下面的例子表示桶大小为 10,填充速度为每分钟 5 个令牌:

1Refill filler = Refill.greedy(5, Duration.ofMinutes(1));

2Bandwidth limit = Bandwidth.classic(10, filler);

其中,Refill 用于填充令牌桶,可以通过它定义填充速度,Bucket4j 有两种填充令牌的策略:间隔策略(intervally) 和 贪婪策略(greedy)。在上面的例子中我们使用的是贪婪策略,如果使用间隔策略可以像下面这样创建 Refill:

1Refill filler = Refill.intervally(5, Duration.ofMinutes(1));

所谓间隔策略指的是每隔一段时间,一次性的填充所有令牌,比如上面的例子,会每隔一分钟,填充 5 个令牌,如下所示:

而贪婪策略会尽可能贪婪的填充令牌,同样是上面的例子,会将一分钟划分成 5 个更小的时间单元,每隔 12 秒,填充 1 个令牌,如下所示:

在了解了 Bucket4j 中的几个核心概念之后,我们再来看看官网介绍的一些特性:

  • 基于令牌桶算法

  • 高性能,无锁实现

  • 不存在精度问题,所有计算都是基于整型的

  • 支持通过符合 JCache API 规范的分布式缓存系统实现分布式限流

  • 支持为每个 Bucket 设置多个 Bandwidth

  • 支持同步和异步 API

  • 支持可插拔的监听 API,用于集成监控和日志

  • 不仅可以用于限流,还可以用于简单的调度

Bucket4j 提供了丰富的文档,推荐在使用 Bucket4j 之前,先把官方文档中的 基本用法 和 高级特性 仔细阅读一遍。另外,关于 Bucket4j 的使用,推荐这篇文章 Rate limiting Spring MVC endpoints with bucket4j,这篇文章详细的讲解了如何在 Spring MVC 中使用拦截器和 Bucket4j 打造业务无侵入的限流方案,另外还讲解了如何使用 Hazelcast 实现分布式限流;另外,Rate Limiting a Spring API Using Bucket4j 这篇文章也是一份很好的入门教程,介绍了 Bucket4j 的基础知识,在文章的最后还提供了 Spring Boot Starter 的集成方式,结合 Spring Boot Actuator 很容易将限流指标集成到监控系统中。

和 Guava 的限流器相比,Bucket4j 的功能显然要更胜一筹,毕竟 Guava 的目的只是用作通用工具类,而不是用于限流的。使用 Bucket4j 基本上可以满足我们的大多数要求,不仅支持单机限流和分布式限流,而且可以很好的集成监控,搭配 Prometheus 和 Grafana 简直完美。值得一提的是,有很多开源项目譬如 JHipster API Gateway 就是使用 Bucket4j 来实现限流的。

Bucket4j 唯一不足的地方是它只支持请求频率限流,不支持并发量限流,另外还有一点,虽然 Bucket4j 支持分布式限流,但它是基于 Hazelcast 这样的分布式缓存系统实现的,不能使用 Redis,这在很多使用 Redis 作缓存的项目中就很不爽,所以我们还需要在开源的世界里继续探索。

3.3 Resilience4j

Resilience4j 是一款轻量级、易使用的高可用框架。用过 Spring Cloud 早期版本的同学肯定都听过 Netflix Hystrix,Resilience4j 的设计灵感就来自于它。自从 Hystrix 停止维护之后,官方也推荐大家使用 Resilience4j 来代替 Hystrix。

Resilience4j 的底层采用 Vavr,这是一个非常轻量级的 Java 函数式库,使得 Resilience4j 非常适合函数式编程。Resilience4j 以装饰器模式提供对函数式接口或 lambda 表达式的封装,提供了一波高可用机制:重试(Retry)熔断(Circuit Breaker)限流(Rate Limiter)限时(Timer Limiter)隔离(Bulkhead)缓存(Caceh)降级(Fallback)。我们重点关注这里的两个功能:限流(Rate Limiter) 和 隔离(Bulkhead),Rate Limiter 是请求频率限流,Bulkhead 是并发量限流。

Resilience4j 提供了两种限流的实现:SemaphoreBasedRateLimiterAtomicRateLimiterSemaphoreBasedRateLimiter 基于信号量实现,用户的每次请求都会申请一个信号量,并记录申请的时间,申请通过则允许请求,申请失败则限流,另外有一个内部线程会定期扫描过期的信号量并释放,很显然这是令牌桶的算法。AtomicRateLimiter 和上面的经典实现类似,不需要额外的线程,在处理每次请求时,根据距离上次请求的时间和生成令牌的速度自动填充。关于这二者的区别可以参考文章 Rate Limiter Internals in Resilience4j。

Resilience4j 也提供了两种隔离的实现:SemaphoreBulkheadThreadPoolBulkhead,通过信号量或线程池控制请求的并发数,具体的用法参考官方文档,这里不再赘述。

下面是一个同时使用限流和隔离的例子:

1// 创建一个 Bulkhead,最大并发量为 150

2BulkheadConfig bulkheadConfig = BulkheadConfig.custom()

3 .maxConcurrentCalls(150)

4 .maxWaitTime(100)

5 .build();

6Bulkhead bulkhead = Bulkhead.of(“backendName”, bulkheadConfig);

7

8// 创建一个 RateLimiter,每秒允许一次请求

9RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom()

10 .timeoutDuration(Duration.ofMillis(100))

11 .limitRefreshPeriod(Duration.ofSeconds(1))

12 .limitForPeriod(1)

13 .build();

14RateLimiter rateLimiter = RateLimiter.of(“backendName”, rateLimiterConfig);

15

16// 使用 Bulkhead 和 RateLimiter 装饰业务逻辑

17Supplier supplier = () -> backendService.doSomething();

18Supplier decoratedSupplier = Decorators.ofSupplier(supplier)

19 .withBulkhead(bulkhead)

20 .withRateLimiter(rateLimiter)

21 .decorate();

22

23// 调用业务逻辑

24Try try = Try.ofSupplier(decoratedSupplier);

25assertThat(try.isSuccess()).isTrue();

Resilience4j 在功能特性上比 Bucket4j 强大不少,而且还支持并发量限流。不过最大的遗憾是,Resilience4j 不支持分布式限流。

3.4 其他

网上还有很多限流相关的开源项目,不可能一一介绍,这里列出来的只是冰山之一角:

可以看出,限流技术在实际项目中应用非常广泛,大家对实现自己的限流算法乐此不疲,新算法和新实现层出不穷。但是找来找去,目前还没有找到一款开源项目完全满足我的需求。

我的需求其实很简单,需要同时满足两种不同的限流场景:请求频率限流和并发量限流,并且能同时满足两种不同的限流架构:单机限流和分布式限流。下面我们就开始在 Spring Cloud Gateway 中实现这几种限流,通过前面介绍的那些项目,我们取长补短,基本上都能用比较成熟的技术实现,只不过对于最后一种情况,分布式并发量限流,网上没有搜到现成的解决方案,在和同事讨论了几个晚上之后,想出一种新型的基于双窗口滑动的限流算法,我在这里抛砖引玉,欢迎大家批评指正,如果大家有更好的方法,也欢迎讨论。

四、在网关中实现限流


在文章一开始介绍 Spring Cloud Gateway 的特性时,我们注意到其中有一条 Request Rate Limiting,说明网关自带了限流的功能,但是 Spring Cloud Gateway 自带的限流有很多限制,譬如不支持单机限流,不支持并发量限流,而且它的请求频率限流也是不尽人意,这些都需要我们自己动手来解决。

4.1 实现单机请求频率限流

Spring Cloud Gateway 中定义了关于限流的一个接口 RateLimiter,如下:

1public interface RateLimiter extends StatefulConfigurable {

2 Mono<RateLimiter.Response> isAllowed(String routeId, String id);

3}

这个接口就一个方法 isAllowed,第一个参数 routeId 表示请求路由的 ID,根据 routeId 可以获取限流相关的配置,第二个参数 id 表示要限流的对象的唯一标识,可以是用户名,也可以是 IP,或者其他的可以从 ServerWebExchange 中得到的信息。我们看下 RequestRateLimiterGatewayFilterFactory 中对 isAllowed 的调用逻辑:

1@Override

2public GatewayFilter apply(Config config) {

3 // 从配置中得到 KeyResolver

4 KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);

5 // 从配置中得到 RateLimiter

6 RateLimiter limiter = getOrDefault(config.rateLimiter,

7 defaultRateLimiter);

8 boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);

9 HttpStatusHolder emptyKeyStatus = HttpStatusHolder

10 .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

11

12 return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY)

13 .flatMap(key -> {

14 // 通过 KeyResolver 得到 key,作为唯一标识 id 传入 isAllowed() 方法

15 if (EMPTY_KEY.equals(key)) {

16 if (denyEmpty) {

17 setResponseStatus(exchange, emptyKeyStatus);

18 return exchange.getResponse().setComplete();

19 }

20 return chain.filter(exchange);

21 }

22 // 获取当前路由 ID,作为 routeId 参数传入 isAllowed() 方法

23 String routeId = config.getRouteId();

24 if (routeId == null) {

25 Route route = exchange

26 .getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);

27 routeId = route.getId();

28 }

29 return limiter.isAllowed(routeId, key).flatMap(response -> {

30

31 for (Map.Entry<String, String> header : response.getHeaders()

32 .entrySet()) {

33 exchange.getResponse().getHeaders().add(header.getKey(),

34 header.getValue());

35 }

36 // 请求允许,直接走到下一个 filter

37 if (response.isAllowed()) {

38 return chain.filter(exchange);

39 }

40 // 请求被限流,返回设置的 HTTP 状态码(默认是 429)

41 setResponseStatus(exchange, config.getStatusCode());

42 return exchange.getResponse().setComplete();

43 });

44 });

45}

从上面的的逻辑可以看出,通过实现 KeyResolver 接口的 resolve 方法就可以自定义要限流的对象了。

1public interface KeyResolver {

2 Mono resolve(ServerWebExchange exchange);

3}

比如下面的 HostAddrKeyResolver 可以根据 IP 来限流:

1public interface KeyResolver {

2 Mono resolve(ServerWebExchange exchange);

3}

4比如下面的 HostAddrKeyResolver 可以根据 IP 来限流:

5public class HostAddrKeyResolver implements KeyResolver {

6 @Override

7 public Mono resolve(ServerWebExchange exchange) {

8 return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());

9 }

10}

我们继续看 Spring Cloud Gateway 的代码发现,RateLimiter 接口只提供了一个实现类 RedisRateLimiter:

很显然是基于 Redis 实现的限流,虽说通过 Redis 也可以实现单机限流,但是总感觉有些大材小用,而且对于那些没有 Redis 的环境很不友好。所以,我们要实现真正的本地限流。

我们从 Spring Cloud Gateway 的 pull request 中发现了一个新特性 Feature/local-rate-limiter,而且看提交记录,这个新特性很有可能会合并到 3.0.0 版本中。我们不妨来看下这个 local-rate-limiter 的实现:LocalRateLimiter.java,可以看出它是基于 Resilience4有意思的是,这个类 还有一个早期版本,是基于 Bucket4j 实现的:

1public Mono isAllowed(String routeId, String id) {

2 Config routeConfig = loadConfiguration(routeId);

3

4 // How many requests per second do you want a user to be allowed to do?

5 int replenishRate = routeConfig.getReplenishRate();

6

7 // How many seconds for a token refresh?

8 int refreshPeriod = routeConfig.getRefreshPeriod();

9

10 // How many tokens are requested per request?

11 int requestedTokens = routeConfig.getRequestedTokens();

12

13 final io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = RateLimiterRegistry

14 .ofDefaults()

15 .rateLimiter(id, createRateLimiterConfig(refreshPeriod, replenishRate));

16

17 final boolean allowed = rateLimiter.acquirePermission(requestedTokens);

18 final Long tokensLeft = (long) rateLimiter.getMetrics().getAvailablePermissions();

19

20 Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));

21 return Mono.just(response);

22}

有意思的是,这个类 还有一个早期版本,是基于 Bucket4j 实现的:

1public Mono isAllowed(String routeId, String id) {

2

3 Config routeConfig = loadConfiguration(routeId);

4

5 // How many requests per second do you want a user to be allowed to do?

6 int replenishRate = routeConfig.getReplenishRate();

7

8 // How much bursting do you want to allow?

9 int burstCapacity = routeConfig.getBurstCapacity();

10

11 // How many tokens are requested per request?

12 int requestedTokens = routeConfig.getRequestedTokens();

13

14 final Bucket bucket = bucketMap.computeIfAbsent(id,

15 (key) -> createBucket(replenishRate, burstCapacity));

16

17 final boolean allowed = bucket.tryConsume(requestedTokens);

18

19 Response response = new Response(allowed,

20 getHeaders(routeConfig, bucket.getAvailableTokens()));

21 return Mono.just(response);

22}

实现方式都是类似的,在上面对 Bucket4j 和 Resilience4j 已经作了比较详细的介绍,这里不再赘述。不过从这里也可以看出 Spring 生态圈对 Resilience4j 是比较看好的,我们也可以将其引入到我们的项目中。

4.2 实现分布式请求频率限流

上面介绍了如何实现单机请求频率限流,接下来再看下分布式请求频率限流。这个就比较简单了,因为上面说了,Spring Cloud Gateway 自带了一个限流实现,就是 RedisRateLimiter,可以用于分布式限流。它的实现原理依然是基于令牌桶算法的,不过实现逻辑是放在一段 lua 脚本中的,我们可以在 src/main/resources/META-INF/scripts 目录下找到该脚本文件 request_rate_limiter.lua:

1local tokens_key = KEYS[1]

2local timestamp_key = KEYS[2]

3

4local rate = tonumber(ARGV[1])

5local capacity = tonumber(ARGV[2])

6local now = tonumber(ARGV[3])

7local requested = tonumber(ARGV[4])

8

9local fill_time = capacity/rate

10local ttl = math.floor(fill_time*2)

11

12local last_tokens = tonumber(redis.call(“get”, tokens_key))

13if last_tokens == nil then

14 last_tokens = capacity

15end

16

17local last_refreshed = tonumber(redis.call(“get”, timestamp_key))

18if last_refreshed == nil then

19 last_refreshed = 0

20end

21

22local delta = math.max(0, now-last_refreshed)

23local filled_tokens = math.min(capacity, last_tokens+(delta*rate))

24local allowed = filled_tokens >= requested

25local new_tokens = filled_tokens

26local allowed_num = 0

27if allowed then

28 new_tokens = filled_tokens - requested

29 allowed_num = 1

30end

31

32if ttl > 0 then

33 redis.call(“setex”, tokens_key, ttl, new_tokens)

34 redis.call(“setex”, timestamp_key, ttl, now)

35end

36

37return { allowed_num, new_tokens }

这段代码和上面介绍令牌桶算法时用 Java 实现的那段经典代码几乎是一样的。这里使用 lua 脚本,主要是利用了 Redis 的单线程特性,以及执行 lua 脚本的原子性,避免了并发访问时可能出现请求量超出上限的现象。想象目前令牌桶中还剩 1 个令牌,此时有两个请求同时到来,判断令牌是否足够也是同时的,两个请求都认为还剩 1 个令牌,于是两个请求都被允许了。

有两种方式来配置 Spring Cloud Gateway 自带的限流。第一种方式是通过配置文件,比如下面所示的代码,可以对某个 route 进行限流:

1spring:

2 cloud:

3 gateway:

4 routes:

5 - id: test

6 uri: http://httpbin.org:80/get

7 filters:

8 - name: RequestRateLimiter

9 args:

10 key-resolver: ‘#{@hostAddrKeyResolver}’

11 redis-rate-limiter.replenishRate: 1

12 redis-rate-limiter.burstCapacity: 3

其中,key-resolver 使用 SpEL 表达式 #{@beanName} 从 Spring 容器中获取 hostAddrKeyResolver 对象,burstCapacity 表示令牌桶的大小,replenishRate 表示每秒往桶中填充多少个令牌,也就是填充速度。

第二种方式是通过下面的代码来配置:

1@Bean

2public RouteLocator myRoutes(RouteLocatorBuilder builder) {

3 return builder.routes()

4 .route(p -> p

5 .path(“/get”)

6 .filters(filter -> filter.requestRateLimiter()

7 .rateLimiter(RedisRateLimiter.class, rl -> rl.setBurstCapacity(3).setReplenishRate(1)).and())

8 .uri(“http://httpbin.org:80”))

9 .build();

10}

这样就可以对某个 route 进行限流了。但是这里有一点要注意,Spring Cloud Gateway 自带的限流器有一个很大的坑,replenishRate 不支持设置小数,也就是说往桶中填充的 token 的速度最少为每秒 1 个,所以,如果我的限流规则是每分钟 10 个请求(按理说应该每 6 秒填充一次,或每秒填充 1/6 个 token),这种情况 Spring Cloud Gateway 就没法正确的限流。网上也有人提了 issue,support greater than a second resolution for the rate limiter,但还没有得到解决。

4.3 实现单机并发量限流

上面学习 Resilience4j 的时候,我们提到了 Resilience4j 的一个功能特性,叫 隔离(Bulkhead)。Bulkhead 这个单词的意思是船的舱壁,利用舱壁可以将不同的船舱隔离起来,这样如果一个船舱破损进水,那么只损失这一个船舱,其它船舱可以不受影响。借鉴造船行业的经验,这种模式也被引入到软件行业,我们把它叫做 舱壁模式(Bulkhead pattern)。舱壁模式一般用于服务隔离,对于一些比较重要的系统资源,如 CPU、内存、连接数等,可以为每个服务设置各自的资源限制,防止某个异常的服务把系统的所有资源都消耗掉。这种服务隔离的思想同样可以用来做并发量限流。

正如前文所述,Resilience4j 提供了两种 Bulkhead 的实现:SemaphoreBulkhead 和 ThreadPoolBulkhead,这也正是舱壁模式常见的两种实现方案:一种是带计数的信号量,一种是固定大小的线程池。考虑到多线程场景下的线程切换成本,默认推荐使用信号量。

在操作系统基础课程中,我们学习过两个名词:互斥量(Mutex)信号量(Semaphores)。互斥量用于线程的互斥,它和临界区有点相似,只有拥有互斥对象的线程才有访问资源的权限,由于互斥对象只有一个,因此任何情况下只会有一个线程在访问此共享资源,从而保证了多线程可以安全的访问和操作共享资源。而信号量是用于线程的同步,这是由荷兰科学家 E.W.Dijkstra 提出的概念,它和互斥量不同,信号允许多个线程同时使用共享资源,但是它同时设定了访问共享资源的线程最大数目,从而可以进行并发量控制。

下面是使用信号量限制并发访问的一个简单例子:

1public class SemaphoreTest {

2

3 private static ExecutorService threadPool = Executors.newFixedThreadPool(100);

4 private static Semaphore semaphore = new Semaphore(10);

5

6 public static void main(String[] args) {

7 for (int i = 0; i < 100; i++) {

8 threadPool.execute(new Runnable() {

9 @Override

10 public void run() {

11 try {

12 semaphore.acquire();

13 System.out.println(“Request processing …”);

14 semaphore.release();

15 } catch (InterruptedException e) {

16 e.printStack();

17 }

18 }

19 });

20 }

21 threadPool.shutdown();

22 }

23}

这里我们创建了 100 个线程同时执行,但是由于信号量计数为 10,所以同时只能有 10 个线程在处理请求。说到计数,实际上,在 Java 里除了 Semaphore 还有很多类也可以用作计数,比如 AtomicLong 或 LongAdder,这在并发量限流中非常常见,只是无法提供像信号量那样的阻塞能力:

1public class AtomicLongTest {

2

3 private static ExecutorService threadPool = Executors.newFixedThreadPool(100);

4 private static AtomicLong atomic = new AtomicLong();

5

6 public static void main(String[] args) {

7 for (int i = 0; i < 100; i++) {

8 threadPool.execute(new Runnable() {

9 @Override

10 public void run() {

11 try {

12 if(atomic.incrementAndGet() > 10) {

13 System.out.println(“Request rejected …”);

14 return;

15 }

16 System.out.println(“Request processing …”);

17 atomic.decrementAndGet();

18 } catch (InterruptedException e) {

19 e.printStack();

20 }

21 }

22 });

23 }

24 threadPool.shutdown();

25 }

26}

4.4 实现分布式并发量限流

通过在单机实现并发量限流,我们掌握了几种常用的手段:信号量、线程池、计数器,这些都是单机上的概念。那么稍微拓展下,如果能实现分布式信号量、分布式线程池、分布式计数器,那么实现分布式并发量限流不就易如反掌了吗?

关于分布式线程池,是我自己杜撰的词,在网上并没有找到类似的概念,比较接近的概念是资源调度和分发,但是又感觉不像,这里直接忽略吧。

关于分布式信号量,还真有这样的东西,比如 Apache Ignite 就提供了 IgniteSemaphore 用于创建分布式信号量,它的使用方式和 Semaphore 非常类似。使用 Redis 的 ZSet 也可以实现分布式信号量,比如 这篇博客介绍的方法,还有《Redis in Action》这本电子书中也提到了这样的例子,教你如何实现 Counting semaphores。另外,Redisson 也实现了基于 Redis 的分布式信号量 RSemaphore,用法也和 Semaphore 类似。使用分布式信号量可以很容易实现分布式并发量限流,实现方式和上面的单机并发量限流几乎是一样的。

最后,关于分布式计数器,实现方案也是多种多样。比如使用 Redis 的 INCR 就很容易实现,更有甚者,使用 MySQL 数据库也可以实现。只不过使用计数器要注意操作的原子性,每次请求时都要经过这三步操作:取计数器当前的值、判断是否超过阈值,超过则拒绝、将计数器的值自增。这其实和信号量的 P 操作是一样的,而释放就对应 V 操作。

所以,利用分布式信号量和计数器就可以实现并发量限流了吗?问题当然没有这么简单。实际上,上面通过信号量和计数器实现单机并发量限流的代码片段有一个严重 BUG:

1semaphore.acquire();

2System.out.println(“Request processing …”);

3semaphore.release();

想象一下如果在处理请求时出现异常了会怎么样?很显然,信号量被该线程获取了,但是却永远不会释放,如果请求异常多了,这将导致信号量被占满,最后一个请求也进不来。在单机场景下,这个问题可以很容易解决,加一个 finally 就行了:

1try {

2 semaphore.acquire();

3 System.out.println(“Request processing …”);

4} catch (InterruptedException e) {

5 e.printStack();

6} finally {

7 semaphore.release();

8}

由于无论出现何种异常,finally 中的代码一定会执行,这样就保证了信号量一定会被释放。但是在分布式系统中,就不是加一个 finally 这么简单了。这是因为在分布式系统中可能存在的异常不一定是可被捕获的代码异常,还有可能是服务崩溃或者不可预知的系统宕机,就算是正常的服务重启也可能导致分布式信号量无法释放。

那么如何才能正确的掌握Redis呢?

为了让大家能够在Redis上能够加深,所以这次给大家准备了一些Redis的学习资料,还有一些大厂的面试题,包括以下这些面试题

  • 并发编程面试题汇总

  • JVM面试题汇总

  • Netty常被问到的那些面试题汇总

  • Tomcat面试题整理汇总

  • Mysql面试题汇总

  • Spring源码深度解析

  • Mybatis常见面试题汇总

  • Nginx那些面试题汇总

  • Zookeeper面试题汇总

  • RabbitMQ常见面试题汇总

JVM常频面试:

Redis高频面试笔记:基础+缓存雪崩+哨兵+集群+Reids场景设计

Mysql面试题汇总(一)

Redis高频面试笔记:基础+缓存雪崩+哨兵+集群+Reids场景设计

Mysql面试题汇总(二)

Redis高频面试笔记:基础+缓存雪崩+哨兵+集群+Reids场景设计

Redis常见面试题汇总(300+题)

Redis高频面试笔记:基础+缓存雪崩+哨兵+集群+Reids场景设计
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门,即可获取!
以很容易解决,加一个 finally 就行了:

1try {

2 semaphore.acquire();

3 System.out.println(“Request processing …”);

4} catch (InterruptedException e) {

5 e.printStack();

6} finally {

7 semaphore.release();

8}

由于无论出现何种异常,finally 中的代码一定会执行,这样就保证了信号量一定会被释放。但是在分布式系统中,就不是加一个 finally 这么简单了。这是因为在分布式系统中可能存在的异常不一定是可被捕获的代码异常,还有可能是服务崩溃或者不可预知的系统宕机,就算是正常的服务重启也可能导致分布式信号量无法释放。

那么如何才能正确的掌握Redis呢?

为了让大家能够在Redis上能够加深,所以这次给大家准备了一些Redis的学习资料,还有一些大厂的面试题,包括以下这些面试题

  • 并发编程面试题汇总

  • JVM面试题汇总

  • Netty常被问到的那些面试题汇总

  • Tomcat面试题整理汇总

  • Mysql面试题汇总

  • Spring源码深度解析

  • Mybatis常见面试题汇总

  • Nginx那些面试题汇总

  • Zookeeper面试题汇总

  • RabbitMQ常见面试题汇总

JVM常频面试:

[外链图片转存中…(img-jYqFElvJ-1714748954117)]

Mysql面试题汇总(一)

[外链图片转存中…(img-wZX2xjYT-1714748954117)]

Mysql面试题汇总(二)

[外链图片转存中…(img-KRJv8KlV-1714748954117)]

Redis常见面试题汇总(300+题)

[外链图片转存中…(img-cBgXO2rv-1714748954117)]
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门,即可获取!

Logo

一起探索未来云端世界的核心,云原生技术专区带您领略创新、高效和可扩展的云计算解决方案,引领您在数字化时代的成功之路。

更多推荐