# tree client-go/ -L 1
client-go/
  ├── discovery //提供 DiscoveryClient 发现客户端
  ├── dynamic  //提供 DynamicClient 动态客户端
  ├── informers //每种 kubernetes 资源的 Informer 实现
  ├── kubernetes //提供 ClientSet 客户端
  ├── listers //为每一个 Kubernetes 资源提供 Lister 功能,该功能对 Get 和 List 请求提供只读的缓存数据
  ├── plugin // 提供 OpenStack、GCP 和 Azure 等云服务商授权插件
  ├── rest //提供 RESTClient 客户端,对 Kubernetes API Server 执行 RESTful 操作
  ├── scale //提供 ScaleClient 客户端,用于扩容或缩容 Deployment、ReplicaSet、Relication Controller 等资源对象
  ├── tools //提供常用工具,例如 SharedInformer、Reflector、DealtFIFO 及 Indexers。提供 Client 查询和缓存机制,以减少向 kube-apiserver 发起的请求数等
  ├── transport //提供安全的 TCP 连接,支持 Http Stream,某些操作需要在客户端和容器之间传输二进制流,例如 exec、attach 等操作。该功能由内部的 spdy 包提供支持
  ├── mapper
  └── util //常用方法,例如 WorkQueue 功能队列、Certificate 证书管理等
一、 Discovery client
import (
	"fmt"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func newDiscoveryClient() {
	config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
	if err != nil {
		panic(err)
	}
	dClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err)
	}

	_, apiResourceList, err := dClient.ServerGroupsAndResources()
	for _, v := range apiResourceList {
		gv, err := schema.ParseGroupVersion(v.GroupVersion)
		if err != nil {
			panic(err)
		}
		for _, resource := range v.APIResources {
			fmt.Println("name:", resource.Name, "    ", "group:", gv.Group, "    ", "version:", gv.Version)
		}
	}
}

二、Dynamic Client
import (
	"context"
	"fmt"
	corev1 "k8s.io/api/core/v1"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func newDynamicClient() {
	config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
	if err != nil {
		panic(err)
	}
	client, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err)
	}
	resource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
	resourceInterface := client.Resource(resource)
	list, err := resourceInterface.Namespace("default").List(context.Background(), v1.ListOptions{})
	if err != nil {
		panic(err)
	}
	podList := &corev1.PodList{}
	err = runtime.DefaultUnstructuredConverter.FromUnstructured(list.UnstructuredContent(), podList)
	if err != nil {
		panic(err)
	}
	for _, item := range podList.Items {
		fmt.Println(item.Namespace, item.Name)
	}

	clonesetGvr := schema.GroupVersionResource{Group:"apps.kruise.io",Version:"v1alpha1",Resource:"clonesets"}

	unstructuredCloneset, err := client.Resource(clonesetGvr).Namespace("default").Get(context.Background(), "nginx-cloneset", v1.GetOptions{})
	if err != nil {
		panic(err)
	}
	nestedInt64, b, err := unstructured.NestedInt64(unstructuredCloneset.Object, "spec", "replicas")
	if b && err == nil {
		fmt.Println(nestedInt64)
	}
	err = unstructured.SetNestedField(unstructuredCloneset.Object, int64(1), "spec", "replicas")
	if err != nil {
		panic(err)
	}
	_, err = client.Resource(clonesetGvr).Namespace("default").Update(context.Background(), unstructuredCloneset, v1.UpdateOptions{})
	if err != nil {
		panic(err)
	}
}
三、Rest Client
import (
	"context"
	"fmt"
	kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func newRestfulClient() {
	config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
	if err != nil {
		panic(err)
	}
	config.APIPath = "apis"
	config.GroupVersion = &kruiseappsv1alpha1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	clientFor, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err)
	}
	list := &kruiseappsv1alpha1.CloneSetList{}
	err = clientFor.Get().Namespace("default").
		Resource("clonesets").
		Do(context.Background()).
		Into(list)

	if err != nil {
		panic(err)
	}
	for _, item := range list.Items {
		fmt.Println(item.Namespace, item.Name)
	}
