k8s笔记——Client-go 4种客户端与Kubernetes API Server 交互
Client-Go 共提供了 4 种与 Kubernetes APIServer 交互的客户端。分别是 RESTClient、DiscoveryClient、ClientSet、DynamicClient。
文章目录
client-go源码结构及说明
源码目录结构及说明:
源码目录 | 说明 |
---|---|
discovery | 提供DiscoveryClient发现客户端 |
dynamic | 提供DynamicClient动态客户端 |
informers | 每种kubernetes资源的动态实现 |
kubernetes | 提供ClientSet客户端 |
listers | 为每一个kubernetes资源提供Lister功能,该功能对Get和List请求提供只读的缓存数据 |
plugin | 提供OpenStack、GCP和Azure等云服务商授权插件 |
rest | 提供RESTClient客户端,对Kuberntes API Server执行RESTful操作 |
scale | 提供ScaleClient客户端,用于扩容或缩容Deployment、ReplicaSet、Replication Controller等资源对象 |
tools | 提供常用工具,例如Sharedinformer、Reflector、DealtFIFO及Indexers。提供Client查询和缓存机制,以减少向kube-apiserver发起的请求数等 |
transport | 提供安全的TCP连接,支持Http Stream,某些操作需要在客户端和容器之间传输二进制流,例如exec、attach等操作。该功能由内部的spdy包提供支持 |
util | 提供常用方法,例如WorkQueue工作队列、Certificate证书管理 |
kubeconfig配置管理
kubeconfig可用于管理访问kube-apiserver的配置信息,也支持多集群的配置管理,可在不同环境下管理不同kube-apiserver集群配置。kubernetes的其他组件都是用kubeconfig配置信息来连接kube-apiserver组件。kubeconfig存储了集群、用户、命名空间和身份验证等信息,默认kubeconfig存放在$HOME/.kube/config
路径下。kubeconfig的配置信息如下:
简单点的
apiVersion: v1
kind: Config
clusters:
- cluster:
proxy-url: http://proxy.example.org:3128
server: https://k8s.example.org/k8s/clusters/c-xxyyzz
name: development
users:
- name: developer
contexts:
- context:
name: development
kubeconfig配置信息分为3部分,分别为:
- clusters:定义kubernetes集群信息,例如kube-apiserver的服务地址及集群的证书信息
- users:定义kubernetes集群用户身份验证的客户端凭据,例如client-certificate、client-key、token及username/password等。
- context:定义kuberntes集群用户信息和命名空间等,用于将请求发送到指定的集群
简单点说,clusters (集群),users(用户),context(上下文)
完整的kubeconfig
以minikube搭建的k8s集群为例
apiVersion: v1
clusters:
- cluster:
certificate-authority: /home/zzyy/.minikube/ca.crt
extensions:
- extension:
last-update: Wed, 22 Jun 2024 21:55:00 CST
provider: minikube.sigs.k8s.io
version: v1.33.1
name: cluster_info
server: https://192.168.49.2:8443
name: minikube
contexts:
- context:
cluster: minikube
extensions:
- extension:
last-update: Wed, 22 Jun 2024 21:55:00 CST
provider: minikube.sigs.k8s.io
version: v1.33.1
name: context_info
namespace: default
user: minikube
name: minikube
current-context: minikube
kind: Config
preferences: {}
users:
- name: minikube
user:
client-certificate: /home/zzyy/.minikube/profiles/minikube/client.crt
client-key: /home/zzyy/.minikube/profiles/minikube/client.key
四种Kubernetes APIServer 交互的客户端
Client-Go 共提供了 4 种与 Kubernetes APIServer 交互的客户端。分别是 RESTClient、DiscoveryClient、ClientSet、DynamicClient。
- RESTClient:最基础的客户端,主要是对 HTTP 请求进行了封装,支持 Json 和 Protobuf 格式的数据。
- DiscoveryClient:发现客户端,负责发现 APIServer 支持的资源组、资源版本和资源信息的。
- ClientSet:负责操作 Kubernetes 内置的资源对象,例如:Pod、Service等。
- DynamicClient:动态客户端,可以对任意的 Kubernetes 资源对象进行通用操作,包括 CRD。
RESTClient
RESTClient是最基础的客户端,其他的ClientSet、DynamicClient及DiscoveryClient都是基于RESTClient实现的。RESTClient对HTTP Request进行了封装,实现了RESTful风格的API。
package main
import (
"context"
"flag"
"fmt"
"path/filepath"
_ "time"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
_ "k8s.io/apimachinery/pkg/api/errors"
_ "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
// 把用户传递的命令行参数解析为对应变量的值
flag.Parse()
// use the current context in kubeconfig 在kubeconfig中使用当前上下文
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 使用 RESTClient 需要开发者自行设置资源 URL
// pod 资源没有 group,在核心组,所以前缀是 api
config.APIPath = "api"
// 设置 corev1 groupVersion
config.GroupVersion = &corev1.SchemeGroupVersion
// 设置解析器,用于用于解析 scheme
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
// 初始化 RESTClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err.Error())
}
// 调用结果用 podList 解析
result := &corev1.PodList{}
// 获取 kube-system 命名空间的 pod
namespace := "kube-system"
// 链式调用 RESTClient 方法获取,并将结果解析到 corev1.PodList{}
err = restClient.Get().Namespace(namespace).Resource("pods").Do(context.TODO()).Into(result)
if err != nil {
panic(err.Error())
}
// 打印结果
for _, pod := range result.Items {
fmt.Printf("namespace: %s, pod: %s \n", pod.Namespace, pod.Name)
//fmt.Printf(" Kind: %s, APIVersion: %s \n", pod.Kind, pod.APIVersion)
}
}
部分代码解释
restClient.Get() //Get请问方式
.Namespace(namespace) //命名空间
.Resource("pods") //指定需要查询的资源,传递资源名称
.Do(context.TODO()) //触发请求
.Into(result) //写入返回结果
执行go build -o app .
输出
namespace: kube-system, pod: coredns-6554b8b87f-f8dq6
namespace: kube-system, pod: coredns-6554b8b87f-qr4wn
namespace: kube-system, pod: etcd-minikube
namespace: kube-system, pod: kube-apiserver-minikube
namespace: kube-system, pod: kube-controller-manager-minikube
namespace: kube-system, pod: kube-proxy-tppnz
namespace: kube-system, pod: kube-scheduler-minikube
namespace: kube-system, pod: storage-provisioner
可以尝试切换namespace,查看其中的pod资源。
$ minikube kubectl -- get namespace
NAME STATUS AGE
default Active 2d
kube-node-lease Active 2d
kube-public Active 2d
kube-system Active 2d
kubernetes-dashboard Active 2d
clientSet
RESTClient是最基础的客户端,使用时需要指定Resource和Version等信息,编写代码时需要提前知道Resource所在的Group和对应的Version信息。ClientSet相比而言使用更加便捷,一般情况,对Kubernetes进行二次开发时通常使用ClientSet。 ClientSet在RESTClient的基础上封装了对Resource和Version的管理方法,每个Resource可以理解为一个客户端,而ClientSet则是多个客户端的集合,每个Resource和Version都以函数的方式暴露给开发者。
注意:ClientSet仅能访问Kubernetes自身的内置资源,不能直接访问CRD自定义资源;如果需要使用ClientSet访问CRD,则需要通过client-gen代码生成器重新生成ClientSet;DynamicClient可以访问CRD资源
package main
import (
"context"
"flag"
"fmt"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
// 把用户传递的命令行参数解析为对应变量的值
flag.Parse()
// use the current context in kubeconfig 在kubeconfig中使用当前上下文
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 获取 clientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
namespace := "kube-system"
// 链式调用 ClientSet 获取 pod 列表
podList, err := clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
for _, pod := range podList.Items {
fmt.Printf("namespace: %s, pod: %s \n", pod.Namespace, pod.Name)
}
}
dynamicClient
DynamicClient客户端是一种动态客户端,可以对任意的Kubernetes资源进行RESTful操作,包括CRD资源。 DynamicClient内部实现了Unstructured,用于处理非结构化数据结构(即无法提前预知的数据结构),这也是DynamicClient能够处理CRD资源的关键。
DynamicClient不是类型安全的,因此在访问CRD自定义资源是要注意,例如,在操作不当时可能会导致程序崩溃。 DynamicClient的处理过程是将Resource(如PodList)转换成Unstructured结构类型,Kubernetes的所有Resource都可以转换为该结构类型。处理完后再将Unstructured转换成PodList。过程类似于Go语言的interface{}断言转换过程。另外,Unstructured结构类型是通过map[string]interface{}转换的。
package main
import (
"context"
"flag"
"fmt"
"path/filepath"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
// 把用户传递的命令行参数解析为对应变量的值
flag.Parse()
// use the current context in kubeconfig 在kubeconfig中使用当前上下文
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 初始化 DynamicClient
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 提供 pod 的 gvr,因为是动态调用,dynamicClient 不知道需要操作哪个资源,所以需要自己提供
gvr := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}
//链式调用 dynamicClient 获取数据
result, err := dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
podList := &corev1.PodList{}
// 将结果解析到 podList scheme 中
err = runtime.DefaultUnstructuredConverter.FromUnstructured(
result.UnstructuredContent(), podList)
for _, pod := range podList.Items {
fmt.Printf("namespace: %s, pod: %s \n", pod.Namespace, pod.Name)
fmt.Printf(" Kind: %s, APIVersion: %s \n", pod.Kind, pod.APIVersion)
}
}
DiscoveryClient
DiscoveryClient是发现客户端,主要用于发现Kubenetes API Server所支持的资源组、资源版本、资源信息。 kubectl的api-versions和api-resources命令输出也是通过DiscoveryClient实现的。其同样是在RESTClient的基础上进行的封装。
DiscoveryClient还可以将资源组、资源版本、资源信息等存储在本地,用于本地缓存,减轻对kubernetes api sever的访问压力,缓存信息默认存储在:~/.kube/cache
和~/.kube/http-cache
下。
package main
import (
"fmt"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)
func main2() {
// 1、先创建一个客户端配置config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err.Error())
}
// 2、使用 discovery.NewDiscoveryClientForConfig(),创建一个 DiscoveryClient 对象
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err.Error())
}
// 3、使用 DiscoveryClient.ServerGroupsAndResources(),获取所有资源列表
_, resourceLists, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
// 4、遍历资源列表,打印出资源组和资源名称
for _, resource := range resourceLists {
fmt.Printf("resource groupVersion: %s\n", resource.GroupVersion)
for _, resource := range resource.APIResources {
fmt.Printf("resource name: %s\n", resource.Name)
}
fmt.Println("--------------------------")
}
}
输出
resource groupVersion: v1
resource name: bindings
resource name: componentstatuses
resource name: configmaps
resource name: endpoints
resource name: events
resource name: limitranges
resource name: namespaces
resource name: namespaces/finalize
resource name: namespaces/status
resource name: nodes
resource name: nodes/proxy
resource name: nodes/status
resource name: persistentvolumeclaims
resource name: persistentvolumeclaims/status
resource name: persistentvolumes
resource name: persistentvolumes/status
resource name: pods
resource name: pods/attach
resource name: pods/binding
resource name: pods/ephemeralcontainers
resource name: pods/eviction
resource name: pods/exec
resource name: pods/log
resource name: pods/portforward
resource name: pods/proxy
resource name: pods/status
resource name: podtemplates
resource name: replicationcontrollers
resource name: replicationcontrollers/scale
resource name: replicationcontrollers/status
resource name: resourcequotas
resource name: resourcequotas/status
resource name: secrets
resource name: serviceaccounts
resource name: serviceaccounts/token
resource name: services
resource name: services/proxy
resource name: services/status
--------------------------
resource groupVersion: events.k8s.io/v1
resource name: events
--------------------------
resource groupVersion: policy/v1
resource name: poddisruptionbudgets
resource name: poddisruptionbudgets/status
--------------------------
resource groupVersion: batch/v1
resource name: cronjobs
resource name: cronjobs/status
resource name: jobs
resource name: jobs/status
--------------------------
resource groupVersion: coordination.k8s.io/v1
resource name: leases
--------------------------
resource groupVersion: storage.k8s.io/v1
resource name: csidrivers
resource name: csinodes
resource name: csistoragecapacities
resource name: storageclasses
resource name: volumeattachments
resource name: volumeattachments/status
--------------------------
resource groupVersion: apiextensions.k8s.io/v1
resource name: customresourcedefinitions
resource name: customresourcedefinitions/status
--------------------------
resource groupVersion: apiregistration.k8s.io/v1
resource name: apiservices
resource name: apiservices/status
--------------------------
resource groupVersion: flowcontrol.apiserver.k8s.io/v1beta2
resource name: flowschemas
resource name: flowschemas/status
resource name: prioritylevelconfigurations
resource name: prioritylevelconfigurations/status
--------------------------
resource groupVersion: certificates.k8s.io/v1
resource name: certificatesigningrequests
resource name: certificatesigningrequests/approval
resource name: certificatesigningrequests/status
--------------------------
resource groupVersion: apps/v1
resource name: controllerrevisions
resource name: daemonsets
resource name: daemonsets/status
resource name: deployments
resource name: deployments/scale
resource name: deployments/status
resource name: replicasets
resource name: replicasets/scale
resource name: replicasets/status
resource name: statefulsets
resource name: statefulsets/scale
resource name: statefulsets/status
--------------------------
resource groupVersion: autoscaling/v2
resource name: horizontalpodautoscalers
resource name: horizontalpodautoscalers/status
--------------------------
resource groupVersion: autoscaling/v1
resource name: horizontalpodautoscalers
resource name: horizontalpodautoscalers/status
--------------------------
resource groupVersion: discovery.k8s.io/v1
resource name: endpointslices
--------------------------
resource groupVersion: authorization.k8s.io/v1
resource name: localsubjectaccessreviews
resource name: selfsubjectaccessreviews
resource name: selfsubjectrulesreviews
resource name: subjectaccessreviews
--------------------------
resource groupVersion: rbac.authorization.k8s.io/v1
resource name: clusterrolebindings
resource name: clusterroles
resource name: rolebindings
resource name: roles
--------------------------
resource groupVersion: flowcontrol.apiserver.k8s.io/v1beta3
resource name: flowschemas
resource name: flowschemas/status
resource name: prioritylevelconfigurations
resource name: prioritylevelconfigurations/status
--------------------------
resource groupVersion: authentication.k8s.io/v1
resource name: selfsubjectreviews
resource name: tokenreviews
--------------------------
resource groupVersion: scheduling.k8s.io/v1
resource name: priorityclasses
--------------------------
resource groupVersion: node.k8s.io/v1
resource name: runtimeclasses
--------------------------
resource groupVersion: networking.k8s.io/v1
resource name: ingressclasses
resource name: ingresses
resource name: ingresses/status
resource name: networkpolicies
--------------------------
resource groupVersion: admissionregistration.k8s.io/v1
resource name: mutatingwebhookconfigurations
resource name: validatingwebhookconfigurations
--------------------------
创建、更新、查询、删除Deployment
使用client-go,实现一个deployment的创建、更新和删除操作。代码依据官方的例子修改而来
package main
import (
"bufio"
"context"
"flag"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
appsresv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"os"
"path/filepath"
)
func main() {
// 1、创建配置文件
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
// 把用户传递的命令行参数解析为对应变量的值
flag.Parse()
// 在kubeconfig中使用当前上下文
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 2、创建clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
deployClient := clientset.AppsV1().Deployments(corev1.NamespaceDefault)
// 3、创建deployment
CreateDeploy(deployClient)
prompt()
// 4、更新deployment
UpdateDeploy(deployClient)
prompt()
// 5、查询deployment
ListDeploy(deployClient)
prompt()
// 6、删除deployment
DeleteDeploy(deployClient)
}
func CreateDeploy(client appsresv1.DeploymentInterface) {
klog.Info("CreateDeploy...........")
replicas := int32(2)
deploy := appsv1.Deployment{
TypeMeta: v1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: v1.ObjectMeta{
Name: "deploy-nginx-demo",
Namespace: corev1.NamespaceDefault,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"app": "nginx",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Name: "nginx",
Labels: map[string]string{
"app": "nginx",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "web",
Image: "nginx:1.12",
Ports: []corev1.ContainerPort{
{
Protocol: corev1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
}
dep, err := client.Create(context.Background(), &deploy, v1.CreateOptions{})
if err != nil {
klog.Errorf("create deployment error:%v", err)
return
}
klog.Infof("create deployment success, name:%s", dep.Name)
}
func UpdateDeploy(client appsresv1.DeploymentInterface) {
klog.Info("UpdateDeploy...........")
// 当有多个客户端对同一个资源进行操作时,可能会发生错误。使用RetryOnConflict来重试,重试相关参数由DefaultRetry来提供
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// 查询要更新的deploy
deploy, err := client.Get(context.Background(), "deploy-nginx-demo", v1.GetOptions{})
if err != nil {
klog.Errorf("can't get deployment, err:%v", err)
return nil
}
// 修改参数后进行更新
replicas := int32(1)
deploy.Spec.Replicas = &replicas
deploy.Spec.Template.Spec.Containers[0].Image = "nginx:1.13"
_, err = client.Update(context.Background(), deploy, v1.UpdateOptions{})
if err != nil {
klog.Errorf("update deployment error, err:%v", err)
}
return err
})
if err != nil {
klog.Errorf("update deployment error, err:%v", err)
} else {
klog.Infof("update deployment success")
}
}
func ListDeploy(client appsresv1.DeploymentInterface) {
klog.Info("ListDeploy...........")
deplist, err := client.List(context.Background(), v1.ListOptions{})
if err != nil {
klog.Errorf("list deployment error, err:%v", err)
return
}
for _, dep := range deplist.Items {
klog.Infof("deploy name:%s, replicas:%d, container image:%s", dep.Name, *dep.Spec.Replicas, dep.Spec.Template.Spec.Containers[0].Image)
}
}
func DeleteDeploy(client appsresv1.DeploymentInterface) {
klog.Info("DeleteDeploy...........")
// 删除策略
deletePolicy := v1.DeletePropagationForeground
err := client.Delete(context.Background(), "deploy-nginx-demo", v1.DeleteOptions{PropagationPolicy: &deletePolicy})
if err != nil {
klog.Errorf("delete deployment error, err:%v", err)
} else {
klog.Info("delete deployment success")
}
}
func prompt() {
fmt.Printf("-> Press Return key to continue.")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
break
}
if err := scanner.Err(); err != nil {
panic(err)
}
fmt.Println()
}
参考资料
k8s类型定义2之基础设施TypeMeta与ObjectMeta
k8s编程operator——(2) client-go中的informer
Client-go 客户端
k8s编程operator——(1) client-go基础部分
Kubernetes operator系列 client-go篇
client-go源码结构及Client客户端对象
k8s 之 kubeconfig 配置介绍及使用指南
k8s官网-使用 kubeconfig 文件组织集群访问
更多推荐
所有评论(0)