client-go源码结构及说明

源码目录结构及说明:

源码目录说明
discovery提供DiscoveryClient发现客户端
dynamic提供DynamicClient动态客户端
informers每种kubernetes资源的动态实现
kubernetes提供ClientSet客户端
listers为每一个kubernetes资源提供Lister功能,该功能对Get和List请求提供只读的缓存数据
plugin提供OpenStack、GCP和Azure等云服务商授权插件
rest提供RESTClient客户端,对Kuberntes API Server执行RESTful操作
scale提供ScaleClient客户端,用于扩容或缩容Deployment、ReplicaSet、Replication Controller等资源对象
tools提供常用工具,例如Sharedinformer、Reflector、DealtFIFO及Indexers。提供Client查询和缓存机制,以减少向kube-apiserver发起的请求数等
transport提供安全的TCP连接,支持Http Stream,某些操作需要在客户端和容器之间传输二进制流,例如exec、attach等操作。该功能由内部的spdy包提供支持
util提供常用方法,例如WorkQueue工作队列、Certificate证书管理

kubeconfig配置管理

使用 kubeconfig 文件组织集群访问

kubeconfig可用于管理访问kube-apiserver的配置信息,也支持多集群的配置管理,可在不同环境下管理不同kube-apiserver集群配置。kubernetes的其他组件都是用kubeconfig配置信息来连接kube-apiserver组件。kubeconfig存储了集群、用户、命名空间和身份验证等信息,默认kubeconfig存放在$HOME/.kube/config路径下。kubeconfig的配置信息如下:

简单点的

apiVersion: v1
kind: Config

clusters:
- cluster:
    proxy-url: http://proxy.example.org:3128
    server: https://k8s.example.org/k8s/clusters/c-xxyyzz
  name: development

users:
- name: developer

contexts:
- context:
  name: development

kubeconfig配置信息分为3部分,分别为:

  • clusters:定义kubernetes集群信息,例如kube-apiserver的服务地址及集群的证书信息
  • users:定义kubernetes集群用户身份验证的客户端凭据,例如client-certificate、client-key、token及username/password等。
  • context:定义kuberntes集群用户信息和命名空间等,用于将请求发送到指定的集群

简单点说,clusters (集群),users(用户),context(上下文)

完整的kubeconfig

以minikube搭建的k8s集群为例

apiVersion: v1                                                               
clusters:                                                                    
- cluster:                                                                   
    certificate-authority: /home/zzyy/.minikube/ca.crt                       
    extensions:                                                              
    - extension:                                                             
        last-update: Wed, 22 Jun 2024 21:55:00 CST                           
        provider: minikube.sigs.k8s.io                                       
        version: v1.33.1                                                     
      name: cluster_info                                                     
    server: https://192.168.49.2:8443                                        
  name: minikube                                                             
contexts:                                                                    
- context:                                                                   
    cluster: minikube                                                        
    extensions:                                                              
    - extension:                                                             
        last-update: Wed, 22 Jun 2024 21:55:00 CST                           
        provider: minikube.sigs.k8s.io                                       
        version: v1.33.1                                                     
      name: context_info                                                     
    namespace: default                                                       
    user: minikube                                                           
  name: minikube                                                             
current-context: minikube                                                    
kind: Config                                                                 
preferences: {}                                                              
users:                                                                       
- name: minikube                                                             
  user:                                                                      
    client-certificate: /home/zzyy/.minikube/profiles/minikube/client.crt    
    client-key: /home/zzyy/.minikube/profiles/minikube/client.key            

四种Kubernetes APIServer 交互的客户端

Client-Go 共提供了 4 种与 Kubernetes APIServer 交互的客户端。分别是 RESTClient、DiscoveryClient、ClientSet、DynamicClient。
在这里插入图片描述

  • RESTClient:最基础的客户端,主要是对 HTTP 请求进行了封装,支持 Json 和 Protobuf 格式的数据。
  • DiscoveryClient:发现客户端,负责发现 APIServer 支持的资源组、资源版本和资源信息的。
  • ClientSet:负责操作 Kubernetes 内置的资源对象,例如:Pod、Service等。
  • DynamicClient:动态客户端,可以对任意的 Kubernetes 资源对象进行通用操作,包括 CRD。