四、GVK & GVR
import (
	"fmt"
	"github.com/openkruise/kruise-api/apps/v1alpha1"
	appv1 "k8s.io/api/apps/v1"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/restmapper"
	"k8s.io/client-go/tools/clientcmd"
	"reflect"
)

func G() {
		// register gvk
	scheme := runtime.NewScheme()
	scheme.AddKnownTypes(v1.SchemeGroupVersion, &v1.Pod{}, &v1.Namespace{}, &v1.Node{}, &v1.ConfigMap{})
	scheme.AddKnownTypes(appv1.SchemeGroupVersion, &appv1.Deployment{}, &appv1.StatefulSet{})
	scheme.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.CloneSet{}, &v1alpha1.StatefulSet{})
	for groupVersionKind := range scheme.AllKnownTypes() {
		fmt.Println(groupVersionKind.String())
	}

	// GVK to GVR
	groupVersionKind := schema.GroupVersionKind{
		Group:   v1alpha1.GroupVersion.Group,
		Version: v1alpha1.GroupVersion.Version,
		Kind:    reflect.TypeOf(&v1alpha1.CloneSet{}).Elem().Name(),
	}
	config, _ := clientcmd.BuildConfigFromFlags("", "./kube.conf")
	clientset, _ := kubernetes.NewForConfig(config)
	apiGroupResources, _ := restmapper.GetAPIGroupResources(clientset.DiscoveryClient)
	restMapper := restmapper.NewDiscoveryRESTMapper(apiGroupResources)
	gvr, _ := restMapper.RESTMapping(groupVersionKind.GroupKind())
	fmt.Println(gvr.Resource.String())

	//GVR to GVK
	groupVersionResource := schema.GroupVersionResource{
		Group:    v1alpha1.GroupVersion.Group,
		Version:  v1alpha1.GroupVersion.Version,
		Resource: "clonesets",
	}
	groupVersionKinds, _ := restMapper.KindFor(groupVersionResource)
	fmt.Println(groupVersionKinds)
}

五、Informers & lists
import (
	"context"
	"fmt"
	v12 "k8s.io/api/core/v1"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"time"
)

func Informers() {
	config, _ := clientcmd.BuildConfigFromFlags("", "./kube.conf")
	clientset, _ := kubernetes.NewForConfig(config)
	c := make(chan struct{})

	informerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0,
		informers.WithNamespace("default"),
		informers.WithTweakListOptions(func(options *v1.ListOptions) {
			options.LabelSelector = labels.FormatLabels(map[string]string{"app": "nginx"})
			options.FieldSelector = labels.FormatLabels(map[string]string{"metadata.name": "nginx-cloneset-tv6nc"})
		}),
		informers.WithTransform(func(i interface{}) (interface{}, error) {
			fmt.Println(i)
			pod := i.(*v12.Pod)
			pod.Name = "xxx"
			return pod, nil
		}),
		informers.WithCustomResyncConfig(map[v1.Object]time.Duration{}),
	)
	nodes := informerFactory.Core().V1().Nodes().Lister()
	pods := informerFactory.Core().V1().Pods().Lister()

	podInformer := informerFactory.Core().V1().Pods().Informer()
	podInformer.AddEventHandler(eventFunc)

	informerFactory.Start(context.Background().Done())

	informerFactory.WaitForCacheSync(nil)
	ret, err := nodes.List(labels.Everything())
	if err != nil {
		panic(err)
	}
	for _, node := range ret {
		fmt.Println(node.Name)
	}
	list, err := pods.List(labels.Everything())
	if err != nil {
		panic(err)
	}
	for _, node := range list {
		fmt.Println(node.Name)
	}
	<-c
}

type EventFunc struct{}

func (e EventFunc) OnAdd(obj interface{}, isInInitialList bool) {
	fmt.Println("add", isInInitialList)
}

func (e EventFunc) OnUpdate(oldObj, newObj interface{}) {
	fmt.Println("update")
}

