主要逻辑

ingress-controller 源码分析

nginx controller 入口函数

// file:k8s.io/ingress-nginx/nginx/main.go
func main() {
    // step1: 初始化日志组件
    klog.InitFlags(nil)

      ......

    // step2:创建必要的目录
    err = file.CreateRequiredDirectories()
      ......

    // step 3 :初始化ApiserverClient
    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
      ......

    // step4: 检查service配置
    if len(conf.DefaultService) > 0 {
        err := checkService(conf.DefaultService, kubeClient)
          ......

        klog.Infof("Validated %v as the default backend.", conf.DefaultService)
    }

    if len(conf.PublishService) > 0 {
        err := checkService(conf.PublishService, kubeClient)
          ......
    }

    // step5:获取namespace
    if conf.Namespace != "" {
        _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
        if err != nil {
            klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
        }
    }

    // step6: 创建默认证书
    conf.FakeCertificate = ssl.GetFakeSSLCert()
    klog.Infof("SSL fake certificate created %v", conf.FakeCertificate.PemFileName)

    // step7: 检查是否支持v1beta API 、k8s 版本是否高于1.18.0
    k8s.IsNetworkingIngressAvailable, k8s.IsIngressV1Ready = k8s.NetworkingIngressAvailable(kubeClient)
    if !k8s.IsNetworkingIngressAvailable {
        klog.Warningf("Using deprecated \"k8s.io/api/extensions/v1beta1\" package because Kubernetes version is < v1.14.0")
    }

    if k8s.IsIngressV1Ready {
          ......
    }

    conf.Client = kubeClient

    // step8: 注册prometheus
    reg := prometheus.NewRegistry()

    reg.MustRegister(prometheus.NewGoCollector())
    reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
        PidFn:        func() (int, error) { return os.Getpid(), nil },
        ReportErrors: true,
    }))
    ......

    // step9:启动profile
    if conf.EnableProfiling {
        go registerProfiler()
    }

    // step10: 实例化nginxcontroller (*)
    ngx := controller.NewNGINXController(conf, mc)

    // step11: 启动健康探测和metrics API
    mux := http.NewServeMux()
    registerHealthz(nginx.HealthPath, ngx, mux)
    registerMetrics(reg, mux)

    go startHTTPServer(conf.ListenPorts.Health, mux)

    // step12: 启动nginx master进程
    go ngx.Start()
  ......
}

nginx controller 初始化

// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
    // 初始化 event broadcaster
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
        Interface: config.Client.CoreV1().Events(config.Namespace),
    })

    // 获取/etc/resolv.conf 中的nameserver 列表
    h, err := dns.GetSystemNameServers()
    if err != nil {
        klog.Warningf("Error reading system nameservers: %v", err)
    }

    // 实例化NGINXController
    n := &NGINXController{
        isIPV6Enabled: ing_net.IsIPv6Enabled(),

        resolver:        h,
        cfg:             config,
        syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

        recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
            Component: "nginx-ingress-controller",
        }),

        stopCh:   make(chan struct{}),
        updateCh: channels.NewRingChannel(1024),

        ngxErrCh: make(chan error),

        stopLock: &sync.Mutex{},

        runningConfig: new(ingress.Configuration),

        Proxy: &TCPProxy{},

        metricCollector: mc,

        command: NewNginxCommand(),
    }

    // 启动webhook 服务
    if n.cfg.ValidationWebhook != "" {
        n.validationWebhookServer = &http.Server{
            Addr:      config.ValidationWebhook,
            Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
            TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
        }
    }

    // 获取pod runtime信息
    pod, err := k8s.GetPodDetails(config.Client)
    if err != nil {
        klog.Fatalf("unexpected error obtaining pod information: %v", err)
    }
    n.podInfo = pod

    // 实例化store(本地缓存)
    n.store = store.New(
        config.Namespace,
        config.ConfigMapName,
        config.TCPConfigMapName,
        config.UDPConfigMapName,
        config.DefaultSSLCertificate,
        config.ResyncPeriod,
        config.Client,
        n.updateCh,
        pod,
        config.DisableCatchAll)

    // 创建同步队列
    n.syncQueue = task.NewTaskQueue(n.syncIngress)

    ... ...

    // 格式化template配置模板
    onTemplateChange := func() {
        template, err := ngx_template.NewTemplate(nginx.TemplatePath)
        if err != nil {
            // this error is different from the rest because it must be clear why nginx is not working
            klog.Errorf(`
-------------------------------------------------------------------------------
Error loading new template: %v
-------------------------------------------------------------------------------
`, err)
            return
        }

        // 若模板格式化正确,则更新到nginxcontroller 对象中,并往同步队列发送一个template-change事件
        n.t = template
        klog.Info("New NGINX configuration template loaded.")
        n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
    }

    // 首次启动加载配置模板文件
    ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
    ......

    n.t = ngxTpl

    // 监听模板文件变化
    // 监听 /etc/nginx/template/nginx.tmpl 模板文件是否有变化,有变化则调用onTemplateChange
    _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
 ... ...

    // 监听/etc/nginx/geoip/ 目录下配置文件变化
    filesToWatch := []string{}
    err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
        ......
        filesToWatch = append(filesToWatch, path)
        ......
    })

    ......

    for _, f := range filesToWatch {
        _, err = watch.NewFileWatcher(f, func() {
            klog.Infof("File %v changed. Reloading NGINX", f)
            // 配置文件有变化则往同步队列发送一个file-change 事件
            n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
        })
        ......
    }

    return n
}

ingress controller 结构体

type NGINXController struct {
    // pod runtime 信息
    podInfo *k8s.PodInfo
    // 配置信息
    cfg *Configuration
    // 事件通知器
    recorder record.EventRecorder
    // 同步队列
    syncQueue *task.Queue
    // 同步状态
    syncStatus status.Syncer
    // 同步限流器
    syncRateLimiter flowcontrol.RateLimiter

    stopLock *sync.Mutex

    stopCh   chan struct{}
    // 更新环状channel
    updateCh *channels.RingChannel

    // 接受nginx 错误信息channel
    ngxErrCh chan error

    // 当前配置文件
    runningConfig *ingress.Configuration
    // nginx 配置模板文件
    t ngx_template.TemplateWriter
    // nameserver 列表
    resolver []net.IP
    // 是否启用ipv6
    isIPV6Enabled bool
    // 是否关闭
    isShuttingDown bool
    // TCP代理
    Proxy *TCPProxy
  // 本地缓存
    store store.Storer
  // metrics 收集器
    metricCollector metric.Collector
  // webhook
    validationWebhookServer *http.Server
    // nginx 二进制命令
    command NginxExecTester
}

ngx.Start()

ngx.Start() 主要做3个事情
启动store 协程
启动syncQueue协程
监听updateCh

当从updateCh 见到变化事件时,向syncQueue 发送一个task

