一、workQueue分类

在Informer最后将资源对象已经写入到事件回调函数中,此后我们直接处理这些数据即可,但是我们使用golang中的chanel来处理会存在处理效率低,存在数据大并发量,数据积压等其他异常情况,为此client-go单独将workqueue提出来,作为公共组件,不仅可以在Kubernetes内部使用,还可以供Client-go使用,用户侧可以通过Workqueue相关方法进行更加灵活的队列处理,如失败重试,延迟入队,限速控制等,实现非阻塞异步逻辑处理。

WorkQueue 支持 3 种队列,并提供了 3 种接口,不同队列实现可应对不同的使用场景,分别介绍如下。

  • Interface:通用队FIFO 队列接口,先进先出队列,并支持去重机制。

  • DelayingInterface:延迟队列接口,基于 Interface 接口封装,延迟一段时间后再将元素存入队列。

  • RateLimitingInterface:限速队列接口,基于 DelayingInterface 接口封装,支持元素存入队列时进行速率限制。

从图中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最终由 rateLimitingType 进行实现,提供了 rateLimit 限速、delay 延时入队(由优先级队列通过小顶堆实现)、queue 队列处理 三大核心能力,另外,在代码中可看到 K8s 实现了三种 RateLimiter:BucketRateLimiter、ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter。

二、workQueue特点

