如何初始化k8s中的client

1、kubernetes.Clientset 参考链接

  • 集群内访问创建k8s-client - 直接获取集群内的config, 通过config创建clientSet。
// creates the in-cluster config
	config, err := rest.InClusterConfig()
	if err != nil {
		panic(err.Error())
	}
	// creates the clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
// InClusterConfig returns a config object which uses the service account
// kubernetes gives to pods. It's intended for clients that expect to be
// running inside a pod running on kubernetes. It will return ErrNotInCluster
// if called from a process not running in a kubernetes environment.
func InClusterConfig() (*Config, error) {
	const (
		tokenFile  = "/var/run/secrets/kubernetes.io/serviceaccount/token"
		rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
	)
	host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
	if len(host) == 0 || len(port) == 0 {
		return nil, ErrNotInCluster
	}

	token, err := ioutil.ReadFile(tokenFile)
	if err != nil {
		return nil, err
	}

	tlsClientConfig := TLSClientConfig{}

	if _, err := certutil.NewPool(rootCAFile); err != nil {
		klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
	} else {
		tlsClientConfig.CAFile = rootCAFile
	}

	return &Config{
		// TODO: switch to using cluster DNS.
		Host:            "https://" + net.JoinHostPort(host, port),
		TLSClientConfig: tlsClientConfig,
		BearerToken:     string(token),
		BearerTokenFile: tokenFile,
	}, nil
}
  • 集群外访问创建k8s-client,由于client是在集群外,与集群内的client相比要多考虑如何授权集群外的k8s-api访问的逻辑。我们可以使用包含集群上下文信息的kubeconfig文件来初始化客户端。 kubectl命令也使用kubeconfig文件对集群进行身份验证。
	var kubeconfig *string
	if home := homeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()

	// use the current context in kubeconfig
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err.Error())
	}

	// create the clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
  • 使用kubeconfig文件来组织关于集群,用户,名称空间和身份验证机制的信息。 kubectl命令行工具使用kubeconfig文件来查找选择群集并与群集的API服务器进行通信所需的信息。默认情况下 kubectl使用的配置文件名称是在$HOME/.kube目录下 config文件,可以通过设置环境变量KUBECONFIG或者–kubeconfig指定其他的配置文件

2、RESTClient

k8s-clientSet是各个k8s资源的客户端集合,先看下k8s-clientSet的结构体,结构体中每个成员实质上为k8s每个资源对应的restful-client。

// Clientset contains the clients for groups. Each group has exactly one
// version included in a Clientset.
type Clientset struct {
	*discovery.DiscoveryClient
	admissionregistrationV1      *admissionregistrationv1.AdmissionregistrationV1Client
	admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
	appsV1                       *appsv1.AppsV1Client
	appsV1beta1                  *appsv1beta1.AppsV1beta1Client
	appsV1beta2                  *appsv1beta2.AppsV1beta2Client
	auditregistrationV1alpha1    *auditregistrationv1alpha1.AuditregistrationV1alpha1Client
	authenticationV1             *authenticationv1.AuthenticationV1Client
	authenticationV1beta1        *authenticationv1beta1.AuthenticationV1beta1Client
	authorizationV1              *authorizationv1.AuthorizationV1Client
	authorizationV1beta1         *authorizationv1beta1.AuthorizationV1beta1Client
	autoscalingV1                *autoscalingv1.AutoscalingV1Client
	autoscalingV2beta1           *autoscalingv2beta1.AutoscalingV2beta1Client
	autoscalingV2beta2           *autoscalingv2beta2.AutoscalingV2beta2Client
	batchV1                      *batchv1.BatchV1Client
	batchV1beta1                 *batchv1beta1.BatchV1beta1Client
	batchV2alpha1                *batchv2alpha1.BatchV2alpha1Client
	certificatesV1beta1          *certificatesv1beta1.CertificatesV1beta1Client
	coordinationV1beta1          *coordinationv1beta1.CoordinationV1beta1Client
	coordinationV1               *coordinationv1.CoordinationV1Client
	coreV1                       *corev1.CoreV1Client
	discoveryV1alpha1            *discoveryv1alpha1.DiscoveryV1alpha1Client
	discoveryV1beta1             *discoveryv1beta1.DiscoveryV1beta1Client
	eventsV1beta1                *eventsv1beta1.EventsV1beta1Client
	extensionsV1beta1            *extensionsv1beta1.ExtensionsV1beta1Client
	flowcontrolV1alpha1          *flowcontrolv1alpha1.FlowcontrolV1alpha1Client
	networkingV1                 *networkingv1.NetworkingV1Client
	networkingV1beta1            *networkingv1beta1.NetworkingV1beta1Client
	nodeV1alpha1                 *nodev1alpha1.NodeV1alpha1Client
	nodeV1beta1                  *nodev1beta1.NodeV1beta1Client
	policyV1beta1                *policyv1beta1.PolicyV1beta1Client
	rbacV1                       *rbacv1.RbacV1Client
	rbacV1beta1                  *rbacv1beta1.RbacV1beta1Client
	rbacV1alpha1                 *rbacv1alpha1.RbacV1alpha1Client
	schedulingV1alpha1           *schedulingv1alpha1.SchedulingV1alpha1Client
	schedulingV1beta1            *schedulingv1beta1.SchedulingV1beta1Client
	schedulingV1                 *schedulingv1.SchedulingV1Client
	settingsV1alpha1             *settingsv1alpha1.SettingsV1alpha1Client
	storageV1beta1               *storagev1beta1.StorageV1beta1Client
	storageV1                    *storagev1.StorageV1Client
	storageV1alpha1              *storagev1alpha1.StorageV1alpha1Client
}

