概述
channel
是go
协程间通信的主要方式,但使用channel时通常需要预设channel
的buffer大小。在实际业务中,通常难以准确的预知需要的channel
大小,若预设较大buffer的channel可能会造成内存浪费,若预设的buffer较小则可能造成处理任务的阻塞,而channel
不支持动态扩容。因此需要一种动态调节缓冲大小的数据结构来支持协程间协作,k8s
的client-go
中提供了基于切片的支持线程安全的并发队列,解耦生产者与消费者,并且提供了去重、限速、重试加入队列等功能,代码简洁设计巧妙,可作为除channel
外另一种便捷的协程通信手段。
基本用法
k8s/client-go代码仓中以一个controller处理任务为例子,提供了work queue
队列的官方example,work queue
部分主要代码如下
生产者部分
1 2 3 4
| queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
queue.Add(key)
|
消费者部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| for i := 0; i < workers; i++ { go wait.Until(c.runWorker, time.Second, stopCh) }
func (c *Controller) runWorker() { for c.processNextItem() { } }
func (c *Controller) processNextItem() bool { key, quit := c.queue.Get() if quit { return false } defer c.queue.Done(key) err := c.syncToStdout(key.(string)) c.handleErr(err, key) return true }
func (c *Controller) handleErr(err error, key interface{}) { if err == nil { c.queue.Forget(key) return } if c.queue.NumRequeues(key) < 5 { klog.Infof("Error syncing pod %v: %v", key, err) c.queue.AddRateLimited(key) return } c.queue.Forget(key) runtime.HandleError(err) }
|
可以看到,work queue
的使用比较简单,且官方已经给了使用模板,调用者不需要处理元素去重、重入队列、并发安全、错误处理等逻辑,只需关注自己的业务即可。
限速队列由多个队列一起完成,下文将逐一介绍其具体实现。
基本队列work queue
接口
该Interface
定义了一个最基本的队列的基本方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type Interface interface { Add(item interface{}) Len() int Get() (item interface{}, shutdown bool) Done(item interface{}) ShutDown() ShuttingDown() bool }
|
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| type Type struct { queue []t dirty set processing set cond *sync.Cond shuttingDown bool metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration clock clock.Clock }
|
这里重点介绍下shutdown
标记,该标记主要为了通知其他goroutine
如消费者、监控组件等队列是否关闭状态,其他goroutine
处理是检查到队列为关闭状态时停止工作,避免发生未知错误。
主要方法
Add
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (q *Type) Add(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() if q.shuttingDown { return } if q.dirty.has(item) { return } q.metrics.add(item) q.dirty.insert(item) if q.processing.has(item) { return } q.queue = append(q.queue, item) q.cond.Signal() }
|
Add
方法会先key放入dirty
和queue中,加入时都会对这两个字段存储的元素进行去重操作,同时若该key正在被处理,则不会加入,防止同一时刻有同一个key被多个worker处理,从而导致业务逻辑错误。所以dirty集合的设计既有去重功能,又保证一个元素至多被处理一次。
Get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock() defer q.cond.L.Unlock() for len(q.queue) == 0 && !q.shuttingDown { q.cond.Wait() } if len(q.queue) == 0 { return nil, true } item, q.queue = q.queue[0], q.queue[1:] q.metrics.get(item) q.processing.insert(item) q.dirty.delete(item) return item, false }
|
Get
操作也比较简单,需要注意的是Get时key也会从dirty集合中移除
Done
& ShutDown
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (q *Type) Done(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() q.metrics.done(item) q.processing.delete(item) if q.dirty.has(item) { q.queue = append(q.queue, item) q.cond.Signal() } }
func (q *Type) ShutDown() { q.cond.L.Lock() defer q.cond.L.Unlock() q.shuttingDown = true q.cond.Broadcast() }
|
以上就是work queue
的基本逻辑,可以看到通过设置dirty和processing两个集合,work queue
实现了去重功能,并防止了相同key被同时处理的错误。接下来将介绍work queue
如何实现延迟已经限速。
延迟队列DelayingQueue
接口
1 2 3 4 5
| type DelayingInterface interface { Interface AddAfter(item interface{}, duration time.Duration) }
|
延迟队列在上文介绍的work queue
基础上实现,继承了Interface
接口,多了一个AddAfter
方法,通过设置指定的duration来达到限速的目的。
实现
1 2 3 4 5 6 7 8 9 10 11 12 13
| func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType { ret := &delayingType{ Interface: q, clock: clock, heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(name), } go ret.waitingLoop() return ret }
|
newDelayingQueue
返回一个delayingType
类型的限速队列,同时会启动一个waitingLoop
协程处理被添加的key,接下来详细分析AddAfter
和waitingLoop
的实现。
主要方法
AddAfter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { if q.ShuttingDown() { return } q.metrics.retry() if duration <= 0 { q.Add(item) return } select { case <-q.stopCh: case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: } }
|
AddAfter
的主要逻辑就是将key和key的延迟时间封装成一个waitFor
struct
,其中readyAt
即为key应该加入到队列的时间,然后将该struct
放入到waitingForAddCh
中,waitingLoop
协程会异步进行处理。默认waitingForAddCh
的大小为1000,当channel
满时添加key会被block。
waitingLoop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() never := make(<-chan time.Time) var nextReadyAtTimer clock.Timer waitingForQueue := &waitForPriorityQueue{} heap.Init(waitingForQueue) waitingEntryByData := map[t]*waitFor{} for { if q.Interface.ShuttingDown() { return } now := q.clock.Now() for waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) if entry.readyAt.After(now) { break } entry = heap.Pop(waitingForQueue).(*waitFor) q.Add(entry.data) delete(waitingEntryByData, entry.data) } nextReadyAt := never if waitingForQueue.Len() > 0 { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } entry := waitingForQueue.Peek().(*waitFor) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAt = nextReadyAtTimer.C() } select { case <-q.stopCh: return case <-q.heartbeat.C(): case <-nextReadyAt: case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } drained := false for !drained { select { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } default: drained = true } } } } }
|
waitingLoop
监听waitingForAddCh
channel
,从中取出待添加到队列的key,如果已经到期则直接加入,否则将key放入到堆中,然后每次从堆中取出最先过期的key进行判断处理。
限速队列RateLimitingQueue
接口
1 2 3 4 5 6 7 8 9 10 11 12
| type RateLimitingInterface interface { DelayingInterface AddRateLimited(item interface{}) Forget(item interface{}) NumRequeues(item interface{}) int }
|
RateLimitingInterface
同样是在·的基础上多了三个方法,使用限速队列时需要调用NewNamedRateLimitingQueue
方法传入RateLimiter
,调用时可以传入不同的限速器ratelimiter
实现,官方提供了四种rate Limiter实现,分别是BucketRateLimiter
、ItemExponentialFailureRateLimiter
、ItemFastSlowRateLimiter
和MaxOfRateLimiter。
RateLimiter
需要实现三个方法
1 2 3 4 5 6 7 8
| type RateLimiter interface { When(item interface{}) time.Duration Forget(item interface{}) NumRequeues(item interface{}) int }
|
BucketRateLimiter
BucketRateLimiter
是基于token令牌桶的限速方法,通过三方库golang.org/x/time/rate
实现,令牌桶的算法原理是将固定数目的token放入桶中,桶满时则不再添加,然后元素需要拿到token才能被处理,后续元素需要等待有空闲的token被释放。使用时通过如下代码初始化一个令牌桶。
1
| rate.NewLimiter(rate.Limit(10), 100)
|
10代表每秒往桶中放入token的数量,100代表token数量,加入有102个元素在,则前100个元素直接通过,而对于第100个元素,由于每秒放入10个token,因此处理一个token需要100ms
,所以第101个元素需要等待100ms
,同理第102个元素需要等待200ms
。
ItemExponentialFailureRateLimiter
ItemExponentialFailureRateLimiter
就是指数退避算法,有两个主要参数baseDelay
,maxDelay
,baseDelay
代表需要推迟的基数,每次添加相同的key其对应的延迟加入时间会指数递增,但也不能无限递增,因此maxDelay
规定了延迟时间的上限。指数退避部分主要代码如下。
1 2 3 4 5 6 7 8
| exp := r.failures[item] r.failures[item] = r.failures[item] + 1
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) if backoff > math.MaxInt64 { return r.maxDelay }
|
利润baseDelay
为10,maxDelay
为1000,同一个key第一次进行需要等待的时间为10*2^1,第二次为10*2^2,以此类推。
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter
定义了两个时间fastDelay
、lowDelay
以及达到fastDelay
的阈值maxFastAttempts。
1 2 3 4 5 6
| r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts { return r.fastDelay } return r.slowDelay
|
当重新加入队列的次数小于阈值maxFastAttempts
,需要等待的时间为fastDelay
,超过阈值则需要等待更长的时间slowDelay。
MaxOfRateLimiter
MaxOfRateLimiter
则是多个RateLimiter
的组合,需要延迟的时间为各个RateLimiter
的时间最大值。
总结
client-go
的work queue
首先实现了一个最基础的队列,包含最基本的Add
、Get
等方法,然后在该队列基础上实现了DelayingQueue
,实现了延迟队列的功能,最后在延迟队列的基础上实现了RateLimitingQueue
,层层嵌套,最终实现了功能完善的限速队列,使得使用者只需关注业务逻辑,不需要自己实现底层逻辑。