RESTClient

RESTClient是最基础的客户端,其他的ClientSet、DynamicClient及DiscoveryClient都是基于RESTClient实现的。RESTClient对HTTP Request进行了封装,实现了RESTful风格的API。

package main

import (
	"context"
	"flag"
	"fmt"
	"path/filepath"
	_ "time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"

	_ "k8s.io/apimachinery/pkg/api/errors"
	_ "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func main() {
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	// 把用户传递的命令行参数解析为对应变量的值
	flag.Parse()

	// use the current context in kubeconfig   在kubeconfig中使用当前上下文
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err.Error())
	}

	// 使用 RESTClient 需要开发者自行设置资源 URL
	// pod 资源没有 group,在核心组,所以前缀是 api
	config.APIPath = "api"
	// 设置 corev1 groupVersion
	config.GroupVersion = &corev1.SchemeGroupVersion
	// 设置解析器,用于用于解析 scheme
	config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
	// 初始化 RESTClient
	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err.Error())
	}
	// 调用结果用 podList 解析
	result := &corev1.PodList{}
	// 获取 kube-system 命名空间的 pod
	namespace := "kube-system"
	// 链式调用 RESTClient 方法获取,并将结果解析到 corev1.PodList{}
	err = restClient.Get().Namespace(namespace).Resource("pods").Do(context.TODO()).Into(result)
	if err != nil {
		panic(err.Error())
	}

	// 打印结果
	for _, pod := range result.Items {
		fmt.Printf("namespace: %s, pod: %s \n", pod.Namespace, pod.Name)
		//fmt.Printf(" Kind: %s, APIVersion: %s \n", pod.Kind, pod.APIVersion)
	}

}

部分代码解释

restClient.Get() //Get请问方式
.Namespace(namespace) //命名空间
.Resource("pods") //指定需要查询的资源,传递资源名称
.Do(context.TODO()) //触发请求
.Into(result) //写入返回结果

执行go build -o app .
输出

namespace: kube-system, pod: coredns-6554b8b87f-f8dq6 
namespace: kube-system, pod: coredns-6554b8b87f-qr4wn 
namespace: kube-system, pod: etcd-minikube 
namespace: kube-system, pod: kube-apiserver-minikube 
namespace: kube-system, pod: kube-controller-manager-minikube 
namespace: kube-system, pod: kube-proxy-tppnz 
namespace: kube-system, pod: kube-scheduler-minikube 
namespace: kube-system, pod: storage-provisioner 

可以尝试切换namespace,查看其中的pod资源。

$ minikube kubectl -- get namespace
NAME                   STATUS   AGE
default                Active   2d
kube-node-lease        Active   2d
kube-public            Active   2d
kube-system            Active   2d
kubernetes-dashboard   Active   2d

clientSet

RESTClient是最基础的客户端,使用时需要指定Resource和Version等信息,编写代码时需要提前知道Resource所在的Group和对应的Version信息。ClientSet相比而言使用更加便捷,一般情况,对Kubernetes进行二次开发时通常使用ClientSet。 ClientSet在RESTClient的基础上封装了对Resource和Version的管理方法,每个Resource可以理解为一个客户端,而ClientSet则是多个客户端的集合,每个Resource和Version都以函数的方式暴露给开发者。

注意:ClientSet仅能访问Kubernetes自身的内置资源,不能直接访问CRD自定义资源;如果需要使用ClientSet访问CRD,则需要通过client-gen代码生成器重新生成ClientSet;DynamicClient可以访问CRD资源

在这里插入图片描述

package main

import (
	"context"
	"flag"
	"fmt"
	"path/filepath"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func main() {
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	// 把用户传递的命令行参数解析为对应变量的值
	flag.Parse()

	// use the current context in kubeconfig   在kubeconfig中使用当前上下文
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err.Error())
	}

	// 获取 clientSet
	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	namespace := "kube-system"
	// 链式调用 ClientSet 获取 pod 列表
	podList, err := clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		panic(err.Error())
	}

	for _, pod := range podList.Items {
		fmt.Printf("namespace: %s, pod: %s \n", pod.Namespace, pod.Name)
	}
}

dynamicClient

