本代码分析基于kubernetes v1.5.0
功能描述
kube-proxy应该是k8s所有服务里面最简单的服务,它的功能很单一,主要用来管理service,包括service的负载均衡。每个service都有一个cluster ip,service依靠selector label,对应后台的Pod,这个工作主要有kube-controller-manager的endpoint-controller完成,这个controller会生成与service相对应的endpoint。

kube-proxy运行于每一个node节点,它的主要工作就是对service、endpoint进行watch,然后在每一个节点建立相关的iptables表项,这样我们就可以在任意一个node上访问service,同时,假如一个service包含多个endpoint,它还起着负载均衡的作用,流量将平均分配到一个endpoint。

一、服务函数入口

和其他服务一致,kube-proxy的主要位于kubernetes/cmd/kube-proxy。首先options.NewProxyConfig()创建一个配置文件,然后利用配置文件创建服务app.NewProxyServerDefault(config),最后运行这个服务。

kubernetes/cmd/kube-proxy/proxy.go +55

func init() {
    healthz.DefaultHealthz()
}

func main() {
    config := options.NewProxyConfig()
    config.AddFlags(pflag.CommandLine)

    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    verflag.PrintAndExitIfRequested()

    s, err := app.NewProxyServerDefault(config)
    if err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

    if err = s.Run(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

二、创建服务

1、proxier和endpointsHandler的创建

kube-proxy的主要代码就位于创建服务,我们现在解析app.NewProxyServerDefault(config)这个函数。

func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) {
    if c, err := configz.New("componentconfig"); err == nil {
        c.Set(config.KubeProxyConfiguration)
    } else {
        glog.Errorf("unable to register configz: %s", err)
    }
    protocol := utiliptables.ProtocolIpv4
    if net.ParseIP(config.BindAddress).To4() == nil {
        protocol = utiliptables.ProtocolIpv6
    }

    var netshInterface utilnetsh.Interface
    var iptInterface utiliptables.Interface
    var dbus utildbus.Interface

    // Create a iptables utils.
    execer := exec.New()

    if runtime.GOOS == "windows" {
        netshInterface = utilnetsh.New(execer)
    } else {
        dbus = utildbus.New()
        iptInterface = utiliptables.New(execer, dbus, protocol)
    }

    // We omit creation of pretty much everything if we run in cleanup mode
    if config.CleanupAndExit {
        return &ProxyServer{
            Config:       config,
            IptInterface: iptInterface,
        }, nil
    }

    // TODO(vmarmol): Use container config for this.
    var oomAdjuster *oom.OOMAdjuster
    if config.OOMScoreAdj != nil {
        oomAdjuster = oom.NewOOMAdjuster()
        if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*config.OOMScoreAdj)); err != nil {
            glog.V(2).Info(err)
        }
    }

    if config.ResourceContainer != "" {
        // Run in its own container.
        if err := resourcecontainer.RunInResourceContainer(config.ResourceContainer); err != nil {
            glog.Warningf("Failed to start in resource-only container %q: %v", config.ResourceContainer, err)
        } else {
            glog.V(2).Infof("Running in resource-only container %q", config.ResourceContainer)
        }
    }

    // Create a Kube Client
    // define api config source
    if config.Kubeconfig == "" && config.Master == "" {
        glog.Warningf("Neither --kubeconfig nor --master was specified.  Using default API client.  This might not work.")
    }
    // This creates a client, first loading any specified kubeconfig
    // file, and then overriding the Master flag, if non-empty.
    kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
        &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
        &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.Master}}).ClientConfig()
    if err != nil {
        return nil, err
    }

    kubeconfig.ContentType = config.ContentType
    // Override kubeconfig qps/burst settings from flags
    kubeconfig.QPS = config.KubeAPIQPS
    kubeconfig.Burst = int(config.KubeAPIBurst)

    client, err := clientset.NewForConfig(kubeconfig)
    if err != nil {
        glog.Fatalf("Invalid API configuration: %v", err)
    }

    // Create event recorder
    hostname := nodeutil.GetHostname(config.HostnameOverride)
    eventBroadcaster := record.NewBroadcaster()
    recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname})

    var proxier proxy.ProxyProvider
    var endpointsHandler proxyconfig.EndpointsConfigHandler

    proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
    if proxyMode == proxyModeIPTables {
        glog.V(0).Info("Using iptables Proxier.")
        if config.IPTablesMasqueradeBit == nil {
            // IPTablesMasqueradeBit must be specified or defaulted.
            return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
        }
        proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
        if err != nil {
            glog.Fatalf("Unable to create proxier: %v", err)
        }
        proxier = proxierIPTables
        endpointsHandler = proxierIPTables
        // No turning back. Remove artifacts that might still exist from the userspace Proxier.
        glog.V(0).Info("Tearing down userspace rules.")
        userspace.CleanupLeftovers(iptInterface)
    } else {
        glog.V(0).Info("Using userspace Proxier.")
        // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
        // our config.EndpointsConfigHandler.
        loadBalancer := userspace.NewLoadBalancerRR()
        // set EndpointsConfigHandler to our loadBalancer
        endpointsHandler = loadBalancer

        var proxierUserspace proxy.ProxyProvider

        if runtime.GOOS == "windows" {
            proxierUserspace, err = winuserspace.NewProxier(
                loadBalancer,
                net.ParseIP(config.BindAddress),
                netshInterface,
                *utilnet.ParsePortRangeOrDie(config.PortRange),
                // TODO @pires replace below with default values, if applicable
                config.IPTablesSyncPeriod.Duration,
                config.UDPIdleTimeout.Duration,
            )
        } else {
            proxierUserspace, err = userspace.NewProxier(
                loadBalancer,
                net.ParseIP(config.BindAddress),
                iptInterface,
                *utilnet.ParsePortRangeOrDie(config.PortRange),
                config.IPTablesSyncPeriod.Duration,
                config.IPTablesMinSyncPeriod.Duration,
                config.UDPIdleTimeout.Duration,
            )
        }
        if err != nil {
            glog.Fatalf("Unable to create proxier: %v", err)
        }
        proxier = proxierUserspace
        // Remove artifacts from the pure-iptables Proxier, if not on Windows.
        if runtime.GOOS != "windows" {
            glog.V(0).Info("Tearing down pure-iptables proxy rules.")
            iptables.CleanupLeftovers(iptInterface)
        }
    }

    // Add iptables reload function, if not on Windows.
    if runtime.GOOS != "windows" {
        iptInterface.AddReloadFunc(proxier.Sync)
    }

    // Create configs (i.e. Watches for Services and Endpoints)
    // Note: RegisterHandler() calls need to happen before creation of Sources because sources
    // only notify on changes, and the initial update (on process start) may be lost if no handlers
    // are registered yet.
    serviceConfig := proxyconfig.NewServiceConfig()
    serviceConfig.RegisterHandler(proxier)

    endpointsConfig := proxyconfig.NewEndpointsConfig()
    endpointsConfig.RegisterHandler(endpointsHandler)

    proxyconfig.NewSourceAPI(
        client.Core().RESTClient(),
        config.ConfigSyncPeriod,
        serviceConfig.Channel("api"),
        endpointsConfig.Channel("api"),
    )

    config.NodeRef = &api.ObjectReference{
        Kind:      "Node",
        Name:      hostname,
        UID:       types.UID(hostname),
        Namespace: "",
    }

    conntracker := realConntracker{}

    return NewProxyServer(client, config, iptInterface, proxier, eventBroadcaster, recorder, conntracker, proxyMode)
}

