flannel

flannel是针对k8s设计的三层的网络解决方案。在k8s中为了使pod之间能够使用一种偏平的网络架构,从而完成跨Pod的网络通信。

官网给的原理图如下:

在这里插入图片描述

flannel 使用TUN/TAP 设备,并使用 UDP 创建覆盖网络来封装 IP 数据包。 子网分配是在 etcd 的帮助下完成的,它维护覆盖到实际 IP 的映射。从以上的原理图也可知大概的流程是是怎么操作的。

其中具体是一个什么流程呢,本文就根据flannel的0.1.0版本的源码进一步学习。

flannel流程-基于Backend为udp

首先是main流程

func main() {
	// glog will log to tmp files by default. override so all entries
	// can flow into journald (if running under systemd)
	flag.Set("logtostderr", "true")

	// now parse command line args
	flag.Parse()

	if opts.help {
		fmt.Fprintf(os.Stderr, "Usage: %s [OPTION]...\n", os.Args[0])
		flag.PrintDefaults()
		os.Exit(0)
	}

	if opts.version {
		fmt.Fprintln(os.Stderr, Version)
		os.Exit(0)
	}

	flagsFromEnv("FLANNELD", flag.CommandLine)

	be, err := newBackend()    // 生成一个代理后端
	if err != nil {
		log.Info(err)
		os.Exit(1)
	}

	// Register for SIGINT and SIGTERM and wait for one of them to arrive
	log.Info("Installing signal handlers")
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)  // 注册信号相关

	exit := make(chan int)
	go run(be, exit)     // 运行

	for {
		select {
		case <-sigs:   // 监控是否退出的信号 如果有则平滑退出
			// unregister to get default OS nuke behaviour in case we don't exit cleanly
			signal.Stop(sigs)

			log.Info("Exiting...")
			be.Stop()

		case code := <-exit:
			log.Infof("%s mode exited", be.Name())
			os.Exit(code)
		}
	}
}

在main主流程中核心的要点就是新生成一个backend,然后通过backend运行起来。

func newBackend() (backend.Backend, error) {
	sm := makeSubnetManager()   // 生成一个子网的管理实例
	config := sm.GetConfig()    // 获取配置

	var bt struct {
		Type string
	}

	if len(config.Backend) == 0 {
		bt.Type = "udp"
	} else {
		if err := json.Unmarshal(config.Backend, &bt); err != nil {
			return nil, fmt.Errorf("Error decoding Backend property of config: %v", err)
		}
	}

	switch strings.ToLower(bt.Type) {  // 根据传入的类型生成backend
	case "udp":
		return udp.New(sm, config.Backend), nil
	case "alloc":
		return alloc.New(sm), nil
	default:
		return nil, fmt.Errorf("'%v': unknown backend type", bt.Type)
	}
}

func run(be backend.Backend, exit chan int) {
	var err error
	defer func() {
		if err == nil || err == task.ErrCanceled {
			exit <- 0
		} else {
			log.Error(err)
			exit <- 1
		}
	}()

	iface, ipaddr, err := lookupIface()    // 获取网卡接口
	if err != nil {
		return
	}

	if iface.MTU == 0 {
		err = fmt.Errorf("Failed to determine MTU for %s interface", ipaddr)
		return
	}

	log.Infof("Using %s as external interface", ipaddr)

	sn, err := be.Init(iface, ipaddr, opts.ipMasq)   // 初始化后端绑定情况
	if err != nil {
		log.Error("Could not init %v backend: %v", be.Name(), err)
		return
	}

	writeSubnetFile(sn)
	daemon.SdNotify("READY=1")

	log.Infof("%s mode initialized", be.Name())
	be.Run()    		// 启动运行
}


本文通过udp的方式来进行分析。

func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
	be := UdpBackend{
		sm:     sm,
		rawCfg: config,
		stop:   make(chan bool),
	}
	be.cfg.Port = defaultPort  // 监听默认的端口
	return &be
}

