client-go 各个包的详细使用demo
使用 go-client操作k8s资源对象的时候,除了常用的crud接口之外,
·
# tree client-go/ -L 1
client-go/
├── discovery //提供 DiscoveryClient 发现客户端
├── dynamic //提供 DynamicClient 动态客户端
├── informers //每种 kubernetes 资源的 Informer 实现
├── kubernetes //提供 ClientSet 客户端
├── listers //为每一个 Kubernetes 资源提供 Lister 功能,该功能对 Get 和 List 请求提供只读的缓存数据
├── plugin // 提供 OpenStack、GCP 和 Azure 等云服务商授权插件
├── rest //提供 RESTClient 客户端,对 Kubernetes API Server 执行 RESTful 操作
├── scale //提供 ScaleClient 客户端,用于扩容或缩容 Deployment、ReplicaSet、Relication Controller 等资源对象
├── tools //提供常用工具,例如 SharedInformer、Reflector、DealtFIFO 及 Indexers。提供 Client 查询和缓存机制,以减少向 kube-apiserver 发起的请求数等
├── transport //提供安全的 TCP 连接,支持 Http Stream,某些操作需要在客户端和容器之间传输二进制流,例如 exec、attach 等操作。该功能由内部的 spdy 包提供支持
├── mapper
└── util //常用方法,例如 WorkQueue 功能队列、Certificate 证书管理等
一、 Discovery client
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)
func newDiscoveryClient() {
config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
if err != nil {
panic(err)
}
dClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
_, apiResourceList, err := dClient.ServerGroupsAndResources()
for _, v := range apiResourceList {
gv, err := schema.ParseGroupVersion(v.GroupVersion)
if err != nil {
panic(err)
}
for _, resource := range v.APIResources {
fmt.Println("name:", resource.Name, " ", "group:", gv.Group, " ", "version:", gv.Version)
}
}
}
二、Dynamic Client
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func newDynamicClient() {
config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
if err != nil {
panic(err)
}
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
resource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
resourceInterface := client.Resource(resource)
list, err := resourceInterface.Namespace("default").List(context.Background(), v1.ListOptions{})
if err != nil {
panic(err)
}
podList := &corev1.PodList{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(list.UnstructuredContent(), podList)
if err != nil {
panic(err)
}
for _, item := range podList.Items {
fmt.Println(item.Namespace, item.Name)
}
clonesetGvr := schema.GroupVersionResource{Group:"apps.kruise.io",Version:"v1alpha1",Resource:"clonesets"}
unstructuredCloneset, err := client.Resource(clonesetGvr).Namespace("default").Get(context.Background(), "nginx-cloneset", v1.GetOptions{})
if err != nil {
panic(err)
}
nestedInt64, b, err := unstructured.NestedInt64(unstructuredCloneset.Object, "spec", "replicas")
if b && err == nil {
fmt.Println(nestedInt64)
}
err = unstructured.SetNestedField(unstructuredCloneset.Object, int64(1), "spec", "replicas")
if err != nil {
panic(err)
}
_, err = client.Resource(clonesetGvr).Namespace("default").Update(context.Background(), unstructuredCloneset, v1.UpdateOptions{})
if err != nil {
panic(err)
}
}
三、Rest Client
import (
"context"
"fmt"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func newRestfulClient() {
config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
if err != nil {
panic(err)
}
config.APIPath = "apis"
config.GroupVersion = &kruiseappsv1alpha1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
clientFor, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
list := &kruiseappsv1alpha1.CloneSetList{}
err = clientFor.Get().Namespace("default").
Resource("clonesets").
Do(context.Background()).
Into(list)
if err != nil {
panic(err)
}
for _, item := range list.Items {
fmt.Println(item.Namespace, item.Name)
}
四、GVK & GVR
import (
"fmt"
"github.com/openkruise/kruise-api/apps/v1alpha1"
appv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"reflect"
)
func G() {
// register gvk
scheme := runtime.NewScheme()
scheme.AddKnownTypes(v1.SchemeGroupVersion, &v1.Pod{}, &v1.Namespace{}, &v1.Node{}, &v1.ConfigMap{})
scheme.AddKnownTypes(appv1.SchemeGroupVersion, &appv1.Deployment{}, &appv1.StatefulSet{})
scheme.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.CloneSet{}, &v1alpha1.StatefulSet{})
for groupVersionKind := range scheme.AllKnownTypes() {
fmt.Println(groupVersionKind.String())
}
// GVK to GVR
groupVersionKind := schema.GroupVersionKind{
Group: v1alpha1.GroupVersion.Group,
Version: v1alpha1.GroupVersion.Version,
Kind: reflect.TypeOf(&v1alpha1.CloneSet{}).Elem().Name(),
}
config, _ := clientcmd.BuildConfigFromFlags("", "./kube.conf")
clientset, _ := kubernetes.NewForConfig(config)
apiGroupResources, _ := restmapper.GetAPIGroupResources(clientset.DiscoveryClient)
restMapper := restmapper.NewDiscoveryRESTMapper(apiGroupResources)
gvr, _ := restMapper.RESTMapping(groupVersionKind.GroupKind())
fmt.Println(gvr.Resource.String())
//GVR to GVK
groupVersionResource := schema.GroupVersionResource{
Group: v1alpha1.GroupVersion.Group,
Version: v1alpha1.GroupVersion.Version,
Resource: "clonesets",
}
groupVersionKinds, _ := restMapper.KindFor(groupVersionResource)
fmt.Println(groupVersionKinds)
}
五、Informers & lists
import (
"context"
"fmt"
v12 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"time"
)
func Informers() {
config, _ := clientcmd.BuildConfigFromFlags("", "./kube.conf")
clientset, _ := kubernetes.NewForConfig(config)
c := make(chan struct{})
informerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0,
informers.WithNamespace("default"),
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.LabelSelector = labels.FormatLabels(map[string]string{"app": "nginx"})
options.FieldSelector = labels.FormatLabels(map[string]string{"metadata.name": "nginx-cloneset-tv6nc"})
}),
informers.WithTransform(func(i interface{}) (interface{}, error) {
fmt.Println(i)
pod := i.(*v12.Pod)
pod.Name = "xxx"
return pod, nil
}),
informers.WithCustomResyncConfig(map[v1.Object]time.Duration{}),
)
nodes := informerFactory.Core().V1().Nodes().Lister()
pods := informerFactory.Core().V1().Pods().Lister()
podInformer := informerFactory.Core().V1().Pods().Informer()
podInformer.AddEventHandler(eventFunc)
informerFactory.Start(context.Background().Done())
informerFactory.WaitForCacheSync(nil)
ret, err := nodes.List(labels.Everything())
if err != nil {
panic(err)
}
for _, node := range ret {
fmt.Println(node.Name)
}
list, err := pods.List(labels.Everything())
if err != nil {
panic(err)
}
for _, node := range list {
fmt.Println(node.Name)
}
<-c
}
type EventFunc struct{}
func (e EventFunc) OnAdd(obj interface{}, isInInitialList bool) {
fmt.Println("add", isInInitialList)
}
func (e EventFunc) OnUpdate(oldObj, newObj interface{}) {
fmt.Println("update")
}
func (e EventFunc) OnDelete(obj interface{}) {
fmt.Println("delete")
}
var eventFunc cache.ResourceEventHandler = EventFunc{}
六、Up & Download
import (
"archive/tar"
"bytes"
"context"
"fmt"
"io"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"os"
"path/filepath"
)
func UpDown() {
config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
if err != nil {
panic(err)
}
config.APIPath = "api"
config.GroupVersion = &v1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
clientFor, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
cmds := []string{"tar", "cf", "-", "/bin"}
executor := exec(clientFor, cmds, config)
var stderr bytes.Buffer
reader, writer := io.Pipe()
go func() {
defer writer.Close()
err = executor.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: writer,
Stderr: &stderr,
Tty: false,
})
}()
if err != nil {
panic(err)
}
tarReader := tar.NewReader(reader)
dest := "output_directory"
for {
header, err := tarReader.Next()
if err != nil {
if err != io.EOF {
panic(err)
}
break
}
mode := header.FileInfo().Mode()
if !mode.IsRegular() && !mode.IsDir() {
fmt.Println("only support file download", mode.Type())
continue
}
// 获取文件的完整路径
path := filepath.Join(dest, header.Name)
// 打开文件进行写入
switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(path, os.FileMode(header.Mode)); err != nil {
panic(err)
}
default:
// 打开文件进行写入
we, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
panic(err)
}
defer we.Close()
// 将文件内容从tarReader复制到writer
if _, err := io.Copy(we, tarReader); err != nil {
panic(err)
}
}
}
}
func exec(clientFor *rest.RESTClient, cmds []string, config *rest.Config) remotecommand.Executor {
request := clientFor.Post().
Namespace("default").
Resource("pods").
Name("nginx-cloneset-tv6nc").
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Container: "container",
Command: cmds,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(config, "POST", request.URL())
if err != nil {
panic(err)
}
return executor
}
七、Scale
import (
"context"
"fmt"
"github.com/openkruise/kruise-api/apps/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/clientcmd"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
func scales() {
config, err := clientcmd.BuildConfigFromFlags("", "./kube.conf")
if err != nil {
panic(err)
}
clientset, _ := kubernetes.NewForConfig(config)
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
panic(err)
}
restMapper, err := apiutil.NewDiscoveryRESTMapper(config, httpClient)
if err != nil {
panic(err)
}
scalesGetter, err := scale.NewForConfig(config, restMapper, dynamic.LegacyAPIPathResolverFunc, scale.NewDiscoveryScaleKindResolver(clientset.Discovery()))
if err != nil {
panic(err)
}
resource, err := restMapper.RESTMapping(schema.GroupKind{
Group: v1alpha1.GroupVersion.Group,
Kind: reflect.TypeOf(&v1alpha1.CloneSet{}).Elem().Name(),
})
if err != nil {
panic(err)
}
get, err := scalesGetter.Scales("default").Get(context.Background(), schema.GroupResource{
Group: v1alpha1.GroupVersion.Group,
Resource: resource.Resource.Resource,
}, "nginx-cloneset", v1.GetOptions{})
if err != nil {
panic(err)
}
fmt.Println(get.Spec.Replicas)
}
更多推荐
已为社区贡献2条内容
所有评论(0)