这个实例通过svcInformer := cache.NewSharedIndexInformer创建informer, 不是直接使用系统也有coreinformers.ServiceInformer

package main

import (
	"flag"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/watch"
	"sync"

	rt "k8s.io/apimachinery/pkg/runtime"
	"k8s.io/client-go/informers"
	coreinformers "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/klog"
	"k8s.io/kubernetes/pkg/controller"
	"math/rand"
	"path/filepath"
	"time"
)

// Interval of synchronizing service status from apiserver
const (
	LocalServiceSyncPeriod = 30 * time.Second
)

func main() {
	var wg sync.WaitGroup
	wg.Add(1)
	rand.Seed(time.Now().UnixNano())
	klog.InitFlags(nil)
	var kubeconfigTemp *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfigPath := filepath.Join(home, ".kube", "config")
		kubeconfigTemp = flag.String("kubeconfig1", kubeconfigPath, "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfigTemp = flag.String("kubeconfig1", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()

	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfigTemp)
	klog.Infof("test02")
	CheckErr(err)

	clientset, err := kubernetes.NewForConfig(config)
	CheckErr(err)

	//sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
	stopCh := make(chan struct{})
	defer close(stopCh)

	fieldSelectorNotSys1 := fields.OneTermEqualSelector("metadata.namespace", "default")
	//fieldSelectorNotSys2 := fields.OneTermNotEqualSelector("metadata.namespace", "polar")
	selectorStr := fields.AndSelectors(fieldSelectorNotSys1).String()
	klog.Infof("selectorStr:%s", selectorStr)

	svcInformer := cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (rt.Object, error) {
				//options.FieldSelector = selectorStr
				return clientset.CoreV1().Services("").List(options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				//options.FieldSelector = selectorStr
				return clientset.CoreV1().Services("").Watch(options)
			},
		},
		&v1.Service{},
		LocalServiceSyncPeriod,
		cache.Indexers{
			cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
		},
	)

	svcInformer.AddEventHandlerWithResyncPeriod(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				newSvc := obj.(*v1.Service)
				if newSvc.Namespace != "default" {
					klog.Infof("svc: filter svc, skip svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
					return false
				} else {
					klog.Infof("svc: filter svc, svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
					return true
				}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc: func(obj interface{}) {
					newSvc := obj.(*v1.Service)
					klog.Infof("controller: add svc, svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
				},

				UpdateFunc: func(oldObj, newObj interface{}) {
					newSvc := newObj.(*v1.Service)
					klog.Infof("controller: Update svc, pod [%s/%s]\n", newSvc.Namespace, newSvc.Name)
				},

				DeleteFunc: func(obj interface{}) {
					delSvc := obj.(*v1.Service)
					klog.Infof("controller: Delete svc, pod [%s/%s]\n", delSvc.Namespace, delSvc.Name)
				},
			},
		},
		LocalServiceSyncPeriod,
	)

	klog.Infof("svcInformer start of run")
	go svcInformer.Run(stopCh)

	if !controller.WaitForCacheSync("service", stopCh, svcInformer.HasSynced) {
		klog.Infof("svcInformer start of run")
		return
	}

	//time.Sleep(5 * time.Minute)
	wg.Wait()
}

func initInformer(clientset *kubernetes.Clientset) coreinformers.PodInformer {
	 generate a shared informerFactory
	sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)

	// create pod informer and start it
	podInformer := sharedInformerFactory.Core().V1().Pods()
	return podInformer
}

func CheckErr(err error) {
	if err != nil {
		panic(err)
	}
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