1. 背景

为了防止突发流量影响apiserver可用性,k8s支持多种限流配置,包括:

  • MaxInFlightLimit,server级别整体限流
  • Client限流
  • EventRateLimit, 限制event
  • APF,更细力度的限制配置

1.1 MaxInFlightLimit限流

  • apiserver默认可设置最大并发量(集群级别,区分只读与修改操作)
  • 通过参数–max-requests-inflight代表只读请求
  • –max-mutating-requests-inflight代表修改请求
  • 可以简单实现限流。

1.1.1 源码解读

  • 入口 GenericAPIServer.New中的添加hook

     	// FlowControl为nil ,代表未启用 APF,API 服务器中的整体并发量将受到 kube-apiserver 的参数 --max-requests-inflight 和 --max-mutating-requests-inflight 的限制。
    	if c.FlowControl != nil {
    		const priorityAndFairnessFilterHookName = "priority-and-fairness-filter"
    		if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) {
    			err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error {
    				genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh)
    				return nil
    			})
    			if err != nil {
    				return nil, err
    			}
    		}
    	} else {
    		const maxInFlightFilterHookName = "max-in-flight-filter"
    		if !s.isPostStartHookRegistered(maxInFlightFilterHookName) {
    			err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error {
    				genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh)
    				return nil
    			})
    			if err != nil {
    				return nil, err
    			}
    		}
    	}
    
    
    // StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight
    // requests.
    func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) {
    	startWatermarkMaintenance(watermark, stopCh)
    }
    
    
    // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
    func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) {
    	// 定期更新inflight使用指标
    	go wait.Until(func() {
    		watermark.lock.Lock()
    		readOnlyWatermark := watermark.readOnlyWatermark
    		mutatingWatermark := watermark.mutatingWatermark
    		watermark.readOnlyWatermark = 0
    		watermark.mutatingWatermark = 0
    		watermark.lock.Unlock()
    
    		metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
    	}, inflightUsageMetricUpdatePeriod, stopCh)
      // 定期观察watermarks。这样做是为了确保他们不会落后太多。当他们
      //落后太多时,在响应接收到的下一个请求时会有很长的延迟,而观察者
      //会赶上来。
       go wait.Until(func() {
          watermark.readOnlyObserver.Add(0)
          watermark.mutatingObserver.Add(0)
      }, observationMaintenancePeriod, stopCh)
    }
    
    
  • WithMaxInFlightLimit代表限流处理函数

调用入口: staging\src\k8s.io\apiserver\pkg\server\config.go

DefaultBuildHandlerChain中,判断FlowControl为nil就开启WithMaxInFlightLimit,

if c.FlowControl != nil {
        requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)
        handler = filterlatency.TrackCompleted(handler)
        handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
        handler = filterlatency.TrackStarted(handler, "priorityandfairness")
    } else {
        handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
    }



func WithMaxInFlightLimit(
	handler http.Handler,
	nonMutatingLimit int,
	mutatingLimit int,
	longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
	// 如果limit num为0就不开启限流了
	if nonMutatingLimit == 0 && mutatingLimit == 0 {
		return handler
	}
	var nonMutatingChan chan bool
	var mutatingChan chan bool
	// 构造限流的chan,类型为长度=limit的 bool chan
	if nonMutatingLimit != 0 {
		nonMutatingChan = make(chan bool, nonMutatingLimit)
		klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
	} else {
		klog.V(2).InfoS("Running with nil nonMutatingChan")
	}
	if mutatingLimit != 0 {
		mutatingChan = make(chan bool, mutatingLimit)
		klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
	} else {
		klog.V(2).InfoS("Running with nil mutatingChan")
	}
	initMaxInFlight(nonMutatingLimit, mutatingLimit)
	// 发起请求
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		ctx := r.Context()
		requestInfo, ok := apirequest.RequestInfoFrom(ctx)
		if !ok {
			handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
			return
		}

		// 检查是否是长时间运行的请求
		if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
			handler.ServeHTTP(w, r)
			return
		}

	 。。。。。。。。
}



// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request, requestInfo *RequestInfo) bool

使用BasicLongRunningRequestCheck检查是否是watch或者pprof debug等长时间运行的请求,因为这些请求不受限制,位置

