ingress-controller 源码分析
主要逻辑nginx controller 入口函数// file:k8s.io/ingress-nginx/nginx/main.gofunc main() {// step1: 初始化日志组件klog.InitFlags(nil)......// step2:创建必要的目录err = file.CreateRequiredDirectories()......// step 3 :初始化Apis
主要逻辑
nginx controller 入口函数
// file:k8s.io/ingress-nginx/nginx/main.go
func main() {
// step1: 初始化日志组件
klog.InitFlags(nil)
......
// step2:创建必要的目录
err = file.CreateRequiredDirectories()
......
// step 3 :初始化ApiserverClient
kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
......
// step4: 检查service配置
if len(conf.DefaultService) > 0 {
err := checkService(conf.DefaultService, kubeClient)
......
klog.Infof("Validated %v as the default backend.", conf.DefaultService)
}
if len(conf.PublishService) > 0 {
err := checkService(conf.PublishService, kubeClient)
......
}
// step5:获取namespace
if conf.Namespace != "" {
_, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
if err != nil {
klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
}
}
// step6: 创建默认证书
conf.FakeCertificate = ssl.GetFakeSSLCert()
klog.Infof("SSL fake certificate created %v", conf.FakeCertificate.PemFileName)
// step7: 检查是否支持v1beta API 、k8s 版本是否高于1.18.0
k8s.IsNetworkingIngressAvailable, k8s.IsIngressV1Ready = k8s.NetworkingIngressAvailable(kubeClient)
if !k8s.IsNetworkingIngressAvailable {
klog.Warningf("Using deprecated \"k8s.io/api/extensions/v1beta1\" package because Kubernetes version is < v1.14.0")
}
if k8s.IsIngressV1Ready {
......
}
conf.Client = kubeClient
// step8: 注册prometheus
reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
PidFn: func() (int, error) { return os.Getpid(), nil },
ReportErrors: true,
}))
......
// step9:启动profile
if conf.EnableProfiling {
go registerProfiler()
}
// step10: 实例化nginxcontroller (*)
ngx := controller.NewNGINXController(conf, mc)
// step11: 启动健康探测和metrics API
mux := http.NewServeMux()
registerHealthz(nginx.HealthPath, ngx, mux)
registerMetrics(reg, mux)
go startHTTPServer(conf.ListenPorts.Health, mux)
// step12: 启动nginx master进程
go ngx.Start()
......
}
nginx controller 初始化
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
// 初始化 event broadcaster
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: config.Client.CoreV1().Events(config.Namespace),
})
// 获取/etc/resolv.conf 中的nameserver 列表
h, err := dns.GetSystemNameServers()
if err != nil {
klog.Warningf("Error reading system nameservers: %v", err)
}
// 实例化NGINXController
n := &NGINXController{
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
cfg: config,
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
}),
stopCh: make(chan struct{}),
updateCh: channels.NewRingChannel(1024),
ngxErrCh: make(chan error),
stopLock: &sync.Mutex{},
runningConfig: new(ingress.Configuration),
Proxy: &TCPProxy{},
metricCollector: mc,
command: NewNginxCommand(),
}
// 启动webhook 服务
if n.cfg.ValidationWebhook != "" {
n.validationWebhookServer = &http.Server{
Addr: config.ValidationWebhook,
Handler: adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
}
}
// 获取pod runtime信息
pod, err := k8s.GetPodDetails(config.Client)
if err != nil {
klog.Fatalf("unexpected error obtaining pod information: %v", err)
}
n.podInfo = pod
// 实例化store(本地缓存)
n.store = store.New(
config.Namespace,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,
config.DefaultSSLCertificate,
config.ResyncPeriod,
config.Client,
n.updateCh,
pod,
config.DisableCatchAll)
// 创建同步队列
n.syncQueue = task.NewTaskQueue(n.syncIngress)
... ...
// 格式化template配置模板
onTemplateChange := func() {
template, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
klog.Errorf(`
-------------------------------------------------------------------------------
Error loading new template: %v
-------------------------------------------------------------------------------
`, err)
return
}
// 若模板格式化正确,则更新到nginxcontroller 对象中,并往同步队列发送一个template-change事件
n.t = template
klog.Info("New NGINX configuration template loaded.")
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}
// 首次启动加载配置模板文件
ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
......
n.t = ngxTpl
// 监听模板文件变化
// 监听 /etc/nginx/template/nginx.tmpl 模板文件是否有变化,有变化则调用onTemplateChange
_, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
... ...
// 监听/etc/nginx/geoip/ 目录下配置文件变化
filesToWatch := []string{}
err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
......
filesToWatch = append(filesToWatch, path)
......
})
......
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
klog.Infof("File %v changed. Reloading NGINX", f)
// 配置文件有变化则往同步队列发送一个file-change 事件
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
......
}
return n
}
ingress controller 结构体
type NGINXController struct {
// pod runtime 信息
podInfo *k8s.PodInfo
// 配置信息
cfg *Configuration
// 事件通知器
recorder record.EventRecorder
// 同步队列
syncQueue *task.Queue
// 同步状态
syncStatus status.Syncer
// 同步限流器
syncRateLimiter flowcontrol.RateLimiter
stopLock *sync.Mutex
stopCh chan struct{}
// 更新环状channel
updateCh *channels.RingChannel
// 接受nginx 错误信息channel
ngxErrCh chan error
// 当前配置文件
runningConfig *ingress.Configuration
// nginx 配置模板文件
t ngx_template.TemplateWriter
// nameserver 列表
resolver []net.IP
// 是否启用ipv6
isIPV6Enabled bool
// 是否关闭
isShuttingDown bool
// TCP代理
Proxy *TCPProxy
// 本地缓存
store store.Storer
// metrics 收集器
metricCollector metric.Collector
// webhook
validationWebhookServer *http.Server
// nginx 二进制命令
command NginxExecTester
}
ngx.Start()
ngx.Start() 主要做3个事情
启动store 协程
启动syncQueue协程
监听updateCh
当从updateCh 见到变化事件时,向syncQueue 发送一个task
// file:internal/ingress/controller/nginx.go
// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {
klog.Info("Starting NGINX Ingress controller")
// 初始化同步informers 及secret
n.store.Run(n.stopCh)
// we need to use the defined ingress class to allow multiple leaders
// in order to update information about ingress status
// 定义节点选举ID (ingress class 用于区分不同集群)
// 使用定义的ingress class 来允许多个leader节点更新ingress状态
electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass)
if class.IngressClass != "" {
electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass)
}
// leader节点选举
setupLeaderElection(&leaderElectionConfig{
......
})
cmd := n.command.ExecCommand()
......
if n.cfg.EnableSSLPassthrough {
n.setupSSLProxy()
}
// 启动nginx
klog.Info("Starting NGINX process")
n.start(cmd)
// 启动同步队列
go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
// 发送initial-sync 事件
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
// In case of error the temporal configuration file will
// be available up to five minutes after the error
// 每隔5分钟删除临时配置文件
go func() {
for {
time.Sleep(5 * time.Minute)
err := cleanTempNginxCfg()
......
}
}()
......
for {
select {
case err := <-n.ngxErrCh:
if n.isShuttingDown {
return
}
// if the nginx master process dies, the workers continue to process requests
// until the failure of the configured livenessProbe and restart of the pod.
// master 进程挂掉时,workerInc进程将继续处理请求,直到配置的liveness探针探测失败
if process.IsRespawnIfRequired(err) {
return
}
// 循环从updateCh里面获取事件
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
klog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
// 放入可忽略的同步队列
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
klog.Warningf("Unexpected event type received %T", event)
}
case <-n.stopCh:
return
}
}
}
事件类型
const (
// CreateEvent event associated with new objects in an informer
CreateEvent EventType = "CREATE"
// UpdateEvent event associated with an object update in an informer
UpdateEvent EventType = "UPDATE"
// DeleteEvent event associated when an object is removed from an informer
DeleteEvent EventType = "DELETE"
// ConfigurationEvent event associated when a controller configuration object is created or updated
ConfigurationEvent EventType = "CONFIGURATION"
)
同步队列
结构体
// Queue manages a time work queue through an independent worker that invokes the
// given sync function for every work item inserted.
// The queue uses an internal timestamp that allows the removal of certain elements
// which timestamp is older than the last successful get operation.
type Queue struct {
// queue is the work queue the worker polls
queue workqueue.RateLimitingInterface
// sync is called for each item in the queue
sync func(interface{}) error
// workerDone is closed when the worker exits
workerDone chan bool
// fn makes a key for an API object
fn func(obj interface{}) (interface{}, error)
// lastSync is the Unix epoch time of the last execution of 'sync'
lastSync int64
}
队列类型
(1) 可忽略队列 EnqueueSkippableTask
(2) 不可忽略队列
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}
// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}
// 入队列
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
klog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
return
}
ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
klog.V(3).Infof("queuing item %v", obj)
key, err := t.fn(obj)
if err != nil {
klog.Errorf("%v", err)
return
}
t.queue.Add(Element{
Key: key,
Timestamp: ts,
})
}
store 协程
// file : k8s.io/ingress-nginx/internal/controller/store/store.go
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s *k8sStore) Run(stopCh chan struct{}) {
// start informers
s.informers.Run(stopCh)
}
调用了informers.Run()方法
起多个协程去监听ingress、secret、endpoint、service、configmap、pod 的变化
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
// 启动secret、endpoint、service、configmap、pod 的informer
go i.Secret.Run(stopCh)
go i.Endpoint.Run(stopCh)
go i.Service.Run(stopCh)
go i.ConfigMap.Run(stopCh)
go i.Pod.Run(stopCh)
......
time.Sleep(1 * time.Second)
go i.Ingress.Run(stopCh)
......
}
这里以监听 ingress 变化为例,接着分析具体实现
// New creates a new object store to be used in the ingress controller
func New(
namespace, configmap, tcp, udp, defaultSSLCertificate string,
resyncPeriod time.Duration,
client clientset.Interface,
updateCh *channels.RingChannel,
pod *k8s.PodInfo,
disableCatchAll bool) Storer {
store := &k8sStore{
informers: &Informer{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
updateCh: updateCh,
backendConfig: ngx_config.NewDefault(),
syncSecretMu: &sync.Mutex{},
backendConfigMu: &sync.RWMutex{},
secretIngressMap: NewObjectRefMap(),
defaultSSLCertificate: defaultSSLCertificate,
pod: pod,
}
......
// k8sStore fulfills resolver.Resolver interface
// 格式化annotation
store.annotations = annotations.NewAnnotationExtractor(store)
store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
......
// create informers factory, enable and assign required informers
// informer 工厂函数
infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(tweakListOptionsFunc))
if k8s.IsNetworkingIngressAvailable {
store.informers.Ingress = infFactory.Networking().V1beta1().Ingresses().Informer()
} else {
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
}
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
labelSelector := labels.SelectorFromSet(store.pod.Labels)
// list and watch 机制
store.informers.Pod = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
options.LabelSelector = labelSelector.String()
return client.CoreV1().Pods(store.pod.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector.String()
return client.CoreV1().Pods(store.pod.Namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
cache.Indexers{},
)
store.listers.Pod.Store = store.informers.Pod.GetStore()
ingDeleteHandler := func(obj interface{}) {
ing, ok := toIngress(obj)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
ing, ok = tombstone.Obj.(*networkingv1beta1.Ingress)
if !ok {
klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(ing) {
klog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
return
}
if isCatchAllIngress(ing.Spec) && disableCatchAll {
klog.Infof("ignoring delete for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
store.listers.IngressWithAnnotation.Delete(ing)
key := k8s.MetaNamespaceKey(ing)
store.secretIngressMap.Delete(key)
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
}
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing, _ := toIngress(obj)
if !class.IsValid(ing) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
klog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
return
}
if isCatchAllIngress(ing.Spec) && disableCatchAll {
klog.Infof("ignoring add for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
store.syncIngress(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: ingDeleteHandler,
UpdateFunc: func(old, cur interface{}) {
oldIng, _ := toIngress(old)
curIng, _ := toIngress(cur)
validOld := class.IsValid(oldIng)
validCur := class.IsValid(curIng)
if !validOld && validCur {
if isCatchAllIngress(curIng.Spec) && disableCatchAll {
klog.Infof("ignoring update for catch-all ingress %v/%v because of --disable-catch-all", curIng.Namespace, curIng.Name)
return
}
klog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
klog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ingDeleteHandler(old)
return
} else if validCur && !reflect.DeepEqual(old, cur) {
if isCatchAllIngress(curIng.Spec) && disableCatchAll {
klog.Infof("ignoring update for catch-all ingress %v/%v and delete old one because of --disable-catch-all", curIng.Namespace, curIng.Name)
ingDeleteHandler(old)
return
}
recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else {
klog.V(3).Infof("No changes on ingress %v/%v. Skipping update", curIng.Namespace, curIng.Name)
return
}
store.syncIngress(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
DeleteFunc: func(obj interface{}) {...},
}
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {...},
DeleteFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
}
......
cmEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
}
podEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
DeleteFunc: func(obj interface{}) {...},
}
serviceHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {...},
}
// 注册各种类型的eventHandler
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(serviceHandler)
store.informers.Pod.AddEventHandler(podEventHandler)
// do not wait for informers to read the configmap configuration
ns, name, _ := k8s.ParseNameNS(configmap)
cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Warningf("Unexpected error reading configuration configmap: %v", err)
}
store.setConfig(cm)
return store
}
可以看到,每种类型的informer 基本都有相关的回调方法,包括:
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
DeleteFunc: func(obj interface{}) {...},
每个方法里面都会往updateCh 写入不同类型的事件(CreateEvent、DeleteEvent、UpdateEvent)
这一步跟store 协程协同工作,informer 通过list&watch 方法监听资源变化,一旦资源有变化则向updateCh 里面写入事件,store 协程循环监听updateCh变化,一旦收到事件则往syncQueue 写入一个task
队列消费
// file : k8s.io/ingress-controller/internal/ingress/controller/nginx.go
// 初始化Queue
n.syncQueue = task.NewTaskQueue(n.syncIngress)
// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
// 对于每个插入进来的项目都会调用sync function
func NewTaskQueue(syncFn func(interface{}) error) *Queue {
return NewCustomTaskQueue(syncFn, nil)
}
// NewCustomTaskQueue
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
// syncFn(也就是syncIngress)被赋值到Queue.sync
q := &Queue{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
sync: syncFn,
workerDone: make(chan bool),
fn: fn,
}
if fn == nil {
q.fn = q.defaultKeyFunc
}
return q
}
消费Queue队列
核心方法:
t.queue.Get() -> t.sync()
// file: k8s.io/ingress-nginx/internal/ingress/controller/nginx.go
func (n *NGINXController) Start() {
......
go n.syncQueue.Run(time.Second, n.stopCh)
......
}
// file: k8s.io/ingress-nginx/internal/task/queue.go
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}
// worker processes work in the queue through sync.
// 消费Queue队列
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
......
ts := time.Now().UnixNano()
item := key.(Element)
// 比对最后一次同步的时间戳与Queue中取出item里面带的时间戳,如果小于最后一次同步时间戳则忽略改变更
if t.lastSync > item.Timestamp {
klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).Infof("syncing %v", item.Key)
// 调用syncIngress
if err := t.sync(key); err != nil {
klog.Warningf("requeuing %v, err %v", item.Key, err)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: time.Now().UnixNano(),
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
syncIngress 工作原理
比对线上在跑的配置跟新生成的配置是否相同,并判断是否能够动态重载配置(仅更新endpoint),减少nginx频繁reload带来性能损耗.
pcfg :当前格式化出来的配置
n.runningConfig : 当前线上环境运行的配置
比对pcfg 和 n.runningConfig 配置,判断是否可以动态更新配置(仅endpoint列表变化)
(1)支持动态更新配置:调用n.configureDynamically(pcfg)
将backend 列表以json格式post 到/configuration/backends 这个LUA Handler,动态更新endpoint 列表
(2)不支持动态更新配置,调用 n.OnUpdate(*pcfg)
生成临时配置文件
检测临时配置文件语法
diff 临时配置文件与当前线上配置文件
删除临时配置文件
将新生成的配置写入线上配置文件
执行nginx -s reload 重载配置
// file: k8s.io/ingress-nginx/internal/ingress/controller/controller.go
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
// 组装nginx 配置文件
// 需要reload 时,调用OnUpdate
func (n *NGINXController) syncIngress(interface{}) error {
......
ings := n.store.ListIngresses(nil)
// 格式化新配置
hosts, servers, pcfg := n.getConfiguration(ings)
......
// 判断配置是否有变化
if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload.")
return nil
}
......
// 配置有变化,则判断是否需要reload nginx
if !n.IsDynamicConfigurationEnough(pcfg) {
klog.Infof("Configuration changes detected, backend reload required.")
// 生成checksum hash值
hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
TagName: "json",
})
pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
//调用onUpdate 方法
err := n.OnUpdate(*pcfg)
......
klog.Infof("Backend successfully reloaded.")
......
}
// 是否首次同步(ingress.Configuration 结构体是否为空)
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
// For the initial sync it always takes some time for NGINX to start listening
// For large configurations it might take a while so we loop and back off
// 首次初始化需要耗费一定的时间,睡眠1秒
klog.Info("Initial sync, sleeping for 1 second.")
time.Sleep(1 * time.Second)
}
// 重试机制
retry := wait.Backoff{
Steps: 15,
Duration: 1 * time.Second,
Factor: 0.8,
Jitter: 0.1,
}
err := wait.ExponentialBackoff(retry, func() (bool, error) {
// 动态更新nginx 配置
err := n.configureDynamically(pcfg)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil
}
klog.Warningf("Dynamic reconfiguration failed: %v", err)
return false, err
})
......
n.runningConfig = pcfg
return nil
}
判断是否可以动态更新配置
不需要reload的场景
- endpoint 变化
需要reload的场景
- 新增ingress
- 新增证书配置
- ingress 增加/删除 PATH
- 删除ingress、service、secret
- Secret 更新
- 部分annotation变更,造成上述状态更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
// 判断是否nginx 可以动态重载,不需要执行reload
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
copyOfRunningConfig := *n.runningConfig
copyOfPcfg := *pcfg
copyOfRunningConfig.Backends = []*ingress.Backend{}
copyOfPcfg.Backends = []*ingress.Backend{}
clearL4serviceEndpoints(©OfRunningConfig)
clearL4serviceEndpoints(©OfPcfg)
copyOfRunningConfig.ControllerPodsCount = 0
copyOfPcfg.ControllerPodsCount = 0
clearCertificates(©OfRunningConfig)
clearCertificates(©OfPcfg)
return copyOfRunningConfig.Equal(©OfPcfg)
}
不能动态更新,调用nginx reload 重载配置
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
// 当监听到配置发生变化,同步循环将调用OnUdate
// 接收到的backend 配置会跟当前配置的configmap 进行合并
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
// 生成临时配置
content, err := n.generateTemplate(cfg, ingressCfg)
......
// 检查配置是否正确
err = n.testTemplate(content)
......
if klog.V(2) {
src, _ := ioutil.ReadFile(cfgPath)
if !bytes.Equal(src, content) {
tmpfile, err := ioutil.TempFile("", "new-nginx-cfg")
if err != nil {
return err
}
defer tmpfile.Close()
// 创建临时配置文件
err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
......
// diff 比对生成的临时配置跟当前生效配置
diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
......
klog.Infof("NGINX configuration diff:\n%v", string(diffOutput))
// 删除临时配置文件
os.Remove(tmpfile.Name())
}
}
// 将新配置写入cfgPath
err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
......
// reload nginx
o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
......
return nil
}
动态更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
// 以json 的格式封装backend 列表并post 到lua API
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
// 更新endpoint 列表
err := configureBackends(pcfg.Backends)
......
}
// 比对TCP/UDP endpoint 列表
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
......
}
if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount {
// post pod 数目
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
......
}
// 比对servers 变化
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged {
err := configureCertificates(pcfg.Servers)
......
}
return nil
}
以JSON 格式 POST 调用LUA Handler /configuration/backends
// file: k8s.io/ingress-nginx/internal/controller/nginx.go
func configureBackends(rawBackends []*ingress.Backend) error {
backends := make([]*ingress.Backend, len(rawBackends))
for i, backend := range rawBackends {
var service *apiv1.Service
if backend.Service != nil {
service = &apiv1.Service{Spec: backend.Service.Spec}
}
luaBackend := &ingress.Backend{
Name: backend.Name,
Port: backend.Port,
SSLPassthrough: backend.SSLPassthrough,
SessionAffinity: backend.SessionAffinity,
UpstreamHashBy: backend.UpstreamHashBy,
LoadBalancing: backend.LoadBalancing,
Service: service,
NoServer: backend.NoServer,
TrafficShapingPolicy: backend.TrafficShapingPolicy,
AlternativeBackends: backend.AlternativeBackends,
}
var endpoints []ingress.Endpoint
for _, endpoint := range backend.Endpoints {
endpoints = append(endpoints, ingress.Endpoint{
Address: endpoint.Address,
Port: endpoint.Port,
})
}
luaBackend.Endpoints = endpoints
backends[i] = luaBackend
}
// 更新endpoint 列表
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
return nil
}
更多推荐
所有评论(0)