DynamicClient客户端是一种动态客户端,可以对任意的Kubernetes资源进行RESTful操作,包括CRD资源。 DynamicClient内部实现了Unstructured,用于处理非结构化数据结构(即无法提前预知的数据结构),这也是DynamicClient能够处理CRD资源的关键。

DynamicClient不是类型安全的,因此在访问CRD自定义资源是要注意,例如,在操作不当时可能会导致程序崩溃。 DynamicClient的处理过程是将Resource(如PodList)转换成Unstructured结构类型,Kubernetes的所有Resource都可以转换为该结构类型。处理完后再将Unstructured转换成PodList。过程类似于Go语言的interface{}断言转换过程。另外,Unstructured结构类型是通过map[string]interface{}转换的。

package main

import (
	"context"
	"flag"
	"fmt"
	"path/filepath"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func main() {
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	// 把用户传递的命令行参数解析为对应变量的值
	flag.Parse()

	// use the current context in kubeconfig   在kubeconfig中使用当前上下文
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err.Error())
	}

	// 初始化 DynamicClient
	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	// 提供 pod 的 gvr,因为是动态调用,dynamicClient 不知道需要操作哪个资源,所以需要自己提供
	gvr := schema.GroupVersionResource{
		Group:    "",
		Version:  "v1",
		Resource: "pods",
	}
	//链式调用 dynamicClient 获取数据
	result, err := dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		panic(err.Error())
	}
	podList := &corev1.PodList{}
	// 将结果解析到 podList scheme 中
	err = runtime.DefaultUnstructuredConverter.FromUnstructured(
		result.UnstructuredContent(), podList)

	for _, pod := range podList.Items {
		fmt.Printf("namespace: %s, pod: %s \n", pod.Namespace, pod.Name)
		fmt.Printf(" Kind: %s, APIVersion: %s \n", pod.Kind, pod.APIVersion)
	}
}

DiscoveryClient

DiscoveryClient是发现客户端,主要用于发现Kubenetes API Server所支持的资源组、资源版本、资源信息。 kubectl的api-versions和api-resources命令输出也是通过DiscoveryClient实现的。其同样是在RESTClient的基础上进行的封装。
DiscoveryClient还可以将资源组、资源版本、资源信息等存储在本地,用于本地缓存,减轻对kubernetes api sever的访问压力,缓存信息默认存储在:~/.kube/cache~/.kube/http-cache下。

package main

import (
	"fmt"
	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func main2() {
	// 1、先创建一个客户端配置config
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err.Error())
	}

	// 2、使用 discovery.NewDiscoveryClientForConfig(),创建一个 DiscoveryClient 对象
	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	// 3、使用 DiscoveryClient.ServerGroupsAndResources(),获取所有资源列表
	_, resourceLists, err := discoveryClient.ServerGroupsAndResources()
	if err != nil {
		panic(err.Error())
	}

	// 4、遍历资源列表,打印出资源组和资源名称
	for _, resource := range resourceLists {
		fmt.Printf("resource groupVersion: %s\n", resource.GroupVersion)
		for _, resource := range resource.APIResources {
			fmt.Printf("resource name: %s\n", resource.Name)
		}
		fmt.Println("--------------------------")
	}

}


输出

