如何实现自己的crd controller
本文简单的实现了一个podipcontroller 并无实际用途,仅用于学习controller的实现package mainimport ("flag""k8s.io/client-go/kubernetes""k8s.io/client-go/rest""k8s.io/client-go/tools/clientcmd&qu
·
本文简单的实现了一个podipcontroller 并无实际用途,仅用于学习controller的实现
package main
import (
"flag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/cache"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/workqueue"
"time"
"k8s.io/apimachinery/pkg/fields"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"log"
)
func init(){
# kubeconfig flag
kubeconfig := flag.String("kubeconfig", "./kubeconfig", "Path to a kube config. Only required if out-of-cluster.")
}
func main(){
flag.Parse()
pic:=newPodipcontroller(*kubeconfig)
var stopCh <-chan struct{}
pic.Run(2, stopCh)
}
type Podipcontroller struct {
#用于watch 资源变化
kubeClient *kubernetes.Clientset
#用于crd的crud操作
crdclient *Podipclient
#实现了一个类似队列的存储对象,通过反射判断对象类型
podStore cache.Store
podController cache.Controller
podtoip *PodToIp
podsQueue workqueue.RateLimitingInterface
}
func (slm *Podipcontroller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fmt.Println("Starting serviceLookupController Manager")
//slm.registerTPR()
go slm.podController.Run(stopCh)
//go slm.endpointController.Run(stopCh)
// wait for the controller to List. This help avoid churns during start up.
if !cache.WaitForCacheSync(stopCh, slm.podController.HasSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(slm.podWorker, time.Second, stopCh)
}
<-stopCh
fmt.Printf("Shutting down Service Lookup Controller")
slm.podsQueue.ShutDown()
}
func (slm *Podipcontroller) podWorker() {
workFunc := func() bool {
key, quit := slm.podsQueue.Get()
if quit {
return true
}
defer slm.podsQueue.Done(key)
obj, exists, err := slm.podStore.GetByKey(key.(string))
if !exists {
fmt.Printf("Pod has been deleted %v\n", key)
return false
}
if err != nil {
fmt.Printf("cannot get pod: %v\n", key)
return false
}
pod := obj.(*v1.Pod)
log.Println(pod.Name,pod.Status.PodIP)
return false
}
for {
if quit := workFunc(); quit {
fmt.Printf("pod worker shutting down")
return
}
}
}
func newPodipcontroller(kubeconfig string) *Podipcontroller{
slm:=&Podipcontroller{
kubeClient:getClientsetOrDie(kubeconfig),
crdclient: getCRDClientOrDie(kubeconfig),
podsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"),
}
watchList:=cache.NewListWatchFromClient(slm.kubeClient.CoreV1().RESTClient(),"pods",v1.NamespaceAll,fields.Everything())
slm.podStore,slm.podController=cache.NewInformer(
watchList,
&v1.Pod{},
time.Second*30,
cache.ResourceEventHandlerFuncs{
AddFunc: slm.enqueuePod,
UpdateFunc: slm.updatePod,
},
)
return slm
}
func (slm *Podipcontroller) enqueuePod(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
fmt.Printf("Couldn't get key for object %+v: %v", obj, err)
return
}
slm.podsQueue.Add(key)
}
func (slm *Podipcontroller) updatePod(oldObj, newObj interface{}) {
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
if newPod.Status.PodIP == oldPod.Status.PodIP {
return
}
slm.enqueuePod(newObj)
}
type Podipclient struct {
rest *rest.RESTClient
}
type PodToIp struct {
metav1.TypeMeta `json:",inline"`
Metadata v1.ObjectMeta `json:"metadata"`
PodName string `json:"podName"`
PodAddress string `json:"podAddress"`
}
func getClientsetOrDie(kubeconfig string) *kubernetes.Clientset {
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
return clientset
}
func (c *Podipclient) Create(body PodToIp) (*PodToIp, error) {
var ret PodToIp
err := c.rest.Post().
Resource("podtoips").
Namespace(metav1.NamespaceDefault).
Body(body).
Do().Into(&ret)
return &ret, err
}
func (c *Podipclient) Update(body *PodToIp) (*PodToIp, error) {
var ret PodToIp
err := c.rest.Put().
Resource("podtoips").
Namespace(metav1.NamespaceDefault).
Name(body.Metadata.Name).
Body(body).
Do().Into(&ret)
return &ret, err
}
func (c *Podipclient) Get(name string) (*PodToIp, error) {
var ret PodToIp
err := c.rest.Get().
Resource("podtoips").
Namespace(metav1.NamespaceDefault).
Name(name).
Do().Into(&ret)
return &ret, err
}
func getCRDClientOrDie(kubeconfig string) *Podipclient {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}
configureClient(config)
rest, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
return &Podipclient{rest}
}
func configureClient(config *rest.Config) {
groupversion := schema.GroupVersion{
Group: "example.com",
Version: "v1",
}
config.GroupVersion = &groupversion
config.APIPath = "/apis"
// Currently TPR only supports JSON
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
schemeBuilder := runtime.NewSchemeBuilder(
func(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
groupversion,
&PodToIp{},
&PodToIpList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
)
return nil
})
schemeBuilder.AddToScheme(api.Scheme)
}
type PodToIpList struct {
metav1.TypeMeta `json:",inline"`
Metadata metav1.ListMeta `json:"metadata"`
Items []PodToIp `json:"items"`
}
更多推荐
已为社区贡献28条内容
所有评论(0)