kubernetes 源码分析之ingress(三)
经过实战了一下,用户也大概知道ingress是怎样使用的,下面接着深入讲解内部结构上图展示了一个nginx ingress controller的部署架构图,ingress controller通过轮询监听apiserver的方式来获取ingress资源的变化,将ingress资源存储到本地缓存,并通知nginx进行相应的配置修改的加载。ingress controller监控了ingr
经过实战了一下,用户也大概知道ingress是怎样使用的,下面接着深入讲解内部结构
上图展示了一个nginx ingress controller的部署架构图,ingress controller通过轮询监听apiserver的方式来获取ingress资源的变化,将ingress资源存储到本地缓存,并通知nginx进行相应的配置修改的加载。
ingress controller监控了ingress、service、endpoint、secret、node、configmap一系列资源,一旦资源发生了变化(包括增加、删除和修改),会立即通知backend,例如nginx等。
为了减少对apiserver的请求次数,nginx controllder会将每次请求在本地进行缓存,该缓存import了kubernetes提供的”k8s.io/kubernetes/pkg/client/cache”
下面进入代码先看服务启动
看代码:controllers/nginx/pkg/cmd/controller/main.go
那么细看怎么样更新配置的。
先看ingress controller里面的listwatch
ic.ingLister.Store, ic.ingController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Extensions().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
ic.endpLister.Store, ic.endpController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
&api.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
ic.secrLister.Store, ic.secrController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "secrets", api.NamespaceAll, fields.Everything()),
&api.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
ic.mapLister.Store, ic.mapController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "configmaps", api.NamespaceAll, fields.Everything()),
&api.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
ic.svcLister.Indexer, ic.svcController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
&api.Service{},
ic.cfg.ResyncPeriod,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything()),
&api.Node{}, ic.cfg.ResyncPeriod, eventHandler)
看看每种资源监听都有自己的eventhandler,譬如,
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
ic.syncQueue.Enqueue(cur)
}
},
}
把监听的消息放到syncQueue中,这个syncQueue的实现在core/pkg/task/queue.go中,会使用里面的sync方法处理队列里面的元素key,这个和controller manager里面的设计一脉相承。
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
close(t.workerDone)
return
}
glog.V(3).Infof("syncing %v", key)
if err := t.sync(key); err != nil {
glog.Warningf("requeuing %v, err %v", key, err)
t.queue.AddRateLimited(key)
} else {
t.queue.Forget(key)
}
t.queue.Done(key)
}
}
下面看具体怎么处理每个event,
func (ic *GenericController) sync(key interface{}) error {
ic.syncRateLimiter.Accept()
if ic.syncQueue.IsShuttingDown() {
return nil
}
if !ic.controllersInSync() {
time.Sleep(podStoreSyncedPollPeriod)
return fmt.Errorf("deferring sync till endpoints controller has synced")
}
upstreams, servers := ic.getBackendServers()
var passUpstreams []*ingress.SSLPassthroughBackend
for _, server := range servers {
if !server.SSLPassthrough {
continue
}
for _, loc := range server.Locations {
if loc.Path != rootLocation {
continue
}
passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
Backend: loc.Backend,
Hostname: server.Hostname,
})
break
}
}
data, err := ic.cfg.Backend.OnUpdate(ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
PassthroughBackends: passUpstreams,
})
if err != nil {
return err
}
out, reloaded, err := ic.cfg.Backend.Reload(data)
if err != nil {
incReloadErrorCount()
glog.Errorf("unexpected failure restarting the backend: \n%v", string(out))
return err
}
if reloaded {
glog.Infof("ingress backend successfully reloaded...")
incReloadCount()
}
return nil
}
这个方法里面,先是组织nginx的各种配置,然后通过OnUpdate更新配置,最后通过Reload更新配置。先看nginx对OnUpdate的实现方法,(controllers/nginx/pkg/cmd/controller/nginx.go)截取关键代码:
content, err := n.t.Write(config.TemplateConfig{
ProxySetHeaders: setHeaders,
MaxOpenFiles: maxOpenFiles,
BacklogSize: sysctlSomaxconn(),
Backends: ingressCfg.Backends,
PassthroughBackends: ingressCfg.PassthroughBackends,
Servers: ingressCfg.Servers,
TCPBackends: ingressCfg.TCPEndpoints,
UDPBackends: ingressCfg.UDPEndpoints,
HealthzURI: ngxHealthPath,
CustomErrors: len(cfg.CustomHTTPErrors) > 0,
Cfg: cfg,
})
这样就把template模板元素替换并返回更新后的模板,然后通知nginx更新配置
func (n NGINXController) Reload(data []byte) ([]byte, bool, error) {
if !n.isReloadRequired(data) {
return []byte("Reload not required"), false, nil
}
err := ioutil.WriteFile(cfgPath, data, 0644)
if err != nil {
return nil, false, err
}
o, e := exec.Command(n.binary, "-s", "reload").CombinedOutput()
return o, true, e
}
isReloadRequired判断文件是否相同确定是否需要更新。如果需要WriteFile覆盖写配置文件。通过nginx -s reload更新配置,ok。nginx的配置更新说完了。那么整个流程走完。
更多推荐
所有评论(0)