sigs.k8s.io controller-runtime系列之六 webhook分析
简介之前介绍过sigs.k8s.io controller-runtime系列之五cache分析[sigs.k8s.io controller-runtime-cache] 。本文主要介绍pkg/webhook的源码分析。目录结构alias.go为webhook常用到的obj定义别名server.goServer结构体// Server 是一个准入 webhook 服务器,可以为流量提供服务, 生
·
简介
之前介绍过sigs.k8s.io controller-runtime系列之五 cache分析[sigs.k8s.io controller-runtime-cache] 。
本文主要介绍pkg/webhook的源码分析。
目录结构
- alias.go 为webhook常用到的obj定义别名
- server.go
- Server结构体
// Server 是一个准入 webhook 服务器,可以为流量提供服务, 生成相关的 k8s 资源进行部署。 type Server struct { // 用于监听的地址,默认是 ""代表全部地址. Host string // server的端口 Port int // 包含服务器密钥和证书的目录。 CertDir string // 服务器证书名称。默认为 tls.crt。 CertName string // 服务器密钥名称。默认为 tls.key。 KeyName string // 服务器用来验证远程(客户端)证书的 CA 证书名称。 // 默认是"", 这意味着服务器不验证客户端的证书。 ClientCAName string // 处理不同 Webhook 的多路复用器 WebhookMux *http.ServeMux // 踪所有已注册的 webhooks 以进行依赖注入,并在重复注册时抛异常 webhooks map[string]http.Handler // 允许从外部源注入依赖项 setFields inject.Func // 确保默认字段只设置一次。 defaultingOnce sync.Once // 保护对 Webhook map和start、register等设置的访问 mu sync.Mutex }
- setDefaults函数 设置默认属性
func (s *Server) setDefaults() { s.webhooks = map[string]http.Handler{} if s.WebhookMux == nil { s.WebhookMux = http.NewServeMux() } if s.Port <= 0 { s.Port = DefaultPort } if len(s.CertDir) == 0 { s.CertDir = filepath.Join(os.TempDir(), "k8s-webhook-server", "serving-certs") } if len(s.CertName) == 0 { s.CertName = "tls.crt" } if len(s.KeyName) == 0 { s.KeyName = "tls.key" } }
- Register函数
// 将给定的 webhook 标记为在给定的路径上提供服务(相当于java中注册了过滤器). func (s *Server) Register(path string, hook http.Handler) { s.mu.Lock() defer s.mu.Unlock() // 确保只执行一次赋值默认属性操作 s.defaultingOnce.Do(s.setDefaults) _, found := s.webhooks[path] if found { panic(fmt.Errorf("can't register duplicate path: %v", path)) } s.webhooks[path] = hook // 分两步 // instrumentedHook构建一个产生prometheus指标(包含Histogram、Counter、Gauge三种类型)的http.HandlerFunc // 将产生的http.HandlerFunc作为注册到http调用path的链路处理器 s.WebhookMux.Handle(path, instrumentedHook(path, hook)) regLog := log.WithValues("path", path) regLog.Info("registering webhook") if s.setFields != nil { if err := s.setFields(hook); err != nil { regLog.Error(err, "unable to inject fields into webhook during registration") } baseHookLog := log.WithName("webhooks") if _, err := inject.LoggerInto(baseHookLog.WithValues("webhook", path), hook); err != nil { regLog.Error(err, "unable to logger into webhook during registration") } } } // 在给定的 webhook 之上添加了一些prothemus指标。 func instrumentedHook(path string, hookRaw http.Handler) http.Handler { // 创建一个map lbl := prometheus.Labels{"webhook": path} //普罗米修斯中有四大类指标(Summary、Histogram、Counter、Gauge),默认实现了后三个(详情见internal/metrics/metrics.go分解) // Histogram类型的指标 lat := metrics.RequestLatency.MustCurryWith(lbl) // Counter类型的指标 cnt := metrics.RequestTotal.MustCurryWith(lbl) // Gauge类型的指标 gge := metrics.RequestInFlight.With(lbl) // Initialize the most likely HTTP status codes. cnt.WithLabelValues("200") cnt.WithLabelValues("500") return promhttp.InstrumentHandlerDuration( lat, promhttp.InstrumentHandlerCounter( cnt, promhttp.InstrumentHandlerInFlight(gge, hookRaw), ), ) }
- Start函数 启动webhook server, 它将根据服务器配置安装 webhook 相关资源
func (s *Server) Start(ctx context.Context) error { s.defaultingOnce.Do(s.setDefaults) baseHookLog := log.WithName("webhooks") baseHookLog.Info("starting webhook server") //加载证书 certPath := filepath.Join(s.CertDir, s.CertName) keyPath := filepath.Join(s.CertDir, s.KeyName) certWatcher, err := certwatcher.New(certPath, keyPath) if err != nil { return err } // for和select实现的wathcer 监听证书是否有改变 go func() { if err := certWatcher.Start(ctx); err != nil { log.Error(err, "certificate watcher error") } }() cfg := &tls.Config{ NextProtos: []string{"h2"}, GetCertificate: certWatcher.GetCertificate, } // 加载 CA 以验证客户端证书 if s.ClientCAName != "" { certPool := x509.NewCertPool() clientCABytes, err := ioutil.ReadFile(filepath.Join(s.CertDir, s.ClientCAName)) if err != nil { return fmt.Errorf("failed to read client CA cert: %v", err) } ok := certPool.AppendCertsFromPEM(clientCABytes) if !ok { return fmt.Errorf("failed to append client CA cert to CA pool") } cfg.ClientCAs = certPool cfg.ClientAuth = tls.RequireAndVerifyClientCert } listener, err := tls.Listen("tcp", net.JoinHostPort(s.Host, strconv.Itoa(int(s.Port))), cfg) if err != nil { return err } log.Info("serving webhook server", "host", s.Host, "port", s.Port) srv := &http.Server{ Handler: s.WebhookMux, } idleConnsClosed := make(chan struct{}) go func() { <-ctx.Done() log.Info("shutting down webhook server") // TODO: use a context with reasonable timeout if err := srv.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout log.Error(err, "error shutting down the HTTP server") } close(idleConnsClosed) }() if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed { return err } <-idleConnsClosed return nil }
- internal/metrics/metrics.go
- controller-runtime 关于webhook注册普罗米修斯的metric(三种类型),后续我们也是从这三种类型对应的map(包含了指标name和labels)中获得指标数据
var ( // RequestLatency 是一个普罗米修斯指标,它是处理准入请求的延迟的直方图 RequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "controller_runtime_webhook_latency_seconds", Help: "Histogram of the latency of processing admission requests", }, []string{"webhook"},//labels ) //RequestTotal 是一个普罗米修斯指标,它是已处理的准入请求总数的计数器 RequestTotal = func() *prometheus.CounterVec { return prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "controller_runtime_webhook_requests_total", Help: "Total number of admission requests by HTTP status code.", }, []string{"webhook", "code"}, ) }() // RequestInFlight 是一个普罗米修斯指标,它是对飞行中准入请求的衡量标准。 RequestInFlight = func() *prometheus.GaugeVec { return prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "controller_runtime_webhook_requests_in_flight", Help: "Current number of admission requests being served.", }, []string{"webhook"}, ) }() ) func init() { //使用metrics.Registry(全局唯一【整个controller-runtime】) // 注册器注册三种类型的指标 metrics.Registry.MustRegister(RequestLatency, RequestTotal, RequestInFlight) }
- admission/webhook.go
- Request结构体 请求定义了准入处理程序的输入。它包含标识问题中的对象的信息(组、版本、种类、资源、子资源、名称、命名空间),
以及有问题的操作(例如获取、创建等),以及对象本身。
type Request struct { // k8s中的Request结构体 admissionv1.AdmissionRequest }
- Response结构体 响应是准入处理程序的输出。
它包含一个响应,指示是否允许给定操作,以及一组补丁在发生变异准入处理程序的情况下变异对象。
type Response struct { // 用于改变 webhook 的 JSON 补丁。 // 使用它而不是设置 Response.Patch (其实是admissionv1.AdmissionResponse.Patch)来最小化序列化和反序列化的开销。 // 此处设置的补丁将覆盖响应(admissionv1.AdmissionResponse)中的任何补丁, // 如果您想直接设置补丁响应(admissionv1.AdmissionResponse),请将其留空。 Patches []jsonpatch.JsonPatchOperation // AdmissionResponse 是原始的准入响应。 其中的 Patch 字段将被上面的Patches覆盖。 admissionv1.AdmissionResponse }
- Complete函数 填充尚未在admissionv1.AdmissionResponse 中设置的任何字段,它会改变响应。(也就是设置默认字段值)
func (r *Response) Complete(req Request) error { r.UID = req.UID if r.Result == nil { r.Result = &metav1.Status{} } if r.Result.Code == 0 { r.Result.Code = http.StatusOK } if len(r.Patches) == 0 { return nil } var err error r.Patch, err = json.Marshal(r.Patches) if err != nil { return err } patchType := admissionv1.PatchTypeJSONPatch r.PatchType = &patchType return nil }
- Handler接口 处理AdmissionRequest
type Handler interface { // 处理request并相应response Handle(context.Context, Request) Response }
- Webhook结构体 代表每个单独的 Webhook。
type Webhook struct { // 处理程序实际上处理一个准入请求,返回它是被允许还是被拒绝,并可能修补request的obj。 Handler Handler // 将允许您获取 http.Request.Context并添加任何其他信息,例如传递请求路径或header,从而允许您从处理程序中读取它们 WithContextFunc func(context.Context, *http.Request) context.Context // 解码器在接收scheme时构建并传递给处理程序,用来获取request的数据 decoder *Decoder log logr.Logger }
- Handle函数 处理AdmissionRequest 和Handler的Handle有区别,包含了Handler的Handle,并对resp做了default处理
如果 webhook 是 mutating 类型,它会将 AdmissionRequest 委托给每个处理程序并合并
如果 webhook 是验证类型,它会将 AdmissionRequest 委托给每个处理程序,并且如果有一个拒绝,则拒绝该请求
func (w *Webhook) Handle(ctx context.Context, req Request) Response { resp := w.Handler.Handle(ctx, req) if err := resp.Complete(req); err != nil { w.log.Error(err, "unable to encode response") return Errored(http.StatusInternalServerError, errUnableToEncodeResponse) } return resp }
- InjectScheme函数 InjectScheme 向 webhook 中注入一个scheme,以构建一个解码器。
func (w *Webhook) InjectScheme(s *runtime.Scheme) error { var err error w.decoder, err = NewDecoder(s) if err != nil { return err } if w.Handler != nil { if _, err := InjectDecoderInto(w.GetDecoder(), w.Handler); err != nil { return err } } return nil }
- InjectFunc函数 向webhook注入一些field
func (w *Webhook) InjectFunc(f inject.Func) error { // 1.执行f 2.为w.Handler注入生成的setFields 3.为w.Handler注入decoder var setFields inject.Func setFields = func(target interface{}) error { if err := f(target); err != nil { return err } if _, err := inject.InjectorInto(setFields, target); err != nil { return err } if _, err := InjectDecoderInto(w.GetDecoder(), target); err != nil { return err } return nil } return setFields(w.Handler) }
- Request结构体 请求定义了准入处理程序的输入。它包含标识问题中的对象的信息(组、版本、种类、资源、子资源、名称、命名空间),
- admission/inject.go
- DecoderInjector结构体 ControllerManager用于把decoder注入到webhook handlers中
type DecoderInjector interface { InjectDecoder(*Decoder) error }
- InjectDecoderInto函数 承接实现了DecoderInjector的结构体,注入特定的decoder
func InjectDecoderInto(decoder *Decoder, i interface{}) (bool, error) { if s, ok := i.(DecoderInjector); ok { return true, s.InjectDecoder(decoder) } return false, nil }
- admission/decode.go
- Decoder结构体 解码器知道如何解码admission request的内容请求,变成一个具体的对象。
type Decoder struct { codecs serializer.CodecFactory }
- NewDecoder函数 根据scheme创建一个decoder
func NewDecoder(scheme *runtime.Scheme) (*Decoder, error) { return &Decoder{codecs: serializer.NewCodecFactory(scheme)}, nil }
- Decode函数
Decode将AdmissionRequest中的内联对象解码到传入的runtime.object中。如果要解码AdmissionRequest中的旧对象,请使用DecodeRaw。
func (d *Decoder) Decode(req Request, into runtime.Object) error { // 如果为空 抛异常 if len(req.Object.Raw) == 0 { return fmt.Errorf("there is no content to decode") } return d.DecodeRaw(req.Object, into) } // 将内联对象解码到传入的runtime.object中 func (d *Decoder) DecodeRaw(rawObj runtime.RawExtension, into runtime.Object) error { // NB(directxman12): there's a bug/weird interaction between decoders and // the API server where the API server doesn't send a GVK on the embedded // objects, which means the unstructured decoder refuses to decode. It // also means we can't pass the unstructured directly in, since it'll try // and call unstructured's special Unmarshal implementation, which calls // back into that same decoder :-/ // See kubernetes/kubernetes#74373. // 如果rawObj.Raw为空 抛异常 if len(rawObj.Raw) == 0 { return fmt.Errorf("there is no content to decode") } // 为了解决k8s的bug(kubernetes/kubernetes#74373) 如果是Unstructured类型,k8s源码中的解码器会返回nil,所以下面做特殊处理 // 判断是否是Unstructured类型 if unstructuredInto, isUnstructured := into.(*unstructured.Unstructured); isUnstructured { // 解组到非结构化的底层对象以避免调用解码器 if err := json.Unmarshal(rawObj.Raw, &unstructuredInto.Object); err != nil { return err } return nil } // 获取解码器slice deserializer := d.codecs.UniversalDeserializer() // 解码rawObj.Raw到into return runtime.DecodeInto(deserializer, rawObj.Raw, into) }
- admission/response.go
- Allowed函数
// Allowed构造一个响应,指示给定的操作允许(没有任何补丁)。 func Allowed(reason string) Response { return ValidationResponse(true, reason) }
- Denied函数 构造一个响应,指示给定的操作不被允许
func Denied(reason string) Response { return ValidationResponse(false, reason) }
- Patched函数 构造一个响应,指示给定的操作允许,并且目标对象将被给定的patches修改
func Patched(reason string, patches ...jsonpatch.JsonPatchOperation) Response { resp := Allowed(reason) resp.Patches = patches return resp }
- Errored函数 为错误的处理请求创建新的响应。
func Errored(code int32, err error) Response { return Response{ AdmissionResponse: admissionv1.AdmissionResponse{ Allowed: false, Result: &metav1.Status{ Code: code, Message: err.Error(), }, }, } }
- ValidationResponse函数 回用于接纳请求的响应。
func ValidationResponse(allowed bool, reason string) Response { code := http.StatusForbidden if allowed { code = http.StatusOK } resp := Response{ AdmissionResponse: admissionv1.AdmissionResponse{ Allowed: allowed, Result: &metav1.Status{ Code: int32(code), }, }, } if len(reason) > 0 { resp.Result.Reason = metav1.StatusReason(reason) } return resp }
- PatchResponseFromRaw函数 根据原始和现在的数据创建patch,并相应response
// 根据original和current来创建不同类型的patch,并响应 func PatchResponseFromRaw(original, current []byte) Response { // 创建空、add、replace类型的patch patches, err := jsonpatch.CreatePatch(original, current) if err != nil { return Errored(http.StatusInternalServerError, err) } return Response{ Patches: patches, AdmissionResponse: admissionv1.AdmissionResponse{ Allowed: true, PatchType: func() *admissionv1.PatchType { if len(patches) == 0 { return nil } pt := admissionv1.PatchTypeJSONPatch return &pt }(), }, } }
- validationResponseFromStatus函数
func validationResponseFromStatus(allowed bool, status metav1.Status) Response { resp := Response{ AdmissionResponse: admissionv1.AdmissionResponse{ Allowed: allowed, Result: &status, }, } return resp }
- admission/http.go
- 定义变量和init
var admissionScheme = runtime.NewScheme() var admissionCodecs = serializer.NewCodecFactory(admissionScheme) func init() { // v1版本 utilruntime.Must(v1.AddToScheme(admissionScheme)) // v1beta1版本 utilruntime.Must(v1beta1.AddToScheme(admissionScheme)) } // 类型约定 比如判断某个struct是否实现接口的方法 var _ http.Handler = &Webhook{}
- ServeHTTP函数 实现了http.Handler接口
func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { var body []byte var err error // 如果r中存在ctx则返回,否则创建新的 ctx := r.Context() if wh.WithContextFunc != nil { // 执行WithContextFunc方法 一般是给ctx赋值 ctx = wh.WithContextFunc(ctx, r) } var reviewResponse Response // 验证r.Body不能为空 if r.Body != nil { if body, err = ioutil.ReadAll(r.Body); err != nil { wh.log.Error(err, "unable to read the body from the incoming request") reviewResponse = Errored(http.StatusBadRequest, err) wh.writeResponse(w, reviewResponse) return } } else { err = errors.New("request body is empty") wh.log.Error(err, "bad request") reviewResponse = Errored(http.StatusBadRequest, err) wh.writeResponse(w, reviewResponse) return } // 验证Content-Type是否准确 contentType := r.Header.Get("Content-Type") if contentType != "application/json" { err = fmt.Errorf("contentType=%s, expected application/json", contentType) wh.log.Error(err, "unable to process a request with an unknown content type", "content type", contentType) reviewResponse = Errored(http.StatusBadRequest, err) wh.writeResponse(w, reviewResponse) return } // v1和v1beta1 AdmissionReview类型完全相同,因此v1beta1类型可以 // 被解码成v1类型。不过,如果未设置对象的TypeMeta,运行时编解码器的解码器会猜测要使用哪种类型,则按类型名解码。 // 通过设置未注册的类型到v1 GVK,解码器将强制v1beta1 AdmissionReview到v1。 // 实际的AdmissionReview GVK将被用来写一个类型化的回应,以防 // webhook config允许多个版本,否则此响应将失败。 req := Request{} ar := unversionedAdmissionReview{} // avoid an extra copy ar.Request = &req.AdmissionRequest ar.SetGroupVersionKind(v1.SchemeGroupVersion.WithKind("AdmissionReview")) _, actualAdmRevGVK, err := admissionCodecs.UniversalDeserializer().Decode(body, nil, &ar) if err != nil { wh.log.Error(err, "unable to decode the request") reviewResponse = Errored(http.StatusBadRequest, err) wh.writeResponse(w, reviewResponse) return } wh.log.V(1).Info("received request", "UID", req.UID, "kind", req.Kind, "resource", req.Resource) reviewResponse = wh.Handle(ctx, req) wh.writeResponseTyped(w, reviewResponse, actualAdmRevGVK) }
- writeAdmissionResponse函数 将ar写入到w中
func (wh *Webhook) writeAdmissionResponse(w io.Writer, ar v1.AdmissionReview) { // 先根据w获取新的encoder,然后编码ar到w中 err := json.NewEncoder(w).Encode(ar) // 如果有异常,重新将异常信息写入到w中 if err != nil { wh.log.Error(err, "unable to encode the response") wh.writeResponse(w, Errored(http.StatusInternalServerError, err)) } else { res := ar.Response if log := wh.log; log.V(1).Enabled() { if res.Result != nil { log = log.WithValues("code", res.Result.Code, "reason", res.Result.Reason) } log.V(1).Info("wrote response", "UID", res.UID, "allowed", res.Allowed) } } }
- admission/multi.go
- Handle函数
type multiMutating []Handler func (hs multiMutating) Handle(ctx context.Context, req Request) Response { patches := []jsonpatch.JsonPatchOperation{} // 遍历多个Handler for _, handler := range hs { // 执行handler.Handle resp := handler.Handle(ctx, req) // 判断resp是否允许,如果true,继续执行,否则返回response if !resp.Allowed { return resp } // 如果patchType不为空切不等于JSONPatch,异常response if resp.PatchType != nil && *resp.PatchType != admissionv1.PatchTypeJSONPatch { return Errored(http.StatusInternalServerError, fmt.Errorf("unexpected patch type returned by the handler: %v, only allow: %v", resp.PatchType, admissionv1.PatchTypeJSONPatch)) } // 追加resp.Patches到patches patches = append(patches, resp.Patches...) } var err error // JsonPatchOperation(patch的相关数据)数组,转化为byte[] marshaledPatch, err := json.Marshal(patches) if err != nil { return Errored(http.StatusBadRequest, fmt.Errorf("error when marshaling the patch: %w", err)) } return Response{ AdmissionResponse: admissionv1.AdmissionResponse{ Allowed: true, Result: &metav1.Status{ Code: http.StatusOK, }, Patch: marshaledPatch, PatchType: func() *admissionv1.PatchType { pt := admissionv1.PatchTypeJSONPatch; return &pt }(), }, } } // 为hs中的handler注入对应的filed func (hs multiMutating) InjectFunc(f inject.Func) error { for _, handler := range hs { if err := f(handler); err != nil { return err } } return nil } // 构建多个mutating webhook handler组成的MultiMutatingHandler func MultiMutatingHandler(handlers ...Handler) Handler { return multiMutating(handlers) }
- conversion包中的conversion.go
- Webhook结构体 实现了一个CRD转换的Webhook HTTP处理程序
type Webhook struct { scheme *runtime.Scheme decoder *Decoder } // 注入decoder func (wh *Webhook) InjectScheme(s *runtime.Scheme) error { var err error wh.scheme = s wh.decoder, err = NewDecoder(s) if err != nil { return err } return nil }
- ServeHTTP函数
func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { // 新建一个类型转化的Review convertReview := &apix.ConversionReview{} // 解码body体到convertReview(对于类型转换body类型固定就是ConversionReview,而对于admission就是AdmissionReview) err := json.NewDecoder(r.Body).Decode(convertReview) if err != nil { log.Error(err, "failed to read conversion request") w.WriteHeader(http.StatusBadRequest) return } // 类型转化逻辑 resp, err := wh.handleConvertRequest(convertReview.Request) if err != nil { log.Error(err, "failed to convert", "request", convertReview.Request.UID) convertReview.Response = errored(err) } else { convertReview.Response = resp } convertReview.Response.UID = convertReview.Request.UID // 注意每次处理后convertReview.Request要置为nil convertReview.Request = nil // 把convertReview写入w,用于传递给下一个webhook err = json.NewEncoder(w).Encode(convertReview) if err != nil { log.Error(err, "failed to write response") return } } // 处理类型转化 func (wh *Webhook) handleConvertRequest(req *apix.ConversionRequest) (*apix.ConversionResponse, error) { if req == nil { return nil, fmt.Errorf("conversion request is nil") } var objects []runtime.RawExtension // 遍历将要类型转化的用户资源byte[] for _, obj := range req.Objects { // 解码用户资源的字节原始数据,返回src: 资源对象 gvk: group/version/kind src, gvk, err := wh.decoder.Decode(obj.Raw) if err != nil { return nil, err } // 给定gvk返回一个对象实体 dst, err := wh.allocateDstObject(req.DesiredAPIVersion, gvk.Kind) if err != nil { return nil, err } // 转化资源对象到目标对象 err = wh.convertObject(src, dst) if err != nil { return nil, err } objects = append(objects, runtime.RawExtension{Object: dst}) } return &apix.ConversionResponse{ UID: req.UID, ConvertedObjects: objects, Result: metav1.Status{ Status: metav1.StatusSuccess, }, }, nil } // 给定gvk返回一个对象实体 func (wh *Webhook) allocateDstObject(apiVersion, kind string) (runtime.Object, error) { // 获取gvk gvk := schema.FromAPIVersionAndKind(apiVersion, kind) // 根据scheme和gvk新建一个对象实体 obj, err := wh.scheme.New(gvk) if err != nil { return obj, err } // TypeAccessor返回一个接口,该接口允许检索和修改一种内存中API版本的内部对象。 // TODO:此接口用于测试往返中没有ObjectMeta或ListMeta的代码(可以使用apiVersion/kind但不符合Kube api约定的对象)。 t, err := meta.TypeAccessor(obj) if err != nil { return obj, err } t.SetAPIVersion(apiVersion) t.SetKind(kind) return obj, nil } // 转化资源对象到目标对象 func (wh *Webhook) convertObject(src, dst runtime.Object) error { // 获取源和目标对象的gvk srcGVK := src.GetObjectKind().GroupVersionKind() dstGVK := dst.GetObjectKind().GroupVersionKind() // 判断源和目标的gk是否相等 if srcGVK.GroupKind() != dstGVK.GroupKind() { return fmt.Errorf("src %T and dst %T does not belong to same API Group", src, dst) } // 判断源和目标的gvk是否相等 if srcGVK == dstGVK { return fmt.Errorf("conversion is not allowed between same type %T", src) } // 判断源和目标对象是否是Hub(集线)或者Convertible srcIsHub, dstIsHub := isHub(src), isHub(dst) srcIsConvertible, dstIsConvertible := isConvertible(src), isConvertible(dst) switch { // 如果源是Hub类型和目标对象是Convertible类型 case srcIsHub && dstIsConvertible: return dst.(conversion.Convertible).ConvertFrom(src.(conversion.Hub)) // 如果目标对象是Hub类型和源是Convertible类型 case dstIsHub && srcIsConvertible: return src.(conversion.Convertible).ConvertTo(dst.(conversion.Hub)) // 如果源和目标对象是Convertible类型 case srcIsConvertible && dstIsConvertible: return wh.convertViaHub(src.(conversion.Convertible), dst.(conversion.Convertible)) // 如果源和目标对象是Hub类型(为什么这里会抛异常? Hub类型的对象不能转化指定类型,只能根据自己的实现转化为特定的一种类型, // 想实现多种类型转化,应该实现Convertible接口) default: return fmt.Errorf("%T is not convertible to %T", src, dst) } } // 源和目标对象都是Convertible类型的转化 func (wh *Webhook) convertViaHub(src, dst conversion.Convertible) error { // 根据对象获取其所有的version下的Hub类型的实体对象 hub, err := wh.getHub(src) if err != nil { return err } if hub == nil { return fmt.Errorf("%s does not have any Hub defined", src) } // 源对象转化为Hub类型对象 err = src.ConvertTo(hub) if err != nil { return fmt.Errorf("%T failed to convert to hub version %T : %w", src, hub, err) } // 将目标对象转化为Hub类型对象 err = dst.ConvertFrom(hub) if err != nil { return fmt.Errorf("%T failed to convert from hub version %T : %w", dst, hub, err) } return nil } // 根据对象获取其所有的version下的Hub类型的实体对象(如果存在多个实现的Hub接口,则抛异常) func (wh *Webhook) getHub(obj runtime.Object) (conversion.Hub, error) { // 根据scheme和obj获取obj的所有gvk gvks, err := objectGVKs(wh.scheme, obj) if err != nil { return nil, err } if len(gvks) == 0 { return nil, fmt.Errorf("error retrieving gvks for object : %v", obj) } var hub conversion.Hub var hubFoundAlready bool // 遍历gvks获取其中最多一个实现了Hub类型的接口的gvk对象 for _, gvk := range gvks { instance, err := wh.scheme.New(gvk) if err != nil { return nil, fmt.Errorf("failed to allocate an instance for gvk %v: %w", gvk, err) } if val, isHub := instance.(conversion.Hub); isHub { if hubFoundAlready { return nil, fmt.Errorf("multiple hub version defined for %T", obj) } hubFoundAlready = true hub = val } } return hub, nil } // 根据scheme和obj获取obj的所有gvk func objectGVKs(scheme *runtime.Scheme, obj runtime.Object) ([]schema.GroupVersionKind, error) { objGVKs, _, err := scheme.ObjectKinds(obj) if err != nil { return nil, err } if len(objGVKs) != 1 { return nil, fmt.Errorf("expect to get only one GVK for %v", obj) } objGVK := objGVKs[0] knownTypes := scheme.AllKnownTypes() var gvks []schema.GroupVersionKind for gvk := range knownTypes { if objGVK.GroupKind() == gvk.GroupKind() { gvks = append(gvks, gvk) } } return gvks, nil }
- IsConvertible函数 判断给定obj在scheme中是否是可以转化的
// 对于要进行转换的类型,需要定义一个中心类型,并且所有非中心类型必须能够转换为中心类型或从中心类型转换 func IsConvertible(scheme *runtime.Scheme, obj runtime.Object) (bool, error) { var hubs, spokes, nonSpokes []runtime.Object // 根据scheme和obj获取obj的所有gvk gvks, err := objectGVKs(scheme, obj) if err != nil { return false, err } if len(gvks) == 0 { return false, fmt.Errorf("error retrieving gvks for object : %v", obj) } for _, gvk := range gvks { instance, err := scheme.New(gvk) if err != nil { return false, fmt.Errorf("failed to allocate an instance for gvk %v: %w", gvk, err) } // 判断是否是Hub类型 if isHub(instance) { // 添加到hubs中 hubs = append(hubs, instance) continue } // 判断是否是Convertible类型 if !isConvertible(instance) { // 添加到nonSpokes(我理解的是error的) nonSpokes = append(nonSpokes, instance) continue } spokes = append(spokes, instance) } if len(gvks) == 1 { return false, nil // single version } if len(hubs) == 0 && len(spokes) == 0 { // 多个version的obj,但是没有verison convert的实现 return false, nil } if len(hubs) == 1 && len(nonSpokes) == 0 { // convertible return true, nil } return false, PartialImplementationError{ hubs: hubs, nonSpokes: nonSpokes, spokes: spokes, } } // 由于部分转换而导致的错误,实现如Hub类型,多个Hub类型。 // PartialImplementationError represents an error due to partial conversion // implementation such as hub without spokes, multiple hubs or spokes without hub. type PartialImplementationError struct { gvk schema.GroupVersionKind hubs []runtime.Object nonSpokes []runtime.Object spokes []runtime.Object }
- conversion包中的decoder.go
- Decoder结构体
// 定义个类型转化的Decoder type Decoder struct { codecs serializer.CodecFactory }
- NewDecoder函数 根据scheme新建一个Decoder
func NewDecoder(scheme *runtime.Scheme) (*Decoder, error) { return &Decoder{codecs: serializer.NewCodecFactory(scheme)}, nil }
- Decode函数 解码实体的byte[]
func (d *Decoder) Decode(content []byte) (runtime.Object, *schema.GroupVersionKind, error) { deserializer := d.codecs.UniversalDeserializer() return deserializer.Decode(content, nil, nil) } func (d *Decoder) DecodeInto(content []byte, into runtime.Object) error { deserializer := d.codecs.UniversalDeserializer() return runtime.DecodeInto(deserializer, content, into) }
更多推荐
已为社区贡献15条内容
所有评论(0)