修复在 k8s runtime 为 containerd 时,端口转发不稳定问题

抛出问题

我们在开发nocalhost项目,此项目中有个功能叫做端口转发,内部使用了 k8s 的端口转发,在上面主动加了 heartbeat 心跳检查。 有用户反馈使用了 containerd 之后,端口转发不稳定,总是失败。

复现步骤

  • 使用 Containerd 作为 k8s runtime, 并部署一个 Pod
  • Pod 中监听业务端口
  • 开发端口转发
  • 请求正常,得到正确结果,愈合预期
  • Pod 中关闭业务监听的端口
  • 请求失败,得到失败结果,符合预期
  • Pod 中开启业务监听的端口
  • 请求失败,得到失败结果,不符合预期

初步分析

看起来就像是Pod关闭端口后,连接出现了问题,然后即便再次开启端口,连接也无法恢复。

排查思路

首先想到的就是我们的端口转发是用的 client-go 中的 portforward,可能是用法不当。因此,使用原生的 kubectl port-forward 后,发现也有问题。初步将矛头指向 containerd了

回想一下 k8s 实现端口转发的原理

  • 监听本地端口,本地请求此端口,产生 conn1
  • k8s apiserver 监听端口
  • 本地连接上 apiserver 的监听端口,产生 conn2
  • apiserver 和 pod 之间建立连接 conn3(后面发现这里的认知是错误的,正确的方式)
  • apiserver 和 pod 之间,使用 socat 和 nsenter 实现 stream 的copy(docker),使用 pkg/netns/netns_linux.go:211,可以在指定的进程中监听网络,端口不会冲突(
    containerd)(这都什么神仙操作,学废了:),不过这都是之后看源码发现的)

然后追踪了一下 client-go 中的 port-forward 源码,发现上面提到的本地和apiserver建立的连接 conn2 是复用的,而复用带来的问题就是不会及时的检查此连接是否有效
vendor/k8s.io/client-go/tools/portforward/portforward.go:188

func (pf *PortForwarder) ForwardPorts() error {
	defer pf.Close()

	var err error
	pf.streamConn, _, err = pf.dialer.Dial(portforward.PortForwardProtocolV1Name) // 这里建立了一个和apiserver的连接,后续就没有尝试重新建立连接了
	if err != nil {
		return fmt.Errorf("error upgrading connection: %s", err)
	}
	defer pf.streamConn.Close()

	return pf.forward()
}

vendor/k8s.io/client-go/tools/portforward/portforward.go:324

func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
	...
	// create error stream
	headers := http.Header{}
	headers.Set(v1.StreamType, v1.StreamTypeError)
	headers.Set(v1.PortHeader, fmt.Sprintf("%d", port.Remote))
	headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
	errorStream, err := pf.streamConn.CreateStream(headers) // 使用和apiserver的连接,建立一个错误流,这里都没有检查此连接的可用性
	if err != nil {
		runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
		return
	}
	// create data stream
	headers.Set(v1.StreamType, v1.StreamTypeData)
	dataStream, err := pf.streamConn.CreateStream(headers) //使用和apiserver的连接,建立一个数据流,这里也没雨检查此链接的可用性
	if err != nil {
		runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
		return
	}
    ...
	go func() {
		// Copy from the remote side to the local port.
		if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
			runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
		}
	}()
    ...
	go func() {
		// Copy from the local port to the remote side.
		if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
			runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
			// break out of the select below without waiting for the other copy to finish
			close(localError)
		}
	}()
}

也即是说本地和 api-server 的连接可用性得不到保证,那么此连接断开了,就无法正确的转发请求了,因此解决方法也是很简单,在这里加个重试,
当旧的pf.streamConn create stream 失败了或者超时了,就认为此连接不可用了,然后重新建立连接即可,如下

func (pf *PortForwarder) tryToCreateStream(header http.Header) (httpstream.Stream, error) {
	errorChan := make(chan error)
	resultChan := make(chan httpstream.Stream)
	time.AfterFunc(time.Second*1, func() {
		errorChan <- errors.New("timeout")
	})
	go func() {
		stream, err := pf.streamConn.CreateStream(header)
		if err == nil {
			errorChan <- nil
			resultChan <- stream
		} else {
			errorChan <- err
		}
	}()
	if err := <-errorChan; err == nil {
		return <-resultChan, nil
	}
	// close old connection in case of resource leak
	_ = pf.streamConn.Close()
	var err error
	pf.streamConn, _, err = pf.dialer.Dial(portforward.PortForwardProtocolV1Name)
	if err != nil {
		runtime.HandleError(fmt.Errorf("error upgrading connection: %s", err))
		return nil, err
	}
	if stream, err := pf.streamConn.CreateStream(header); err == nil {
		return stream, nil
	}
	return nil, err
}

深入探究一下 k8s 的 port-forward 机制,为什么 docker 不会出现这个问题尼?

dockershim 的端口转发

让我们来翻看一下 kubernetes 关于 portforward 的代码
pkg/kubelet/dockershim/docker_streaming_others.go:32

