简介

之前介绍过sigs.k8s.io controller-runtime系列之五 cache分析[sigs.k8s.io controller-runtime-cache] 。
本文主要介绍pkg/webhook的源码分析。

目录结构

  1. alias.go 为webhook常用到的obj定义别名
  2. server.go
    • Server结构体
    // Server 是一个准入 webhook 服务器,可以为流量提供服务, 生成相关的 k8s 资源进行部署。
    type Server struct {
    	// 用于监听的地址,默认是 ""代表全部地址.
    	Host string
    
    	// server的端口
    	Port int
    
    	// 包含服务器密钥和证书的目录。
    	CertDir string
    
    	// 服务器证书名称。默认为 tls.crt。
    	CertName string
    
    	// 服务器密钥名称。默认为 tls.key。
    	KeyName string
    
    	// 服务器用来验证远程(客户端)证书的 CA 证书名称。
    	// 默认是"", 这意味着服务器不验证客户端的证书。
    	ClientCAName string
    
    	// 处理不同 Webhook 的多路复用器
    	WebhookMux *http.ServeMux
    
    	// 踪所有已注册的 webhooks 以进行依赖注入,并在重复注册时抛异常
    	webhooks map[string]http.Handler
    
    	// 允许从外部源注入依赖项
    	setFields inject.Func
    
    	// 确保默认字段只设置一次。
    	defaultingOnce sync.Once
    
    	// 保护对 Webhook map和start、register等设置的访问
    	mu sync.Mutex
    }
    
    • setDefaults函数 设置默认属性
    func (s *Server) setDefaults() {
    	s.webhooks = map[string]http.Handler{}
    	if s.WebhookMux == nil {
    		s.WebhookMux = http.NewServeMux()
    	}
    
    	if s.Port <= 0 {
    		s.Port = DefaultPort
    	}
    
    	if len(s.CertDir) == 0 {
    		s.CertDir = filepath.Join(os.TempDir(), "k8s-webhook-server", "serving-certs")
    	}
    
    	if len(s.CertName) == 0 {
    		s.CertName = "tls.crt"
    	}
    
    	if len(s.KeyName) == 0 {
    		s.KeyName = "tls.key"
    	}
    }
    
    • Register函数
    // 将给定的 webhook 标记为在给定的路径上提供服务(相当于java中注册了过滤器).
    func (s *Server) Register(path string, hook http.Handler) {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
        // 确保只执行一次赋值默认属性操作
    	s.defaultingOnce.Do(s.setDefaults)
    	_, found := s.webhooks[path]
    	if found {
    		panic(fmt.Errorf("can't register duplicate path: %v", path))
    	}
    	
    	s.webhooks[path] = hook
        // 分两步
        // instrumentedHook构建一个产生prometheus指标(包含Histogram、Counter、Gauge三种类型)的http.HandlerFunc
        // 将产生的http.HandlerFunc作为注册到http调用path的链路处理器
    	s.WebhookMux.Handle(path, instrumentedHook(path, hook))
    
    	regLog := log.WithValues("path", path)
    	regLog.Info("registering webhook")
    
    	if s.setFields != nil {
    		if err := s.setFields(hook); err != nil {
    			regLog.Error(err, "unable to inject fields into webhook during registration")
    		}
    
    		baseHookLog := log.WithName("webhooks")
    
    		if _, err := inject.LoggerInto(baseHookLog.WithValues("webhook", path), hook); err != nil {
    			regLog.Error(err, "unable to logger into webhook during registration")
    		}
    	}
    }
    
    // 在给定的 webhook 之上添加了一些prothemus指标。
    func instrumentedHook(path string, hookRaw http.Handler) http.Handler {
        // 创建一个map
    	lbl := prometheus.Labels{"webhook": path}
    
        //普罗米修斯中有四大类指标(Summary、Histogram、Counter、Gauge),默认实现了后三个(详情见internal/metrics/metrics.go分解)
    	// Histogram类型的指标
        lat := metrics.RequestLatency.MustCurryWith(lbl)
        // Counter类型的指标
    	cnt := metrics.RequestTotal.MustCurryWith(lbl)
        // Gauge类型的指标
    	gge := metrics.RequestInFlight.With(lbl)
    
    	// Initialize the most likely HTTP status codes.
    	cnt.WithLabelValues("200")
    	cnt.WithLabelValues("500")
    
    	return promhttp.InstrumentHandlerDuration(
    		lat,
    		promhttp.InstrumentHandlerCounter(
    			cnt,
    			promhttp.InstrumentHandlerInFlight(gge, hookRaw),
    		),
    	)
    }
    
    • Start函数 启动webhook server, 它将根据服务器配置安装 webhook 相关资源
    func (s *Server) Start(ctx context.Context) error {
    s.defaultingOnce.Do(s.setDefaults)
    
    baseHookLog := log.WithName("webhooks")
    baseHookLog.Info("starting webhook server")
    
    //加载证书
    certPath := filepath.Join(s.CertDir, s.CertName)
    keyPath := filepath.Join(s.CertDir, s.KeyName)
    
    certWatcher, err := certwatcher.New(certPath, keyPath)
    if err != nil {
    	return err
    }
    
    // for和select实现的wathcer  监听证书是否有改变
    go func() {
    	if err := certWatcher.Start(ctx); err != nil {
    		log.Error(err, "certificate watcher error")
    	}
    }()
    
    cfg := &tls.Config{
    	NextProtos:     []string{"h2"},
    	GetCertificate: certWatcher.GetCertificate,
    }
    
    // 加载 CA 以验证客户端证书
    if s.ClientCAName != "" {
    	certPool := x509.NewCertPool()
    	clientCABytes, err := ioutil.ReadFile(filepath.Join(s.CertDir, s.ClientCAName))
    	if err != nil {
    		return fmt.Errorf("failed to read client CA cert: %v", err)
    	}
    
    	ok := certPool.AppendCertsFromPEM(clientCABytes)
    	if !ok {
    		return fmt.Errorf("failed to append client CA cert to CA pool")
    	}
    
    	cfg.ClientCAs = certPool
    	cfg.ClientAuth = tls.RequireAndVerifyClientCert
    }
    
    listener, err := tls.Listen("tcp", net.JoinHostPort(s.Host, strconv.Itoa(int(s.Port))), cfg)
    if err != nil {
    	return err
    }
    
    log.Info("serving webhook server", "host", s.Host, "port", s.Port)
    
    srv := &http.Server{
    	Handler: s.WebhookMux,
    }
    
    idleConnsClosed := make(chan struct{})
    go func() {
    	<-ctx.Done()
    	log.Info("shutting down webhook server")
    
    	// TODO: use a context with reasonable timeout
    	if err := srv.Shutdown(context.Background()); err != nil {
    		// Error from closing listeners, or context timeout
    		log.Error(err, "error shutting down the HTTP server")
    	}
    	close(idleConnsClosed)
    }()
    
    if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed {
    	return err
    }
    
    <-idleConnsClosed
    return nil
    }
    
  3. internal/metrics/metrics.go
    • controller-runtime 关于webhook注册普罗米修斯的metric(三种类型),后续我们也是从这三种类型对应的map(包含了指标name和labels)中获得指标数据
    var (
    	// RequestLatency 是一个普罗米修斯指标,它是处理准入请求的延迟的直方图
    	RequestLatency = prometheus.NewHistogramVec(
    		prometheus.HistogramOpts{
    			Name: "controller_runtime_webhook_latency_seconds",
    			Help: "Histogram of the latency of processing admission requests",
    		},
    		[]string{"webhook"},//labels
    	)
    
    	//RequestTotal 是一个普罗米修斯指标,它是已处理的准入请求总数的计数器
    	RequestTotal = func() *prometheus.CounterVec {
    		return prometheus.NewCounterVec(
    			prometheus.CounterOpts{
    				Name: "controller_runtime_webhook_requests_total",
    				Help: "Total number of admission requests by HTTP status code.",
    			},
    			[]string{"webhook", "code"},
    		)
    	}()
    
    	// RequestInFlight 是一个普罗米修斯指标,它是对飞行中准入请求的衡量标准。
    	RequestInFlight = func() *prometheus.GaugeVec {
    		return prometheus.NewGaugeVec(
    			prometheus.GaugeOpts{
    				Name: "controller_runtime_webhook_requests_in_flight",
    				Help: "Current number of admission requests being served.",
    			},
    			[]string{"webhook"},
    		)
    	}()
    )
    
    func init() {
        //使用metrics.Registry(全局唯一【整个controller-runtime】)
        // 注册器注册三种类型的指标
    	metrics.Registry.MustRegister(RequestLatency, RequestTotal, RequestInFlight)
    }
    
  4. admission/webhook.go
    • Request结构体 请求定义了准入处理程序的输入。它包含标识问题中的对象的信息(组、版本、种类、资源、子资源、名称、命名空间),
      以及有问题的操作(例如获取、创建等),以及对象本身。
    type Request struct {
        // k8s中的Request结构体 
    	admissionv1.AdmissionRequest
    }
    
    • Response结构体 响应是准入处理程序的输出。
      它包含一个响应,指示是否允许给定操作,以及一组补丁在发生变异准入处理程序的情况下变异对象。
    type Response struct {
    // 用于改变 webhook 的 JSON 补丁。 
    // 使用它而不是设置 Response.Patch (其实是admissionv1.AdmissionResponse.Patch)来最小化序列化和反序列化的开销。 
    // 此处设置的补丁将覆盖响应(admissionv1.AdmissionResponse)中的任何补丁, 
    // 如果您想直接设置补丁响应(admissionv1.AdmissionResponse),请将其留空。
    Patches []jsonpatch.JsonPatchOperation
    
    // AdmissionResponse 是原始的准入响应。 其中的 Patch 字段将被上面的Patches覆盖。
    admissionv1.AdmissionResponse
    }
    
    • Complete函数 填充尚未在admissionv1.AdmissionResponse 中设置的任何字段,它会改变响应。(也就是设置默认字段值)
    func (r *Response) Complete(req Request) error {
    	r.UID = req.UID
    
    	if r.Result == nil {
    		r.Result = &metav1.Status{}
    	}
    	if r.Result.Code == 0 {
    		r.Result.Code = http.StatusOK
    	}
    	
    	if len(r.Patches) == 0 {
    		return nil
    	}
    
    	var err error
    	r.Patch, err = json.Marshal(r.Patches)
    	if err != nil {
    		return err
    	}
    	patchType := admissionv1.PatchTypeJSONPatch
    	r.PatchType = &patchType
    
    	return nil
    }
    
    • Handler接口 处理AdmissionRequest
      type Handler interface {
      	// 处理request并相应response
      	Handle(context.Context, Request) Response
      }
      
    • Webhook结构体 代表每个单独的 Webhook。
    type Webhook struct {
    	// 处理程序实际上处理一个准入请求,返回它是被允许还是被拒绝,并可能修补request的obj。
    	Handler Handler
    
    	// 将允许您获取 http.Request.Context并添加任何其他信息,例如传递请求路径或header,从而允许您从处理程序中读取它们
    	WithContextFunc func(context.Context, *http.Request) context.Context
    
    	// 解码器在接收scheme时构建并传递给处理程序,用来获取request的数据
    	decoder *Decoder
    
    	log logr.Logger
    }
    
    
    • Handle函数 处理AdmissionRequest 和Handler的Handle有区别,包含了Handler的Handle,并对resp做了default处理
      如果 webhook 是 mutating 类型,它会将 AdmissionRequest 委托给每个处理程序并合并
      如果 webhook 是验证类型,它会将 AdmissionRequest 委托给每个处理程序,并且如果有一个拒绝,则拒绝该请求
    func (w *Webhook) Handle(ctx context.Context, req Request) Response {
    	resp := w.Handler.Handle(ctx, req)
    	if err := resp.Complete(req); err != nil {
    		w.log.Error(err, "unable to encode response")
    		return Errored(http.StatusInternalServerError, errUnableToEncodeResponse)
    	}
    
    	return resp
    }
    
    • InjectScheme函数 InjectScheme 向 webhook 中注入一个scheme,以构建一个解码器。
    func (w *Webhook) InjectScheme(s *runtime.Scheme) error {
    	
    	var err error
    	w.decoder, err = NewDecoder(s)
    	if err != nil {
    		return err
    	}
    
    	if w.Handler != nil {
    		if _, err := InjectDecoderInto(w.GetDecoder(), w.Handler); err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    • InjectFunc函数 向webhook注入一些field
    func (w *Webhook) InjectFunc(f inject.Func) error {
    	// 1.执行f  2.为w.Handler注入生成的setFields 3.为w.Handler注入decoder
    	var setFields inject.Func
    	setFields = func(target interface{}) error {
    		if err := f(target); err != nil {
    			return err
    		}
    
    		if _, err := inject.InjectorInto(setFields, target); err != nil {
    			return err
    		}
    
    		if _, err := InjectDecoderInto(w.GetDecoder(), target); err != nil {
    			return err
    		}
    
    		return nil
    	}
    
    	return setFields(w.Handler)
    }
    
  5. admission/inject.go
    • DecoderInjector结构体 ControllerManager用于把decoder注入到webhook handlers中
    type DecoderInjector interface {
    	InjectDecoder(*Decoder) error
    }
    
    • InjectDecoderInto函数 承接实现了DecoderInjector的结构体,注入特定的decoder
    func InjectDecoderInto(decoder *Decoder, i interface{}) (bool, error) {
    	if s, ok := i.(DecoderInjector); ok {
    		return true, s.InjectDecoder(decoder)
    	}
    	return false, nil
    }
    
  6. admission/decode.go
    • Decoder结构体 解码器知道如何解码admission request的内容请求,变成一个具体的对象。
    type Decoder struct {
    	codecs serializer.CodecFactory
    }
    
    • NewDecoder函数 根据scheme创建一个decoder
    func NewDecoder(scheme *runtime.Scheme) (*Decoder, error) {
    	return &Decoder{codecs: serializer.NewCodecFactory(scheme)}, nil
    }
    
    • Decode函数
      Decode将AdmissionRequest中的内联对象解码到传入的runtime.object中。如果要解码AdmissionRequest中的旧对象,请使用DecodeRaw。
    func (d *Decoder) Decode(req Request, into runtime.Object) error {
    	// 如果为空  抛异常
    	if len(req.Object.Raw) == 0 {
    		return fmt.Errorf("there is no content to decode")
    	}
    	return d.DecodeRaw(req.Object, into)
    }
    
    // 将内联对象解码到传入的runtime.object中
    func (d *Decoder) DecodeRaw(rawObj runtime.RawExtension, into runtime.Object) error {
    	// NB(directxman12): there's a bug/weird interaction between decoders and
    	// the API server where the API server doesn't send a GVK on the embedded
    	// objects, which means the unstructured decoder refuses to decode.  It
    	// also means we can't pass the unstructured directly in, since it'll try
    	// and call unstructured's special Unmarshal implementation, which calls
    	// back into that same decoder :-/
    	// See kubernetes/kubernetes#74373.
    
    	// 如果rawObj.Raw为空 抛异常
    	if len(rawObj.Raw) == 0 {
    		return fmt.Errorf("there is no content to decode")
    	}
    
        // 为了解决k8s的bug(kubernetes/kubernetes#74373) 如果是Unstructured类型,k8s源码中的解码器会返回nil,所以下面做特殊处理
        // 判断是否是Unstructured类型
    	if unstructuredInto, isUnstructured := into.(*unstructured.Unstructured); isUnstructured {
            // 解组到非结构化的底层对象以避免调用解码器
    		if err := json.Unmarshal(rawObj.Raw, &unstructuredInto.Object); err != nil {
    			return err
    		}
    
    		return nil
    	}
    
        // 获取解码器slice
    	deserializer := d.codecs.UniversalDeserializer()
        // 解码rawObj.Raw到into
    	return runtime.DecodeInto(deserializer, rawObj.Raw, into)
    }
    
  7. admission/response.go
    • Allowed函数
    // Allowed构造一个响应,指示给定的操作允许(没有任何补丁)。 
    func Allowed(reason string) Response {
    	return ValidationResponse(true, reason)
    }
    
    • Denied函数 构造一个响应,指示给定的操作不被允许
    func Denied(reason string) Response {
    	return ValidationResponse(false, reason)
    }
    
    • Patched函数 构造一个响应,指示给定的操作允许,并且目标对象将被给定的patches修改
    func Patched(reason string, patches ...jsonpatch.JsonPatchOperation) Response {
    	resp := Allowed(reason)
    	resp.Patches = patches
    
    	return resp
    }
    
    • Errored函数 为错误的处理请求创建新的响应。
    func Errored(code int32, err error) Response {
    	return Response{
    		AdmissionResponse: admissionv1.AdmissionResponse{
    			Allowed: false,
    			Result: &metav1.Status{
    				Code:    code,
    				Message: err.Error(),
    			},
    		},
    	}
    }
    
    • ValidationResponse函数 回用于接纳请求的响应。
    func ValidationResponse(allowed bool, reason string) Response {
    	code := http.StatusForbidden
    	if allowed {
    		code = http.StatusOK
    	}
    	resp := Response{
    		AdmissionResponse: admissionv1.AdmissionResponse{
    			Allowed: allowed,
    			Result: &metav1.Status{
    				Code: int32(code),
    			},
    		},
    	}
    	if len(reason) > 0 {
    		resp.Result.Reason = metav1.StatusReason(reason)
    	}
    	return resp
    }
    
    • PatchResponseFromRaw函数 根据原始和现在的数据创建patch,并相应response
    // 根据original和current来创建不同类型的patch,并响应
    func PatchResponseFromRaw(original, current []byte) Response {
        // 创建空、add、replace类型的patch
    	patches, err := jsonpatch.CreatePatch(original, current)
    	if err != nil {
    		return Errored(http.StatusInternalServerError, err)
    	}
    	return Response{
    		Patches: patches,
    		AdmissionResponse: admissionv1.AdmissionResponse{
    			Allowed: true,
    			PatchType: func() *admissionv1.PatchType {
    				if len(patches) == 0 {
    					return nil
    				}
    				pt := admissionv1.PatchTypeJSONPatch
    				return &pt
    			}(),
    		},
    	}
    }
    
    • validationResponseFromStatus函数
    func validationResponseFromStatus(allowed bool, status metav1.Status) Response {
    	resp := Response{
    		AdmissionResponse: admissionv1.AdmissionResponse{
    			Allowed: allowed,
    			Result:  &status,
    		},
    	}
    	return resp
    }
    
  8. admission/http.go
    • 定义变量和init
    var admissionScheme = runtime.NewScheme()
    var admissionCodecs = serializer.NewCodecFactory(admissionScheme)
    
    func init() {
        // v1版本
    	utilruntime.Must(v1.AddToScheme(admissionScheme))
        // v1beta1版本
    	utilruntime.Must(v1beta1.AddToScheme(admissionScheme))
    }
    // 类型约定 比如判断某个struct是否实现接口的方法 
    var _ http.Handler = &Webhook{}
    
    • ServeHTTP函数 实现了http.Handler接口
    func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    	var body []byte
    	var err error
        // 如果r中存在ctx则返回,否则创建新的
    	ctx := r.Context()
    	if wh.WithContextFunc != nil {
            // 执行WithContextFunc方法  一般是给ctx赋值
    		ctx = wh.WithContextFunc(ctx, r)
    	}
    
    	var reviewResponse Response
        // 验证r.Body不能为空
    	if r.Body != nil {
    		if body, err = ioutil.ReadAll(r.Body); err != nil {
    			wh.log.Error(err, "unable to read the body from the incoming request")
    			reviewResponse = Errored(http.StatusBadRequest, err)
    			wh.writeResponse(w, reviewResponse)
    			return
    		}
    	} else {
    		err = errors.New("request body is empty")
    		wh.log.Error(err, "bad request")
    		reviewResponse = Errored(http.StatusBadRequest, err)
    		wh.writeResponse(w, reviewResponse)
    		return
    	}
    
    	// 验证Content-Type是否准确
    	contentType := r.Header.Get("Content-Type")
    	if contentType != "application/json" {
    		err = fmt.Errorf("contentType=%s, expected application/json", contentType)
    		wh.log.Error(err, "unable to process a request with an unknown content type", "content type", contentType)
    		reviewResponse = Errored(http.StatusBadRequest, err)
    		wh.writeResponse(w, reviewResponse)
    		return
    	}
    
    	// v1和v1beta1 AdmissionReview类型完全相同,因此v1beta1类型可以
        // 被解码成v1类型。不过,如果未设置对象的TypeMeta,运行时编解码器的解码器会猜测要使用哪种类型,则按类型名解码。
        // 通过设置未注册的类型到v1 GVK,解码器将强制v1beta1 AdmissionReview到v1。
        // 实际的AdmissionReview GVK将被用来写一个类型化的回应,以防
        // webhook config允许多个版本,否则此响应将失败。
    	req := Request{}
    	ar := unversionedAdmissionReview{}
    	// avoid an extra copy
    	ar.Request = &req.AdmissionRequest
    	ar.SetGroupVersionKind(v1.SchemeGroupVersion.WithKind("AdmissionReview"))
    	_, actualAdmRevGVK, err := admissionCodecs.UniversalDeserializer().Decode(body, nil, &ar)
    	if err != nil {
    		wh.log.Error(err, "unable to decode the request")
    		reviewResponse = Errored(http.StatusBadRequest, err)
    		wh.writeResponse(w, reviewResponse)
    		return
    	}
    	wh.log.V(1).Info("received request", "UID", req.UID, "kind", req.Kind, "resource", req.Resource)
    
    	reviewResponse = wh.Handle(ctx, req)
    	wh.writeResponseTyped(w, reviewResponse, actualAdmRevGVK)
    }
    
    • writeAdmissionResponse函数 将ar写入到w中
    func (wh *Webhook) writeAdmissionResponse(w io.Writer, ar v1.AdmissionReview) {
        // 先根据w获取新的encoder,然后编码ar到w中
    	err := json.NewEncoder(w).Encode(ar)
        // 如果有异常,重新将异常信息写入到w中
    	if err != nil {
    		wh.log.Error(err, "unable to encode the response")
    		wh.writeResponse(w, Errored(http.StatusInternalServerError, err))
    	} else {
    		res := ar.Response
    		if log := wh.log; log.V(1).Enabled() {
    			if res.Result != nil {
    				log = log.WithValues("code", res.Result.Code, "reason", res.Result.Reason)
    			}
    			log.V(1).Info("wrote response", "UID", res.UID, "allowed", res.Allowed)
    		}
    	}
    }
    
  9. admission/multi.go
    • Handle函数
    type multiMutating []Handler
    
    func (hs multiMutating) Handle(ctx context.Context, req Request) Response {
    	patches := []jsonpatch.JsonPatchOperation{}
     // 遍历多个Handler
    	for _, handler := range hs {
         // 执行handler.Handle
    		resp := handler.Handle(ctx, req)
         // 判断resp是否允许,如果true,继续执行,否则返回response
    		if !resp.Allowed {
    			return resp
    		}
         // 如果patchType不为空切不等于JSONPatch,异常response
    		if resp.PatchType != nil && *resp.PatchType != admissionv1.PatchTypeJSONPatch {
    			return Errored(http.StatusInternalServerError,
    				fmt.Errorf("unexpected patch type returned by the handler: %v, only allow: %v",
    					resp.PatchType, admissionv1.PatchTypeJSONPatch))
    		}
         // 追加resp.Patches到patches
    		patches = append(patches, resp.Patches...)
    	}
    	var err error
     // JsonPatchOperation(patch的相关数据)数组,转化为byte[]
    	marshaledPatch, err := json.Marshal(patches)
    	if err != nil {
    		return Errored(http.StatusBadRequest, fmt.Errorf("error when marshaling the patch: %w", err))
    	}
    	return Response{
    		AdmissionResponse: admissionv1.AdmissionResponse{
    			Allowed: true,
    			Result: &metav1.Status{
    				Code: http.StatusOK,
    			},
    			Patch:     marshaledPatch,
    			PatchType: func() *admissionv1.PatchType { pt := admissionv1.PatchTypeJSONPatch; return &pt }(),
    		},
    	}
    }
    
    // 为hs中的handler注入对应的filed
    func (hs multiMutating) InjectFunc(f inject.Func) error {
    	for _, handler := range hs {
    		if err := f(handler); err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    // 构建多个mutating webhook handler组成的MultiMutatingHandler
    func MultiMutatingHandler(handlers ...Handler) Handler {
    	return multiMutating(handlers)
    }
    
  10. conversion包中的conversion.go
    • Webhook结构体 实现了一个CRD转换的Webhook HTTP处理程序
    type Webhook struct {
    	scheme  *runtime.Scheme
    	decoder *Decoder
    }
    
    // 注入decoder
    func (wh *Webhook) InjectScheme(s *runtime.Scheme) error {
    	var err error
    	wh.scheme = s
    	wh.decoder, err = NewDecoder(s)
    	if err != nil {
    		return err
    	}
    
    	return nil
    }
    
    • ServeHTTP函数
    func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
     // 新建一个类型转化的Review
    	convertReview := &apix.ConversionReview{}
     // 解码body体到convertReview(对于类型转换body类型固定就是ConversionReview,而对于admission就是AdmissionReview)
    	err := json.NewDecoder(r.Body).Decode(convertReview)
    	if err != nil {
    		log.Error(err, "failed to read conversion request")
    		w.WriteHeader(http.StatusBadRequest)
    		return
    	}
    
    	// 类型转化逻辑
    	resp, err := wh.handleConvertRequest(convertReview.Request)
    	if err != nil {
    		log.Error(err, "failed to convert", "request", convertReview.Request.UID)
    		convertReview.Response = errored(err)
    	} else {
    		convertReview.Response = resp
    	}
    	convertReview.Response.UID = convertReview.Request.UID
     // 注意每次处理后convertReview.Request要置为nil  
    	convertReview.Request = nil
     
     // 把convertReview写入w,用于传递给下一个webhook
    	err = json.NewEncoder(w).Encode(convertReview)
    	if err != nil {
    		log.Error(err, "failed to write response")
    		return
    	}
    }
    
    // 处理类型转化
    func (wh *Webhook) handleConvertRequest(req *apix.ConversionRequest) (*apix.ConversionResponse, error) {
    	if req == nil {
    		return nil, fmt.Errorf("conversion request is nil")
    	}
    	var objects []runtime.RawExtension
     // 遍历将要类型转化的用户资源byte[]
    	for _, obj := range req.Objects {
         // 解码用户资源的字节原始数据,返回src: 资源对象  gvk: group/version/kind
    		src, gvk, err := wh.decoder.Decode(obj.Raw)
    		if err != nil {
    			return nil, err
    		}
         
         // 给定gvk返回一个对象实体
    		dst, err := wh.allocateDstObject(req.DesiredAPIVersion, gvk.Kind)
    		if err != nil {
    			return nil, err
    		}
         // 转化资源对象到目标对象 
    		err = wh.convertObject(src, dst)
    		if err != nil {
    			return nil, err
    		}
    		objects = append(objects, runtime.RawExtension{Object: dst})
    	}
    	return &apix.ConversionResponse{
    		UID:              req.UID,
    		ConvertedObjects: objects,
    		Result: metav1.Status{
    			Status: metav1.StatusSuccess,
    		},
    	}, nil
    }
    
    // 给定gvk返回一个对象实体
    func (wh *Webhook) allocateDstObject(apiVersion, kind string) (runtime.Object, error) {
     // 获取gvk
    	gvk := schema.FromAPIVersionAndKind(apiVersion, kind)
     
     // 根据scheme和gvk新建一个对象实体
    	obj, err := wh.scheme.New(gvk)
    	if err != nil {
    		return obj, err
    	}
     
     // TypeAccessor返回一个接口,该接口允许检索和修改一种内存中API版本的内部对象。
     // TODO:此接口用于测试往返中没有ObjectMeta或ListMeta的代码(可以使用apiVersion/kind但不符合Kube api约定的对象)。
    	t, err := meta.TypeAccessor(obj)
    	if err != nil {
    		return obj, err
    	}
    
    	t.SetAPIVersion(apiVersion)
    	t.SetKind(kind)
    
    	return obj, nil
    }
    
    // 转化资源对象到目标对象
    func (wh *Webhook) convertObject(src, dst runtime.Object) error {
     // 获取源和目标对象的gvk
    	srcGVK := src.GetObjectKind().GroupVersionKind()
    	dstGVK := dst.GetObjectKind().GroupVersionKind()
    
     // 判断源和目标的gk是否相等
    	if srcGVK.GroupKind() != dstGVK.GroupKind() {
    		return fmt.Errorf("src %T and dst %T does not belong to same API Group", src, dst)
    	}
     
     // 判断源和目标的gvk是否相等
    	if srcGVK == dstGVK {
    		return fmt.Errorf("conversion is not allowed between same type %T", src)
    	}
     
     // 判断源和目标对象是否是Hub(集线)或者Convertible
    	srcIsHub, dstIsHub := isHub(src), isHub(dst)
    	srcIsConvertible, dstIsConvertible := isConvertible(src), isConvertible(dst)
    
    	switch { 
     // 如果源是Hub类型和目标对象是Convertible类型
    	case srcIsHub && dstIsConvertible:
    		return dst.(conversion.Convertible).ConvertFrom(src.(conversion.Hub))
     // 如果目标对象是Hub类型和源是Convertible类型
    	case dstIsHub && srcIsConvertible:
    		return src.(conversion.Convertible).ConvertTo(dst.(conversion.Hub))
     // 如果源和目标对象是Convertible类型
    	case srcIsConvertible && dstIsConvertible:
    		return wh.convertViaHub(src.(conversion.Convertible), dst.(conversion.Convertible))
     // 如果源和目标对象是Hub类型(为什么这里会抛异常? Hub类型的对象不能转化指定类型,只能根据自己的实现转化为特定的一种类型,
     // 想实现多种类型转化,应该实现Convertible接口)
    	default:
    		return fmt.Errorf("%T is not convertible to %T", src, dst)
    	}
    }
    
    // 源和目标对象都是Convertible类型的转化
    func (wh *Webhook) convertViaHub(src, dst conversion.Convertible) error {
     // 根据对象获取其所有的version下的Hub类型的实体对象
    	hub, err := wh.getHub(src)
    	if err != nil {
    		return err
    	}
    
    	if hub == nil {
    		return fmt.Errorf("%s does not have any Hub defined", src)
    	}
     // 源对象转化为Hub类型对象
    	err = src.ConvertTo(hub)
    	if err != nil {
    		return fmt.Errorf("%T failed to convert to hub version %T : %w", src, hub, err)
    	}
     // 将目标对象转化为Hub类型对象
    	err = dst.ConvertFrom(hub)
    	if err != nil {
    		return fmt.Errorf("%T failed to convert from hub version %T : %w", dst, hub, err)
    	}
    
    	return nil
    }
    
    // 根据对象获取其所有的version下的Hub类型的实体对象(如果存在多个实现的Hub接口,则抛异常)
    func (wh *Webhook) getHub(obj runtime.Object) (conversion.Hub, error) {
     // 根据scheme和obj获取obj的所有gvk
    	gvks, err := objectGVKs(wh.scheme, obj)
    	if err != nil {
    		return nil, err
    	}
    	if len(gvks) == 0 {
    		return nil, fmt.Errorf("error retrieving gvks for object : %v", obj)
    	}
    
    	var hub conversion.Hub
    	var hubFoundAlready bool
     // 遍历gvks获取其中最多一个实现了Hub类型的接口的gvk对象
    	for _, gvk := range gvks {
    		instance, err := wh.scheme.New(gvk)
    		if err != nil {
    			return nil, fmt.Errorf("failed to allocate an instance for gvk %v: %w", gvk, err)
    		}
    		if val, isHub := instance.(conversion.Hub); isHub {
    			if hubFoundAlready {
    				return nil, fmt.Errorf("multiple hub version defined for %T", obj)
    			}
    			hubFoundAlready = true
    			hub = val
    		}
    	}
    	return hub, nil
    }
    
    // 根据scheme和obj获取obj的所有gvk
    func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) ([]schema.GroupVersionKind, error) {
    	objGVKs, _, err := scheme.ObjectKinds(obj)
    	if err != nil {
    		return nil, err
    	}
    	if len(objGVKs) != 1 {
    		return nil, fmt.Errorf("expect to get only one GVK for %v", obj)
    	}
    	objGVK := objGVKs[0]
    	knownTypes := scheme.AllKnownTypes()
    
    	var gvks []schema.GroupVersionKind
    	for gvk := range knownTypes {
    		if objGVK.GroupKind() == gvk.GroupKind() {
    			gvks = append(gvks, gvk)
    		}
    	}
    	return gvks, nil
    }
    
    • IsConvertible函数 判断给定obj在scheme中是否是可以转化的
    // 对于要进行转换的类型,需要定义一个中心类型,并且所有非中心类型必须能够转换为中心类型或从中心类型转换
    func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) {
    	var hubs, spokes, nonSpokes []runtime.Object
     // 根据scheme和obj获取obj的所有gvk
    	gvks, err := objectGVKs(scheme, obj)
    	if err != nil {
    		return false, err
    	}
    	if len(gvks) == 0 {
    		return false, fmt.Errorf("error retrieving gvks for object : %v", obj)
    	}
    
    	for _, gvk := range gvks {
    		instance, err := scheme.New(gvk)
    		if err != nil {
    			return false, fmt.Errorf("failed to allocate an instance for gvk %v: %w", gvk, err)
    		}
         // 判断是否是Hub类型
    		if isHub(instance) {
             // 添加到hubs中
    			hubs = append(hubs, instance)
    			continue
    		}
         // 判断是否是Convertible类型
    		if !isConvertible(instance) {
             // 添加到nonSpokes(我理解的是error的)
    			nonSpokes = append(nonSpokes, instance)
    			continue
    		}
    
    		spokes = append(spokes, instance)
    	}
    
    	if len(gvks) == 1 {
    		return false, nil // single version
    	}
    
    	if len(hubs) == 0 && len(spokes) == 0 {
    		// 多个version的obj,但是没有verison convert的实现
    		return false, nil
    	}
    
    	if len(hubs) == 1 && len(nonSpokes) == 0 { // convertible
    		return true, nil
    	}
    
    	return false, PartialImplementationError{
    		hubs:      hubs,
    		nonSpokes: nonSpokes,
    		spokes:    spokes,
    	}
    }
    
    // 由于部分转换而导致的错误,实现如Hub类型,多个Hub类型。
    // PartialImplementationError represents an error due to partial conversion
    // implementation such as hub without spokes, multiple hubs or spokes without hub.
    type PartialImplementationError struct {
    	gvk       schema.GroupVersionKind
    	hubs      []runtime.Object
    	nonSpokes []runtime.Object
    	spokes    []runtime.Object
    }
    
  11. conversion包中的decoder.go
    • Decoder结构体
    // 定义个类型转化的Decoder
    type Decoder struct {
    	codecs serializer.CodecFactory
    } 
    
    • NewDecoder函数 根据scheme新建一个Decoder
    func NewDecoder(scheme *runtime.Scheme) (*Decoder, error) {
    	return &Decoder{codecs: serializer.NewCodecFactory(scheme)}, nil
    }
    
    • Decode函数 解码实体的byte[]
    func (d *Decoder) Decode(content []byte) (runtime.Object, *schema.GroupVersionKind, error) {
    	deserializer := d.codecs.UniversalDeserializer()
    	return deserializer.Decode(content, nil, nil)
    }
    
    func (d *Decoder) DecodeInto(content []byte, into runtime.Object) error {
    	deserializer := d.codecs.UniversalDeserializer()
    	return runtime.DecodeInto(deserializer, content, into)
    }
    
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