以CoreV1资源为例,获取k8s clientSet 结构体中对应资源版本的restclient:

restClient:= clientSet.CoreV1().RESTClient()

//1、CoreV1Client is used to interact with features provided by the  group. 
// CoreV1Client结构体是clientSet的成员字段的类型之一。

type CoreV1Client struct {
	restClient rest.Interface
}

// 2、CoreV1 retrieves the CoreV1Client
func (c *Clientset) CoreV1() corev1.CoreV1Interface {
	return c.coreV1  
//coreV1即CoreV1Client结构体,coreV1为clientSet的成员字段之一,
//coreV1为 *corev1.CoreV1Client
}

type CoreV1Interface interface {
	RESTClient() rest.Interface
	....
}

// 3、CoreV1Client结构体为CoreV1Interface实现了RESTClient()方法。 
//returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *CoreV1Client) RESTClient() rest.Interface {
	if c == nil {
		return nil
	}
	return c.restClient
}


// 4、 rest.Interface: Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

//5、Most consumers should use client.New() to get a Kubernetes API client. 
// rest.RESTClient结构体实现了rest.Interface接口。
type RESTClient struct {
	// base is the root URL for all invocations of the client
	base *url.URL
	// versionedAPIPath is a path segment connecting the base URL to the resource root
	versionedAPIPath string

	// content describes how a RESTClient encodes and decodes responses.
	content ClientContentConfig

	// creates BackoffManager that is passed to requests.
	createBackoffMgr func() BackoffManager

	// rateLimiter is shared among all requests created by this client unless specifically
	// overridden.
	rateLimiter flowcontrol.RateLimiter

	// Set specific behavior of the client.  If not set http.DefaultClient will be used.
	Client *http.Client
}

// 6、CoreV1中如何创建RESTClient结构体NewForConfig creates a new CoreV1Client for the given config.
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	client, err := rest.RESTClientFor(&config)
	if err != nil {
		return nil, err
	}
	return &CoreV1Client{client}, nil
}
// 

client-go中client如何创建的资源?- restful

k8s的client-go提供了对资源的增删改查的接口, 我们可以调用相关资源结构体的api,进行增删改查的操作。如下以deployment示例:

1、用户获取k8s client的 deployment的调用接口

func k8sDeployment() appv1.DeploymentInterface {
   return k8sClientset().AppsV1().Deployments(k8sNamespace)
}

2、相应的资源包已经实现了获取k8s相关资源的调用接口

// deployments implements DeploymentInterface
type deployments struct {
	client rest.Interface
	ns     string
}

// newDeployments returns a Deployments
func newDeployments(c *AppsV1Client, namespace string) *deployments {
	return &deployments{
		client: c.RESTClient(),
		ns:     namespace,
	}
}
//AppsV1包实现了获取deployment 资源的接口
func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
	return newDeployments(c, namespace)
}

3、client-go 中 deployments结构体已经实现的调用方法如下

