k8s的kube-proxy分析

Kube-proxy主要是伴随着kubtlet进程一起部署在每个node节点中,proxy的功能主要就是为了完成在k8s集群中实现集群内部的通信,也可完成集群外的数据到集群内部的通信。从功能上来说确实是完成了更高层次的网络封装,让用户能够忽略网络层的部分细节从而专注于业务层的功能。

在早期的k8s的实现中,使用了最简单快速的方式来实现流量的转发,即通过用户态的数据接受直接转发到后端的目标地址上去。

首先查看proxy的main函数:

var (
	etcdServerList util.StringList     // etcd监听的服务器列表
	etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers")
	bindAddress    = util.IP(net.ParseIP("0.0.0.0"))   // 本地绑定的端口
	clientConfig   = &client.Config{} 								 // 客户端配置
)

func init() {
	client.BindClientConfigFlags(flag.CommandLine, clientConfig)
	flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional). Mutually exclusive with -etcd_config")
	flag.Var(&bindAddress, "bind_address", "The address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
}

func main() {
	flag.Parse()
	util.InitLogs()
	defer util.FlushLogs()

	verflag.PrintAndExitIfRequested()

	serviceConfig := config.NewServiceConfig()         // 生成一个serviceConfig
	endpointsConfig := config.NewEndpointsConfig() 		 // 生成endpoint结构

	// define api config source
	if clientConfig.Host != "" {
		glog.Infof("Using api calls to get config %v", clientConfig.Host)
		client, err := client.New(clientConfig)            
		if err != nil {
			glog.Fatalf("Invalid API configuration: %v", err)
		}
		config.NewSourceAPI(
			client,
			30*time.Second,
			serviceConfig.Channel("api"),
			endpointsConfig.Channel("api"),
		)                                               // 生成对应的配置中心
	} else {

		var etcdClient *etcd.Client 										// etcd客户端连接

		// Set up etcd client
		if len(etcdServerList) > 0 {
			// Set up logger for etcd client
			etcd.SetLogger(util.NewLogger("etcd "))
			etcdClient = etcd.NewClient(etcdServerList)
		} else if *etcdConfigFile != "" {
			// Set up logger for etcd client
			etcd.SetLogger(util.NewLogger("etcd "))
			var err error
			etcdClient, err = etcd.NewClientFromFile(*etcdConfigFile)

			if err != nil {
				glog.Fatalf("Error with etcd config file: %v", err)
			}
		}

		// Create a configuration source that handles configuration from etcd.
		if etcdClient != nil {
			glog.Infof("Using etcd servers %v", etcdClient.GetCluster())

			config.NewConfigSourceEtcd(etcdClient,
				serviceConfig.Channel("etcd"),
				endpointsConfig.Channel("etcd")) 							// 设置从etcd读取配置
		}
	}

	loadBalancer := proxy.NewLoadBalancerRR() 					// 通过负载均衡的算法
	proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress)) 	// 监听代理信息
	// Wire proxier to handle changes to services
	serviceConfig.RegisterHandler(proxier)
	// And wire loadBalancer to handle changes to endpoints to services
	endpointsConfig.RegisterHandler(loadBalancer)    	   // 注册endpoint

	// Just loop forever for now...
	select {}
}

从执行流程上一块看出,通过service和endpoint的概念,将service对应所有的服务与endpoint来绑定,即endpoint绑定所有该服务后续对应的pod的节点。如果从etcd来监控信息变化的方式的话,就会通过etcd监控的增删改来控制当前的proxy的端口的开启或关闭。

我们继续查看config.NewConfigSourceEtcd来查看具体的运行逻辑:

// NewConfigSourceEtcd creates a new ConfigSourceEtcd and immediately runs the created ConfigSourceEtcd in a goroutine.
func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd {
	config := ConfigSourceEtcd{
		client:           client,    				// 传入的etcd实例客户端
		serviceChannel:   serviceChannel,   	// 监听的service的chan
		endpointsChannel: endpointsChannel,   // 监听的endpoint的chan
		interval:         2 * time.Second, 
	}
	go config.Run() 												// 开始监听变化
	return config
}