func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
	// Parse our configuration
	if len(m.rawCfg) > 0 {
		if err := json.Unmarshal(m.rawCfg, &m.cfg); err != nil {
			return nil, fmt.Errorf("error decoding UDP backend config: %v", err)
		}
	}

	// Acquire the lease form subnet manager
	attrs := subnet.BaseAttrs{
		PublicIP: ip.FromIP(extIP),
	}

	sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop)
	if err != nil {
		if err == task.ErrCanceled {
			return nil, err
		} else {
			return nil, fmt.Errorf("failed to acquire lease: %v", err)
		}
	}

	// Tunnel's subnet is that of the whole overlay network (e.g. /16)
	// and not that of the individual host (e.g. /24)
	m.tunNet = ip.IP4Net{
		IP:        sn.IP,
		PrefixLen: m.sm.GetConfig().Network.PrefixLen,
	}    // 配置一个tun设备

	// TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
	m.mtu = extIface.MTU - encapOverhead   // 控制包的长度免得超过长度

	if err = m.initTun(ipMasq); err != nil {   // 初始化设备
		return nil, err
	}

	m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.cfg.Port})   // 监听udp的网络端口数据
	if err != nil {
		return nil, fmt.Errorf("failed to start listening on UDP socket: %v", err)
	}


	m.ctl, m.ctl2, err = newCtlSockets()   // 生成一对读写套接字
	if err != nil {
		return nil, fmt.Errorf("failed to create control socket: %v", err)
	}

	return &backend.SubnetDef{
		Net: sn,
		MTU: m.mtu,
	}, nil
}

func (m *UdpBackend) Run() {
	// one for each goroutine below
	m.wg.Add(2)

	go func() {
		runCProxy(m.tun, m.conn, m.ctl2, m.tunNet.IP, m.mtu)   // 进行tun设备流量的代理转发
		m.wg.Done()
	}()

	go func() {
		m.sm.LeaseRenewer(m.stop)
		m.wg.Done()
	}()

	m.monitorEvents()    // 通过etcd监控网络信息的变更

	m.wg.Wait()
}

...

func runCProxy(tun *os.File, conn *net.UDPConn, ctl *os.File, tunIP ip.IP4, tunMTU int) {
	var log_errors int
	if log.V(1) {
		log_errors = 1
	}

	c, err := conn.File()   // 获取udp的连接信息
	if err != nil {
		log.Error("Converting UDPConn to File failed: ", err)
		return
	}
	defer c.Close()

	C.run_proxy(
		C.int(tun.Fd()),
		C.int(c.Fd()),
		C.int(ctl.Fd()),
		C.in_addr_t(tunIP.NetworkOrder()),
		C.size_t(tunMTU),
		C.int(log_errors),
	)   // 通过c语言的函数进行处理
}

先通过tun设备的初始化,最后就运行到来了runCProxy函数,该函数是通过c语言来实现。

void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, size_t tun_mtu, int log_errors) {
	char *buf;
	struct pollfd fds[3] = {
		{
			.fd = tun,
			.events = POLLIN 
		},
		{
			.fd = sock,
			.events = POLLIN
		},
		{
			.fd = ctl,
			.events = POLLIN
		},
	};    // 配置三个文件描述符

	exit_flag = 0;
	tun_addr = tun_ip;
	log_enabled = log_errors;

	buf = (char *) malloc(tun_mtu);   // 获取Buf大小
	if( !buf ) {
		log_error("Failed to allocate %d byte buffer\n", tun_mtu);
		exit(1);
	}

	while( !exit_flag ) {
		int nfds = poll(fds, 3, -1);   // 监听文件描述符
		if( nfds < 0 ) {
			if( errno == EINTR )
				continue;

			log_error("Poll failed: %s\n", strerror(errno));
			exit(1);
		}

		if( fds[0].revents & POLLIN )
			tun_to_udp(tun, sock, buf, tun_mtu);  //  接受tun设备的数据通过udp发送对应的节点

		if( fds[1].revents & POLLIN )
			udp_to_tun(sock, tun, buf, tun_mtu);   // 将udp的接受的数据转发到本机的tun设备上

		if( fds[2].revents & POLLIN )  			// 监听发送的命令用来监听etcd中的网络的变化
			process_cmd(ctl);
	}

	free(buf);
}

通过将tun、udp和ctl的三个文件描述符传入,以达到监听设备的数据并将设备的数据发送,并根据etcd的变化来动态改变转发信息。

tun_to_udp
static struct sockaddr_in *find_route(in_addr_t dst) {
	size_t i;

	for( i = 0; i < routes_cnt; i++ ) {
		if( contains(routes[i].dst, dst) ) {  // 根据目的地址匹配找到远端信息
			// packets for same dest tend to come in bursts. swap to front make it faster for subsequent ones
			if( i != 0 ) {
				struct route_entry tmp = routes[i];
				routes[i] = routes[0];
				routes[0] = tmp;
			}

			return &routes[0].next_hop;
		}
	}

	return NULL;
}

...
  