// file:internal/ingress/controller/nginx.go
// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {
    klog.Info("Starting NGINX Ingress controller")
    // 初始化同步informers 及secret
    n.store.Run(n.stopCh)

    // we need to use the defined ingress class to allow multiple leaders
    // in order to update information about ingress status
    // 定义节点选举ID (ingress class 用于区分不同集群)
    // 使用定义的ingress class 来允许多个leader节点更新ingress状态
    electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass)
    if class.IngressClass != "" {
        electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass)
    }

 // leader节点选举
    setupLeaderElection(&leaderElectionConfig{
        ......
    })

    cmd := n.command.ExecCommand()

    ......

    if n.cfg.EnableSSLPassthrough {
        n.setupSSLProxy()
    }

    // 启动nginx
    klog.Info("Starting NGINX process")
    n.start(cmd)

    // 启动同步队列
    go n.syncQueue.Run(time.Second, n.stopCh)

    // force initial sync
    // 发送initial-sync 事件
    n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))

    // In case of error the temporal configuration file will
    // be available up to five minutes after the error
    // 每隔5分钟删除临时配置文件
    go func() {
        for {
            time.Sleep(5 * time.Minute)
            err := cleanTempNginxCfg()
            ......
        }
    }()

    ......

    for {
        select {
        case err := <-n.ngxErrCh:
            if n.isShuttingDown {
                return
            }

            // if the nginx master process dies, the workers continue to process requests
            // until the failure of the configured livenessProbe and restart of the pod.
            // master 进程挂掉时,workerInc进程将继续处理请求,直到配置的liveness探针探测失败
            if process.IsRespawnIfRequired(err) {
                return
            }

        // 循环从updateCh里面获取事件
        case event := <-n.updateCh.Out():
            if n.isShuttingDown {
                break
            }

            if evt, ok := event.(store.Event); ok {
                klog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
                if evt.Type == store.ConfigurationEvent {
                    // TODO: is this necessary? Consider removing this special case
                    n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
                    continue
                }
                // 放入可忽略的同步队列
                n.syncQueue.EnqueueSkippableTask(evt.Obj)
            } else {
                klog.Warningf("Unexpected event type received %T", event)
            }
        case <-n.stopCh:
            return
        }
    }
}

事件类型

const (
    // CreateEvent event associated with new objects in an informer
    CreateEvent EventType = "CREATE"
    // UpdateEvent event associated with an object update in an informer
    UpdateEvent EventType = "UPDATE"
    // DeleteEvent event associated when an object is removed from an informer
    DeleteEvent EventType = "DELETE"
    // ConfigurationEvent event associated when a controller configuration object is created or updated
    ConfigurationEvent EventType = "CONFIGURATION"
)

同步队列
结构体

// Queue manages a time work queue through an independent worker that invokes the
// given sync function for every work item inserted.
// The queue uses an internal timestamp that allows the removal of certain elements
// which timestamp is older than the last successful get operation.
type Queue struct {
    // queue is the work queue the worker polls
    queue workqueue.RateLimitingInterface
    // sync is called for each item in the queue
    sync func(interface{}) error
    // workerDone is closed when the worker exits
    workerDone chan bool
    // fn makes a key for an API object
    fn func(obj interface{}) (interface{}, error)
    // lastSync is the Unix epoch time of the last execution of 'sync'
    lastSync int64
}

队列类型
(1) 可忽略队列 EnqueueSkippableTask
(2) 不可忽略队列

// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
    t.enqueue(obj, false)
}

// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
    t.enqueue(obj, true)
}

// 入队列
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
    if t.IsShuttingDown() {
        klog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
        return
    }

    ts := time.Now().UnixNano()
    if !skippable {
        // make sure the timestamp is bigger than lastSync
        ts = time.Now().Add(24 * time.Hour).UnixNano()
    }
    klog.V(3).Infof("queuing item %v", obj)
    key, err := t.fn(obj)
    if err != nil {
        klog.Errorf("%v", err)
        return
    }
    t.queue.Add(Element{
        Key:       key,
        Timestamp: ts,
    })
}

store 协程

// file : k8s.io/ingress-nginx/internal/controller/store/store.go
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s *k8sStore) Run(stopCh chan struct{}) {
    // start informers
    s.informers.Run(stopCh)
}

调用了informers.Run()方法
起多个协程去监听ingress、secret、endpoint、service、configmap、pod 的变化

// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
    // 启动secret、endpoint、service、configmap、pod 的informer
    go i.Secret.Run(stopCh)
    go i.Endpoint.Run(stopCh)
    go i.Service.Run(stopCh)
    go i.ConfigMap.Run(stopCh)
    go i.Pod.Run(stopCh)

        ......

    time.Sleep(1 * time.Second)

    go i.Ingress.Run(stopCh)
    ......
}

这里以监听 ingress 变化为例,接着分析具体实现

