docker containerd 架构和源码简单分析
docker containerd 架构和源码简单分析本文结合docker1.12简单说明一下docker 的现有框架,简单分析docker containerd的架构和源码。docker发展到现在已经有很多个版本了,其架构也发生了很大的变化, 目前的docker 生态链已趋于完善,docker本身的架构也已基本趋于稳定,其架构已由原来的dockerd 演变成了docker-daemo
docker containerd 架构和源码简单分析
本文结合docker1.12简单说明一下docker 的现有框架,简单分析docker containerd的架构和源码。
docker发展到现在已经有很多个版本了,其架构也发生了很大的变化, 目前的docker 生态链已趋于完善,docker本身的架构也已基本趋于稳定,其架构已由原来的dockerd 演变成了docker-daemon、containerd、runc构成的一个多层结构。有点趋于虚拟机的架构。
从docker的命令行开始,其架构如下:
本文的重点描述对象是containerd,但是为了较好的描述和文档编排,简单说明从docker client开始。
以docker run busybox为例来说明。
docker client 执行命令:
docker client 实际是用户输入命令行的 docker CLI命令行, 这里是对用户最友好的借口,但是docker 容器本身并不执行在该命令行下,甚至不执行在输入的这台机器上。
docker client主要是执行简单的的参数解析与组合,以及命令的拆分。
for example:
#> docker run busybox
以上命令行被解析为create、attach和start三个命令并以http的post方式发送给docker daemon.
docker daemon–docker 容器的守护进程和管理者
docker daemon是docker的守护进程,是所有docker容器的管理者,这里注册了所有docker 创建的对象,并负责维护和统计其相关信息。
docker daemon接收到docker client发送的create请求, 创建并注册container信息, docker 支持的容器种类比较多,docker-daemon仅仅处理通用的信息,这里的创建只是创建相关的描述信息和各种config。
docker daemon接收到docker client发送的attach请求.处理更为简单, 这里不做说明。
docker daemon接收到docker client发送的start请求, start请求是docker run的一个子命令,或者是docker start命令的全部,docker start是启动容器,也就是创建容器的执行实例—Linux的进程。
docker daemon处理start请求的具体函数是(Daemon) ContainerStart(**)该函数执行到最后调用ctr.client.remote.apiClient.CreateContainer进而转至containerd。
docekr containerd
上面说了 docker daemon会发送create的消息到containerd中创建真正的容器。这里进行分析,我们先从docker containerd的初始化开始。
docker containerd 的基础框架如下:
如上图所述,docker-containerd的基本架构由containerd守护进程、apiserver(GRPC)服务、supervisor和task-group构成。
containerd守护进程
简单说一下containerd守护进程,也是containerd的初始化进程。
我们简单的看一下containerd的初始化代码:
func main() {
logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.RFC3339Nano})
app := cli.NewApp()
app.Name = "containerd"
if containerd.GitCommit != "" {
app.Version = fmt.Sprintf("%s commit: %s", containerd.Version, containerd.GitCommit)
} else {
app.Version = containerd.Version
}
app.Usage = usage
app.Flags = daemonFlags
app.Before = func(context *cli.Context) error {
setupDumpStacksTrap()
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
if context.GlobalDuration("metrics-interval") > 0 {
if err := debugMetrics(context.GlobalDuration("metrics-interval"), context.GlobalString ("graphite-address")); err != nil {
return err
}
}
}
if p := context.GlobalString("pprof-address"); len(p) > 0 {
pprof.Enable(p)
}
if err := checkLimits(); err != nil {
return err
}
return nil
}
app.Action = func(context *cli.Context) {
if err := daemon(context); err != nil {
logrus.Fatal(err)
}
}
if err := app.Run(os.Args); err != nil {
logrus.Fatal(err)
}
}
经过一些了的初始化和参数配置之后,执行app.Run, 实际上是执行的daemon(context)函数。 我们接下来看一下daemon(context)函数。
func daemon(context *cli.Context) error {
s := make(chan os.Signal, 2048)
signal.Notify(s, syscall.SIGTERM, syscall.SIGINT)
sv, err := supervisor.New(
context.String("state-dir"),
context.String("runtime"),
context.String("shim"),
context.StringSlice("runtime-args"),
context.Duration("start-timeout"),
context.Int("retain-count"))
if err != nil {
return err
}
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
w := supervisor.NewWorker(sv, wg)
go w.Start()
}
if err := sv.Start(); err != nil {
return err
}
// Split the listen string of the form proto://addr
listenSpec := context.String("listen")
listenParts := strings.SplitN(listenSpec, "://", 2)
if len(listenParts) != 2 {
return fmt.Errorf("bad listen address format %s, expected proto://address", listenSpec)
}
server, err := startServer(listenParts[0], listenParts[1], sv)
if err != nil {
return err
}
for ss := range s {
switch ss {
default:
logrus.Infof("stopping containerd after receiving %s", ss)
server.Stop()
os.Exit(0)
}
}
return nil
}
以上代码便是daemon(context)的全部代码, 其中主要的几个步骤是:
s := make(chan os.Signal, 2048)
创建一个os.signal的管道, 为后面等待signal做准备。
- sv,err := supervisor.New(*)
new 一个supervisor wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
w := supervisor.NewWorker(sv, wg)
go w.Start()
}
给创建的supervisor添加10个worker,并启动worker, 者10个worker构成一个sync.WaitGroup{}err := sv.Start()
启动supervisor的工作。- server, err := startServer(listenParts[0], listenParts[1], sv)
启动api-server的GRPC服务 - for ss := range s {*}
等待信号量,以作决策。这也是containerd主流程的长期状态。
apiserver 简单说明
这里的apiserver是一个基于http2的GRPC服务,docker daemon的http请求就发送到这里。
上面代码中的server, err := startServer(listenParts[0], listenParts[1], sv)就是开始api-server的调用,从代码来看:
api-server是一个基于GRPC的服务,对于GRPC这里不做过多的介绍, 仅仅从http的角度来进行分析。
var _API_serviceDesc = grpc.ServiceDesc{
ServiceName: "types.API",
HandlerType: (*APIServer)(nil),
Methods: []grpc.MethodDesc{
……
{
MethodName: "CreateContainer",
Handler: _API_CreateContainer_Handler,
},
……
},
Streams: []grpc.StreamDesc{
{
StreamName: "Events",
Handler: _API_Events_Handler,
ServerStreams: true,
},
},
}
从docker daemon调用的关键字是ctr.client.remote.apiClient.CreateContainer即为GRPC的客户端的运行命令,其中的关键是grpc.Invoke(ctx, “/types.API/CreateContainer”, in, out, c.cc, opts…),对应的处理函数(或者说GRPC对应的描述函数)结合上面的grpc.ServiceDesc应该就是_API_CreateContainer_Handler, 我们来看一下_API_CreateContainer_Handler的代码:
func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateContainerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(APIServer).CreateContainer(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/types.API/CreateContainer",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
}
return interceptor(ctx, in, info, handler)
}
而真正的处理函数则是srv.(APIServer).CreateContainerfunction(**),我们来看一下其具体的函数内容:
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
if c.BundlePath == "" {
return nil, errors.New("empty bundle path")
}
e := &supervisor.StartTask{}
e.ID = c.Id
e.BundlePath = c.BundlePath
e.Stdin = c.Stdin
e.Stdout = c.Stdout
e.Stderr = c.Stderr
e.Labels = c.Labels
e.NoPivotRoot = c.NoPivotRoot
e.Runtime = c.Runtime
e.RuntimeArgs = c.RuntimeArgs
e.StartResponse = make(chan supervisor.StartResponse, 1)
if c.Checkpoint != "" {
e.CheckpointDir = c.CheckpointDir
e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint,
}
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
r := <-e.StartResponse
apiC, err := createAPIContainer(r.Container, false)
if err != nil {
return nil, err
}
return &types.CreateContainerResponse{
Container: apiC,
}, nil
}
在这个函数中重点是生成了一个supervisor.StartTask{}的任务, 并将相关参数进行初始化, 然后SendTask,最后在createAPIContainer将其返回。
我们来看一下SendTask的代码:
func (s *Supervisor) SendTask(evt Task) {
TasksCounter.Inc(1)
s.tasks <- evt
}
仅仅将任务放入supervisor的任务队列中。
supervisor
这里我们接着守护进程中的流程看一下Supervisor的代码。
创建supervisor:
func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
startTasks := make(chan *startTask, 10)
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err
}
machine, err := CollectMachineInformation()
if err != nil {
return nil, err
}
monitor, err := NewMonitor()
if err != nil {
return nil, err
}
s := &Supervisor{
stateDir: stateDir,
containers: make(map[string]*containerInfo),
startTasks: startTasks,
machine: machine,
subscribers: make(map[chan Event]struct{}),
tasks: make(chan Task, defaultBufferSize),
monitor: monitor,
runtime: runtimeName,
runtimeArgs: runtimeArgs,
shim: shimName,
timeout: timeout,
}
if err := setupEventLog(s, retainCount); err != nil {
return nil, err
}
go s.exitHandler()
go s.oomHandler()
if err := s.restore(); err != nil {
return nil, err
}
return s, nil
}
启动supervisor:
func (s *Supervisor) Start() error {
logrus.WithFields(logrus.Fields{
"stateDir": s.stateDir,
"runtime": s.runtime,
"runtimeArgs": s.runtimeArgs,
"memory": s.machine.Memory,
"cpus": s.machine.Cpus,
}).Debug("containerd: supervisor running")
go func() {
for i := range s.tasks {
s.handleTask(i)
}
}()
return nil
}
Supervisor的主流程就是不停地从tasks中获取任务继而执行handleTasks进行处理,继续看handleTask:
func (s *Supervisor) handleTask(i Task) {
var err error
switch t := i.(type) {
case *AddProcessTask:
err = s.addProcess(t)
case *CreateCheckpointTask:
err = s.createCheckpoint(t)
case *DeleteCheckpointTask:
err = s.deleteCheckpoint(t)
case *StartTask:
err = s.start(t)
case *DeleteTask:
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask:
err = s.signal(t)
case *StatsTask:
err = s.stats(t)
case *UpdateTask:
err = s.updateContainer(t)
case *UpdateProcessTask:
err = s.updateProcess(t)
case *OOMTask:
err = s.oom(t)
default:
err = ErrUnknownTask
}
if err != errDeferredResponse {
i.ErrorCh() <- err
close(i.ErrorCh())
}
}
我们一docker run/start为例,这里i.(type)就是*StartTask,因此我们继续看s.start(t):
func (s *Supervisor) start(t *StartTask) error {
start := time.Now()
rt := s.runtime
rtArgs := s.runtimeArgs
if t.Runtime != "" {
rt = t.Runtime
rtArgs = t.RuntimeArgs
}
container, err := runtime.New(runtime.ContainerOpts{
Root: s.stateDir,
ID: t.ID,
Bundle: t.BundlePath,
Runtime: rt,
RuntimeArgs: rtArgs,
Shim: s.shim,
Labels: t.Labels,
NoPivotRoot: t.NoPivotRoot,
Timeout: s.timeout,
})
if err != nil {
return err
}
s.containers[t.ID] = &containerInfo{
container: container,
}
ContainersCounter.Inc(1)
task := &startTask{
Err: t.ErrorCh(),
Container: container,
StartResponse: t.StartResponse,
Stdin: t.Stdin,
Stdout: t.Stdout,
Stderr: t.Stderr,
}
if t.Checkpoint != nil {
task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name)
}
s.startTasks <- task
ContainerCreateTimer.UpdateSince(start)
return errDeferredResponse
}
做了一些时间处理并进行了数据结构的转换,然后把task放大startTasks中。startTasks中的任务将由worker处理。
worker group
首先看一下worker的创建:
type worker struct {
wg *sync.WaitGroup
s *Supervisor
}
// NewWorker return a new initialized worker
func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
return &worker{
s: s,
wg: wg,
}
}
再看一下worker的工作流:
// Start runs a loop in charge of starting new containers
func (w *worker) Start() {
defer w.wg.Done()
for t := range w.s.startTasks {
started := time.Now()
// Maxx start D:\study\go\containerd-docker-v1.12.x\runtime\container.go, also call the start below
process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"id": t.Container.ID(),
}).Error("containerd: start container")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited {
if process.State() != runtime.Stopped {
logrus.WithField("error", err).Error("containerd: notify OOM events")
}
}
if err := w.s.monitorProcess(process); err != nil {
logrus.WithField("error", err).Error("containerd: add process to monitor")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
// only call process start if we aren't restoring from a checkpoint
// if we have restored from a checkpoint then the process is already started
if t.CheckpointPath == "" {
// Maxx call exec.cmd(docker-runc start $CID)
// D:\study\go\containerd-docker-v1.12.x\runtime\process.go
if err := process.Start(); err != nil {
logrus.WithField("error", err).Error("containerd: start init process")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
}
ContainerStartTimer.UpdateSince(started)
t.Err <- nil
t.StartResponse <- StartResponse{
Container: t.Container,
}
w.s.notifySubscribers(Event{
Timestamp: time.Now(),
ID: t.Container.ID(),
Type: StateStart,
})
}
}
这里最重要的代码是:process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))和 process.Start()
首先 t.Container.Start函数在*\containerd\runtime\container.go,源码如下:
func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
processRoot := filepath.Join(c.root, c.id, InitProcessID)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
// Maxx start shim process cmd
/*
docker-containerd-shim 817c43b3f5794d0e5dfdb92acf60fe7653b3efc33a4388733d357d00a8d8ae1a /var/run/docker/libcontainerd/817c43b3f5794d0e5dfdb92acf60fe7653b3efc33a4388733d357d00a8d8ae1a docker-runc
*/
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
)
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
spec, err := c.readSpec()
if err != nil {
return nil, err
}
config := &processConfig{
checkpoint: checkpointPath,
root: processRoot,
id: InitProcessID,
c: c,
stdio: s,
spec: spec,
processSpec: specs.ProcessSpec(spec.Process),
}
p, err := newProcess(config)
if err != nil {
return nil, err
}
if err := c.createCmd(InitProcessID, cmd, p); err != nil {
return nil, err
}
return p, nil
}
这个函数继续执行了newProcess跟createCmd,newProcess仅仅处理了一些参数信息,createCmd的代码如下:
func (c *container) createCmd(pid string, cmd *exec.Cmd, p *process) error {
p.cmd = cmd
if err := cmd.Start(); err != nil {
close(p.cmdDoneCh)
if exErr, ok := err.(*exec.Error); ok {
if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
return fmt.Errorf("%s not installed on system", c.shim)
}
}
return err
}
// We need the pid file to have been written to run
defer func() {
go func() {
err := p.cmd.Wait()
if err == nil {
p.cmdSuccess = true
}
if same, err := p.isSameProcess(); same && p.pid > 0 {
// The process changed its PR_SET_PDEATHSIG, so force
// kill it
logrus.Infof("containerd: %s:%s (pid %v) has become an orphan, killing it", p.container.id, p.id, p.pid)
err = unix.Kill(p.pid, syscall.SIGKILL)
if err != nil && err != syscall.ESRCH {
logrus.Errorf("containerd: unable to SIGKILL %s:%s (pid %v): %v", p.container.id, p.id, p.pid, err)
} else {
for {
err = unix.Kill(p.pid, 0)
if err != nil {
break
}
time.Sleep(5 * time.Millisecond)
}
}
}
close(p.cmdDoneCh)
}()
}()
if err := c.waitForCreate(p, cmd); err != nil {
return err
}
c.processes[pid] = p
return nil
}
上面函数调用了cmd.Start()真正执行了cmd命令,改名了的内容基本类似“docker-containerd-shim 817c43b3f5794d0e5dfdb92acf60fe7653b3efc33a4388733d357d00a8d8ae1a /var/run/docker/libcontainerd/817c43b3f5794d0e5dfdb92acf60fe7653b3efc33a4388733d357d00a8d8ae1a docker-runc”,从而进入了shim的命令行处理,代码如下:
func main() {
flag.Parse()
cwd, err := os.Getwd()
if err != nil {
panic(err)
}
f, err := os.OpenFile(filepath.Join(cwd, "shim-log.json"), os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0666)
if err != nil {
panic(err)
}
if err := start(f); err != nil {
// this means that the runtime failed starting the container and will have the
// proper error messages in the runtime log so we should to treat this as a
// shim failure because the sim executed properly
if err == errRuntime {
f.Close()
return
}
// log the error instead of writing to stderr because the shim will have
// /dev/null as it's stdio because it is supposed to be reparented to system
// init and will not have anyone to read from it
writeMessage(f, "error", err)
f.Close()
os.Exit(1)
}
}
func start(log *os.File) error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
// set the shim as the subreaper for all orphaned processes created by the container
if err := osutils.SetSubreaper(1); err != nil {
return err
}
// open the exit pipe
f, err := os.OpenFile("exit", syscall.O_WRONLY, 0)
if err != nil {
return err
}
defer f.Close()
control, err := os.OpenFile("control", syscall.O_RDWR, 0)
if err != nil {
return err
}
defer control.Close()
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if err != nil {
return err
}
defer func() {
if err := p.Close(); err != nil {
writeMessage(log, "warn", err)
}
}()
if err := p.create(); err != nil {
p.delete()
return err
}
msgC := make(chan controlMessage, 32)
go func() {
for {
var m controlMessage
if _, err := fmt.Fscanf(control, "%d %d %d\n", &m.Type, &m.Width, &m.Height); err != nil {
continue
}
msgC <- m
}
}()
var exitShim bool
for {
select {
case s := <-signals:
switch s {
case syscall.SIGCHLD:
exits, _ := osutils.Reap(false)
for _, e := range exits {
// check to see if runtime is one of the processes that has exited
if e.Pid == p.pid() {
exitShim = true
writeInt("exitStatus", e.Status)
}
}
}
// runtime has exited so the shim can also exit
if exitShim {
// Let containerd take care of calling the runtime
// delete.
// This is needed to be done first in order to ensure
// that the call to Reap does not block until all
// children of the container have died if init was not
// started in its own PID namespace.
f.Close()
// Wait for all the childs this process may have
// created (needed for exec and init processes when
// they join another pid namespace)
osutils.Reap(true)
p.Wait()
return nil
}
case msg := <-msgC:
switch msg.Type {
case 0:
// close stdin
if p.stdinCloser != nil {
p.stdinCloser.Close()
}
case 1:
if p.console == nil {
continue
}
ws := term.Winsize{
Width: uint16(msg.Width),
Height: uint16(msg.Height),
}
term.SetWinsize(p.console.Fd(), &ws)
}
}
}
return nil
}
而这个函数会调p.create,我们来看一下create的函数源码:
func (p *process) create() error {
cwd, err := os.Getwd()
if err != nil {
return err
}
logPath := filepath.Join(cwd, "log.json")
args := append([]string{
"--log", logPath,
"--log-format", "json",
}, p.state.RuntimeArgs...)
if p.state.Exec {
args = append(args, "exec",
"-d",
"--process", filepath.Join(cwd, "process.json"),
"--console", p.consolePath,
)
} else if p.checkpoint != nil {
args = append(args, "restore",
"-d",
"--image-path", p.checkpointPath,
"--work-path", filepath.Join(p.checkpointPath, "criu.work", "restore-"+time.Now().Format(time.RFC3339)),
)
add := func(flags ...string) {
args = append(args, flags...)
}
if p.checkpoint.Shell {
add("--shell-job")
}
if p.checkpoint.TCP {
add("--tcp-established")
}
if p.checkpoint.UnixSockets {
add("--ext-unix-sk")
}
if p.state.NoPivotRoot {
add("--no-pivot")
}
for _, ns := range p.checkpoint.EmptyNS {
add("--empty-ns", ns)
}
} else {
args = append(args, "create",
"--bundle", p.bundle,
"--console", p.consolePath,
)
if p.state.NoPivotRoot {
args = append(args, "--no-pivot")
}
}
args = append(args,
"--pid-file", filepath.Join(cwd, "pid"),
p.id,
)
cmd := exec.Command(p.runtime, args...)
cmd.Dir = p.bundle
cmd.Stdin = p.stdio.stdin
cmd.Stdout = p.stdio.stdout
cmd.Stderr = p.stdio.stderr
// Call out to setPDeathSig to set SysProcAttr as elements are platform specific
cmd.SysProcAttr = setPDeathSig()
if err := cmd.Start(); err != nil {
if exErr, ok := err.(*exec.Error); ok {
if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
return fmt.Errorf("%s not installed on system", p.runtime)
}
}
return err
}
p.stdio.stdout.Close()
p.stdio.stderr.Close()
if err := cmd.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); ok {
return errRuntime
}
return err
}
data, err := ioutil.ReadFile("pid")
if err != nil {
return err
}
pid, err := strconv.Atoi(string(data))
if err != nil {
return err
}
p.containerPid = pid
return nil
}
经过一系列的参数处理之后定义了cmd :=exec.Command(p.runtime, args…) 并最终调用了cmd.start()
至此,shim的主要任务宣布结束,因为p.runtime的内容是docker-runc,从而进入runc命令行执行的进程中。
而process.Start()函数在*\containerd\runtime\process.go,源码如下:
func (p *process) Start() error {
if p.ID() == InitProcessID {
var (
errC = make(chan error, 1)
args = append(p.container.runtimeArgs, "start", p.container.id)
cmd = exec.Command(p.container.runtime, args...)
)
go func() {
out, err := cmd.CombinedOutput()
if err != nil {
errC <- fmt.Errorf("%s: %q", err.Error(), out)
}
errC <- nil
}()
select {
case err := <-errC:
if err != nil {
return err
}
case <-p.cmdDoneCh:
if !p.cmdSuccess {
if cmd.Process != nil {
cmd.Process.Kill()
}
cmd.Wait()
return ErrShimExited
}
err := <-errC
if err != nil {
return err
}
}
}
return nil
}
上面这个函数是supervisor work调用的,是在创建容器或者调用docker-container-shim后做的善后工作。
create container流程说明
从上面的代码分析基本上可以很清晰的看到create Container的流程, docker Client发送消息到docker daemon, docker daemon进行过文件夹准备,各种文件参数的处理后发送GRPC消息, Containerd的GRPC Server接收到消息并处理,生产一个supervisor的StartTask任务,并将其添加到supervisor的任务队里中, Supervisor的主函数对其进行简单处理后放入startTask队列,该队里由supervisor的worker进行处理,worker中的最重生产 docker-containerd-shim命令行,由shim进行执行,在生成Runc命令,最终由Runc来执行容器中的的初始化进程并绑定Cgroup。
docker runc
完成后续任务,生产docker container的最后工作,生产docker内部初始化进程,并绑定各种资源。
用户真正的业务是以runc命令执行的。至于runc的代码在下一篇文章中分析。
参考文档:
更多推荐
所有评论(0)