// Run begins watching for new services and their endpoints on etcd.
func (s ConfigSourceEtcd) Run() {
	// Initially, just wait for the etcd to come up before doing anything more complicated.
	var services []api.Service
	var endpoints []api.Endpoints
	var err error
	for {
		services, endpoints, err = s.GetServices()     // 获取当前的service信息
		if err == nil {
			break
		}
		glog.V(1).Infof("Failed to get any services: %v", err)
		time.Sleep(s.interval)
	}

	if len(services) > 0 {
		serviceUpdate := ServiceUpdate{Op: SET, Services: services}   // 如果获取有service则添加到设置chan中
		s.serviceChannel <- serviceUpdate
	}
	if len(endpoints) > 0 { 																				// 如果获取有endpoints则发送到chan中
		endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}
		s.endpointsChannel <- endpointsUpdate
	}

	// Ok, so we got something back from etcd. Let's set up a watch for new services, and
	// their endpoints
	go util.Forever(s.WatchForChanges, 1*time.Second)       // 每隔一秒运行监听的变化 根据service和endpoint来添加服务信息

	for {
		services, endpoints, err = s.GetServices()            // 获取服务信息 并通知到每个通道
		if err != nil {
			glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err)
		} else {
			if len(services) > 0 {
				serviceUpdate := ServiceUpdate{Op: SET, Services: services}
				s.serviceChannel <- serviceUpdate
			}
			if len(endpoints) > 0 {
				endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}
				s.endpointsChannel <- endpointsUpdate
			}
		}
		time.Sleep(30 * time.Second)
	}
}


func (s ConfigSourceEtcd) WatchForChanges() {
	glog.V(4).Info("Setting up a watch for new services")
	watchChannel := make(chan *etcd.Response)
	go s.client.Watch("/registry/services/", 0, true, watchChannel, nil)  // 监听新加入的service信息
	for {
		watchResponse, ok := <-watchChannel    // 获取监听的数据
		if !ok { 
			break
		}
		s.ProcessChange(watchResponse)         // 根据service来进行处理
	}
}

func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
	glog.V(4).Infof("Processing a change in service configuration... %s", *response)

	// If it's a new service being added (signified by a localport being added)
	// then process it as such
	if strings.Contains(response.Node.Key, "/endpoints/") {
		s.ProcessEndpointResponse(response)   	// 如果包含endpoints信息则添加endpoint信息
	} else if response.Action == "set" {
		service, err := etcdResponseToService(response)   // 如果是set操作则新增servcie
		if err != nil {
			glog.Errorf("Failed to parse %s Port: %s", response, err)
			return
		}

		glog.V(4).Infof("New service added/updated: %#v", service)
		serviceUpdate := ServiceUpdate{Op: ADD, Services: []api.Service{*service}}
		s.serviceChannel <- serviceUpdate
		return
	}
	if response.Action == "delete" { 					// 如果是删除则发送删除信号
		parts := strings.Split(response.Node.Key[1:], "/")
		if len(parts) == 4 {
			glog.V(4).Infof("Deleting service: %s", parts[3])
			serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{TypeMeta: api.TypeMeta{ID: parts[3]}}}}
			s.serviceChannel <- serviceUpdate
			return
		}
		glog.Warningf("Unknown service delete: %#v", parts)
	}
}

func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) {
	glog.V(4).Infof("Processing a change in endpoint configuration... %s", *response)
	var endpoints api.Endpoints
	err := latest.Codec.DecodeInto([]byte(response.Node.Value), &endpoints)  // 处理endpoint的信息变更
	if err != nil {
		glog.Errorf("Failed to parse service out of etcd key: %v : %+v", response.Node.Value, err)
		return
	}
	endpointsUpdate := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoints}}
	s.endpointsChannel <- endpointsUpdate
}

通过初始化的流程可知,最终无论是servcie的变更还是endpoint的变更最终都发送到了service的通道和endpoint的通道中去,这其中的通道的初始化与工作就分别放在了ServiceConfig和EndpointsConfig中去了。

type ServiceConfig struct {
	mux     *config.Mux
	watcher *config.Watcher
	store   *serviceStore
}

// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
	updates := make(chan struct{}) 						// 初始化通道
	store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}    // 初始化数据
	mux := config.NewMux(store)
	watcher := config.NewWatcher() 						// 初始化一个watcher
	go watchForUpdates(watcher, store, updates)  	// 监听变化
	return &ServiceConfig{mux, watcher, store}
}

func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
	c.watcher.Add(config.ListenerFunc(func(instance interface{}) {
		handler.OnUpdate(instance.([]api.Service))
	}))   	// 给watcher添加一个回调处理函数
}

func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
	ch := c.mux.Channel(source)
	serviceCh := make(chan ServiceUpdate)  // 生成一个管道并通过该管道给通道通信
	go func() {
		for update := range serviceCh {
			ch <- update
		}
		close(ch)
	}()
	return serviceCh
}

一旦service有信息更新就会调用servcieConfig注册的proxier,endpoints有更新则调用注册的loadBalancer,这其中的回调的实现使用了订阅发布的模式,一旦该通道有消息则通知到所有订阅的handler处理。

我们查看一下proxier的流程:

// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
	loadBalancer LoadBalancer     		// 复杂均衡的组件
	mu           sync.Mutex // protects serviceMap
	serviceMap   map[string]*serviceInfo   //保存的service信息
	address      net.IP 										// 监听的地址
}

// NewProxier returns a new Proxier given a LoadBalancer and an
// address on which to listen
func NewProxier(loadBalancer LoadBalancer, address net.IP) *Proxier {
	return &Proxier{
		loadBalancer: loadBalancer,
		serviceMap:   make(map[string]*serviceInfo),
		address:      address,
	}
}

// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error {
	proxier.mu.Lock()
	defer proxier.mu.Unlock()
	return proxier.stopProxyInternal(service, info)
}

// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error {
	if !info.setActive(false) {
		return nil
	}
	glog.V(3).Infof("Removing service: %s", service)
	delete(proxier.serviceMap, service)
	return info.socket.Close()
}

func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) {
	proxier.mu.Lock()
	defer proxier.mu.Unlock()
	info, ok := proxier.serviceMap[service]
	return info, ok
}

func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
	proxier.mu.Lock()
	defer proxier.mu.Unlock()
	proxier.serviceMap[service] = info
}

// addServiceOnUnusedPort starts listening for a new service, returning the
// port it's using.  For testing on a system with unknown ports used.  The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnUnusedPort(service string, protocol api.Protocol, timeout time.Duration) (string, error) {
	sock, err := newProxySocket(protocol, proxier.address, 0)
	if err != nil {
		return "", err
	}
	_, port, err := net.SplitHostPort(sock.Addr().String())
	if err != nil {
		return "", err
	}
	portNum, err := strconv.Atoi(port)
	if err != nil {
		return "", err
	}
	proxier.setServiceInfo(service, &serviceInfo{
		port:     portNum,
		protocol: protocol,
		active:   true,
		socket:   sock,
		timeout:  timeout,
	})
	proxier.startAccepting(service, sock)
	return port, nil
}

func (proxier *Proxier) startAccepting(service string, sock proxySocket) {
	glog.V(1).Infof("Listening for %s on %s:%s", service, sock.Addr().Network(), sock.Addr().String())
	go func(service string, proxier *Proxier) {
		defer util.HandleCrash()
		sock.ProxyLoop(service, proxier)
	}(service, proxier)
}

// How long we leave idle UDP connections open.
const udpIdleTimeout = 1 * time.Minute

// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {
	glog.V(4).Infof("Received update notice: %+v", services)
	activeServices := util.StringSet{}
	for _, service := range services {        // 遍历服务信息
		activeServices.Insert(service.ID)
		info, exists := proxier.getServiceInfo(service.ID)    // 检查服务是否存在
		// TODO: check health of the socket?  What if ProxyLoop exited?
		if exists && info.isActive() && info.port == service.Port { // 如果已经存在则跳过
			continue
		}
		if exists && info.port != service.Port { 			// 如果存在但是服务端口不同则停止原代理
			err := proxier.stopProxy(service.ID, info)
			if err != nil {
				glog.Errorf("error stopping %s: %v", service.ID, err)
			}
		}
		glog.V(3).Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port) 						// 新增新的代理
		sock, err := newProxySocket(service.Protocol, proxier.address, service.Port)  // 根据协议来新增代理信息
		if err != nil {
			glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err)
			continue
		}
		proxier.setServiceInfo(service.ID, &serviceInfo{
			port:     service.Port,
			protocol: service.Protocol,
			active:   true,
			socket:   sock,
			timeout:  udpIdleTimeout,
		}) 				// 设置当前的代理
		proxier.startAccepting(service.ID, sock)  // 设置服务开始接受流量
	}
	proxier.mu.Lock()
	defer proxier.mu.Unlock()
	for name, info := range proxier.serviceMap { 		// 检查当前service是否存活如果不存活则停止
		if !activeServices.Has(name) {
			err := proxier.stopProxyInternal(name, info)
			if err != nil {
				glog.Errorf("error stopping %s: %v", name, err)
			}
		}
	}
}

再新生成的对象中利用newProxySocket来生成对应协议的代理,当前的代理方式分为tcp和udp两种。

func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
	host := ip.String()
	switch strings.ToUpper(string(protocol)) {
	case "TCP":
		listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))  // tcp代理
		if err != nil {
			return nil, err
		}
		return &tcpProxySocket{listener}, nil
	case "UDP":
		addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port))) // udp代理
		if err != nil {
			return nil, err
		}
		conn, err := net.ListenUDP("udp", addr)
		if err != nil {
			return nil, err
		}
		return &udpProxySocket{conn}, nil 
	}
	return nil, fmt.Errorf("Unknown protocol %q", protocol)
}

以udp为例,介绍一下流程:

// udpProxySocket implements proxySocket.  Close() is implemented by net.UDPConn.  When Close() is called,
// no new connections are allowed and existing connections are broken.
// TODO: We could lame-duck this ourselves, if it becomes important.
type udpProxySocket struct {
	*net.UDPConn
}

func (udp *udpProxySocket) Addr() net.Addr {
	return udp.LocalAddr()
}

// Holds all the known UDP clients that have not timed out.
type clientCache struct {
	mu      sync.Mutex
	clients map[string]net.Conn // addr string -> connection
}

func newClientCache() *clientCache {
	return &clientCache{clients: map[string]net.Conn{}}
}

func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
	info, found := proxier.getServiceInfo(service)   // 首先获取service信息
	if !found {
		glog.Errorf("Failed to find service: %s", service)
		return
	}
	activeClients := newClientCache() 							// 查看缓存的客户端
	var buffer [4096]byte // 4KiB should be enough for most whole-packets  设置接收缓冲区大小
	for {
		if !info.isActive() {
			break
		}

		// Block until data arrives.
		// TODO: Accumulate a histogram of n or something, to fine tune the buffer size.
		n, cliAddr, err := udp.ReadFrom(buffer[0:])   // 读取缓冲区数据
		if err != nil {
			if e, ok := err.(net.Error); ok {  // 如果错误则关闭
				if e.Temporary() {
					glog.V(1).Infof("ReadFrom had a temporary failure: %v", err)
					continue
				}
			}
			glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err)
			break
		}
		// If this is a client we know already, reuse the connection and goroutine.
		svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout)   // 获取反向代理的客户端信息
		if err != nil {
			continue
		}
		// TODO: It would be nice to let the goroutine handle this write, but we don't
		// really want to copy the buffer.  We could do a pool of buffers or something.
		_, err = svrConn.Write(buffer[0:n])  将读取的数据反向写入到后端代理的客户端
		if err != nil {
			if !logTimeout(err) {
				glog.Errorf("Write failed: %v", err)
				// TODO: Maybe tear down the goroutine for this client/server pair?
			}
			continue
		}
		svrConn.SetDeadline(time.Now().Add(info.timeout))  // 设置超时时间
		if err != nil {
			glog.Errorf("SetDeadline failed: %v", err)
			continue
		}
	}
}

func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) {
	activeClients.mu.Lock()
	defer activeClients.mu.Unlock()

	svrConn, found := activeClients.clients[cliAddr.String()]  // 获取缓存的连接
	if !found {
		// TODO: This could spin up a new goroutine to make the outbound connection,
		// and keep accepting inbound traffic.
		glog.V(2).Infof("New UDP connection from %s", cliAddr)  // 如果没有则创建
		endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr)  // 根据调度算法来选择连接的后端
		if err != nil {
			glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
			return nil, err
		}
		glog.V(4).Infof("Mapped service %s to endpoint %s", service, endpoint)
		svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout)  // 新建连接
		if err != nil {
			// TODO: Try another endpoint?
			glog.Errorf("Dial failed: %v", err)
			return nil, err
		}
		activeClients.clients[cliAddr.String()] = svrConn   // 保存连接
		go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
			defer util.HandleCrash()
			udp.proxyClient(cliAddr, svrConn, activeClients, timeout)
		}(cliAddr, svrConn, activeClients, timeout)  // 通过协程来同步将数据发送给后端连接
	}
	return svrConn, nil
}

// This function is expected to be called as a goroutine.
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
	defer svrConn.Close()
	var buffer [4096]byte
	for {
		n, err := svrConn.Read(buffer[0:])   // 读取数据
		if err != nil {
			if !logTimeout(err) {
				glog.Errorf("Read failed: %v", err)
			}
			break
		}
		svrConn.SetDeadline(time.Now().Add(timeout))  // 设置超时时间
		if err != nil {
			glog.Errorf("SetDeadline failed: %v", err)
			break
		}
		n, err = udp.WriteTo(buffer[0:n], cliAddr)  // 将数据写入
		if err != nil {
			if !logTimeout(err) {
				glog.Errorf("WriteTo failed: %v", err)
			}
			break
		}
	}
	activeClients.mu.Lock()
	delete(activeClients.clients, cliAddr.String())  // 退出后删除该连接信息
	activeClients.mu.Unlock()
}

到此为止,基本上一整个的流量转发的流程就基本结束,这就是用户态流量转发的思路。

总结

k8s早期版本0.4只实现了用户态的流量转发,并支持了两种协议tcp和udp流量转发,转发的规则是根据在etcd中监控到的信息来将对应的service的信息转发到对应的pod上去,从而完成整个流量在k8s集群中的流转。由于本人才疏学浅,如有错误请批评指正。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