Containerd容器管理机制
简介containerd 是一个来自 Docker 的高级容器运行时,并实现了 CRI 规范。它是从 Docker 项目中分离出来,之后 containerd 被捐赠给云原生计算基金会(CNCF)为容器社区提供创建新容器解决方案的基础。当前k8s在创建容器的时候,是通过containerd来创建一个容器,containerd 收到请求后,会创建一个叫做containerd-shim的进程,...
简介
containerd 是一个来自 Docker 的高级容器运行时,并实现了 CRI 规范。它是从 Docker 项目中分离出来,之后 containerd 被捐赠给云原生计算基金会(CNCF)为容器社区提供创建新容器解决方案的基础。
当前k8s在创建容器的时候,是通过 containerd
来创建一个容器,containerd 收到请求后,会创建一个叫做 containerd-shim
的进程,让这个进程去操作容器,
为什么需要这样一个进程呢?
容器进程是需要一个父进程来做状态收集、维持 stdin 等 fd 打开等工作的,假如这个父进程就是 containerd,那如果 containerd 挂掉的话,整个宿主机上所有的容器都得退出了,而引入 containerd-shim
这个垫片就可以来规避这个问题了。
然后创建容器需要做一些 namespaces 和 cgroups 的配置,以及挂载 root 文件系统等操作,这些操作其实已经有了标准的规范,那就是 OCI(开放容器标准), runc
就是它的一个参考实现。
所以真正启动容器是通过 containerd-shim
去调用 runc
来启动容器的, runc
启动完容器后本身会直接退出, containerd-shim
则会成为容器进程的父进程, 负责收集容器进程的状态, 上报给 containerd, 并在容器中 pid 为 1 的进程退出后接管容器中的子进程进行清理, 确保不会出现僵尸进程。
源码地址:https://github.com/containerd/containerd
初始化容器处理
创建容器的代码是放在这个方法里面的:
// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, fmt.Errorf("create namespace: %w", err)
}
opts := &options.Options{}
if r.Options.GetValue() != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
if v != nil {
opts = v.(*options.Options)
}
}
var pmounts []process.Mount
for _, m := range r.Rootfs {
pmounts = append(pmounts, process.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
rootfs := ""
if len(pmounts) > 0 {
rootfs = filepath.Join(r.Bundle, "rootfs")
if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
return nil, err
}
}
config := &process.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: opts.BinaryName,
Rootfs: pmounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
if err := WriteOptions(r.Bundle, opts); err != nil {
return nil, err
}
// For historical reason, we write opts.BinaryName as well as the entire opts
if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil {
return nil, err
}
var mounts []mount.Mount
for _, pm := range pmounts {
mounts = append(mounts, mount.Mount{
Type: pm.Type,
Source: pm.Source,
Target: pm.Target,
Options: pm.Options,
})
}
defer func() {
if retErr != nil {
if err := mount.UnmountMounts(mounts, rootfs, 0); err != nil {
logrus.WithError(err).Warn("failed to cleanup rootfs mount")
}
}
}()
if err := mount.All(mounts, rootfs); err != nil {
return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
}
p, err := newInit(
ctx,
r.Bundle,
filepath.Join(r.Bundle, "work"),
ns,
platform,
config,
opts,
rootfs,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := p.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
container := &Container{
ID: r.ID,
Bundle: r.Bundle,
process: p,
processes: make(map[string]process.Process),
reservedProcess: make(map[string]struct{}),
}
pid := p.Pid()
if pid > 0 {
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(pid)
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", pid)
return container, nil
}
cg, err = cgroupsv2.Load(g)
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", pid)
}
} else {
cg, err = cgroup1.Load(cgroup1.PidPath(pid))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
}
}
container.cgroup = cg
}
return container, nil
}
上面代码逻辑大致如下:
创建文件系统根目录
挂载磁盘映射
根据指定的runc工具路径初始化runc对象(默认是
/run/containerd/runc
)初始化容器管理处理器(关联 runc对象,后面对容器的所有操作都是在这个处理器中完成)
通过runc命令在宿主机上创建容器
初始化容器对象(持有容器处理器对象)
创建容器
具体的创建容器方法如下:
// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
var (
err error
socket *runc.Socket
pio *processIO
pidFile = newPidFile(p.Bundle)
)
if r.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return fmt.Errorf("failed to create OCI runtime console socket: %w", err)
}
defer socket.Close()
} else {
if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
return fmt.Errorf("failed to create init process I/O: %w", err)
}
p.io = pio
}
if r.Checkpoint != "" {
return p.createCheckpointedState(r, pidFile)
}
opts := &runc.CreateOpts{
PidFile: pidFile.Path(),
NoPivot: p.NoPivotRoot,
NoNewKeyring: p.NoNewKeyring,
}
if p.io != nil {
opts.IO = p.io.IO()
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
return p.runtimeError(err, "OCI runtime create failed")
}
if r.Stdin != "" {
if err := p.openStdin(r.Stdin); err != nil {
return err
}
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return fmt.Errorf("failed to retrieve console master: %w", err)
}
console, err = p.Platform.CopyConsole(ctx, console, p.id, r.Stdin, r.Stdout, r.Stderr, &p.wg)
if err != nil {
return fmt.Errorf("failed to start console copy: %w", err)
}
p.console = console
} else {
if err := pio.Copy(ctx, &p.wg); err != nil {
return fmt.Errorf("failed to start io pipe copy: %w", err)
}
}
pid, err := pidFile.Read()
if err != nil {
return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err)
}
p.pid = pid
return nil
}
上面的代码逻辑大致如下:
创建IO或者socket通信
通过runc工具命令创建容器
获取容器的pid并绑定到容器处理器中
启动容器
通过上面初始化容器我们知道,容器的所有操作管理都是在容器对象里面处理的,比如启动容器:
// Start a container process
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Process, error) {
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Start(ctx); err != nil {
return nil, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(p.Pid())
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
cg, err = cgroupsv2.Load(g)
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
} else {
cg, err = cgroup1.Load(cgroup1.PidPath(p.Pid()))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
}
}
c.cgroup = cg
}
return p, nil
}
上面的代码逻辑大致如下:
-
获取容器处理器(处理器是一个接口,容器不同的状态下绑定的实现对象不一样)
通过处理器执行runc命令启动容器
func (s *createdState) Start(ctx context.Context) error
{
if err := s.p.start(ctx); err != nil {
return err
}
return s.transition("running")
}
func (p *Init) start(ctx context.Context) error {
err := p.runtime.Start(ctx, p.id)
return p.runtimeError(err, "OCI runtime start failed")
}
// Start will start an already created container
func (r *Runc) Start(context context.Context, id string) error {
return r.runOrError(r.command(context, "start", id))
}
https://www.qikqiak.com/post/containerd-usage/
https://colstuwjx.github.io/2021/08/%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB%E4%BB%8E%E4%BB%A3%E7%A0%81%E5%AE%9E%E7%8E%B0%E5%B1%82%E9%9D%A2%E6%80%9D%E8%80%83-kubernetes-%E4%B8%BA%E4%BB%80%E4%B9%88%E4%BC%9A%E5%BC%83%E7%94%A8%E5%AF%B9-docker-%E7%9A%84%E6%94%AF%E6%8C%81/
更多推荐
所有评论(0)