func (r *streamingRuntime) portForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
	container, err := r.client.InspectContainer(podSandboxID)
	if err != nil {
		return err
	}

	if !container.State.Running {
		return fmt.Errorf("container not running (%s)", container.ID)
	}

	containerPid := container.State.Pid
	socatPath, lookupErr := exec.LookPath("socat")
	if lookupErr != nil {
		return fmt.Errorf("unable to do port forwarding: socat not found")
	}

	args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)}

	nsenterPath, lookupErr := exec.LookPath("nsenter")
	if lookupErr != nil {
		return fmt.Errorf("unable to do port forwarding: nsenter not found")
	}

	commandString := fmt.Sprintf("%s %s", nsenterPath, strings.Join(args, " "))
	klog.V(4).InfoS("Executing port forwarding command", "command", commandString)

	command := exec.Command(nsenterPath, args...)
	command.Stdout = stream

	stderr := new(bytes.Buffer)
	command.Stderr = stderr

	inPipe, err := command.StdinPipe()
	if err != nil {
		return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err)
	}
	go func() {
		io.Copy(inPipe, stream)
		inPipe.Close()
	}()

	if err := command.Run(); err != nil {
		return fmt.Errorf("%v: %s", err, stderr.String())
	}

	return nil
}

可以看到,这里的端口转发的原理是通过 socat和 nsenter 来实现的,也即是 nsenter 可以进入 pod 所在进程的namespace(此 namespace 是 linux 中的 namespace 概念,不是 k8s 中的)
然后使用socat做端口映射。然后将本地和 apiserver 建立的连接 conn2,使用 io.copy 到 exec.command 的 stdinPipeline 中,只要命令不返回,这个端口转发总是有效的

containerd 的端口转发

pkg/cri/server/sandbox_portforward_linux.go:35

func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriteCloser) error {
	s, err := c.sandboxStore.Get(id)
	if err != nil {
		return errors.Wrapf(err, "failed to find sandbox %q in store", id)
	}

	var netNSDo func(func(ns.NetNS) error) error
	// netNSPath is the network namespace path for logging.
	var netNSPath string
	securityContext := s.Config.GetLinux().GetSecurityContext()
	hostNet := securityContext.GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE
	if !hostNet {
		if closed, err := s.NetNS.Closed(); err != nil {
			return errors.Wrapf(err, "failed to check netwok namespace closed for sandbox %q", id)
		} else if closed {
			return errors.Errorf("network namespace for sandbox %q is closed", id)
		}
		netNSDo = s.NetNS.Do
		netNSPath = s.NetNS.GetPath()
	} else {
		// Run the function directly for host network.
		netNSDo = func(do func(_ ns.NetNS) error) error {
			return do(nil)
		}
		netNSPath = "host"
	}

	log.G(ctx).Infof("Executing port forwarding in network namespace %q", netNSPath)
	err = netNSDo(func(_ ns.NetNS) error {
		defer stream.Close()
		var conn net.Conn
		conn, err := net.Dial("tcp4", fmt.Sprintf("localhost:%d", port))
		if err != nil {
			var errV6 error
			conn, errV6 = net.Dial("tcp6", fmt.Sprintf("localhost:%d", port))
			if errV6 != nil {
				return fmt.Errorf("failed to connect to localhost:%d inside namespace %q, IPv4: %v IPv6 %v ", port, id, err, errV6)
			}
		}
		defer conn.Close()

		errCh := make(chan error, 2)
		// Copy from the the namespace port connection to the client stream
		go func() {
			log.G(ctx).Debugf("PortForward copying data from namespace %q port %d to the client stream", id, port)
			_, err := io.Copy(stream, conn)
			errCh <- err
		}()

		// Copy from the client stream to the namespace port connection
		go func() {
			log.G(ctx).Debugf("PortForward copying data from client stream to namespace %q port %d", id, port)
			_, err := io.Copy(conn, stream)
			errCh <- err
		}()

		// Wait until the first error is returned by one of the connections
		// we use errFwd to store the result of the port forwarding operation
		// if the context is cancelled close everything and return
		var errFwd error
		select {
		case errFwd = <-errCh:
			log.G(ctx).Debugf("PortForward stop forwarding in one direction in network namespace %q port %d: %v", id, port, errFwd)
		case <-ctx.Done():
			log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err())
			return ctx.Err()
		}
        ...
		return errFwd
	})

	if err != nil {
		return errors.Wrapf(err, "failed to execute portforward in network namespace %q", netNSPath)
	}
	log.G(ctx).Infof("Finish port forwarding for %q port %d", id, port)

	return nil
}

可以看到,这里没有使用到 socat 和 nsenter,而是在进程所在的 namespace( linux 中的 namespace 概念) 中直接拨号的目标端口,产生conn3, 然后使用 io.copy(conn2, conn3)和io.copy(conn3, conn2),实现socat的功能
,而进入进程所在的 namespace( linux 的 namespace )使用的是 NetNS.do()方法,vendor/github.com/containernetworking/plugins/pkg/ns/ns_linux.go:79
而 containerd 这里使用的是原生的 io.copy,遇到 io.EOF 就退出了。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