kube-proxy有两种运行模式,一种是基于用户态proxy,另一种是基于内核态的iptables。基于iptables,效率更高。这里我们主要讨论基于iptables的工作模式。

进入 if proxyMode == proxyModeIptables 函数,这个函数内部主要构建了两个对象,一个是proxier,另一个是endpointsHandler,他们都是proxierIptables。

2、serviceconfig创建

NewServiceConfig首先创建了一个channel,然后创建一个serviceStore,这个serviceStore主要用于存储从api-service watch的service,当然之前还有一些过滤操作。然后创建了一个mux和bcaster,然后启动一个goroutine,运行函数watchForUpdates,假如updates可读,则立即执行bcaster.Notify(accessor.MergedState())。其实这里利用了broadcaster这一套框架,其实主要是我们可以向broadcaster注册listeners,broadcaster有更新时,调用notify通知所有的listener。

serviceConfig.RegisterHandler(proxier)其实就是注册了一个listener,到收到通知时,它会执行handler.OnServiceUpdate(instance.([]api.Service))函数。

只有updates可读时,才会触发bcaster.Notify,进而促发handler.OnServiceUpdate(instance.([]api.Service)),那么谁会往updates这个channel里面写入东西呢?

kubernetes/cmd/kube-proxy/app/server.go +280

    serviceConfig := proxyconfig.NewServiceConfig()
    serviceConfig.RegisterHandler(proxier)

kubernetes/pkg/proxy/config/config.go +193

