k8s client-go workqueue
1 基础队列1.1 基础队列接口type Interface interface {Add(item interface{})// 向队列中添加一个元素,interface{}类型,说明可以添加任何类型的元素Len() int// 队列长度,就是元素的个数Get() (item interface{}, shutdown bool) // 从队列中获取一个元素,双返回值,这个
1 基础队列
1.1 基础队列接口
type Interface interface {
Add(item interface{}) // 向队列中添加一个元素,interface{}类型,说明可以添加任何类型的元素
Len() int // 队列长度,就是元素的个数
Get() (item interface{}, shutdown bool) // 从队列中获取一个元素,双返回值,这个和chan的<-很像,第二个返回值告知队列是否已经关闭了
Done(item interface{}) // 告知队列该元素已经处理完了
ShutDown() // 关闭队列
ShuttingDown() bool // 查询队列是否正在关闭
}
1.2 基础队列实现类
type Type struct {
queue []t // 排序队列
dirty set // 等待处理,还未加入排序队列的数据(可以这么理解:在内存(dirty)中但未写入硬盘(queue)的数据)
processing set // 正在处理的元素集合
cond *sync.Cond // 与pthread_cond_t相同,条件同步
shuttingDown bool // 关闭标记
metrics queueMetrics // 这个metrics和prometheus的metrics概念相同,此处不做过多说明,知道功能就行
}
1.2.1 Add
可以理解为dirty为内存中存放的指向某个资源的地址,资源会时时更新。当将dirty存储到硬盘时(queue),它可以被按序读取到CPU cache(processing)中由CPU处理。在内存中已经有资源地址(资源的key)的情况下,资源内容更新,不需要去更新资源地址,直接返回。当内存中不存在的情况下,则将资源地址存放入内存中,如果该资源地址没有被CPU的处理,则将其保存到硬盘中(queue),等待后续处理,如果该资源地址正在被cpu处理,则不将其加入到硬盘中(多线程情况下,可能会出现cpu cache中用不同版本的相同key资源,这样就会出现错误了)
// 代码源自client-go/util/workqueue/queue.go
func (q *Type) Add(item interface{}) {
// 和pthread_cond_t不同的是golang的cond自带了互斥锁
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 队列正在关闭,直接返回
if q.shuttingDown {
return
}
// dirty中已经存在直接返回
if q.dirty.has(item) {
return
}
// 告知metrics添加了元素
q.metrics.add(item)
// 添加到dirty中
q.dirty.insert(item)
// 元素刚被拿走处理,那就直接返回
if q.processing.has(item) {
return
}
// 追加到元素数组的尾部
q.queue = append(q.queue, item)
// 通知有新元素到了,此时有协程阻塞就会被唤醒
q.cond.Signal()
}
1.2.2 Get
从queue中取出队首元素,process中插入新取出的队首元素,dirty中移除队首元素
// 代码源自client-go/util/workqueue/queue.go
func (q *Type) Get() (item interface{}, shutdown bool) {
// 加锁解锁不解释
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 没有数据,阻塞协程
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
// 协程被激活但还没有数据,说明队列被关闭了,这个和chan一样
if len(q.queue) == 0 {
return nil, true
}
// 弹出第一个元素,我一直感觉golang的slice[1:]这种操作性能不太高~以后有时间看看代码实现
item, q.queue = q.queue[0], q.queue[1:]
// 通知metrics元素被取走了
q.metrics.get(item)
// 从dirty集合中移除,加入到processing集合,经过前面的分析这里就很好理解了
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
1.2.3 Done
从processing中移除处理完毕的元素,同时如果dirty中有该元素,则将其加入到queue中
// 代码源自client-go/util/workqueue/queue.go
func (q *Type) Done(item interface{}) {
// 加锁解锁不解释
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 通知metrics元素处理完了
q.metrics.done(item)
// 从processing集合中删除
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
2 延时队列
2.1 延时队列接口
// 代码源自client-go/util/workqueue/delaying_queue.go
type DelayingInterface interface {
Interface // 继承了基础队列所有接口
AddAfter(item interface{}, duration time.Duration) // 增加了延迟添加的接口
}
2.2 延时队列接口实现
// 代码源自client-go/util/workqueue/delaying_queue.go
type delayingType struct {
Interface // 基础队列
clock clock.Clock // 时钟,用于获取时间
stopCh chan struct{} // 延时就意味着异步,就要有另一个协程处理,所以需要退出信号
heartbeat clock.Ticker // 定时器,在没有任何数据操作时可以定时的唤醒处理协程,定义为心跳没毛病
waitingForAddCh chan *waitFor // 所有延迟添加的元素封装成waitFor放到chan中
metrics retryMetrics // 和通用队列中的metrics功能类似
}
//
type waitFor struct {
data t // 元素数据,这个t就是在通用队列中定义的类型interface{}
readyAt time.Time // 在什么时间添加到队列中
index int // 这是个索引,后面会详细说明
}
2.2.1 waitForPriorityQueue
waitForPriorityQueue用于管理所有延时添加的元素,所有的元素在waitForPriorityQueue中按照延时添加时间从小到大排序
// 代码源自client-go/util/workqueue/delaying_queue.go
// waitFor的定义上面有,是需要延时添加的元素都要封装成这个类型
// waitForPriorityQueue就把需要延迟的元素形成了一个队列,队列按照元素的延时添加的时间(readyAt)从小到大排序
// 实现的策略就是实现了go/src/container/heap/heap.go中的Interface类型,读者可以自行了解heap
// 这里只需要知道waitForPriorityQueue这个数组是有序的,排序方式是按照时间从小到大
type waitForPriorityQueue []*waitFor
// heap需要实现的接口,告知队列长度
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
// heap需要实现的接口,告知第i个元素是否比第j个元素小
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt) // 此处对比的就是时间,所以排序按照时间排序
}
// heap需要实现的接口,实现第i和第j个元素换
func (pq waitForPriorityQueue) Swap(i, j int) {
// 这种语法好牛逼,有没有,C/C++程序猿没法理解~
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i // 因为heap没有所以,所以需要自己记录索引,这也是为什么waitFor定义索引参数的原因
pq[j].index = j
}
// heap需要实现的接口,用于向队列中添加数据
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n // 记录索引值
*pq = append(*pq, item) // 放到了数组尾部
}
// heap需要实现的接口,用于从队列中弹出最后一个数据
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)] // 缩小数组,去掉了最后一个元素
return item
}
// 返回第一个元素
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
2.2.2 AddAfter
负责将需要延时添加的元素封装成waitFor,然后添加到waitingForAddCh chan中(当waitingForAddCh满的时候会阻塞,这里支持的元素数量应该是1000)
// 代码源自client-go/util/workqueue/delaying_queue.go
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// 如果队列关闭就直接退出
if q.ShuttingDown() {
return
}
// 记录metrics
q.metrics.retry()
// 不需要延迟,那就直接像通用队列一样添加
if duration <= 0 {
q.Add(item)
return
}
// 把元素封装成waitFor传入chan,切记select没有default,所以可能会被阻塞
// 这里面用到了stopChan,因为有阻塞的可能,所以用stopChan可以保证退出
select {
case <-q.stopCh:
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
2.2.3 waitingLoop()
从waitingForAddCh中读取数据放入到waitingForQueue中(如果延时添加时间点超过当前时间则直接放入queue中)。循环处理waitingForQueue,每次取其延时添加时间最小的数据,如果其延时添加时间点迟于当前时间点,则添加nextReadyAt信号为其延时添加时间点。如果大于当前时间则直接放入到queue中,nextReadyAt信号为never。
重新开始循环处理的信号(1)心跳时间(2)nextReadyAt(3)waitingForAddCh中添加新数据。
// 代码源自client-go/util/workqueue/delaying_queue.go
// 这部分就是演示队列的核心代码
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// 这个变量后面会用到,当没有元素需要延时添加的时候利用这个变量实现长时间等待
never := make(<-chan time.Time)
// 构造我们上面提到的有序队列了,并且初始化
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
// 这个map是用来避免对象重复添加的,如果重复添加就只更新时间
waitingEntryByData := map[t]*waitFor{}
// 开始无限循环
for {
// 队列关闭了,就可以返回了
if q.Interface.ShuttingDown() {
return
}
// 获取当前时间
now := q.clock.Now()
// 有序队列中是否有元素,有人肯定会问还没向有序队列里添加呢判断啥啊?后面会有添加哈
for waitingForQueue.Len() > 0 {
// Peek函数我们前面注释了,获取第一个元素,注意:不会从队列中取出哦
entry := waitingForQueue.Peek().(*waitFor)
// 元素指定添加的时间过了么?如果没有过那就跳出循环
if entry.readyAt.After(now) {
break
}
// 既然时间已经过了,那就把它从有序队列拿出来放入通用队列中,这里面需要注意几点:
// 1.heap.Pop()弹出的是第一个元素,waitingForQueue.Pop()弹出的是最后一个元素
// 2.从有序队列把元素弹出,同时要把元素从上面提到的map删除,因为不用再判断重复添加了
// 3.此处是唯一一个地方把元素从有序队列移到通用队列,后面主要是等待时间到过程
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// 如果有序队列中没有元素,那就不用等一段时间了,也就是永久等下去
// 如果有序队列中有元素,那就用第一个元素指定的时间减去当前时间作为等待时间,逻辑挺简单
// 有序队列是用时间排序的,后面的元素需要等待的时间更长,所以先处理排序靠前面的元素
nextReadyAt := never
if waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
}
// 进入各种等待
select {
// 有退出信号么?
case <-q.stopCh:
return
// 定时器,没过一段时间没有任何数据,那就再执行一次大循环,从理论上讲这个没用,但是这个具备容错能力,避免BUG死等
case <-q.heartbeat.C():
// 这个就是有序队列里面需要等待时间信号了,时间到就会有信号
case <-nextReadyAt:
// 这里是从chan中获取元素的,AddAfter()放入chan中的元素
case waitEntry := <-q.waitingForAddCh:
// 如果时间已经过了就直接放入通用队列,没过就插入到有序队列
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
// 下面的代码看似有点多,目的就是把chan中的元素一口气全部取干净,注意用了default意味着chan中没有数据就会立刻停止
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
// 下面的代码是把元素插入有序队列的实现
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// 看看元素是不是被添加过?如果添加过看谁的时间靠后就用谁的时间
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}
return
}
// 把元素放入有序队列中,并记录在map里面,这个map就是上面那个用于判断对象是否重复添加的map
// 注意,这里面调用的是heap.Push,不是waitForPriorityQueue.Push
heap.Push(q, entry)
knownEntries[entry.data] = entry
}
3 限速队列
3.1 RateLimiter接口
// 代码源自client-go/util/workqueue/default_rate_limiter.go
type RateLimiter interface {
When(item interface{}) time.Duration // 返回元素需要等待多长时间
Forget(item interface{}) // 抛弃该元素,意味着该元素已经被处理了,下次添加的时候元素重排次数从0开始
NumRequeues(item interface{}) int // 元素放入队列多少次了
}
3.2 RateLimiter接口实现
它们的差异主要在于when方法中采用不同的算法来计算延时添加时间
3.2.1 ItemExponentialFailureRateLimiter
基于排队指数算法的限速器
// 代码源自client-go/util/workqueue/default_rate_limiters.go
// 限速器的定义
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex // 互斥锁
failures map[interface{}]int // 记录每个元素错误次数,每调用一次When累加一次
baseDelay time.Duration // 元素延迟基数,算法后面会有说明
maxDelay time.Duration // 元素最大的延迟时间
}
// 实现限速器的When接口
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加错误计数,比较好理解
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// 通过错误次数计算延迟时间,公式是2^i * baseDelay,按指数递增,符合Exponential名字
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
// 计算后的延迟值和最大延迟值二者取最小值
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
// 实现限速器的NumRequeues接口,很简单,没什么好说的
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 实现限速器的Forget接口,也很简单,没什么好说的
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
3.2.2 ItemFastSlowRateLimiter
基于计数器算法的限速器
// 代码源自client-go/util/workqueue/default_rate_limiters.go
// 限速器定义
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex // 互斥锁
failures map[interface{}]int // 错误次数计数
maxFastAttempts int // 错误尝试阈值
fastDelay time.Duration // 短延迟时间
slowDelay time.Duration // 长延迟时间
}
// 限速器实现When接口
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加错误计数
r.failures[item] = r.failures[item] + 1
// 错误次数超过阈值用长延迟,否则用短延迟
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
// 限速器实现NumRequeues接口,比较简单不多解释
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 限速器实现Forget接口,比较简单不多解释
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
3.2.3 BucketRateLimiter
基于令牌桶的限速器,利用golang.org.x.time.rate.Limiter实现固定速率(qps)的限速器。延时时间与元素令牌桶添加token的速率r和桶大小b,以及元素添加的顺位有关。举例,r为10(10 token/s =1 token / 100 ms),b为100,添加1000个元素,则前100个元素会被立刻处理,而101、102、103个元素处理延迟为101/100 ms、102/200 ms、103/100 ms …
// 代码源自client-go/util/workqueue/default_rate_limiters.go
type BucketRateLimiter struct {
*rate.Limiter // 这个就是golang.org.x.time.rate.Limiter
}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay() // 获取延迟,这个延迟会是个相对固定的周期
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0 // 因为固定频率的,也就不存在重试什么的了
}
func (r *BucketRateLimiter) Forget(item interface{}) {
}
3.2.4 MaxOfRateLimiter
基于混合模式的限速器,采用了多种限速算法,使用这些算法得到的最大延时时间
// 代码源自client-go/util/workqueue/default_rate_limiters.go
type MaxOfRateLimiter struct {
limiters []RateLimiter // 限速器数组,创建该限速器需要提供一个限速器数组
}
// 限速器实现When接口
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
// 这里在获取所有限速里面时间最大的
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
// 限速器实现NumRequeues接口
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
// Requeues也是取最大值
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
// 限速器实现Forget接口
func (r *MaxOfRateLimiter) Forget(item interface{}) {
// 逐一遍历Forget就行了,比较简单
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
3.3 限速队列接口
// 代码源自client-go/util/workqueue/rate_limiting_queue.go
type RateLimitingInterface interface {
DelayingInterface // 继承了延时队列
AddRateLimited(item interface{}) // 按照限速方式添加元素的接口
Forget(item interface{}) // 丢弃指定元素
NumRequeues(item interface{}) int // 查询元素放入队列的次数
}
3.4 限速队列接口实现
// 这个是限速队列的实现
type rateLimitingType struct {
DelayingInterface // 同样要继承延迟队列
rateLimiter RateLimiter // 限速器
}
// 代码源自client-go/util/workqueue/rate_limitting_queue.go
func (q *rateLimitingType) AddRateLimited(item interface{}) {
// 通过限速器获取延迟时间,然后加入到延时队列
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
4 总结
各类型队列关系:
Interface<——DelayingInterface<——RateLimitingInterface
其中Interface实现类Type提供了队列的基础数据结构和基础的Add、Get、Done(从processing中移除已处理元素,对于存在于dirty中的元素,将其添加到queue中)方法。而DelayInterface则在Interface基础上添加了AddAfter方法,支持在指定延时时间后调用Add方法加元素添加到queue中。而RateLimitingInterface在DelayInterface基础上添加了限速器和Forget方法(从重排计数map中删除元素),基于重排次数,计算延迟时间,调用AddAfter方法,在指定延时时间后将元素添加到queue中
更多推荐
所有评论(0)