func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) apirequest.LongRunningRequestCheck {
	return func(r *http.Request, requestInfo *apirequest.RequestInfo) bool {
		if longRunningVerbs.Has(requestInfo.Verb) {
			return true
		}
		if requestInfo.IsResourceRequest && longRunningSubresources.Has(requestInfo.Subresource) {
			return true
		}
		if !requestInfo.IsResourceRequest && strings.HasPrefix(requestInfo.Path, "/debug/pprof/") {
			return true
		}
		return false
	}
}

检查是只读操作还是修改操作,决定使用哪个chan限制

		var c chan bool
		isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
		if isMutatingRequest {
			c = mutatingChan
		} else {
			c = nonMutatingChan
		}

如果队列未满,有空位置,则更新排队数字

  • 使用select 向c中写入true,如果能写入到说明队列未满
  • 记录下对应的指标
		select {
			case c <- true:
				// We note the concurrency level both while the
				// request is being served and after it is done being
				// served, because both states contribute to the
				// sampled stats on concurrency.
				if isMutatingRequest {
					watermark.recordMutating(len(c))
				} else {
					watermark.recordReadOnly(len(c))
				}
				// default代表队列已满
				defer func() {
					<-c
					if isMutatingRequest {
						watermark.recordMutating(len(c))
					} else {
						watermark.recordReadOnly(len(c))
					}
				}()
				handler.ServeHTTP(w, r)

但是如果请求的group中含有 system:masters,则放行, 因为apiserver认为这个组是很重要的请求,不能被限流.

  • group=system:masters 对应的clusterRole 为cluster-admin, 队列已满,如果请求的group中没有 system:masters,则返回http 429错误,并且丢弃请求
				// at this point we're about to return a 429, BUT not all actors should be rate limited.  A system:master is so powerful
				// that they should always get an answer.  It's a super-admin or a loopback connection.
				if currUser, ok := apirequest.UserFrom(ctx); ok {
					for _, group := range currUser.GetGroups() {
						if group == user.SystemPrivilegedGroup {
							handler.ServeHTTP(w, r)
							return
						}
					}
				}

  • http 429 代表当前有太多请求了,请重试,并设置 response 的header Retry-After =1
// We need to split this data between buckets used for throttling.
				metrics.RecordDroppedRequest(r, requestInfo, metrics.APIServerComponent, isMutatingRequest)
				metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
				tooManyRequests(r, w)

func tooManyRequests(req *http.Request, w http.ResponseWriter) {
	// Return a 429 status indicating "Too Many Requests"
	w.Header().Set("Retry-After", retryAfter)
	http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}

1.2 Client限流

client-go默认的qps为5,但是只支持客户端限流,只能由各个发起端限制

  • 集群管理员无法控制用户行为。

1.3 EventRateLimit

  • EventRateLimit在1.13之后支持,只限制event请求
  • 集成在apiserver内部webhoook中
  • 可配置某个用户、namespace、server等event操作限制,通过webhook形式实现。

集群管理员可以通过以下方式指定事件速率限制:

  • 启用 EventRateLimit 准入控制器;

  • 在通过 API 服务器的命令行标志 --admission-control-config-file 设置的文件中, 引用 EventRateLimit 配置文件:

    apiVersion: apiserver.config.k8s.io/v1
    kind: AdmissionConfiguration
    plugins:
      - name: EventRateLimit
        path: eventconfig.yaml
    ...
    

    可以在配置中指定的限制有四种类型:

    • Server:API 服务器收到的所有(创建或修改)Event 请求共享一个桶。
    • Namespace:每个名字空间都对应一个专用的桶。
    • User:为每个用户分配一个桶。
    • SourceAndObject:根据事件的来源和涉及对象的各种组合分配桶。

    eventconfig.yaml 示例

    apiVersion: eventratelimit.admission.k8s.io/v1alpha1
    kind: Configuration
    limits:
      - type: Namespace
        qps: 50
        burst: 100
        cacheSize: 2000
      - type: User
        qps: 10
        burst: 50
    

原理

  • 具体原理可以参考提案,每个eventratelimit 配置使用一个单独的令牌桶限速器
  • 每次event操作,遍历每个匹配的限速器检查是否能获取令牌,如果可以允许请求,否则返回429。

优点

  • 实现简单,允许一定量的并发
  • 可支持server/namespace/user等级别的限流

缺点

  • 仅支持event,通过webhook实现只能拦截修改类请求

  • 所有namespace的限流相同,没有优先级

参考文档:https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/admission-controllers/#eventratelimit

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