CNI简介

Container Networking Interface(CNI)提供了一种linux的应用容器的插件化网络解决方案。最初是由rkt Networking Proposal发展而来。他的模型只涉及两个概念:

容器(container) : 容器是拥有独立linux网络命名空间的独立单元。比如rkt/docker创建出来的容器。
网络(network): 网络指代了可以相互联系的一组实体。这些实体拥有各自独立唯一的ip。这些实体可以是容器,是物理机,或者其他网络设备(比如路由器)等。

containernetworking项目

Github地址:

https://github.com/containernetworking/

这个地址包括两个项目,一个是CNI项目,一个是plugin项目,plugin主要用来适配CNI项目。
plugin项目主要包括以下插件:

root@ubuntu:~/go/src/github.com/containernetworking/plugins/plugins/main# ls -1
bridge
host-device
ipvlan
loopback
macvlan
ptp
vlan
root@ubuntu:~/go/src/github.com/containernetworking/plugins/plugins/main# 

以常见的bridge插件开始分析

CNI Bridge插件分析

插件路径:

M:\root\go\src\github.com\containernetworking\plugins\plugins\main\bridge\bridge.go

函数入口:

func main() {
	// TODO: implement plugin version
	skel.PluginMain(cmdAdd, cmdGet, cmdDel, version.All, "TODO")
}

skel.PluginMain函数由"github.com/containernetworking/cni/pkg/skel"包导入,这个包属于CNI项目,代码路径:

M:\root\go\src\github.com\containernetworking\cni\pkg\skel\skel.go

进入skel.go文件,查看PluginMain函数:

// PluginMainWithError is the core "main" for a plugin. It accepts
// callback functions for add, check, and del CNI commands and returns an error.
//
// The caller must also specify what CNI spec versions the plugin supports.
//
// It is the responsibility of the caller to check for non-nil error return.
//
// For a plugin to comply with the CNI spec, it must print any error to stdout
// as JSON and then exit with nonzero status code.
//
// To let this package automatically handle errors and call os.Exit(1) for you,
// use PluginMain() instead.
func PluginMainWithError(cmdAdd, cmdCheck, cmdDel func(_ *CmdArgs) error, versionInfo version.PluginInfo, about string) *types.Error {
	return (&dispatcher{
		Getenv: os.Getenv,
		Stdin:  os.Stdin,
		Stdout: os.Stdout,
		Stderr: os.Stderr,
	}).pluginMain(cmdAdd, cmdCheck, cmdDel, versionInfo, about)
}

// PluginMain is the core "main" for a plugin which includes automatic error handling.
//
// The caller must also specify what CNI spec versions the plugin supports.
//
// The caller can specify an "about" string, which is printed on stderr
// when no CNI_COMMAND is specified. The reccomended output is "CNI plugin <foo> v<version>"
//
// When an error occurs in either cmdAdd, cmdCheck, or cmdDel, PluginMain will print the error
// as JSON to stdout and call os.Exit(1).
//
// To have more control over error handling, use PluginMainWithError() instead.
func PluginMain(cmdAdd, cmdCheck, cmdDel func(_ *CmdArgs) error, versionInfo version.PluginInfo, about string) {
	if e := PluginMainWithError(cmdAdd, cmdCheck, cmdDel, versionInfo, about); e != nil {
		if err := e.Print(); err != nil {
			log.Print("Error writing error JSON to stdout: ", err)
		}
		os.Exit(1)
	}
}

PluginMain函数把传入的变量再传入PluginMainWithError函数,PluginMainWithError函数再调用dispatcher变量的pluginMain方法,pluginMain方法定义如下:

func (t *dispatcher) pluginMain(cmdAdd, cmdCheck, cmdDel func(_ *CmdArgs) error, versionInfo version.PluginInfo, about string) *types.Error {
	//获取由环境变量导入的参数
	cmd, cmdArgs, err := t.getCmdArgsFromEnv()
	if err != nil {
		// Print the about string to stderr when no command is set
		if _, ok := err.(missingEnvError); ok && t.Getenv("CNI_COMMAND") == "" && about != "" {
			fmt.Fprintln(t.Stderr, about)
			return nil
		}
		return createTypedError(err.Error())
	}

	if cmd != "VERSION" {
		err = validateConfig(cmdArgs.StdinData)
		if err != nil {
			return createTypedError(err.Error())
		}
	}

	switch cmd {
	case "ADD":
		err = t.checkVersionAndCall(cmdArgs, versionInfo, cmdAdd)       //检查版本并调用传入函数
	case "CHECK":
		configVersion, err := t.ConfVersionDecoder.Decode(cmdArgs.StdinData)
		if err != nil {
			return createTypedError(err.Error())
		}
		if gtet, err := version.GreaterThanOrEqualTo(configVersion, "0.4.0"); err != nil {
			return createTypedError(err.Error())
		} else if !gtet {
			return &types.Error{
				Code: types.ErrIncompatibleCNIVersion,
				Msg:  "config version does not allow CHECK",
			}
		}
		for _, pluginVersion := range versionInfo.SupportedVersions() {
			gtet, err := version.GreaterThanOrEqualTo(pluginVersion, configVersion)
			if err != nil {
				return createTypedError(err.Error())
			} else if gtet {
				if err := t.checkVersionAndCall(cmdArgs, versionInfo, cmdCheck); err != nil {
					return createTypedError(err.Error())
				}
				return nil
			}
		}
		return &types.Error{
			Code: types.ErrIncompatibleCNIVersion,
			Msg:  "plugin version does not allow CHECK",
		}
	case "DEL":
		err = t.checkVersionAndCall(cmdArgs, versionInfo, cmdDel)
	case "VERSION":
		err = versionInfo.Encode(t.Stdout)
	default:
		return createTypedError("unknown CNI_COMMAND: %v", cmd)
	}

	if err != nil {
		if e, ok := err.(*types.Error); ok {
			// don't wrap Error in Error
			return e
		}
		return createTypedError(err.Error())
	}
	return nil
}

pluginMain方法首先根据环境变量获取当前请求的动作是ADD或者DEL还是CHECK,然后进入对应的分支,接着使用checkVersionAndCall方法检查传入的版本号然后调用传入的参数,这里被调用的函数就是插件本身传入的函数,如bridge.go文件里面定义的“cmdAdd, cmdGet, cmdDel”这些函数,过程如下:

func (t *dispatcher) checkVersionAndCall(cmdArgs *CmdArgs, pluginVersionInfo version.PluginInfo, toCall func(*CmdArgs) error) error {
	configVersion, err := t.ConfVersionDecoder.Decode(cmdArgs.StdinData)
	if err != nil {
		return err
	}
	verErr := t.VersionReconciler.Check(configVersion, pluginVersionInfo)
	if verErr != nil {
		return &types.Error{
			Code:    types.ErrIncompatibleCNIVersion,
			Msg:     "incompatible CNI versions",
			Details: verErr.Details(),
		}
	}

	return toCall(cmdArgs)
}

最终使用toCall调用到了bridge.go文件对应的函数。

bridge cmdAdd函数分析

bridge插件的cmdAdd方法用于添加一个容器到指定的网桥,核心过程是首先在宿主机创建网桥然后在容器内部通过系统调用创建VETH接口对,然后把属于host的接口设备为host命名空间,最后再配置容器接口IP地址:

1、加载输入参数

func cmdAdd(args *skel.CmdArgs) error {
	n, cniVersion, err := loadNetConf(args.StdinData)
	if err != nil {
		return err
	}

2、创建网桥

br, brInterface, err := setupBridge(n)
	if err != nil {
		return err
	}

3、获取容器命名空间

netns, err := ns.GetNS(args.Netns)
	if err != nil {
		return fmt.Errorf("failed to open netns %q: %v", args.Netns, err)
	}
	defer netns.Close()

4、创建VETH接口对

--------------------------
hostInterface, containerInterface, err := setupVeth(netns, br, args.IfName, n.MTU, n.HairpinMode)
	if err != nil {
		return err
	}
--------------------------
setupVeth函数如下,核心调用为:netns.Do:

func setupVeth(netns ns.NetNS, br *netlink.Bridge, ifName string, mtu int, hairpinMode bool) (*current.Interface, *current.Interface, error) {
	contIface := &current.Interface{}
	hostIface := &current.Interface{}

	err := netns.Do(func(hostNS ns.NetNS) error {
		// create the veth pair in the container and move host end into host netns
		hostVeth, containerVeth, err := ip.SetupVeth(ifName, mtu, hostNS)
		if err != nil {
			return err
		}
		contIface.Name = containerVeth.Name
		contIface.Mac = containerVeth.HardwareAddr.String()
		contIface.Sandbox = netns.Path()
		hostIface.Name = hostVeth.Name
		return nil
	})
	if err != nil {
		return nil, nil, err
	}

	// need to lookup hostVeth again as its index has changed during ns move
	hostVeth, err := netlink.LinkByName(hostIface.Name)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to lookup %q: %v", hostIface.Name, err)
	}
	hostIface.Mac = hostVeth.Attrs().HardwareAddr.String()

	// connect host veth end to the bridge
	if err := netlink.LinkSetMaster(hostVeth, br); err != nil {
		return nil, nil, fmt.Errorf("failed to connect %q to bridge %v: %v", hostVeth.Attrs().Name, br.Attrs().Name, err)
	}

	// set hairpin mode
	if err = netlink.LinkSetHairpin(hostVeth, hairpinMode); err != nil {
		return nil, nil, fmt.Errorf("failed to setup hairpin mode for %v: %v", hostVeth.Attrs().Name, err)
	}

	return hostIface, contIface, nil
}
--------------------------
netns.Do函数如下,核心调用为:hostNS, err := GetCurrentNS(),进入容器命名空间,然后使用协程开始创建接口,最后使用defer关闭容器空间:

func (ns *netNS) Do(toRun func(NetNS) error) error {
	if err := ns.errorIfClosed(); err != nil {
		return err
	}

	containedCall := func(hostNS NetNS) error {
		threadNS, err := GetCurrentNS()
		if err != nil {
			return fmt.Errorf("failed to open current netns: %v", err)
		}
		defer threadNS.Close()

		// switch to target namespace
		if err = ns.Set(); err != nil {
			return fmt.Errorf("error switching to ns %v: %v", ns.file.Name(), err)
		}
		defer threadNS.Set() // switch back

		return toRun(hostNS)
	}

	// save a handle to current network namespace
	hostNS, err := GetCurrentNS()
	if err != nil {
		return fmt.Errorf("Failed to open current namespace: %v", err)
	}
	defer hostNS.Close()

	var wg sync.WaitGroup
	wg.Add(1)

	var innerError error
	go func() {
		defer wg.Done()
		runtime.LockOSThread()
		innerError = containedCall(hostNS)
	}()
	wg.Wait()

	return innerError
}

5、使用ipam获取IP地址并计算网关地址

// run the IPAM plugin and get back the config to apply
	r, err := ipam.ExecAdd(n.IPAM.Type, args.StdinData)
	if err != nil {
		return err
	}

	// Convert whatever the IPAM result was into the current Result type
	result, err := current.NewResultFromResult(r)
	if err != nil {
		return err
	}

	if len(result.IPs) == 0 {
		return errors.New("IPAM plugin returned missing IP config")
	}

	result.Interfaces = []*current.Interface{brInterface, hostInterface, containerInterface}

	// Gather gateway information for each IP family
	gwsV4, gwsV6, err := calcGateways(result, n)
	if err != nil {
		return err
	}

6、配置容器IP地址

// Configure the container hardware address and IP address(es)
	if err := netns.Do(func(_ ns.NetNS) error {
		contVeth, err := net.InterfaceByName(args.IfName)
		if err != nil {
			return err
		}

		// Disable IPv6 DAD just in case hairpin mode is enabled on the
		// bridge. Hairpin mode causes echos of neighbor solicitation
		// packets, which causes DAD failures.
		for _, ipc := range result.IPs {
			if ipc.Version == "6" && (n.HairpinMode || n.PromiscMode) {
				if err := disableIPV6DAD(args.IfName); err != nil {
					return err
				}
				break
			}
		}

		// Add the IP to the interface
		if err := ipam.ConfigureIface(args.IfName, result); err != nil {
			return err
		}

		// Send a gratuitous arp
		for _, ipc := range result.IPs {
			if ipc.Version == "4" {
				_ = arping.GratuitousArpOverIface(ipc.Address.IP, *contVeth)
			}
		}
		return nil
	}); err != nil {
		return err
	}

7、网关地址设置为网桥IP

if n.IsGW {
		var firstV4Addr net.IP
		// Set the IP address(es) on the bridge and enable forwarding
		for _, gws := range []*gwInfo{gwsV4, gwsV6} {
			for _, gw := range gws.gws {
				if gw.IP.To4() != nil && firstV4Addr == nil {
					firstV4Addr = gw.IP
				}

				err = ensureBridgeAddr(br, gws.family, &gw, n.ForceAddress)
				if err != nil {
					return fmt.Errorf("failed to set bridge addr: %v", err)
				}
			}

			if gws.gws != nil {
				if err = enableIPForward(gws.family); err != nil {
					return fmt.Errorf("failed to enable forwarding: %v", err)
				}
			}
		}
	}
Logo

更多推荐