flannel原理初探针对0.1.0版本
flannelflannel是针对k8s设计的三层的网络解决方案。在k8s中为了使pod之间能够使用一种偏平的网络架构,从而完成跨Pod的网络通信。官网给的原理图如下:flannel 使用TUN/TAP 设备,并使用 UDP 创建覆盖网络来封装 IP 数据包。 子网分配是在 etcd 的帮助下完成的,它维护覆盖到实际 IP 的映射。从以上的原理图也可知大概的流程是是怎么操作的。其中具体是一个什么流
flannel
flannel是针对k8s设计的三层的网络解决方案。在k8s中为了使pod之间能够使用一种偏平的网络架构,从而完成跨Pod的网络通信。
官网给的原理图如下:
flannel 使用TUN/TAP 设备,并使用 UDP 创建覆盖网络来封装 IP 数据包。 子网分配是在 etcd 的帮助下完成的,它维护覆盖到实际 IP 的映射。从以上的原理图也可知大概的流程是是怎么操作的。
其中具体是一个什么流程呢,本文就根据flannel的0.1.0版本的源码进一步学习。
flannel流程-基于Backend为udp
首先是main流程
func main() {
// glog will log to tmp files by default. override so all entries
// can flow into journald (if running under systemd)
flag.Set("logtostderr", "true")
// now parse command line args
flag.Parse()
if opts.help {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTION]...\n", os.Args[0])
flag.PrintDefaults()
os.Exit(0)
}
if opts.version {
fmt.Fprintln(os.Stderr, Version)
os.Exit(0)
}
flagsFromEnv("FLANNELD", flag.CommandLine)
be, err := newBackend() // 生成一个代理后端
if err != nil {
log.Info(err)
os.Exit(1)
}
// Register for SIGINT and SIGTERM and wait for one of them to arrive
log.Info("Installing signal handlers")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) // 注册信号相关
exit := make(chan int)
go run(be, exit) // 运行
for {
select {
case <-sigs: // 监控是否退出的信号 如果有则平滑退出
// unregister to get default OS nuke behaviour in case we don't exit cleanly
signal.Stop(sigs)
log.Info("Exiting...")
be.Stop()
case code := <-exit:
log.Infof("%s mode exited", be.Name())
os.Exit(code)
}
}
}
在main主流程中核心的要点就是新生成一个backend,然后通过backend运行起来。
func newBackend() (backend.Backend, error) {
sm := makeSubnetManager() // 生成一个子网的管理实例
config := sm.GetConfig() // 获取配置
var bt struct {
Type string
}
if len(config.Backend) == 0 {
bt.Type = "udp"
} else {
if err := json.Unmarshal(config.Backend, &bt); err != nil {
return nil, fmt.Errorf("Error decoding Backend property of config: %v", err)
}
}
switch strings.ToLower(bt.Type) { // 根据传入的类型生成backend
case "udp":
return udp.New(sm, config.Backend), nil
case "alloc":
return alloc.New(sm), nil
default:
return nil, fmt.Errorf("'%v': unknown backend type", bt.Type)
}
}
func run(be backend.Backend, exit chan int) {
var err error
defer func() {
if err == nil || err == task.ErrCanceled {
exit <- 0
} else {
log.Error(err)
exit <- 1
}
}()
iface, ipaddr, err := lookupIface() // 获取网卡接口
if err != nil {
return
}
if iface.MTU == 0 {
err = fmt.Errorf("Failed to determine MTU for %s interface", ipaddr)
return
}
log.Infof("Using %s as external interface", ipaddr)
sn, err := be.Init(iface, ipaddr, opts.ipMasq) // 初始化后端绑定情况
if err != nil {
log.Error("Could not init %v backend: %v", be.Name(), err)
return
}
writeSubnetFile(sn)
daemon.SdNotify("READY=1")
log.Infof("%s mode initialized", be.Name())
be.Run() // 启动运行
}
本文通过udp的方式来进行分析。
func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
be := UdpBackend{
sm: sm,
rawCfg: config,
stop: make(chan bool),
}
be.cfg.Port = defaultPort // 监听默认的端口
return &be
}
func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
// Parse our configuration
if len(m.rawCfg) > 0 {
if err := json.Unmarshal(m.rawCfg, &m.cfg); err != nil {
return nil, fmt.Errorf("error decoding UDP backend config: %v", err)
}
}
// Acquire the lease form subnet manager
attrs := subnet.BaseAttrs{
PublicIP: ip.FromIP(extIP),
}
sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop)
if err != nil {
if err == task.ErrCanceled {
return nil, err
} else {
return nil, fmt.Errorf("failed to acquire lease: %v", err)
}
}
// Tunnel's subnet is that of the whole overlay network (e.g. /16)
// and not that of the individual host (e.g. /24)
m.tunNet = ip.IP4Net{
IP: sn.IP,
PrefixLen: m.sm.GetConfig().Network.PrefixLen,
} // 配置一个tun设备
// TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
m.mtu = extIface.MTU - encapOverhead // 控制包的长度免得超过长度
if err = m.initTun(ipMasq); err != nil { // 初始化设备
return nil, err
}
m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.cfg.Port}) // 监听udp的网络端口数据
if err != nil {
return nil, fmt.Errorf("failed to start listening on UDP socket: %v", err)
}
m.ctl, m.ctl2, err = newCtlSockets() // 生成一对读写套接字
if err != nil {
return nil, fmt.Errorf("failed to create control socket: %v", err)
}
return &backend.SubnetDef{
Net: sn,
MTU: m.mtu,
}, nil
}
func (m *UdpBackend) Run() {
// one for each goroutine below
m.wg.Add(2)
go func() {
runCProxy(m.tun, m.conn, m.ctl2, m.tunNet.IP, m.mtu) // 进行tun设备流量的代理转发
m.wg.Done()
}()
go func() {
m.sm.LeaseRenewer(m.stop)
m.wg.Done()
}()
m.monitorEvents() // 通过etcd监控网络信息的变更
m.wg.Wait()
}
...
func runCProxy(tun *os.File, conn *net.UDPConn, ctl *os.File, tunIP ip.IP4, tunMTU int) {
var log_errors int
if log.V(1) {
log_errors = 1
}
c, err := conn.File() // 获取udp的连接信息
if err != nil {
log.Error("Converting UDPConn to File failed: ", err)
return
}
defer c.Close()
C.run_proxy(
C.int(tun.Fd()),
C.int(c.Fd()),
C.int(ctl.Fd()),
C.in_addr_t(tunIP.NetworkOrder()),
C.size_t(tunMTU),
C.int(log_errors),
) // 通过c语言的函数进行处理
}
先通过tun设备的初始化,最后就运行到来了runCProxy函数,该函数是通过c语言来实现。
void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, size_t tun_mtu, int log_errors) {
char *buf;
struct pollfd fds[3] = {
{
.fd = tun,
.events = POLLIN
},
{
.fd = sock,
.events = POLLIN
},
{
.fd = ctl,
.events = POLLIN
},
}; // 配置三个文件描述符
exit_flag = 0;
tun_addr = tun_ip;
log_enabled = log_errors;
buf = (char *) malloc(tun_mtu); // 获取Buf大小
if( !buf ) {
log_error("Failed to allocate %d byte buffer\n", tun_mtu);
exit(1);
}
while( !exit_flag ) {
int nfds = poll(fds, 3, -1); // 监听文件描述符
if( nfds < 0 ) {
if( errno == EINTR )
continue;
log_error("Poll failed: %s\n", strerror(errno));
exit(1);
}
if( fds[0].revents & POLLIN )
tun_to_udp(tun, sock, buf, tun_mtu); // 接受tun设备的数据通过udp发送对应的节点
if( fds[1].revents & POLLIN )
udp_to_tun(sock, tun, buf, tun_mtu); // 将udp的接受的数据转发到本机的tun设备上
if( fds[2].revents & POLLIN ) // 监听发送的命令用来监听etcd中的网络的变化
process_cmd(ctl);
}
free(buf);
}
通过将tun、udp和ctl的三个文件描述符传入,以达到监听设备的数据并将设备的数据发送,并根据etcd的变化来动态改变转发信息。
tun_to_udp
static struct sockaddr_in *find_route(in_addr_t dst) {
size_t i;
for( i = 0; i < routes_cnt; i++ ) {
if( contains(routes[i].dst, dst) ) { // 根据目的地址匹配找到远端信息
// packets for same dest tend to come in bursts. swap to front make it faster for subsequent ones
if( i != 0 ) {
struct route_entry tmp = routes[i];
routes[i] = routes[0];
routes[0] = tmp;
}
return &routes[0].next_hop;
}
}
return NULL;
}
...
static void tun_to_udp(int tun, int sock, char *buf, size_t buflen) {
struct iphdr *iph;
struct sockaddr_in *next_hop;
ssize_t pktlen = tun_recv_packet(tun, buf, buflen); // 接受tun设备发送的数据
if( pktlen < 0 )
return;
iph = (struct iphdr *)buf;
next_hop = find_route((in_addr_t) iph->daddr); // 通过ip头部信息配合路由表查找转发的目的节点
if( !next_hop ) {
send_net_unreachable(tun, buf);
return;
}
if( !decrement_ttl(iph) ) {
/* TTL went to 0, discard.
* TODO: send back ICMP Time Exceeded
*/
return;
}
sock_send_packet(sock, buf, pktlen, next_hop); // 匹配到了目标节点就发送数据
}
...
static void sock_send_packet(int sock, char *pkt, size_t pktlen, struct sockaddr_in *dst) {
ssize_t nsent = sendto(sock, pkt, pktlen, 0, (struct sockaddr *)dst, sizeof(struct sockaddr_in)); // 将接受的数据发送出去
if( nsent != pktlen ) {
if( nsent < 0 ) {
log_error("UDP send to %s:%hu failed: %s\n",
inet_ntoa(dst->sin_addr), ntohs(dst->sin_port), strerror(errno));
} else {
log_error("Was only able to send %d out of %d bytes to %s:%hu\n",
(int)nsent, (int)pktlen, inet_ntoa(dst->sin_addr), ntohs(dst->sin_port));
}
}
}
从tun_to_udp函数的处理流程可知,通过tun设备接入的数据,通过解析包数据来匹配对应的远端的节点信息,并将接受的数据发送对目的节点。
udp_to_tun
static void udp_to_tun(int sock, int tun, char *buf, size_t buflen) {
struct iphdr *iph;
ssize_t pktlen = sock_recv_packet(sock, buf, buflen); // 接受udp的数据
if( pktlen < 0 ) {
return;
}
iph = (struct iphdr *)buf;
if( !decrement_ttl(iph) ) {
/* TTL went to 0, discard.
* TODO: send back ICMP Time Exceeded
*/
return;
}
tun_send_packet(tun, buf, pktlen); //将udp的数据写入到tun设备
}
...
static void tun_send_packet(int tun, char *pkt, size_t pktlen) {
ssize_t nsent = write(tun, pkt, pktlen); // 将接受的数据写入tun
if( nsent != pktlen ) {
if( nsent < 0 ) {
log_error("TUN send failed: %s\n", strerror(errno));
} else {
log_error("Was only able to send %d out of %d bytes to TUN\n", (int)nsent, (int)pktlen);
}
}
}
通过将udp接受到的数据写入tun里面,从而完成数据的转发功能。
process_cmd
static void process_cmd(int ctl) {
struct command cmd;
struct ip_net ipn;
struct sockaddr_in sa = {
.sin_family = AF_INET
};
ssize_t nrecv = recv(ctl, (char *) &cmd, sizeof(cmd), 0);
if( nrecv < 0 ) {
log_error("CTL recv failed: %s\n", strerror(errno));
return;
}
if( cmd.cmd == CMD_SET_ROUTE ) {
ipn.mask = netmask(cmd.dest_net_len);
ipn.ip = cmd.dest_net & ipn.mask;
sa.sin_addr.s_addr = cmd.next_hop_ip;
sa.sin_port = htons(cmd.next_hop_port);
set_route(ipn, &sa);
} else if( cmd.cmd == CMD_DEL_ROUTE ) {
ipn.mask = netmask(cmd.dest_net_len);
ipn.ip = cmd.dest_net & ipn.mask;
del_route(ipn);
} else if( cmd.cmd == CMD_STOP ) {
exit_flag = 1;
}
}
通过监听的etcd传入的命令来维护数据转发的规则表,以提供给tun_to_udp使用。
总结
本文主要针对0.1.0版本的flannel的udp转发功能进行了简单的原理分析,从分析过程中可知,通过udp方式来实现k8s不同的pod的数据的转发的主要流程,从实现方式上来看,流量数据的来回拷贝很大程度上面会影响效率(当前的flannel提供了多种的网络转发方案,可自由选择),不过这个解决方案也是低成本实现的一种方式,利用tun设备加etcd的路由维护来完成流量转发。有关tun设备相关的操作可参考后文链接,有助于帮助大家理解。由于本人才疏学浅,如有错误请批评指正。
参考链接
Linux虚拟网络设备之tun/tap
更多推荐
所有评论(0)