经过实战了一下,用户也大概知道ingress是怎样使用的,下面接着深入讲解内部结构
这里写图片描述
上图展示了一个nginx ingress controller的部署架构图,ingress controller通过轮询监听apiserver的方式来获取ingress资源的变化,将ingress资源存储到本地缓存,并通知nginx进行相应的配置修改的加载。
ingress controller监控了ingress、service、endpoint、secret、node、configmap一系列资源,一旦资源发生了变化(包括增加、删除和修改),会立即通知backend,例如nginx等。
为了减少对apiserver的请求次数,nginx controllder会将每次请求在本地进行缓存,该缓存import了kubernetes提供的”k8s.io/kubernetes/pkg/client/cache”
下面进入代码先看服务启动
看代码:controllers/nginx/pkg/cmd/controller/main.go
那么细看怎么样更新配置的。
先看ingress controller里面的listwatch

ic.ingLister.Store, ic.ingController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Extensions().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
        &extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)

    ic.endpLister.Store, ic.endpController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
        &api.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)

    ic.secrLister.Store, ic.secrController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "secrets", api.NamespaceAll, fields.Everything()),
        &api.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)

    ic.mapLister.Store, ic.mapController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "configmaps", api.NamespaceAll, fields.Everything()),
        &api.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)

    ic.svcLister.Indexer, ic.svcController = cache.NewIndexerInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
        &api.Service{},
        ic.cfg.ResyncPeriod,
        cache.ResourceEventHandlerFuncs{},
        cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

    ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything()),
        &api.Node{}, ic.cfg.ResyncPeriod, eventHandler)

看看每种资源监听都有自己的eventhandler,譬如,

    eventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ic.syncQueue.Enqueue(obj)
        },
        DeleteFunc: func(obj interface{}) {
            ic.syncQueue.Enqueue(obj)
        },
        UpdateFunc: func(old, cur interface{}) {
            if !reflect.DeepEqual(old, cur) {
                ic.syncQueue.Enqueue(cur)
            }
        },
    }

把监听的消息放到syncQueue中,这个syncQueue的实现在core/pkg/task/queue.go中,会使用里面的sync方法处理队列里面的元素key,这个和controller manager里面的设计一脉相承。

// worker processes work in the queue through sync.
func (t *Queue) worker() {
    for {
        key, quit := t.queue.Get()
        if quit {
            close(t.workerDone)
            return
        }
        glog.V(3).Infof("syncing %v", key)
        if err := t.sync(key); err != nil {
            glog.Warningf("requeuing %v, err %v", key, err)
            t.queue.AddRateLimited(key)
        } else {
            t.queue.Forget(key)
        }

        t.queue.Done(key)
    }
}

下面看具体怎么处理每个event,

func (ic *GenericController) sync(key interface{}) error {
    ic.syncRateLimiter.Accept()

    if ic.syncQueue.IsShuttingDown() {
        return nil
    }

    if !ic.controllersInSync() {
        time.Sleep(podStoreSyncedPollPeriod)
        return fmt.Errorf("deferring sync till endpoints controller has synced")
    }

    upstreams, servers := ic.getBackendServers()
    var passUpstreams []*ingress.SSLPassthroughBackend
    for _, server := range servers {
        if !server.SSLPassthrough {
            continue
        }

        for _, loc := range server.Locations {
            if loc.Path != rootLocation {
                continue
            }
            passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
                Backend:  loc.Backend,
                Hostname: server.Hostname,
            })
            break
        }
    }

    data, err := ic.cfg.Backend.OnUpdate(ingress.Configuration{
        Backends:            upstreams,
        Servers:             servers,
        TCPEndpoints:        ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
        UDPEndpoints:        ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
        PassthroughBackends: passUpstreams,
    })
    if err != nil {
        return err
    }

    out, reloaded, err := ic.cfg.Backend.Reload(data)
    if err != nil {
        incReloadErrorCount()
        glog.Errorf("unexpected failure restarting the backend: \n%v", string(out))
        return err
    }
    if reloaded {
        glog.Infof("ingress backend successfully reloaded...")
        incReloadCount()
    }
    return nil
}

这个方法里面,先是组织nginx的各种配置,然后通过OnUpdate更新配置,最后通过Reload更新配置。先看nginx对OnUpdate的实现方法,(controllers/nginx/pkg/cmd/controller/nginx.go)截取关键代码:

content, err := n.t.Write(config.TemplateConfig{
        ProxySetHeaders:     setHeaders,
        MaxOpenFiles:        maxOpenFiles,
        BacklogSize:         sysctlSomaxconn(),
        Backends:            ingressCfg.Backends,
        PassthroughBackends: ingressCfg.PassthroughBackends,
        Servers:             ingressCfg.Servers,
        TCPBackends:         ingressCfg.TCPEndpoints,
        UDPBackends:         ingressCfg.UDPEndpoints,
        HealthzURI:          ngxHealthPath,
        CustomErrors:        len(cfg.CustomHTTPErrors) > 0,
        Cfg:                 cfg,
    })

这样就把template模板元素替换并返回更新后的模板,然后通知nginx更新配置

func (n NGINXController) Reload(data []byte) ([]byte, bool, error) {
    if !n.isReloadRequired(data) {
        return []byte("Reload not required"), false, nil
    }

    err := ioutil.WriteFile(cfgPath, data, 0644)
    if err != nil {
        return nil, false, err
    }

    o, e := exec.Command(n.binary, "-s", "reload").CombinedOutput()

    return o, true, e
}

isReloadRequired判断文件是否相同确定是否需要更新。如果需要WriteFile覆盖写配置文件。通过nginx -s reload更新配置,ok。nginx的配置更新说完了。那么整个流程走完。

Logo

开源、云原生的融合云平台

更多推荐