// DeploymentsGetter has a method to return a DeploymentInterface.
// A group's client should implement this interface.
type DeploymentsGetter interface {
	Deployments(namespace string) DeploymentInterface
}
// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
	Create(*v1.Deployment) (*v1.Deployment, error)
	Update(*v1.Deployment) (*v1.Deployment, error)
	UpdateStatus(*v1.Deployment) (*v1.Deployment, error)
	Delete(name string, options *metav1.DeleteOptions) error
	DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
	Get(name string, options metav1.GetOptions) (*v1.Deployment, error)
	List(opts metav1.ListOptions) (*v1.DeploymentList, error)
	Watch(opts metav1.ListOptions) (watch.Interface, error)
	Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Deployment, err error)
	GetScale(deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
	UpdateScale(deploymentName string, scale *autoscalingv1.Scale) (*autoscalingv1.Scale, error)

	DeploymentExpansion
}

4、client-go接口的底层实现是给k8s集群的apiserver发送http-restful请求。post方法的body的入参是资源obj。

}
// Create takes the representation of a deployment and creates it. 
// Returns the server's representation of the deployment, and an error, if there is any.
func (c *deployments) Create(deployment *v1.Deployment) (result *v1.Deployment, err error) {
	result = &v1.Deployment{}
	err = c.client.Post().
		Namespace(c.ns).          //资源创建的ns
		Resource("deployments").  //资源创建的kind
		Body(deployment).         //资源创建的spec
		Do().
		Into(result)
	return
}

5、对资源的不同请求restful的method也不相同 , 常见的请求如下, 参考链接

(1)create a Deployment - HTTP Request: 用户要确定创建资源的ns和kind,在url最后两个字段体现。
POST /apis/apps/v1/namespaces/{namespace}/deployments

(2)partially update the specified Deployment - HTTP Request (部分更新)
PATCH /apis/apps/v1/namespaces/{namespace}/deployments/{name}.

(3)replace the specified Deployment - HTTP Request)(完整替换当前资源,yaml的规格要求完备)
PUT /apis/apps/v1/namespaces/{namespace}/deployments/{name}

(4)delete a Deployment - HTTP Request (用户要给出资源ns\kind\name)
DELETE /apis/apps/v1/namespaces/{namespace}/deployments/{name}

(5)read the specified Deployment - HTTP Request (用户要给出资源ns\kind\name)
GET /apis/apps/v1/namespaces/{namespace}/deployments/{name}

(6)list or watch objects of kind Deployment -HTTP Request (用户要给出资源ns\kind)
GET /apis/apps/v1/namespaces/{namespace}/deployments


k8s client-go中用到的interface和struct

1、“k8s.io/apimachinery/pkg/runtime/schema” 包中k8s schema.GroupVersionKind结构体存储group、version、kind字段的值。

// GroupVersionKind unambiguously identifies a kind.  It doesn't anonymously include GroupVersion
// to avoid automatic coercion.  It doesn't use a GroupVersion to avoid custom marshalling
type GroupVersionKind struct {
	Group   string
	Version string
	Kind    string
}

2、"k8s.io/apimachinery/pkg/runtime"包中runtime.Object接口,k8s的资源类型都应该实现了该接口。在Scheme上注册的所有API类型都必须支持object对象接口。 由于预计方案中的对象将被序列化到wire,因此对象必须提供接口允许序列化程序设置对象表示的种类,版本和组。 在不希望序列化对象的情况下,对象可以选择返回无操作ObjectKindAccessor。

// Object interface must be supported by all API types registered with Scheme. Since objects in a scheme are
// expected to be serialized to the wire, the interface an Object must provide to the Scheme allows
// serializers to set the kind, version, and group the object is represented as. An Object may choose
// to return a no-op ObjectKindAccessor in cases where it is not expected to be serialized.
type Object interface {
	GetObjectKind() schema.ObjectKind
	DeepCopyObject() Object
}

3、从Scheme序列化的所有对象都对其类型信息进行编码。 序列化使用此接口,将Scheme中的类型信息设置到对象的序列化版本上。 对于无法序列化或具有独特要求的对象,此接口可能是无操作的。

// All objects that are serialized from a Scheme encode their type information. This interface is used
// by serialization to set type information from the Scheme onto the serialized version of an object.
// For objects that cannot be serialized or have unique requirements, this interface may be a no-op.
type ObjectKind interface {
	// SetGroupVersionKind sets or clears the intended serialized kind of an object. Passing kind nil
	// should clear the current setting.
	SetGroupVersionKind(kind GroupVersionKind)
	// GroupVersionKind returns the stored group, version, and kind of an object, or nil if the object does
	// not expose or provide these fields.
	GroupVersionKind() GroupVersionKind
}