// New creates a new object store to be used in the ingress controller
func New(
    namespace, configmap, tcp, udp, defaultSSLCertificate string,
    resyncPeriod time.Duration,
    client clientset.Interface,
    updateCh *channels.RingChannel,
    pod *k8s.PodInfo,
    disableCatchAll bool) Storer {

    store := &k8sStore{
        informers:             &Informer{},
        listers:               &Lister{},
        sslStore:              NewSSLCertTracker(),
        updateCh:              updateCh,
        backendConfig:         ngx_config.NewDefault(),
        syncSecretMu:          &sync.Mutex{},
        backendConfigMu:       &sync.RWMutex{},
        secretIngressMap:      NewObjectRefMap(),
        defaultSSLCertificate: defaultSSLCertificate,
        pod:                   pod,
    }

    ......

    // k8sStore fulfills resolver.Resolver interface
    // 格式化annotation
    store.annotations = annotations.NewAnnotationExtractor(store)

    store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

    ......

    // create informers factory, enable and assign required informers
    // informer 工厂函数
    infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
        informers.WithNamespace(namespace),
        informers.WithTweakListOptions(tweakListOptionsFunc))

    if k8s.IsNetworkingIngressAvailable {
        store.informers.Ingress = infFactory.Networking().V1beta1().Ingresses().Informer()
    } else {
        store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
    }

    store.listers.Ingress.Store = store.informers.Ingress.GetStore()

    store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
    store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

    store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
    store.listers.Secret.Store = store.informers.Secret.GetStore()

    store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
    store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

    store.informers.Service = infFactory.Core().V1().Services().Informer()
    store.listers.Service.Store = store.informers.Service.GetStore()

    labelSelector := labels.SelectorFromSet(store.pod.Labels)

    // list and watch 机制
    store.informers.Pod = cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
                options.LabelSelector = labelSelector.String()
                return client.CoreV1().Pods(store.pod.Namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                options.LabelSelector = labelSelector.String()
                return client.CoreV1().Pods(store.pod.Namespace).Watch(context.TODO(), options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        cache.Indexers{},
    )
    store.listers.Pod.Store = store.informers.Pod.GetStore()

    ingDeleteHandler := func(obj interface{}) {
        ing, ok := toIngress(obj)
        if !ok {
            // If we reached here it means the ingress was deleted but its final state is unrecorded.
            tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
            if !ok {
                klog.Errorf("couldn't get object from tombstone %#v", obj)
                return
            }
            ing, ok = tombstone.Obj.(*networkingv1beta1.Ingress)
            if !ok {
                klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
                return
            }
        }

        if !class.IsValid(ing) {
            klog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
            return
        }
        if isCatchAllIngress(ing.Spec) && disableCatchAll {
            klog.Infof("ignoring delete for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)
            return
        }
        recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

        store.listers.IngressWithAnnotation.Delete(ing)

        key := k8s.MetaNamespaceKey(ing)
        store.secretIngressMap.Delete(key)

        updateCh.In() <- Event{
            Type: DeleteEvent,
            Obj:  obj,
        }
    }

    ingEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ing, _ := toIngress(obj)
            if !class.IsValid(ing) {
                a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
                klog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
                return
            }
            if isCatchAllIngress(ing.Spec) && disableCatchAll {
                klog.Infof("ignoring add for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)
                return
            }
            recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

            store.syncIngress(ing)
            store.updateSecretIngressMap(ing)
            store.syncSecrets(ing)

            updateCh.In() <- Event{
                Type: CreateEvent,
                Obj:  obj,
            }
        },
        DeleteFunc: ingDeleteHandler,
        UpdateFunc: func(old, cur interface{}) {
            oldIng, _ := toIngress(old)
            curIng, _ := toIngress(cur)

            validOld := class.IsValid(oldIng)
            validCur := class.IsValid(curIng)
            if !validOld && validCur {
                if isCatchAllIngress(curIng.Spec) && disableCatchAll {
                    klog.Infof("ignoring update for catch-all ingress %v/%v because of --disable-catch-all", curIng.Namespace, curIng.Name)
                    return
                }

                klog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validOld && !validCur {
                klog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                ingDeleteHandler(old)
                return
            } else if validCur && !reflect.DeepEqual(old, cur) {
                if isCatchAllIngress(curIng.Spec) && disableCatchAll {
                    klog.Infof("ignoring update for catch-all ingress %v/%v and delete old one because of --disable-catch-all", curIng.Namespace, curIng.Name)
                    ingDeleteHandler(old)
                    return
                }

                recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else {
                klog.V(3).Infof("No changes on ingress %v/%v. Skipping update", curIng.Namespace, curIng.Name)
                return
            }

            store.syncIngress(curIng)
            store.updateSecretIngressMap(curIng)
            store.syncSecrets(curIng)

            updateCh.In() <- Event{
                Type: UpdateEvent,
                Obj:  cur,
            }
        },
    }

    secrEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
        DeleteFunc: func(obj interface{}) {...},
    }

    epEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        DeleteFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
    }

    ......

    cmEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
    }

    podEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {...},
        UpdateFunc: func(old, cur interface{}) {...},
        DeleteFunc: func(obj interface{}) {...},
    }

    serviceHandler := cache.ResourceEventHandlerFuncs{
        UpdateFunc: func(old, cur interface{}) {...},
    }

 // 注册各种类型的eventHandler
    store.informers.Ingress.AddEventHandler(ingEventHandler)
    store.informers.Endpoint.AddEventHandler(epEventHandler)
    store.informers.Secret.AddEventHandler(secrEventHandler)
    store.informers.ConfigMap.AddEventHandler(cmEventHandler)
    store.informers.Service.AddEventHandler(serviceHandler)
    store.informers.Pod.AddEventHandler(podEventHandler)

    // do not wait for informers to read the configmap configuration
    ns, name, _ := k8s.ParseNameNS(configmap)
    cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
    if err != nil {
        klog.Warningf("Unexpected error reading configuration configmap: %v", err)
    }

    store.setConfig(cm)
    return store
}

