k8s API限流——server级别整体限流和客户端限流
使用BasicLongRunningRequestCheck检查是否是watch或者pprof debug等长时间运行的请求,因为这些请求不受限制,位置。但是如果请求的group中含有 system:masters,则放行, 因为apiserver认为这个组是很重要的请求,不能被限流.调用入口:staging\src\k8s.io\apiserver\pkg\server\config.go。cl
·
1. 背景
为了防止突发流量影响apiserver可用性,k8s支持多种限流配置,包括:
- MaxInFlightLimit,server级别整体限流
- Client限流
- EventRateLimit, 限制event
- APF,更细力度的限制配置
1.1 MaxInFlightLimit限流
- apiserver默认可设置最大并发量(集群级别,区分只读与修改操作)
- 通过参数–max-requests-inflight代表只读请求
- –max-mutating-requests-inflight代表修改请求
- 可以简单实现限流。
1.1.1 源码解读
-
入口 GenericAPIServer.New中的添加hook
// FlowControl为nil ,代表未启用 APF,API 服务器中的整体并发量将受到 kube-apiserver 的参数 --max-requests-inflight 和 --max-mutating-requests-inflight 的限制。 if c.FlowControl != nil { const priorityAndFairnessFilterHookName = "priority-and-fairness-filter" if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) { err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error { genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh) return nil }) if err != nil { return nil, err } } } else { const maxInFlightFilterHookName = "max-in-flight-filter" if !s.isPostStartHookRegistered(maxInFlightFilterHookName) { err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error { genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh) return nil }) if err != nil { return nil, err } } } // StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight // requests. func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) { startWatermarkMaintenance(watermark, stopCh) } // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) { // 定期更新inflight使用指标 go wait.Until(func() { watermark.lock.Lock() readOnlyWatermark := watermark.readOnlyWatermark mutatingWatermark := watermark.mutatingWatermark watermark.readOnlyWatermark = 0 watermark.mutatingWatermark = 0 watermark.lock.Unlock() metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) }, inflightUsageMetricUpdatePeriod, stopCh) // 定期观察watermarks。这样做是为了确保他们不会落后太多。当他们 //落后太多时,在响应接收到的下一个请求时会有很长的延迟,而观察者 //会赶上来。 go wait.Until(func() { watermark.readOnlyObserver.Add(0) watermark.mutatingObserver.Add(0) }, observationMaintenancePeriod, stopCh) }
-
WithMaxInFlightLimit代表限流处理函数
调用入口: staging\src\k8s.io\apiserver\pkg\server\config.go
DefaultBuildHandlerChain中,判断FlowControl为nil就开启WithMaxInFlightLimit,
if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
func WithMaxInFlightLimit(
handler http.Handler,
nonMutatingLimit int,
mutatingLimit int,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
// 如果limit num为0就不开启限流了
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler
}
var nonMutatingChan chan bool
var mutatingChan chan bool
// 构造限流的chan,类型为长度=limit的 bool chan
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit)
klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
} else {
klog.V(2).InfoS("Running with nil nonMutatingChan")
}
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit)
klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
} else {
klog.V(2).InfoS("Running with nil mutatingChan")
}
initMaxInFlight(nonMutatingLimit, mutatingLimit)
// 发起请求
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
return
}
// 检查是否是长时间运行的请求
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
handler.ServeHTTP(w, r)
return
}
。。。。。。。。
}
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request, requestInfo *RequestInfo) bool
使用BasicLongRunningRequestCheck检查是否是watch或者pprof debug等长时间运行的请求,因为这些请求不受限制,位置
func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) apirequest.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *apirequest.RequestInfo) bool {
if longRunningVerbs.Has(requestInfo.Verb) {
return true
}
if requestInfo.IsResourceRequest && longRunningSubresources.Has(requestInfo.Subresource) {
return true
}
if !requestInfo.IsResourceRequest && strings.HasPrefix(requestInfo.Path, "/debug/pprof/") {
return true
}
return false
}
}
检查是只读操作还是修改操作,决定使用哪个chan限制
var c chan bool
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
if isMutatingRequest {
c = mutatingChan
} else {
c = nonMutatingChan
}
如果队列未满,有空位置,则更新排队数字
- 使用select 向c中写入true,如果能写入到说明队列未满
- 记录下对应的指标
select {
case c <- true:
// We note the concurrency level both while the
// request is being served and after it is done being
// served, because both states contribute to the
// sampled stats on concurrency.
if isMutatingRequest {
watermark.recordMutating(len(c))
} else {
watermark.recordReadOnly(len(c))
}
// default代表队列已满
defer func() {
<-c
if isMutatingRequest {
watermark.recordMutating(len(c))
} else {
watermark.recordReadOnly(len(c))
}
}()
handler.ServeHTTP(w, r)
但是如果请求的group中含有 system:masters,则放行, 因为apiserver认为这个组是很重要的请求,不能被限流.
- group=system:masters 对应的clusterRole 为cluster-admin, 队列已满,如果请求的group中没有 system:masters,则返回http 429错误,并且丢弃请求
// at this point we're about to return a 429, BUT not all actors should be rate limited. A system:master is so powerful
// that they should always get an answer. It's a super-admin or a loopback connection.
if currUser, ok := apirequest.UserFrom(ctx); ok {
for _, group := range currUser.GetGroups() {
if group == user.SystemPrivilegedGroup {
handler.ServeHTTP(w, r)
return
}
}
}
- http 429 代表当前有太多请求了,请重试,并设置 response 的header Retry-After =1
// We need to split this data between buckets used for throttling.
metrics.RecordDroppedRequest(r, requestInfo, metrics.APIServerComponent, isMutatingRequest)
metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w)
func tooManyRequests(req *http.Request, w http.ResponseWriter) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter)
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}
1.2 Client限流
client-go默认的qps为5,但是只支持客户端限流,只能由各个发起端限制
- 集群管理员无法控制用户行为。
更多推荐
已为社区贡献36条内容
所有评论(0)