resource groupVersion: v1
resource name: bindings
resource name: componentstatuses
resource name: configmaps
resource name: endpoints
resource name: events
resource name: limitranges
resource name: namespaces
resource name: namespaces/finalize
resource name: namespaces/status
resource name: nodes
resource name: nodes/proxy
resource name: nodes/status
resource name: persistentvolumeclaims
resource name: persistentvolumeclaims/status
resource name: persistentvolumes
resource name: persistentvolumes/status
resource name: pods
resource name: pods/attach
resource name: pods/binding
resource name: pods/ephemeralcontainers
resource name: pods/eviction
resource name: pods/exec
resource name: pods/log
resource name: pods/portforward
resource name: pods/proxy
resource name: pods/status
resource name: podtemplates
resource name: replicationcontrollers
resource name: replicationcontrollers/scale
resource name: replicationcontrollers/status
resource name: resourcequotas
resource name: resourcequotas/status
resource name: secrets
resource name: serviceaccounts
resource name: serviceaccounts/token
resource name: services
resource name: services/proxy
resource name: services/status
--------------------------
resource groupVersion: events.k8s.io/v1
resource name: events
--------------------------
resource groupVersion: policy/v1
resource name: poddisruptionbudgets
resource name: poddisruptionbudgets/status
--------------------------
resource groupVersion: batch/v1
resource name: cronjobs
resource name: cronjobs/status
resource name: jobs
resource name: jobs/status
--------------------------
resource groupVersion: coordination.k8s.io/v1
resource name: leases
--------------------------
resource groupVersion: storage.k8s.io/v1
resource name: csidrivers
resource name: csinodes
resource name: csistoragecapacities
resource name: storageclasses
resource name: volumeattachments
resource name: volumeattachments/status
--------------------------
resource groupVersion: apiextensions.k8s.io/v1
resource name: customresourcedefinitions
resource name: customresourcedefinitions/status
--------------------------
resource groupVersion: apiregistration.k8s.io/v1
resource name: apiservices
resource name: apiservices/status
--------------------------
resource groupVersion: flowcontrol.apiserver.k8s.io/v1beta2
resource name: flowschemas
resource name: flowschemas/status
resource name: prioritylevelconfigurations
resource name: prioritylevelconfigurations/status
--------------------------
resource groupVersion: certificates.k8s.io/v1
resource name: certificatesigningrequests
resource name: certificatesigningrequests/approval
resource name: certificatesigningrequests/status
--------------------------
resource groupVersion: apps/v1
resource name: controllerrevisions
resource name: daemonsets
resource name: daemonsets/status
resource name: deployments
resource name: deployments/scale
resource name: deployments/status
resource name: replicasets
resource name: replicasets/scale
resource name: replicasets/status
resource name: statefulsets
resource name: statefulsets/scale
resource name: statefulsets/status
--------------------------
resource groupVersion: autoscaling/v2
resource name: horizontalpodautoscalers
resource name: horizontalpodautoscalers/status
--------------------------
resource groupVersion: autoscaling/v1
resource name: horizontalpodautoscalers
resource name: horizontalpodautoscalers/status
--------------------------
resource groupVersion: discovery.k8s.io/v1
resource name: endpointslices
--------------------------
resource groupVersion: authorization.k8s.io/v1
resource name: localsubjectaccessreviews
resource name: selfsubjectaccessreviews
resource name: selfsubjectrulesreviews
resource name: subjectaccessreviews
--------------------------
resource groupVersion: rbac.authorization.k8s.io/v1
resource name: clusterrolebindings
resource name: clusterroles
resource name: rolebindings
resource name: roles
--------------------------
resource groupVersion: flowcontrol.apiserver.k8s.io/v1beta3
resource name: flowschemas
resource name: flowschemas/status
resource name: prioritylevelconfigurations
resource name: prioritylevelconfigurations/status
--------------------------
resource groupVersion: authentication.k8s.io/v1
resource name: selfsubjectreviews
resource name: tokenreviews
--------------------------
resource groupVersion: scheduling.k8s.io/v1
resource name: priorityclasses
--------------------------
resource groupVersion: node.k8s.io/v1
resource name: runtimeclasses
--------------------------
resource groupVersion: networking.k8s.io/v1
resource name: ingressclasses
resource name: ingresses
resource name: ingresses/status
resource name: networkpolicies
--------------------------
resource groupVersion: admissionregistration.k8s.io/v1
resource name: mutatingwebhookconfigurations
resource name: validatingwebhookconfigurations
--------------------------

创建、更新、查询、删除Deployment

使用client-go,实现一个deployment的创建、更新和删除操作。代码依据官方的例子修改而来

package main

import (
	"bufio"
	"context"
	"flag"
	"fmt"
	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	appsresv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/client-go/util/retry"
	"k8s.io/klog/v2"
	"os"
	"path/filepath"
)

func main() {
	// 1、创建配置文件
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	// 把用户传递的命令行参数解析为对应变量的值
	flag.Parse()

	// 在kubeconfig中使用当前上下文
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err.Error())
	}

	// 2、创建clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}
	deployClient := clientset.AppsV1().Deployments(corev1.NamespaceDefault)

	// 3、创建deployment
	CreateDeploy(deployClient)

	prompt()
	// 4、更新deployment
	UpdateDeploy(deployClient)

	prompt()
	// 5、查询deployment
	ListDeploy(deployClient)

	prompt()
	// 6、删除deployment
	DeleteDeploy(deployClient)
}