可以看到,每种类型的informer 基本都有相关的回调方法,包括:
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
DeleteFunc: func(obj interface{}) {...},

每个方法里面都会往updateCh 写入不同类型的事件(CreateEvent、DeleteEvent、UpdateEvent)
这一步跟store 协程协同工作,informer 通过list&watch 方法监听资源变化,一旦资源有变化则向updateCh 里面写入事件,store 协程循环监听updateCh变化,一旦收到事件则往syncQueue 写入一个task

队列消费

// file : k8s.io/ingress-controller/internal/ingress/controller/nginx.go
// 初始化Queue
n.syncQueue = task.NewTaskQueue(n.syncIngress)

// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
// 对于每个插入进来的项目都会调用sync function
func NewTaskQueue(syncFn func(interface{}) error) *Queue {
    return NewCustomTaskQueue(syncFn, nil)
}

// NewCustomTaskQueue  
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
    // syncFn(也就是syncIngress)被赋值到Queue.sync 
    q := &Queue{
        queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
        sync:       syncFn,
        workerDone: make(chan bool),
        fn:         fn,
    }

    if fn == nil {
        q.fn = q.defaultKeyFunc
    }

    return q
}

消费Queue队列
核心方法:
t.queue.Get() -> t.sync()

// file: k8s.io/ingress-nginx/internal/ingress/controller/nginx.go
func (n *NGINXController) Start() {
    ......
    go n.syncQueue.Run(time.Second, n.stopCh)
    ......
}

// file: k8s.io/ingress-nginx/internal/task/queue.go
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
   wait.Until(t.worker, period, stopCh)
}

