本代码片段展示如何通过informer监听service和service对应endpoint的变化

package main

import (
	"flag"
	"fmt"
	"k8s.io/client-go/util/homedir"
	"path/filepath"
	"time"

	"k8s.io/client-go/informers"
	coreinformers "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/klog"
	//"k8s.io/client-go/pkg/api/v1"
	"k8s.io/api/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/component-base/logs"
)

type ServiceEndPointLoggingController struct {
	informerFactory informers.SharedInformerFactory
	svcInformer     coreinformers.ServiceInformer
	epInformer      coreinformers.EndpointsInformer
}

// Run starts shared informers and waits for the shared informer cache to
// synchronize.
func (c *ServiceEndPointLoggingController) Run(stopCh chan struct{}) error {
	// Starts all the shared informers that have been created by the factory so
	// far.
	c.informerFactory.Start(stopCh)
	// wait for the initial synchronization of the local cache.
	if !cache.WaitForCacheSync(stopCh, c.svcInformer.Informer().HasSynced, c.epInformer.Informer().HasSynced) {
		return fmt.Errorf("failed to sync")
	}
	return nil
}

func NewSvcEpLoggingController(informerFactory informers.SharedInformerFactory) *ServiceEndPointLoggingController {
	svcInformer := informerFactory.Core().V1().Services()
	epInformer := informerFactory.Core().V1().Endpoints()

	c := &ServiceEndPointLoggingController{
		informerFactory: informerFactory,
		svcInformer:     svcInformer,
		epInformer:      epInformer,
	}

	svcInformer.Informer().AddEventHandler(
		// Your custom resource event handlers.
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				newSvc := obj.(*v1.Service)
				if newSvc.Namespace != "default" {
					return false
				}
				klog.Infof("filter: svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
				return true
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc: func(obj interface{}) {
					newSvc := obj.(*v1.Service)
					klog.Infof("controller: add svc, svc [%s/%s]\n", newSvc.Namespace, newSvc.Name)
				},

				UpdateFunc: func(oldObj, newObj interface{}) {
					newSvc := newObj.(*v1.Service)
					klog.Infof("controller: Update svc, pod [%s/%s]\n", newSvc.Namespace, newSvc.Name)
				},

				DeleteFunc: func(obj interface{}) {
					delSvc := obj.(*v1.Service)
					klog.Infof("controller: Delete svc, pod [%s/%s]\n", delSvc.Namespace, delSvc.Name)
				},
			},
		},
	)

	epInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				ep := obj.(*v1.Endpoints)
				if ep.Namespace != "default" {
					return false
				}
				return true
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc: func(cur interface{}) {
					endpoint := cur.(*v1.Endpoints)
					klog.Infof("AddEp:%v", endpoint.Name)
				},

				UpdateFunc: func(objA, objB interface{}) {
					ep1 := objA.(*v1.Endpoints)
					ep2 := objB.(*v1.Endpoints)
					klog.Infof("UpdateEp, name:%s, oldEp:%v, newEp:%v", ep1.Name, ep1.Subsets, ep2.Subsets)

				},

				DeleteFunc: func(cur interface{}) {
					endpoint := cur.(*v1.Endpoints)
					klog.Infof("DelEp [%s/%s]\n", endpoint.Namespace, endpoint.Name)
				},
			},
		},
	)

	return c
}

func main() {
	flag.Parse()
	logs.InitLogs()
	defer logs.FlushLogs()
	var kubeconfigTemp *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfigPath := filepath.Join(home, ".kube", "config")
		kubeconfigTemp = flag.String("kubeconfig1", kubeconfigPath, "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfigTemp = flag.String("kubeconfig1", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()

	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfigTemp)
	if err != nil {
		panic(err.Error())
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatal(err)
	}

	factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)
	controller := NewSvcEpLoggingController(factory)

	stop := make(chan struct{})
	defer close(stop)

	err = controller.Run(stop)
	if err != nil {
		klog.Fatal(err)
	}
	select {}
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