func (e EventFunc) OnDelete(obj interface{}) {
	fmt.Println("delete")
}

var eventFunc cache.ResourceEventHandler = EventFunc{}
六、Up & Download
import (
	"archive/tar"
	"bytes"
	"context"
	"fmt"
	"io"
	v1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/remotecommand"
	"os"
	"path/filepath"
)

func UpDown() {
	config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
	if err != nil {
		panic(err)
	}
	config.APIPath = "api"
	config.GroupVersion = &v1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	clientFor, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err)
	}
	cmds := []string{"tar", "cf", "-", "/bin"}
	executor := exec(clientFor, cmds, config)

	var stderr bytes.Buffer
	reader, writer := io.Pipe()
	go func() {
		defer writer.Close()
		err = executor.StreamWithContext(context.Background(), remotecommand.StreamOptions{
			Stdin:  os.Stdin,
			Stdout: writer,
			Stderr: &stderr,
			Tty:    false,
		})
	}()
	if err != nil {
		panic(err)
	}
	tarReader := tar.NewReader(reader)
	dest := "output_directory"

	for {
		header, err := tarReader.Next()
		if err != nil {
			if err != io.EOF {
				panic(err)
			}
			break
		}
		mode := header.FileInfo().Mode()
		if !mode.IsRegular() && !mode.IsDir() {
			fmt.Println("only support file download", mode.Type())
			continue
		}
		// 获取文件的完整路径
		path := filepath.Join(dest, header.Name)
		// 打开文件进行写入
		switch header.Typeflag {
		case tar.TypeDir:
			if err := os.MkdirAll(path, os.FileMode(header.Mode)); err != nil {
				panic(err)
			}
		default:
			// 打开文件进行写入
			we, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
			if err != nil {
				panic(err)
			}
			defer we.Close()
			// 将文件内容从tarReader复制到writer
			if _, err := io.Copy(we, tarReader); err != nil {
				panic(err)
			}
		}
	}
}

func exec(clientFor *rest.RESTClient, cmds []string, config *rest.Config) remotecommand.Executor {
	request := clientFor.Post().
		Namespace("default").
		Resource("pods").
		Name("nginx-cloneset-tv6nc").
		SubResource("exec").
		VersionedParams(&v1.PodExecOptions{
			Container: "container",
			Command:   cmds,
			Stdin:     true,
			Stdout:    true,
			Stderr:    true,
			TTY:       false,
		}, scheme.ParameterCodec)
	executor, err := remotecommand.NewSPDYExecutor(config, "POST", request.URL())
	if err != nil {
		panic(err)
	}
	return executor
}
七、Scale
import (
	"context"
	"fmt"
	"github.com/openkruise/kruise-api/apps/v1alpha1"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/scale"
	"k8s.io/client-go/tools/clientcmd"
	"reflect"
	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

func scales() {
	config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
	if err != nil {
		panic(err)
	}
	clientset, _ := kubernetes.NewForConfig(config)

	httpClient, err := rest.HTTPClientFor(config)
	if err != nil {
		panic(err)
	}
	restMapper, err := apiutil.NewDiscoveryRESTMapper(config, httpClient)

	if err != nil {
		panic(err)
	}

	scalesGetter, err := scale.NewForConfig(config, restMapper, dynamic.LegacyAPIPathResolverFunc, scale.NewDiscoveryScaleKindResolver(clientset.Discovery()))
	if err != nil {
		panic(err)
	}

	resource, err := restMapper.RESTMapping(schema.GroupKind{
		Group: v1alpha1.GroupVersion.Group,
		Kind:  reflect.TypeOf(&v1alpha1.CloneSet{}).Elem().Name(),
	})
	if err != nil {
		panic(err)
	}
	get, err := scalesGetter.Scales("default").Get(context.Background(), schema.GroupResource{
		Group:    v1alpha1.GroupVersion.Group,
		Resource: resource.Resource.Resource,
	}, "nginx-cloneset", v1.GetOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Println(get.Spec.Replicas)
}

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