func NewServiceConfig() *ServiceConfig {
    updates := make(chan struct{}, 1)
    store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
    mux := config.NewMux(store)
    bcaster := config.NewBroadcaster()
    go watchForUpdates(bcaster, store, updates)
    return &ServiceConfig{mux, bcaster, store}
}

func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
    c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
        glog.V(3).Infof("Calling handler.OnServiceUpdate()")
        handler.OnServiceUpdate(instance.([]api.Service))
    }))
}
...
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
    for true {
        <-updates
        bcaster.Notify(accessor.MergedState())
    }
}

3、endpointsConfig的创建

逻辑和serviceConfig的创建完全一样,只是将service换成了endpoint。

kubernetes/cmd/kube-proxy/app/server.go +283

endpointsConfig := proxyconfig.NewEndpointsConfig()
    endpointsConfig.RegisterHandler(endpointsHandler)

kubernetes/pkg/proxy/config/config.go +84

func NewEndpointsConfig() *EndpointsConfig {
    updates := make(chan struct{}, 1)
    store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
    mux := config.NewMux(store)
    bcaster := config.NewBroadcaster()
    go watchForUpdates(bcaster, store, updates)
    return &EndpointsConfig{mux, bcaster, store}
}

func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
    c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
        glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
        handler.OnEndpointsUpdate(instance.([]api.Endpoints))
    }))
}

4、 service和endpoint的真实来源

service、endpoint肯定来源于apiserver,kube-proxy如何获取呢?获取的关键代码是proxyconfig.NewSourceAPI这个函数。我们看看这个函数传入的几个参数,主要看后两个,serviceConfig.Channel(“api”)和endpointsConfig.Channel(“api”),这里以service为例。

kubernetes/cmd/kube-proxy/app/server.go +286

proxyconfig.NewSourceAPI(
        client,
        config.ConfigSyncPeriod,
        serviceConfig.Channel("api"),
        endpointsConfig.Channel("api"),
    )

首先调用c.mux.Channel构建创建了ch,ch本质上是一个channel,然后又创建了一个serviceCh。启动一个goroutine,这个goroutine的作用就是从serviceCh读取东西,然后写入ch。最后返回这个serviceCh

kubernetes/pkg/proxy/config/config.go +213

func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
    ch := c.mux.Channel(source)
    serviceCh := make(chan ServiceUpdate)
    go func() {
        for update := range serviceCh {
            ch <- update
        }
        close(ch)
    }()
    return serviceCh
}

返回的serviceCh被传入proxyconfig.NewSourceAPI函数。我们看一下这个函数,其实主要创建了两个reflector,用来从kube-apiserver同步service和endpoint信息,同步的信息会写入serviceCh和endpointCh。这些信息进而会写入ch这个channel。那么谁会从ch读取信息呢。问题的关键是ch := c.mux.Channel(source)

kubernetes/pkg/proxy/config/api.go +28

func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
    servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
    cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run()

    endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())
    cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run()
}

func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store {
    fn := func(objs []interface{}) {
        var services []api.Service
        for _, o := range objs {
            services = append(services, *(o.(*api.Service)))
        }
        ch <- ServiceUpdate{Op: SET, Services: services}
    }
    if store == nil {
        store = cache.NewStore(cache.MetaNamespaceKeyFunc)
    }
    return &cache.UndeltaStore{
        Store:    store,
        PushFunc: fn,
    }
}

func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store {
    fn := func(objs []interface{}) {
        var endpoints []api.Endpoints
        for _, o := range objs {
            endpoints = append(endpoints, *(o.(*api.Endpoints)))
        }
        ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints}
    }
    if store == nil {
        store = cache.NewStore(cache.MetaNamespaceKeyFunc)
    }
    return &cache.UndeltaStore{
        Store:    store,
        PushFunc: fn,
    }
}

5、Mux框架

Mux框架代码的位于kubernetes/pkg/util/config/config.go。它的主要作用是合并多个数据源,每一次调用Channel(source)都会注册一个数据源,同时返回一个channel,我们可以往返回的channel中写入数据,一旦写入数据就会促发Merge函数。上面的ch这个channel,就是调用Channel返回的。我们看看service的Merge函数,endpoint类似

kubernetes/pkg/proxy/config/config.go +235