4、runtime.RawExtension结构体,存储资源object的结构体和object的json字节流数据。

type RawExtension struct {
	// Raw is the underlying serialization of this object.
	//
	// TODO: Determine how to detect ContentType and ContentEncoding of 'Raw' data.
	Raw []byte `json:"-" protobuf:"bytes,1,opt,name=raw"`
	// Object can hold a representation of this extension - useful for working with versioned
	// structs.
	Object Object `json:"-"`
}

+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object 指请在生成DeepCopy时,实现Kubernetes提供的runtime.Object接口

5、“k8s.io/apimachinery/pkg/apis/meta/v1/unstructured”unstructuredJSONScheme 是k8s-client中重要的结构体。unstructuredJSONScheme结构体的decode方法反序列化object的json字节流数据为相应的runtime.Object,并返回相应的GroupVersionKind结构体。

// Codec is a Serializer that deals with the details of versioning objects. It offers the same
// interface as Serializer, so this is a marker to consumers that care about the version of the objects
// they receive.
type Codec Serializer

// UnstructuredJSONScheme is capable of converting JSON data into the Unstructured
// type, which can be used for generic access to objects without a predefined scheme.
// TODO: move into serializer/json.
var UnstructuredJSONScheme runtime.Codec = unstructuredJSONScheme{}

type unstructuredJSONScheme struct{}

func (s unstructuredJSONScheme) Decode(data []byte, _ *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
	var err error
	if obj != nil {
		err = s.decodeInto(data, obj)
	} else {
		obj, err = s.decode(data)
	}

	if err != nil {
		return nil, nil, err
	}

	gvk := obj.GetObjectKind().GroupVersionKind()
	if len(gvk.Kind) == 0 {
		return nil, &gvk, runtime.NewMissingKindErr(string(data))
	}

	return obj, &gvk, nil
}

6、“k8s.io/apimachinery/pkg/util/yaml”, k8syaml文件解析包,一般使用decoder := yaml.NewYAMLOrJSONDecoder(r, 4096),来解析原始yaml字节流请求为json字节流的形式。

// NewYAMLToJSONDecoder decodes YAML documents from the provided
// stream in chunks by converting each document (as defined by
// the YAML spec) into its own chunk, converting it to JSON via
// yaml.YAMLToJSON, and then passing it to json.Decoder.
func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
	reader := bufio.NewReader(r)
	return &YAMLToJSONDecoder{
		reader: NewYAMLReader(reader),
	}
}

// Decode reads a YAML document as JSON from the stream or returns
// an error. The decoding rules match json.Unmarshal, not
// yaml.Unmarshal.
func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
	bytes, err := d.reader.Read()
	if err != nil && err != io.EOF {
		return err
	}

	if len(bytes) != 0 {
		err := yaml.Unmarshal(bytes, into)
		if err != nil {
			return YAMLSyntaxError{err}
		}
	}
	return err
}

如何解析yaml创建资源

1、把传入的yaml字节流请求进行解析,一个yaml字节流可能包含多个需要创建更新的资源object, 把yaml字节流中每个资源放到相应runtime.RawExtension{}中进行存储。

func (h *handler) decode(r io.Reader) error {
	decoder := yaml.NewYAMLOrJSONDecoder(r, 4096)
	for {
//提供object序列化后的存储
		ext := runtime.RawExtension{}
//每次序列化一个object到ext中,把接收的yaml文件流转为json流到ext中
		if err := decoder.Decode(&ext); err != nil {
			if err == io.EOF {
				return nil
			}
			return fmt.Errorf("error parsing: %v", err)
		}
		ext.Raw = bytes.TrimSpace(ext.Raw)
		if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
			continue
		}
//关键步骤反序列化json数据为相应的runtime.Object,并返回相应的gkv结构
		obj, gkv, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, nil, nil)
		if err != nil {
			return err
		}
		ext.Object = obj
		h.groupVersionKinds = append(h.groupVersionKinds, gkv)
		h.exts = append(h.exts, &ext)
	}
}

2、调用rest-client创建解析后的yaml资源