WorkQueue 称为工作队列,Kubernetes 的 WorkQueue 队列与普通 FIFO(先进先出,First-In, First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。

  • 有序:按照添加顺序处理元素(item)。

  • 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。

  • 并发性:多生产者和多消费者。

  • 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。

  • 通知机制:ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出。

  • 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。

  • 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。

  • Metric:支持 metric 监控指标,可用于 Prometheus 监控。

三、普通FIFO队列

数据结构及方法

普通FIFO队列,先进先出,支持去重。

queue实际存储元素的地方,是 slice 结构,用于保证元素有序;

dirty用于去重机制,是map结构,保证一个元素被处理之前哪怕其被添加了多次(并发情况下),但也只会被处理一次;

processing用于标记机制,是map结构,标记一个元素是否正在被处理;

Add方法,添加一个元素到queue中,首先会判断队列是否关闭,其次会判断元素是否在dirty中,如果在则直接返回实现去重,不在则添加,然后判断元素是否在processing中,如果存在则直接返回,如果不存在则将其添加进queue中;

Get方法,从queue队列头部弹出一个元素放到processing中,并从dirty中移除;

Done方法,标记元素已处理完成,从processing中移除,并判断元素是否还在dirty中,如果在则将其重新添加至queue队尾;

FIFO队列存储过程

 通过 Add 方法往 FIFO 队列中分别插入 1、2、3 三个元素,此时队列中的 queue 和 dirty 分别存有 1、2、3 元素,processing为空;然后通过 Get 方法获取最先进入的1元素,此时1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被处理;最后,当我们处理完 1 元素时通过 Done 方法标记该元素已经被处理完成,此时将1元素从 processing中移除。

高并发下如何保证一个元素哪怕其被添加了多次,但也只会被处理一次?

元素在processing中正被处理:

在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 中并从queue和dirty中移除;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,而是仅添加到dirty中;在 goroutine A 通过 Done 方法标记1袁术被处理完成并从processing删除后,检测到dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。

元素在queue和dirty中还未放入processing:

在并发场景下,假设 goroutine A 通过Add方法出入1袁术到queue和dirty中;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在dirty中已经存在相同的元素,会直接返回。

四、延迟队列

数据结构及方法

延迟队列,其原理是延迟一段时间后再将元素插入 FIFO 队列,防止hot-loop。继承了普通FIFO队列的通用接口,在其基础上主要新增了waitingForAddCh字段和AddAfter、waitingLoop方法。

waitingForAddCh:其默认初始大小为 1000,通过 AddAfter 方法插入元素时,是非阻塞状态的,只有当插入的元素大于或等于 1000 时,延迟队列才会处于阻塞状态。waitingForAddCh 字段中的数据通过新启 goroutine 运行的 waitingLoop 函数持久运行。

waitFor: 保存了包括数据data和该item什么时间起(readyAt)就可以进入队列了.
waitForPriorityQueue:是用于保存waitFor的优先队列, 按readyAt时间从早到晚排序,形成一个优先级队列, 先readyitem先出队列.

延迟队列存储过程

将元素 1 放入 waitingForAddCh 字段中,通过 waitingLoop 函数消费元素1数据。当元素1的延迟时间还没到则添加到优先级队列,如果延迟时间到了则添加到 FIFO 队列中。同时会不断遍历优先队列中的元素,判断延迟时间是否到达,到达则从优先队列删除并添加到普通FIFO队列中进行正常处理。其中优先级队列实现利用了golang底层库heap实现,堆顶元素为延迟时间最近的元素。

五、限速队列

数据结构及方法

限速队列,继承了延迟队列的通用接口,并结合限速器利用延迟队列的特性延迟某个元素的插入时间,从而实现对元素入队有一定速率限制,新增有AddRateLimited(获取到限速器延迟时间,然后加入到延迟队列)、Forget、NumRequeues 3个方法。

限速器主要包括三个函数When获取某个元素应该等待的时间,Forget释放某个元素不再监测,NumRequeues返回元素入队列的次数

限速队列存储过程

将元素 1 通过限速器计算出延迟时间,然后放入延迟队列中

限速器的实现

RateLimiter主要有四种类型,主要行为表现在当某一事件元素失败后,等待时间的计算规则不一致:

1、BucketRateLimiter令牌桶算法

令牌桶算法是通过 Go 语言的第三方库 golang.org/x/time/rate 实现的,令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过(accept),而没有得到 token 的元素处于等待状态,从而通过控制发放 token 来达到限速目的

在实例化 rate.NewLimiter 后,传入 r 和 b 两个参数,其中 r 参数表示每秒往“桶”里填充的 token 数量,b 参数表示令牌桶的大小(即令牌桶最多存放的 token 数量)。我们假定 r 为 10,b 为 100。假设在一个限速周期内插入了 1000 个元素,通过 r.Limiter.Reserve().Delay 函数返回指定元素应该等待的时间,那么前 b(即 100)个元素会被立刻处理,而后面元素的延迟时间分别为 item100/100ms、item101/200ms、item102/300ms、item103/400ms,以此类推。

2、ItemExponentialFailureRateLimiter排队指数算法

将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过 maxDelay。元素的排队数统计是有限速周期的,一个限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。如果该元素被 Forget 方法处理完,则清空排队数

failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;另外,baseDelay 字段是最初的限速单位(默认为 5ms),maxDelay 字段是最大限速单位(默认为 1000s)

限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。

我们假定 baseDelay 是 1 * time.Millisecond,maxDelay 是 1000 * time.Second。假设在一个限速周期内通过 AddRateLimited 方法插入 10 个相同元素,那么第 1 个元素会通过延迟队列的 AddAfter 方法插入并设置延迟时间为 1ms(即 baseDelay),第 2 个相同元素的延迟时间为 2ms,第 3 个相同元素的延迟时间为 4ms,第 4 个相同元素的延迟时间为 8ms,第 5 个相同元素的延迟时间为 16ms……第 10 个相同元素的延迟时间为 512ms,最长延迟时间不超过 1000s(即 maxDelay)。

3、ItemFastSlowRateLimiter计数器算法

计数器算法是限速算法中最简单的一种,其原理是限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过50个元素,每插入一个元素计数器会自增 1,当计数器数到50的阈值且还在限速周期内时,则不允许有元素再通过。WorkQueue 在此基础上扩展了 fast 和 slow 速率。

计数器算法提供了 4 个主要字段:failures、fastDelay、slowDelay 及 maxFastAttempts。其中,failures 字段用于统计元素排队次数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;而 fastDelay 和 slowDelay 字段是用于定义 fast、slow 速率的;另外,maxFastAttempts 字段用于控制从 fast 速率转换到 slow 速率。

假设 fastDelay 是 5 * time.Millisecond,slowDelay 是 10 * time.Second,maxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。

4、MaxOfRateLimiter混合模式

包含多个限流器,并返回所有RateLimiter的最坏值。

默认的DefaultControllerRateLimiter是MaxOfRateLimiter取最坏值,其中并包括两个限流器,

BucketRateLimiter限流器是一个令牌桶算法处理尖峰流量,实现平滑限流,令牌桶大小是100,生成令牌速度是10QPS,拿令牌是没有速度限制的;

ItemExponentialFailureRateLimiter:等待时间=min(1000s,5ms*2^n),与失败次数n呈指数关系; 

备注:k8s controller的重试等待时间是5ms*2的n次方,n是重试次数,但是不超过1000s,并且还有一个令牌桶算法处理尖峰流量实现平滑限流,所以流量暴增时等待时间可能比1000s更长。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