func (s *serviceStore) Merge(source string, change interface{}) error {
    s.serviceLock.Lock()
    services := s.services[source]
    if services == nil {
        services = make(map[types.NamespacedName]api.Service)
    }
    update := change.(ServiceUpdate)
    switch update.Op {
    case ADD:
        glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))
        for _, value := range update.Services {
            name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
            services[name] = value
        }
    case REMOVE:
        glog.V(5).Infof("Removing a service %s", spew.Sdump(update))
        for _, value := range update.Services {
            name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
            delete(services, name)
        }
    case SET:
        glog.V(5).Infof("Setting services %s", spew.Sdump(update))
        // Clear the old map entries by just creating a new map
        services = make(map[types.NamespacedName]api.Service)
        for _, value := range update.Services {
            name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
            services[name] = value
        }
    default:
        glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
    }
    s.services[source] = services
    s.serviceLock.Unlock()
    if s.updates != nil {
        // Since we record the snapshot before sending this signal, it's
        // possible that the consumer ends up performing an extra update.
        select {
        case s.updates <- struct{}{}:
        default:
            glog.V(4).Infof("Service handler already has a pending interrupt.")
        }
    }
    return nil
}

这个函数的作用写将获取的service写到serviceStore.services,然后往updates写入一个空的结构体,updates变成可读的,这样就将前面的东西串联起来了。

6、OnServiceUpdate和OnEndpointsUpdate

以OnServiceUpdate函数为例

func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
    start := time.Now()
    defer func() {
        glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
    }()
    proxier.mu.Lock()
    defer proxier.mu.Unlock()
    proxier.haveReceivedServiceUpdate = true

    activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set

    for i := range allServices {
        service := &allServices[i]
        svcName := types.NamespacedName{
            Namespace: service.Namespace,
            Name:      service.Name,
        }

        // if ClusterIP is "None" or empty, skip proxying
        if !api.IsServiceIPSet(service) {
            glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
            continue
        }

        for i := range service.Spec.Ports {
            servicePort := &service.Spec.Ports[i]

            serviceName := proxy.ServicePortName{
                NamespacedName: svcName,
                Port:           servicePort.Name,
            }
            activeServices[serviceName] = true
            info, exists := proxier.serviceMap[serviceName]
            if exists && proxier.sameConfig(info, service, servicePort) {
                // Nothing changed.
                continue
            }
            if exists {
                // Something changed.
                glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
                delete(proxier.serviceMap, serviceName)
            }
            serviceIP := net.ParseIP(service.Spec.ClusterIP)
            glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
            info = newServiceInfo(serviceName)
            info.clusterIP = serviceIP
            info.port = int(servicePort.Port)
            info.protocol = servicePort.Protocol
            info.nodePort = int(servicePort.NodePort)
            info.externalIPs = service.Spec.ExternalIPs
            // Deep-copy in case the service instance changes
            info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
            info.sessionAffinityType = service.Spec.SessionAffinity
            info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
            proxier.serviceMap[serviceName] = info

            glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
        }
    }

    staleUDPServices := sets.NewString()
    // Remove services missing from the update.
    for name := range proxier.serviceMap {
        if !activeServices[name] {
            glog.V(1).Infof("Removing service %q", name)
            if proxier.serviceMap[name].protocol == api.ProtocolUDP {
                staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String())
            }
            delete(proxier.serviceMap, name)
        }
    }

    proxier.syncProxyRules()
    proxier.deleteServiceConnections(staleUDPServices.List())

}

代码的核心是根据获取的service信息,构建proxier.serviceMap,然后调用proxier.syncProxyRules()去同步iptables信息。

从上述的分析可知,一旦service、endpoint有变化,相应的iptables规则就会得到更新。

三、运行服务

kubernetes/cmd/kube-proxy/app/server.go +306

func (s *ProxyServer) Run() error {
    // remove iptables rules and exit
    if s.Config.CleanupAndExit {
        encounteredError := userspace.CleanupLeftovers(s.IptInterface)
        encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
        if encounteredError {
            return errors.New("Encountered an error while tearing down rules.")
        }
        return nil
    }

    s.Broadcaster.StartRecordingToSink(s.Client.Events(""))

    // Start up a webserver if requested
    if s.Config.HealthzPort > 0 {
        http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
            fmt.Fprintf(w, "%s", s.ProxyMode)
        })
        configz.InstallHandler(http.DefaultServeMux)
        go wait.Until(func() {
            err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(int(s.Config.HealthzPort)), nil)
            if err != nil {
                glog.Errorf("Starting health server failed: %v", err)
            }
        }, 5*time.Second, wait.NeverStop)
    }

    // Tune conntrack, if requested
    if s.Conntracker != nil {
        max, err := getConntrackMax(s.Config)
        if err != nil {
            return err
        }
        if max > 0 {
            err := s.Conntracker.SetMax(max)
            if err != nil {
                if err != readOnlySysFSError {
                    return err
                }
                const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: can't raise conntrack limits, problems may arise later."
                s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeWarning, err.Error(), message)
            }
        }
        if s.Config.ConntrackTCPEstablishedTimeout.Duration > 0 {
            if err := s.Conntracker.SetTCPEstablishedTimeout(int(s.Config.ConntrackTCPEstablishedTimeout.Duration / time.Second)); err != nil {
                return err
            }
        }
    }

    // Birth Cry after the birth is successful
    s.birthCry()

    // Just loop forever for now...
    s.Proxier.SyncLoop()
    return nil
}