// worker processes work in the queue through sync.
// 消费Queue队列
func (t *Queue) worker() {
   for {
      key, quit := t.queue.Get()
      ......
      ts := time.Now().UnixNano()

      item := key.(Element)
      // 比对最后一次同步的时间戳与Queue中取出item里面带的时间戳,如果小于最后一次同步时间戳则忽略改变更
      if t.lastSync > item.Timestamp {
         klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
         t.queue.Forget(key)
         t.queue.Done(key)
         continue
      }

      klog.V(3).Infof("syncing %v", item.Key)
      // 调用syncIngress
      if err := t.sync(key); err != nil {
         klog.Warningf("requeuing %v, err %v", item.Key, err)
         t.queue.AddRateLimited(Element{
            Key:       item.Key,
            Timestamp: time.Now().UnixNano(),
         })
      } else {
         t.queue.Forget(key)
         t.lastSync = ts
      }

      t.queue.Done(key)
   }
}

syncIngress 工作原理

比对线上在跑的配置跟新生成的配置是否相同,并判断是否能够动态重载配置(仅更新endpoint),减少nginx频繁reload带来性能损耗.
pcfg :当前格式化出来的配置
n.runningConfig : 当前线上环境运行的配置

比对pcfg 和 n.runningConfig 配置,判断是否可以动态更新配置(仅endpoint列表变化)
(1)支持动态更新配置:调用n.configureDynamically(pcfg)
将backend 列表以json格式post 到/configuration/backends 这个LUA Handler,动态更新endpoint 列表

(2)不支持动态更新配置,调用 n.OnUpdate(*pcfg)
生成临时配置文件
检测临时配置文件语法
diff 临时配置文件与当前线上配置文件
删除临时配置文件
将新生成的配置写入线上配置文件
执行nginx -s reload 重载配置

// file: k8s.io/ingress-nginx/internal/ingress/controller/controller.go
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
// 组装nginx 配置文件
// 需要reload 时,调用OnUpdate
func (n *NGINXController) syncIngress(interface{}) error {
    ......
   ings := n.store.ListIngresses(nil)
   // 格式化新配置
   hosts, servers, pcfg := n.getConfiguration(ings)
   ......

   // 判断配置是否有变化
   if n.runningConfig.Equal(pcfg) {
      klog.V(3).Infof("No configuration change detected, skipping backend reload.")
      return nil
   }

   ......

   // 配置有变化,则判断是否需要reload nginx
   if !n.IsDynamicConfigurationEnough(pcfg) {
      klog.Infof("Configuration changes detected, backend reload required.")

      // 生成checksum hash值
      hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
         TagName: "json",
      })

      pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)

      //调用onUpdate 方法
      err := n.OnUpdate(*pcfg)
      ......

      klog.Infof("Backend successfully reloaded.")
      ......
   }

   // 是否首次同步(ingress.Configuration 结构体是否为空)
   isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
   if isFirstSync {
      // For the initial sync it always takes some time for NGINX to start listening
      // For large configurations it might take a while so we loop and back off
      // 首次初始化需要耗费一定的时间,睡眠1秒
      klog.Info("Initial sync, sleeping for 1 second.")
      time.Sleep(1 * time.Second)
   }

   // 重试机制
   retry := wait.Backoff{
      Steps:    15,
      Duration: 1 * time.Second,
      Factor:   0.8,
      Jitter:   0.1,
   }

   err := wait.ExponentialBackoff(retry, func() (bool, error) {
      // 动态更新nginx 配置
      err := n.configureDynamically(pcfg)
      if err == nil {
         klog.V(2).Infof("Dynamic reconfiguration succeeded.")
         return true, nil
      }

      klog.Warningf("Dynamic reconfiguration failed: %v", err)
      return false, err
   })
   ......

   n.runningConfig = pcfg

   return nil
}

判断是否可以动态更新配置
不需要reload的场景

  1. endpoint 变化

需要reload的场景

  1. 新增ingress
  2. 新增证书配置
  3. ingress 增加/删除 PATH
  4. 删除ingress、service、secret
  5. Secret 更新
  6. 部分annotation变更,造成上述状态更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
