k8s API限流
参考文档:https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/admission-controllers/#eventratelimit。但是如果请求的group中含有 system:masters,则放行, 因为apiserver认为这个组是很重要的请求,不能被限流.client-go默认的qps为5,但是只支持客户端限流,
1. 背景
为了防止突发流量影响apiserver可用性,k8s支持多种限流配置,包括:
- MaxInFlightLimit,server级别整体限流
- Client限流
- EventRateLimit, 限制event
- APF,更细力度的限制配置
1.1 MaxInFlightLimit限流
- apiserver默认可设置最大并发量(集群级别,区分只读与修改操作)
- 通过参数–max-requests-inflight代表只读请求
- –max-mutating-requests-inflight代表修改请求
- 可以简单实现限流。
1.1.1 源码解读
-
入口 GenericAPIServer.New中的添加hook
// FlowControl为nil ,代表未启用 APF,API 服务器中的整体并发量将受到 kube-apiserver 的参数 --max-requests-inflight 和 --max-mutating-requests-inflight 的限制。 if c.FlowControl != nil { const priorityAndFairnessFilterHookName = "priority-and-fairness-filter" if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) { err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error { genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh) return nil }) if err != nil { return nil, err } } } else { const maxInFlightFilterHookName = "max-in-flight-filter" if !s.isPostStartHookRegistered(maxInFlightFilterHookName) { err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error { genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh) return nil }) if err != nil { return nil, err } } } // StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight // requests. func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) { startWatermarkMaintenance(watermark, stopCh) } // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) { // 定期更新inflight使用指标 go wait.Until(func() { watermark.lock.Lock() readOnlyWatermark := watermark.readOnlyWatermark mutatingWatermark := watermark.mutatingWatermark watermark.readOnlyWatermark = 0 watermark.mutatingWatermark = 0 watermark.lock.Unlock() metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) }, inflightUsageMetricUpdatePeriod, stopCh) // 定期观察watermarks。这样做是为了确保他们不会落后太多。当他们 //落后太多时,在响应接收到的下一个请求时会有很长的延迟,而观察者 //会赶上来。 go wait.Until(func() { watermark.readOnlyObserver.Add(0) watermark.mutatingObserver.Add(0) }, observationMaintenancePeriod, stopCh) }
-
WithMaxInFlightLimit代表限流处理函数
调用入口: staging\src\k8s.io\apiserver\pkg\server\config.go
DefaultBuildHandlerChain中,判断FlowControl为nil就开启WithMaxInFlightLimit,
if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
func WithMaxInFlightLimit(
handler http.Handler,
nonMutatingLimit int,
mutatingLimit int,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
// 如果limit num为0就不开启限流了
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler
}
var nonMutatingChan chan bool
var mutatingChan chan bool
// 构造限流的chan,类型为长度=limit的 bool chan
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit)
klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
} else {
klog.V(2).InfoS("Running with nil nonMutatingChan")
}
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit)
klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
} else {
klog.V(2).InfoS("Running with nil mutatingChan")
}
initMaxInFlight(nonMutatingLimit, mutatingLimit)
// 发起请求
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
return
}
// 检查是否是长时间运行的请求
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
handler.ServeHTTP(w, r)
return
}
。。。。。。。。
}
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request, requestInfo *RequestInfo) bool
使用BasicLongRunningRequestCheck检查是否是watch或者pprof debug等长时间运行的请求,因为这些请求不受限制,位置
func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) apirequest.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *apirequest.RequestInfo) bool {
if longRunningVerbs.Has(requestInfo.Verb) {
return true
}
if requestInfo.IsResourceRequest && longRunningSubresources.Has(requestInfo.Subresource) {
return true
}
if !requestInfo.IsResourceRequest && strings.HasPrefix(requestInfo.Path, "/debug/pprof/") {
return true
}
return false
}
}
检查是只读操作还是修改操作,决定使用哪个chan限制
var c chan bool
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
if isMutatingRequest {
c = mutatingChan
} else {
c = nonMutatingChan
}
如果队列未满,有空位置,则更新排队数字
- 使用select 向c中写入true,如果能写入到说明队列未满
- 记录下对应的指标
select {
case c <- true:
// We note the concurrency level both while the
// request is being served and after it is done being
// served, because both states contribute to the
// sampled stats on concurrency.
if isMutatingRequest {
watermark.recordMutating(len(c))
} else {
watermark.recordReadOnly(len(c))
}
// default代表队列已满
defer func() {
<-c
if isMutatingRequest {
watermark.recordMutating(len(c))
} else {
watermark.recordReadOnly(len(c))
}
}()
handler.ServeHTTP(w, r)
但是如果请求的group中含有 system:masters,则放行, 因为apiserver认为这个组是很重要的请求,不能被限流.
- group=system:masters 对应的clusterRole 为cluster-admin, 队列已满,如果请求的group中没有 system:masters,则返回http 429错误,并且丢弃请求
// at this point we're about to return a 429, BUT not all actors should be rate limited. A system:master is so powerful
// that they should always get an answer. It's a super-admin or a loopback connection.
if currUser, ok := apirequest.UserFrom(ctx); ok {
for _, group := range currUser.GetGroups() {
if group == user.SystemPrivilegedGroup {
handler.ServeHTTP(w, r)
return
}
}
}
- http 429 代表当前有太多请求了,请重试,并设置 response 的header Retry-After =1
// We need to split this data between buckets used for throttling.
metrics.RecordDroppedRequest(r, requestInfo, metrics.APIServerComponent, isMutatingRequest)
metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w)
func tooManyRequests(req *http.Request, w http.ResponseWriter) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter)
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}
1.2 Client限流
client-go默认的qps为5,但是只支持客户端限流,只能由各个发起端限制
- 集群管理员无法控制用户行为。
1.3 EventRateLimit
- EventRateLimit在1.13之后支持,只限制event请求
- 集成在apiserver内部webhoook中
- 可配置某个用户、namespace、server等event操作限制,通过webhook形式实现。
集群管理员可以通过以下方式指定事件速率限制:
-
启用
EventRateLimit
准入控制器; -
在通过 API 服务器的命令行标志
--admission-control-config-file
设置的文件中, 引用EventRateLimit
配置文件:apiVersion: apiserver.config.k8s.io/v1 kind: AdmissionConfiguration plugins: - name: EventRateLimit path: eventconfig.yaml ...
可以在配置中指定的限制有四种类型:
Server
:API 服务器收到的所有(创建或修改)Event 请求共享一个桶。Namespace
:每个名字空间都对应一个专用的桶。User
:为每个用户分配一个桶。SourceAndObject
:根据事件的来源和涉及对象的各种组合分配桶。
eventconfig.yaml
示例apiVersion: eventratelimit.admission.k8s.io/v1alpha1 kind: Configuration limits: - type: Namespace qps: 50 burst: 100 cacheSize: 2000 - type: User qps: 10 burst: 50
原理
- 具体原理可以参考提案,每个eventratelimit 配置使用一个单独的令牌桶限速器
- 每次event操作,遍历每个匹配的限速器检查是否能获取令牌,如果可以允许请求,否则返回429。
优点
- 实现简单,允许一定量的并发
- 可支持server/namespace/user等级别的限流
缺点
-
仅支持event,通过webhook实现只能拦截修改类请求
-
所有namespace的限流相同,没有优先级
参考文档:https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/admission-controllers/#eventratelimit
更多推荐
所有评论(0)