kubernetes/pkg/proxy/iptables/proxier.go +385

func (proxier *Proxier) SyncLoop() {
    t := time.NewTicker(proxier.syncPeriod)
    defer t.Stop()
    for {
        <-t.C
        glog.V(6).Infof("Periodic sync")
        proxier.Sync()
    }
}

func (proxier *Proxier) Sync() {
    proxier.mu.Lock()
    defer proxier.mu.Unlock()
    proxier.syncProxyRules()
}

code url:
k8s.io/kubernetes/pkg/proxy/iptables/proxier.go

Run()的作用主要是每隔proxier.syncPeriod,会调用一次proxier.Sync(),进而调用proxier.syncProxyRules()。proxier.syncPeriod的默认值为30秒

至于怎么更新iptables,查看synxProxyRules即可。

func (proxier *Proxier) syncProxyRules() {
    if proxier.throttle != nil {
        proxier.throttle.Accept()
    }
    start := time.Now()
    defer func() {
        glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
    }()
    // don't sync rules till we've received services and endpoints
    if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
        glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
        return
    }
    glog.V(3).Infof("Syncing iptables rules")

    // Create and link the kube services chain.
    {
        tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
        for _, table := range tablesNeedServicesChain {
            if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil {
                glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err)
                return
            }
        }

        tableChainsNeedJumpServices := []struct {
            table utiliptables.Table
            chain utiliptables.Chain
        }{
            {utiliptables.TableFilter, utiliptables.ChainOutput},
            {utiliptables.TableNAT, utiliptables.ChainOutput},
            {utiliptables.TableNAT, utiliptables.ChainPrerouting},
        }
        comment := "kubernetes service portals"
        args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)}
        for _, tc := range tableChainsNeedJumpServices {
            if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
                glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err)
                return
            }
        }
    }

    // Create and link the kube postrouting chain.
    {
        if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil {
            glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err)
            return
        }

        comment := "kubernetes postrouting rules"
        args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)}
        if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
            glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err)
            return
        }
    }

    // Get iptables-save output so we can check for existing chains and rules.
    // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
    existingFilterChains := make(map[utiliptables.Chain]string)
    iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableFilter)
    if err != nil { // if we failed to get any rules
        glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
    } else { // otherwise parse the output
        existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw)
    }

    existingNATChains := make(map[utiliptables.Chain]string)
    iptablesSaveRaw, err = proxier.iptables.Save(utiliptables.TableNAT)
    if err != nil { // if we failed to get any rules
        glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
    } else { // otherwise parse the output
        existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
    }

    filterChains := bytes.NewBuffer(nil)
    filterRules := bytes.NewBuffer(nil)
    natChains := bytes.NewBuffer(nil)
    natRules := bytes.NewBuffer(nil)

    // Write table headers.
    writeLine(filterChains, "*filter")
    writeLine(natChains, "*nat")

    // Make sure we keep stats for the top-level chains, if they existed
    // (which most should have because we created them above).
    if chain, ok := existingFilterChains[kubeServicesChain]; ok {
        writeLine(filterChains, chain)
    } else {
        writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain))
    }
    if chain, ok := existingNATChains[kubeServicesChain]; ok {
        writeLine(natChains, chain)
    } else {
        writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain))
    }
    if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
        writeLine(natChains, chain)
    } else {
        writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain))
    }
    if chain, ok := existingNATChains[kubePostroutingChain]; ok {
        writeLine(natChains, chain)
    } else {
        writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain))
    }
    if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
        writeLine(natChains, chain)
    } else {
        writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
    }

    // Install the kubernetes-specific postrouting rules. We use a whole chain for
    // this so that it is easier to flush and change, for example if the mark
    // value should ever change.
    writeLine(natRules, []string{
        "-A", string(kubePostroutingChain),
        "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
        "-m", "mark", "--mark", proxier.masqueradeMark,
        "-j", "MASQUERADE",
    }...)

    // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
    // this so that it is easier to flush and change, for example if the mark
    // value should ever change.
    writeLine(natRules, []string{
        "-A", string(KubeMarkMasqChain),
        "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
    }...)

    // Accumulate NAT chains to keep.
    activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set

    // Accumulate the set of local ports that we will be holding open once this update is complete
    replacementPortsMap := map[localPort]closeable{}

    // Build rules for each service.
    for svcName, svcInfo := range proxier.serviceMap {
        protocol := strings.ToLower(string(svcInfo.protocol))

        // Create the per-service chain, retaining counters if possible.
        svcChain := servicePortChainName(svcName, protocol)
        if chain, ok := existingNATChains[svcChain]; ok {
            writeLine(natChains, chain)
        } else {
            writeLine(natChains, utiliptables.MakeChainLine(svcChain))
        }
        activeNATChains[svcChain] = true

        svcXlbChain := serviceLBChainName(svcName, protocol)
        if svcInfo.onlyNodeLocalEndpoints {
            // Only for services with the externalTraffic annotation set to OnlyLocal
            // create the per-service LB chain, retaining counters if possible.
            if lbChain, ok := existingNATChains[svcXlbChain]; ok {
                writeLine(natChains, lbChain)
            } else {
                writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain))
            }
            activeNATChains[svcXlbChain] = true
        } else if activeNATChains[svcXlbChain] {
            // Cleanup the previously created XLB chain for this service
            delete(activeNATChains, svcXlbChain)
        }

        // Capture the clusterIP.
        args := []string{
            "-A", string(kubeServicesChain),
            "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()),
            "-m", protocol, "-p", protocol,
            "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
            "--dport", fmt.Sprintf("%d", svcInfo.port),
        }
        if proxier.masqueradeAll {
            writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
        }
        if len(proxier.clusterCIDR) > 0 {
            writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
        }
        writeLine(natRules, append(args, "-j", string(svcChain))...)

        // Capture externalIPs.
        for _, externalIP := range svcInfo.externalIPs {
            // If the "external" IP happens to be an IP that is local to this
            // machine, hold the local port open so no other process can open it
            // (because the socket might open but it would never work).
            if local, err := isLocalIP(externalIP); err != nil {
                glog.Errorf("can't determine if IP is local, assuming not: %v", err)
            } else if local {
                lp := localPort{
                    desc:     "externalIP for " + svcName.String(),
                    ip:       externalIP,
                    port:     svcInfo.port,
                    protocol: protocol,
                }
                if proxier.portsMap[lp] != nil {
                    glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                    replacementPortsMap[lp] = proxier.portsMap[lp]
                } else {
                    socket, err := proxier.portMapper.OpenLocalPort(&lp)
                    if err != nil {
                        glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err)
                        continue
                    }
                    replacementPortsMap[lp] = socket
                }
            } // We're holding the port, so it's OK to install iptables rules.
            args := []string{
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()),
                "-m", protocol, "-p", protocol,
                "-d", fmt.Sprintf("%s/32", externalIP),
                "--dport", fmt.Sprintf("%d", svcInfo.port),
            }
            // We have to SNAT packets to external IPs.
            writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)

            // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
            // nor from a local process to be forwarded to the service.
            // This rule roughly translates to "all traffic from off-machine".
            // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
            externalTrafficOnlyArgs := append(args,
                "-m", "physdev", "!", "--physdev-is-in",
                "-m", "addrtype", "!", "--src-type", "LOCAL")
            writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
            dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
            // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
            // This covers cases like GCE load-balancers which get added to the local routing table.
            writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
        }

        // Capture load-balancer ingress.
        for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
            if ingress.IP != "" {
                // create service firewall chain
                fwChain := serviceFirewallChainName(svcName, protocol)
                if chain, ok := existingNATChains[fwChain]; ok {
                    writeLine(natChains, chain)
                } else {
                    writeLine(natChains, utiliptables.MakeChainLine(fwChain))
                }
                activeNATChains[fwChain] = true
                // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
                // This currently works for loadbalancers that preserves source ips.
                // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.

                args := []string{
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
                    "-m", protocol, "-p", protocol,
                    "-d", fmt.Sprintf("%s/32", ingress.IP),
                    "--dport", fmt.Sprintf("%d", svcInfo.port),
                }
                // jump to service firewall chain
                writeLine(natRules, append(args, "-j", string(fwChain))...)

                args = []string{
                    "-A", string(fwChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
                }

                // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
                chosenChain := svcXlbChain
                // If we are proxying globally, we need to masquerade in case we cross nodes.
                // If we are proxying only locally, we can retain the source IP.
                if !svcInfo.onlyNodeLocalEndpoints {
                    writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                    chosenChain = svcChain
                }

                if len(svcInfo.loadBalancerSourceRanges) == 0 {
                    // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
                    writeLine(natRules, append(args, "-j", string(chosenChain))...)
                } else {
                    // firewall filter based on each source range
                    allowFromNode := false
                    for _, src := range svcInfo.loadBalancerSourceRanges {
                        writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...)
                        // ignore error because it has been validated
                        _, cidr, _ := net.ParseCIDR(src)
                        if cidr.Contains(proxier.nodeIP) {
                            allowFromNode = true
                        }
                    }
                    // generally, ip route rule was added to intercept request to loadbalancer vip from the
                    // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
                    // Need to add the following rule to allow request on host.
                    if allowFromNode {
                        writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...)
                    }
                }

                // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
                // It means the packet cannot go thru the firewall, then mark it for DROP
                writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...)
            }
        }

        // Capture nodeports.  If we had more than 2 rules it might be
        // worthwhile to make a new per-service chain for nodeport rules, but
        // with just 2 rules it ends up being a waste and a cognitive burden.
        if svcInfo.nodePort != 0 {
            // Hold the local port open so no other process can open it
            // (because the socket might open but it would never work).
            lp := localPort{
                desc:     "nodePort for " + svcName.String(),
                ip:       "",
                port:     svcInfo.nodePort,
                protocol: protocol,
            }
            if proxier.portsMap[lp] != nil {
                glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                replacementPortsMap[lp] = proxier.portsMap[lp]
            } else {
                socket, err := proxier.portMapper.OpenLocalPort(&lp)
                if err != nil {
                    glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
                    continue
                }
                replacementPortsMap[lp] = socket
            } // We're holding the port, so it's OK to install iptables rules.

            args := []string{
                "-A", string(kubeNodePortsChain),
                "-m", "comment", "--comment", svcName.String(),
                "-m", protocol, "-p", protocol,
                "--dport", fmt.Sprintf("%d", svcInfo.nodePort),
            }
            if !svcInfo.onlyNodeLocalEndpoints {
                // Nodeports need SNAT, unless they're local.
                writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                // Jump to the service chain.
                writeLine(natRules, append(args, "-j", string(svcChain))...)
            } else {
                // TODO: Make all nodePorts jump to the firewall chain.
                // Currently we only create it for loadbalancers (#33586).
                writeLine(natRules, append(args, "-j", string(svcXlbChain))...)
            }
        }

        // If the service has no endpoints then reject packets.
        if len(proxier.endpointsMap[svcName]) == 0 {
            writeLine(filterRules,
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
                "-m", protocol, "-p", protocol,
                "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
                "--dport", fmt.Sprintf("%d", svcInfo.port),
                "-j", "REJECT",
            )
            continue
        }

        // Generate the per-endpoint chains.  We do this in multiple passes so we
        // can group rules together.
        // These two slices parallel each other - keep in sync
        endpoints := make([]*endpointsInfo, 0)
        endpointChains := make([]utiliptables.Chain, 0)
        for _, ep := range proxier.endpointsMap[svcName] {
            endpoints = append(endpoints, ep)
            endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip)
            endpointChains = append(endpointChains, endpointChain)

            // Create the endpoint chain, retaining counters if possible.
            if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
                writeLine(natChains, chain)
            } else {
                writeLine(natChains, utiliptables.MakeChainLine(endpointChain))
            }
            activeNATChains[endpointChain] = true
        }

        // First write session affinity rules, if applicable.
        if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
            for _, endpointChain := range endpointChains {
                writeLine(natRules,
                    "-A", string(svcChain),
                    "-m", "comment", "--comment", svcName.String(),
                    "-m", "recent", "--name", string(endpointChain),
                    "--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap",
                    "-j", string(endpointChain))
            }
        }

        // Now write loadbalancing & DNAT rules.
        n := len(endpointChains)
        for i, endpointChain := range endpointChains {
            // Balancing rules in the per-service chain.
            args := []string{
                "-A", string(svcChain),
                "-m", "comment", "--comment", svcName.String(),
            }
            if i < (n - 1) {
                // Each rule is a probabilistic match.
                args = append(args,
                    "-m", "statistic",
                    "--mode", "random",
                    "--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i)))
            }
            // The final (or only if n == 1) rule is a guaranteed match.
            args = append(args, "-j", string(endpointChain))
            writeLine(natRules, args...)

            // Rules in the per-endpoint chain.
            args = []string{
                "-A", string(endpointChain),
                "-m", "comment", "--comment", svcName.String(),
            }
            // Handle traffic that loops back to the originator with SNAT.
            writeLine(natRules, append(args,
                "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]),
                "-j", string(KubeMarkMasqChain))...)
            // Update client-affinity lists.
            if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
                args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
            }
            // DNAT to final destination.
            args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip)
            writeLine(natRules, args...)
        }

        // The logic below this applies only if this service is marked as OnlyLocal
        if !svcInfo.onlyNodeLocalEndpoints {
            continue
        }

        // Now write ingress loadbalancing & DNAT rules only for services that have a localOnly annotation
        // TODO - This logic may be combinable with the block above that creates the svc balancer chain
        localEndpoints := make([]*endpointsInfo, 0)
        localEndpointChains := make([]utiliptables.Chain, 0)
        for i := range endpointChains {
            if endpoints[i].localEndpoint {
                // These slices parallel each other; must be kept in sync
                localEndpoints = append(localEndpoints, endpoints[i])
                localEndpointChains = append(localEndpointChains, endpointChains[i])
            }
        }
        // First rule in the chain redirects all pod -> external vip traffic to the
        // Service's ClusterIP instead. This happens whether or not we have local
        // endpoints; only if clusterCIDR is specified
        if len(proxier.clusterCIDR) > 0 {
            args = []string{
                "-A", string(svcXlbChain),
                "-m", "comment", "--comment",
                fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`),
                "-s", proxier.clusterCIDR,
                "-j", string(svcChain),
            }
            writeLine(natRules, args...)
        }

        numLocalEndpoints := len(localEndpointChains)
        if numLocalEndpoints == 0 {
            // Blackhole all traffic since there are no local endpoints
            args := []string{
                "-A", string(svcXlbChain),
                "-m", "comment", "--comment",
                fmt.Sprintf(`"%s has no local endpoints"`, svcName.String()),
                "-j",
                string(KubeMarkDropChain),
            }
            writeLine(natRules, args...)
        } else {
            // Setup probability filter rules only over local endpoints
            for i, endpointChain := range localEndpointChains {
                // Balancing rules in the per-service chain.
                args := []string{
                    "-A", string(svcXlbChain),
                    "-m", "comment", "--comment",
                    fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcName.String()),
                }
                if i < (numLocalEndpoints - 1) {
                    // Each rule is a probabilistic match.
                    args = append(args,
                        "-m", "statistic",
                        "--mode", "random",
                        "--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i)))
                }
                // The final (or only if n == 1) rule is a guaranteed match.
                args = append(args, "-j", string(endpointChain))
                writeLine(natRules, args...)
            }
        }
    }

    // Delete chains no longer in use.
    for chain := range existingNATChains {
        if !activeNATChains[chain] {
            chainString := string(chain)
            if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
                // Ignore chains that aren't ours.
                continue
            }
            // We must (as per iptables) write a chain-line for it, which has
            // the nice effect of flushing the chain.  Then we can remove the
            // chain.
            writeLine(natChains, existingNATChains[chain])
            writeLine(natRules, "-X", chainString)
        }
    }

    // Finally, tail-call to the nodeports chain.  This needs to be after all
    // other service portal rules.
    writeLine(natRules,
        "-A", string(kubeServicesChain),
        "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
        "-m", "addrtype", "--dst-type", "LOCAL",
        "-j", string(kubeNodePortsChain))

    // Write the end-of-table markers.
    writeLine(filterRules, "COMMIT")
    writeLine(natRules, "COMMIT")

    // Sync rules.
    // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
    filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
    natLines := append(natChains.Bytes(), natRules.Bytes()...)
    lines := append(filterLines, natLines...)

    glog.V(3).Infof("Restoring iptables rules: %s", lines)
    err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
    if err != nil {
        glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines)
        // Revert new local ports.
        revertPorts(replacementPortsMap, proxier.portsMap)
        return
    }

    // Close old local ports and save new ones.
    for k, v := range proxier.portsMap {
        if replacementPortsMap[k] == nil {
            v.Close()
        }
    }
    proxier.portsMap = replacementPortsMap
}

四、总结

kube-proxy的功能较为单一,其核心思想是同kube-apiserver同步service和endpoint信息,然后更新到iptables。

Logo

开源、云原生的融合云平台

更多推荐