func CreateDeploy(client appsresv1.DeploymentInterface) {
	klog.Info("CreateDeploy...........")
	replicas := int32(2)
	deploy := appsv1.Deployment{
		TypeMeta: v1.TypeMeta{
			Kind:       "Deployment",
			APIVersion: "apps/v1",
		},
		ObjectMeta: v1.ObjectMeta{
			Name:      "deploy-nginx-demo",
			Namespace: corev1.NamespaceDefault,
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: &replicas,
			Selector: &v1.LabelSelector{
				MatchLabels: map[string]string{
					"app": "nginx",
				},
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: v1.ObjectMeta{
					Name: "nginx",
					Labels: map[string]string{
						"app": "nginx",
					},
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "web",
							Image: "nginx:1.12",
							Ports: []corev1.ContainerPort{
								{
									Protocol:      corev1.ProtocolTCP,
									ContainerPort: 80,
								},
							},
						},
					},
				},
			},
		},
	}

	dep, err := client.Create(context.Background(), &deploy, v1.CreateOptions{})
	if err != nil {
		klog.Errorf("create deployment error:%v", err)
		return
	}
	klog.Infof("create deployment success, name:%s", dep.Name)
}

func UpdateDeploy(client appsresv1.DeploymentInterface) {
	klog.Info("UpdateDeploy...........")
	// 当有多个客户端对同一个资源进行操作时,可能会发生错误。使用RetryOnConflict来重试,重试相关参数由DefaultRetry来提供
	err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
		// 查询要更新的deploy
		deploy, err := client.Get(context.Background(), "deploy-nginx-demo", v1.GetOptions{})
		if err != nil {
			klog.Errorf("can't get deployment, err:%v", err)
			return nil
		}

		// 修改参数后进行更新
		replicas := int32(1)
		deploy.Spec.Replicas = &replicas
		deploy.Spec.Template.Spec.Containers[0].Image = "nginx:1.13"

		_, err = client.Update(context.Background(), deploy, v1.UpdateOptions{})
		if err != nil {
			klog.Errorf("update deployment error, err:%v", err)
		}
		return err
	})

	if err != nil {
		klog.Errorf("update deployment error, err:%v", err)
	} else {
		klog.Infof("update deployment success")
	}

}

func ListDeploy(client appsresv1.DeploymentInterface) {
	klog.Info("ListDeploy...........")
	deplist, err := client.List(context.Background(), v1.ListOptions{})
	if err != nil {
		klog.Errorf("list deployment error, err:%v", err)
		return
	}

	for _, dep := range deplist.Items {
		klog.Infof("deploy name:%s, replicas:%d, container image:%s", dep.Name, *dep.Spec.Replicas, dep.Spec.Template.Spec.Containers[0].Image)
	}
}

func DeleteDeploy(client appsresv1.DeploymentInterface) {
	klog.Info("DeleteDeploy...........")
	// 删除策略
	deletePolicy := v1.DeletePropagationForeground
	err := client.Delete(context.Background(), "deploy-nginx-demo", v1.DeleteOptions{PropagationPolicy: &deletePolicy})
	if err != nil {
		klog.Errorf("delete deployment error, err:%v", err)
	} else {
		klog.Info("delete deployment success")
	}
}

func prompt() {
	fmt.Printf("-> Press Return key to continue.")
	scanner := bufio.NewScanner(os.Stdin)
	for scanner.Scan() {
		break
	}
	if err := scanner.Err(); err != nil {
		panic(err)
	}
	fmt.Println()
}

参考资料

k8s类型定义2之基础设施TypeMeta与ObjectMeta
k8s编程operator——(2) client-go中的informer
Client-go 客户端
k8s编程operator——(1) client-go基础部分
Kubernetes operator系列 client-go篇
client-go源码结构及Client客户端对象
k8s 之 kubeconfig 配置介绍及使用指南
k8s官网-使用 kubeconfig 文件组织集群访问

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