static void tun_to_udp(int tun, int sock, char *buf, size_t buflen) {
	struct iphdr *iph;
	struct sockaddr_in *next_hop;

	ssize_t pktlen = tun_recv_packet(tun, buf, buflen);   // 接受tun设备发送的数据
	if( pktlen < 0 )
		return;
	
	iph = (struct iphdr *)buf;

	next_hop = find_route((in_addr_t) iph->daddr);  // 通过ip头部信息配合路由表查找转发的目的节点
	if( !next_hop ) {
		send_net_unreachable(tun, buf);
		return;
	}

	if( !decrement_ttl(iph) ) {
		/* TTL went to 0, discard.
		 * TODO: send back ICMP Time Exceeded
		 */
		return;
	}

	sock_send_packet(sock, buf, pktlen, next_hop);   // 匹配到了目标节点就发送数据
}

...
  
static void sock_send_packet(int sock, char *pkt, size_t pktlen, struct sockaddr_in *dst) {
	ssize_t nsent = sendto(sock, pkt, pktlen, 0, (struct sockaddr *)dst, sizeof(struct sockaddr_in));   // 将接受的数据发送出去

	if( nsent != pktlen ) {
		if( nsent < 0 ) {
			log_error("UDP send to %s:%hu failed: %s\n",
					inet_ntoa(dst->sin_addr), ntohs(dst->sin_port), strerror(errno));
		} else {
			log_error("Was only able to send %d out of %d bytes to %s:%hu\n",
					(int)nsent, (int)pktlen, inet_ntoa(dst->sin_addr), ntohs(dst->sin_port));
		}
	}
}

从tun_to_udp函数的处理流程可知,通过tun设备接入的数据,通过解析包数据来匹配对应的远端的节点信息,并将接受的数据发送对目的节点。

udp_to_tun
static void udp_to_tun(int sock, int tun, char *buf, size_t buflen) {
	struct iphdr *iph;

	ssize_t pktlen = sock_recv_packet(sock, buf, buflen);   // 接受udp的数据
	if( pktlen < 0 ) {
		return;
	}

	iph = (struct iphdr *)buf;

	if( !decrement_ttl(iph) ) {
		/* TTL went to 0, discard.
		 * TODO: send back ICMP Time Exceeded
		 */
		return;
	}

	tun_send_packet(tun, buf, pktlen);   //将udp的数据写入到tun设备
}

...
  
static void tun_send_packet(int tun, char *pkt, size_t pktlen) {
	ssize_t nsent = write(tun, pkt, pktlen);   // 将接受的数据写入tun

	if( nsent != pktlen ) {
		if( nsent < 0 ) {
			log_error("TUN send failed: %s\n", strerror(errno));
		} else {
			log_error("Was only able to send %d out of %d bytes to TUN\n", (int)nsent, (int)pktlen);
		}
	}
}

通过将udp接受到的数据写入tun里面,从而完成数据的转发功能。

process_cmd
static void process_cmd(int ctl) {
	struct command cmd;
	struct ip_net ipn;
	struct sockaddr_in sa = {
		.sin_family = AF_INET
	};

	ssize_t nrecv = recv(ctl, (char *) &cmd, sizeof(cmd), 0);
	if( nrecv < 0 ) {
		log_error("CTL recv failed: %s\n", strerror(errno));
		return;
	}

	if( cmd.cmd == CMD_SET_ROUTE ) {
		ipn.mask = netmask(cmd.dest_net_len);
		ipn.ip = cmd.dest_net & ipn.mask;

		sa.sin_addr.s_addr = cmd.next_hop_ip;
		sa.sin_port = htons(cmd.next_hop_port);

		set_route(ipn, &sa);

	} else if( cmd.cmd == CMD_DEL_ROUTE ) {
		ipn.mask = netmask(cmd.dest_net_len);
		ipn.ip = cmd.dest_net & ipn.mask;

		del_route(ipn);

	} else if( cmd.cmd == CMD_STOP ) {
		exit_flag = 1;
	}
}

通过监听的etcd传入的命令来维护数据转发的规则表,以提供给tun_to_udp使用。

总结

本文主要针对0.1.0版本的flannel的udp转发功能进行了简单的原理分析,从分析过程中可知,通过udp方式来实现k8s不同的pod的数据的转发的主要流程,从实现方式上来看,流量数据的来回拷贝很大程度上面会影响效率(当前的flannel提供了多种的网络转发方案,可自由选择),不过这个解决方案也是低成本实现的一种方式,利用tun设备加etcd的路由维护来完成流量转发。有关tun设备相关的操作可参考后文链接,有助于帮助大家理解。由于本人才疏学浅,如有错误请批评指正。

参考链接
Linux虚拟网络设备之tun/tap

一起动手写一个VPN

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