k8s client-go访问service的informer示例2之创建NewSharedIndexInformer
这个实例通过svcInformer := cache.NewSharedIndexInformer创建informer, 不是直接使用系统也有coreinformers.ServiceInformerpackage mainimport ("flag"v1 "k8s.io/api/core/v1"metav1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.i
·
这个实例通过svcInformer := cache.NewSharedIndexInformer创建informer, 不是直接使用系统也有coreinformers.ServiceInformer
package main
import (
"flag"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
"sync"
rt "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"math/rand"
"path/filepath"
"time"
)
// Interval of synchronizing service status from apiserver
const (
LocalServiceSyncPeriod = 30 * time.Second
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
rand.Seed(time.Now().UnixNano())
klog.InitFlags(nil)
var kubeconfigTemp *string
if home := homedir.HomeDir(); home != "" {
kubeconfigPath := filepath.Join(home, ".kube", "config")
kubeconfigTemp = flag.String("kubeconfig1", kubeconfigPath, "(optional) absolute path to the kubeconfig file")
} else {
kubeconfigTemp = flag.String("kubeconfig1", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfigTemp)
klog.Infof("test02")
CheckErr(err)
clientset, err := kubernetes.NewForConfig(config)
CheckErr(err)
//sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
stopCh := make(chan struct{})
defer close(stopCh)
fieldSelectorNotSys1 := fields.OneTermEqualSelector("metadata.namespace", "default")
//fieldSelectorNotSys2 := fields.OneTermNotEqualSelector("metadata.namespace", "polar")
selectorStr := fields.AndSelectors(fieldSelectorNotSys1).String()
klog.Infof("selectorStr:%s", selectorStr)
svcInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (rt.Object, error) {
//options.FieldSelector = selectorStr
return clientset.CoreV1().Services("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
//options.FieldSelector = selectorStr
return clientset.CoreV1().Services("").Watch(options)
},
},
&v1.Service{},
LocalServiceSyncPeriod,
cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
},
)
svcInformer.AddEventHandlerWithResyncPeriod(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
newSvc := obj.(*v1.Service)
if newSvc.Namespace != "default" {
klog.Infof("svc: filter svc, skip svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
return false
} else {
klog.Infof("svc: filter svc, svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
return true
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
newSvc := obj.(*v1.Service)
klog.Infof("controller: add svc, svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newSvc := newObj.(*v1.Service)
klog.Infof("controller: Update svc, pod [%s/%s]\n", newSvc.Namespace, newSvc.Name)
},
DeleteFunc: func(obj interface{}) {
delSvc := obj.(*v1.Service)
klog.Infof("controller: Delete svc, pod [%s/%s]\n", delSvc.Namespace, delSvc.Name)
},
},
},
LocalServiceSyncPeriod,
)
klog.Infof("svcInformer start of run")
go svcInformer.Run(stopCh)
if !controller.WaitForCacheSync("service", stopCh, svcInformer.HasSynced) {
klog.Infof("svcInformer start of run")
return
}
//time.Sleep(5 * time.Minute)
wg.Wait()
}
func initInformer(clientset *kubernetes.Clientset) coreinformers.PodInformer {
generate a shared informerFactory
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
// create pod informer and start it
podInformer := sharedInformerFactory.Core().V1().Pods()
return podInformer
}
func CheckErr(err error) {
if err != nil {
panic(err)
}
}
更多推荐
已为社区贡献9条内容
所有评论(0)