// 判断是否nginx 可以动态重载,不需要执行reload
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
   copyOfRunningConfig := *n.runningConfig
   copyOfPcfg := *pcfg

   copyOfRunningConfig.Backends = []*ingress.Backend{}
   copyOfPcfg.Backends = []*ingress.Backend{}

   clearL4serviceEndpoints(©OfRunningConfig)
   clearL4serviceEndpoints(©OfPcfg)

   copyOfRunningConfig.ControllerPodsCount = 0
   copyOfPcfg.ControllerPodsCount = 0

   clearCertificates(©OfRunningConfig)
   clearCertificates(©OfPcfg)

   return copyOfRunningConfig.Equal(©OfPcfg)
}

不能动态更新,调用nginx reload 重载配置

// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
// 当监听到配置发生变化,同步循环将调用OnUdate
// 接收到的backend 配置会跟当前配置的configmap 进行合并
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
   cfg := n.store.GetBackendConfiguration()
   cfg.Resolver = n.resolver

   // 生成临时配置
   content, err := n.generateTemplate(cfg, ingressCfg)
   ......

   // 检查配置是否正确
   err = n.testTemplate(content)
   ......

   if klog.V(2) {
      src, _ := ioutil.ReadFile(cfgPath)
      if !bytes.Equal(src, content) {
         tmpfile, err := ioutil.TempFile("", "new-nginx-cfg")
         if err != nil {
            return err
         }
         defer tmpfile.Close()
         // 创建临时配置文件
         err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
         ......
         // diff 比对生成的临时配置跟当前生效配置
         diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
         ......

         klog.Infof("NGINX configuration diff:\n%v", string(diffOutput))

         // 删除临时配置文件
         os.Remove(tmpfile.Name())
      }
   }
   // 将新配置写入cfgPath
   err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
   ......

   // reload nginx
   o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
   ......

   return nil
}

动态更新

// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
// 以json 的格式封装backend 列表并post 到lua API
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
   backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
   if backendsChanged {
       // 更新endpoint 列表
      err := configureBackends(pcfg.Backends)
      ......
   }

   // 比对TCP/UDP endpoint 列表
   streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
   if streamConfigurationChanged {
      err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
      ......
   }

   if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount {
      // post pod 数目
      statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
         ControllerPodsCount: pcfg.ControllerPodsCount,
      })
      ......
   }
   // 比对servers 变化
   serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
   if serversChanged {
      err := configureCertificates(pcfg.Servers)
      ......
   }

   return nil
}

以JSON 格式 POST 调用LUA Handler /configuration/backends

// file: k8s.io/ingress-nginx/internal/controller/nginx.go
func configureBackends(rawBackends []*ingress.Backend) error {
   backends := make([]*ingress.Backend, len(rawBackends))

   for i, backend := range rawBackends {
      var service *apiv1.Service
      if backend.Service != nil {
         service = &apiv1.Service{Spec: backend.Service.Spec}
      }
      luaBackend := &ingress.Backend{
         Name:                 backend.Name,
         Port:                 backend.Port,
         SSLPassthrough:       backend.SSLPassthrough,
         SessionAffinity:      backend.SessionAffinity,
         UpstreamHashBy:       backend.UpstreamHashBy,
         LoadBalancing:        backend.LoadBalancing,
         Service:              service,
         NoServer:             backend.NoServer,
         TrafficShapingPolicy: backend.TrafficShapingPolicy,
         AlternativeBackends:  backend.AlternativeBackends,
      }

      var endpoints []ingress.Endpoint
      for _, endpoint := range backend.Endpoints {
         endpoints = append(endpoints, ingress.Endpoint{
            Address: endpoint.Address,
            Port:    endpoint.Port,
         })
      }

      luaBackend.Endpoints = endpoints
      backends[i] = luaBackend
   }

    // 更新endpoint 列表
   statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
   if err != nil {
      return err
   }

   if statusCode != http.StatusCreated {
      return fmt.Errorf("unexpected error code: %d", statusCode)
   }

   return nil
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