func (h *handler) apply(ctx context.Context) *metav1.Status {
	type applyObject struct {
		obj             runtime.Object
		isCreateRequest bool
		restClient      clientrest.Interface
		namespace       string
		kind            string
		name            string
	}

	applyObjects := make([]*applyObject, len(h.exts))

	for idx, ext := range h.exts {
		obj := ext.Object
		gvk := h.groupVersionKinds[idx]
		restClient := util.RESTClientFor(h.client, gvk.Group, gvk.Version)

		namespace, err := h.metaAccessor.Namespace(obj)
		if err != nil {
			return errorInternal
		}

		name, err := h.metaAccessor.Name(obj)
		if err != nil {
			return errorInternal
		}

		genName, err := h.metaAccessor.GenerateName(obj)
		if err != nil {
			return errorInternal
		}

		if len(name) == 0 && len(genName) == 0 {
			return errorBadName
		}

		if len(name) != 0 {
			result := restClient.Get().
				Context(ctx).
				NamespaceIfScoped(parseNamespaceIfScoped(namespace, gvk.Kind)).
				Resource(util.ResourceFromKind(gvk.Kind)).
				Name(name).
				Do()
			err := result.Error()
			if err != nil && !errors.IsNotFound(err) {
				if statusError, ok := err.(*errors.StatusError); ok {
					status := statusError.Status()
					return &status
				}
				return unknownError(err)
			}
			if err == nil {
				if h.notUpdate {
					return &metav1.Status{
						TypeMeta: metav1.TypeMeta{
							Kind:       "Status",
							APIVersion: "v1",
						},
						Status: metav1.StatusFailure,
						Code:   http.StatusConflict,
						Reason: metav1.StatusReasonAlreadyExists,
						Details: &metav1.StatusDetails{
							Name:  name,
							Group: gvk.Group,
							Kind:  gvk.Kind,
						},
						Message: fmt.Sprintf("%s \"%s\" already exists", gvk.Kind, name),
					}
				}
				returnedObj, err := result.Get()
				if err != nil {
					return errorInternal
				}
				resourceVersion, err := h.metaAccessor.ResourceVersion(obj)
				if err != nil {
					return errorInternal
				}
				if resourceVersion != "" {
					return errorHasResourceVersion
				}
				savedResourceVersion, err := h.metaAccessor.ResourceVersion(returnedObj)
				if err != nil {
					return errorInternal
				}
				if err := h.metaAccessor.SetResourceVersion(obj, savedResourceVersion); err != nil {
					return errorInternal
				}
				applyObjects[idx] = &applyObject{
					obj:             obj,
					isCreateRequest: false,
					restClient:      restClient,
					namespace:       namespace,
					kind:            gvk.Kind,
					name:            name,
				}
				continue
			}
		}
		// create
		applyObjects[idx] = &applyObject{
			obj:             obj,
			isCreateRequest: true,
			restClient:      restClient,
			namespace:       namespace,
			kind:            gvk.Kind,
			name:            name,
		}
	}

	var messages []string
	for _, applyObj := range applyObjects {
		if applyObj.isCreateRequest {
			// create
			result := applyObj.restClient.Post().
				Context(ctx).
				NamespaceIfScoped(parseNamespaceIfScoped(applyObj.namespace, applyObj.kind)).
				Resource(util.ResourceFromKind(applyObj.kind)).
				Body(applyObj.obj).
				Do()
			log.Debugf("Apply cluster bucket create call: %v", applyObj)
			err := result.Error()
			if err != nil {
				if statusError, ok := err.(*errors.StatusError); ok {
					status := statusError.Status()
					return &status
				}
				return unknownError(err)
			}
			if len(applyObj.name) != 0 {
				messages = append(messages, fmt.Sprintf("%s %s created", applyObj.kind, applyObj.name))
			} else {
				messages = append(messages, fmt.Sprintf("%s generated", applyObj.kind))
			}
		} else {
			// update
			result := applyObj.restClient.Put().
				Context(ctx).
				NamespaceIfScoped(parseNamespaceIfScoped(applyObj.namespace, applyObj.kind)).
				Resource(util.ResourceFromKind(applyObj.kind)).
				Name(applyObj.name).
				Body(applyObj.obj).
				Do()
			log.Debugf("Apply cluster bucket update call: %v", applyObj)
			err := result.Error()
			if err != nil {
				if statusError, ok := err.(*errors.StatusError); ok {
					status := statusError.Status()
					return &status
				}
				return unknownError(err)
			}
			messages = append(messages, fmt.Sprintf("%s %s configured", applyObj.kind, applyObj.name))
		}
	}

	return &metav1.Status{
		Status:  metav1.StatusSuccess,
		Code:    http.StatusOK,
		Message: strings.Join(messages, "\n"),
	}
}
Logo

开源、云原生的融合云平台

更多推荐