docker-containerd 啟動流程分析


一般在docker啟動時,containerd的啟動命令如下所示:

root      2090  0.0  0.1 292780 11008 ?        Ssl  10月22   0:12 docker-containerd -l unix:///var/run/docker/libcontainerd/docker-containerd.sock --shim docker-containerd-shim 
--metrics-interval=0 --start-timeout 2m --state-dir /var/run/docker/libcontainerd/containerd --runtime docker-runc

  

1、containerd/containerd/main.go

func daemon(context *cli.Context) error

(1)、首先調用:

sv, err := supervisor.New(

  context.String("state-dir"),

  context.String("runtime"),

  context.String("shim"),

  context.String("runtime-args"),

  context.String("start-timeout"),

  context.Int("retain-count"),

)

  

(2)、for循環10次,調用w := supervisor.NewWorker(sv, wg),再go w.Start()

(3)、調用sv.Start(),啟動supervisor

(4)、調用server, err := startServer(listenParts[0], listenParts[1], sv),啟動grpc server

 

supervisor的數據結構定義如下所示:

// Supervisor represents a container supervisor

type Supervisor struct {

  // stateDir is the directory on the system to store container runtime state information

  stateDir    string
  // name of the OCI compatible runtime used to execute containers

  runtime    string
  runtimeArgs  []string
  shim      string
  containers   map[string]*containerInfo
  startTasks   chan *startTask
  // we need a lock around the subscribers map only because addtions and deletions from

  // the map via the API so we cannot really control the currency

  subscriberLock sync.RWMutex
  subscribers    map[chan Event]struct{}
  machine    Machine
  tasks       chan Task
  monitor      *Monitor
  eventLog    []Event
  eventLock     sync.Mutex
  timeout     time.Duration
}

  

2、containerd/supervisor/supervisor.go

// New returns an initialized Process supervisor

func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error)

(1)、調用machine, err := CollectionMachineInformation(),獲取當前宿主機的CPU數和RAM總量

(2)、調用monitor, err := NewMonitor(),啟動並返回一個監視器

(3)、填充數據結構Supervisor:

s := &Supervisor{

  stateDir:    stateDir,

  containers:   make(map[string]*ContainerInfo),
  startTasks:   startTasks,
  machine:    machine,
  subscriber:   make(map[chan Event]struct{}),

  tasks:      make(chan Task, defaultBufferSize),
  monitor:    monitor,
  runtime:    runtimeName,
  runtimeArgs:  runtimeArgs,
  shim:     shimName,
  timeout:    timeout,
}

  

(4)、調用setupEventLog(s, retainCount)設置event log

(5)、生成兩個goroutine,s.exitHandler()和s.oomHandler()

(6)、最后,調用s.restore(),加載之前已經存在的容器

 

3、containerd/supervisor/supervisor.go

func (s *Supvervisor) restore() error

(1)、遍歷目錄s.stateDir(其實就是/var/run/docker/libcontainerd/containerd)

(2)、調用id := d.Name()獲取容器id,再調用container, err := runtime.Load(s.stateDir, id, s.shim, s.timeout),load的作用就是加載s.stateDir/id/state.json獲取容器實例。之后,再遍歷s.stateDir/id/下的pid 文件,加載容器中的process。

(3)、調用processes, err := container.Processes(),加載容器中的process,如果process的狀態為running,則調用s.monitorProcess(p)對其進行監控,並對其中不在運行的process進行處理。

 

4、containerd/supervisor/supervisor.go

// Start is a non-blocking call that runs the supervisor for monitoring container processes and executing new containers

// This event loop is the only thing that is allowed to modify state of containers and processes, therefore it is save to do operations

// in the handlers that modify state of the system or state of the Supervisor

func (s *Supervisor) Start() error

該函數所做的工作很簡單,就是啟動一個goroutine,再for i := range s.tasks,調用s.handlerTask(i)

 

Task的數據結構如下所示:

// Task executes an action returning an error chan with either nil or the error from excuting the task

type Task interface {

  // ErrorCh returns a channel used to report and error from an async task

  ErrorCh() chan error

}

  

5、containerd/supervisor/supervisor.go

func (s *Supervisor) handleTask(i Task)

該函數根據i的類型,調用相應的處理函數進行處理。例如,i.(type)為*StartTask時,則調用s.start(t),若i.(type)為*DeleteTask時,則調用s.delete(t)。

 

-----------------------------------------------------------------------  worker的工作 -------------------------------------------------------------------------

worker的數據結構如下所示:

type Work interface {

  Start()

}

type worker struct {

  wg  *sync.WaitGroup
  s   *Supervisor
}

 

4、containerd/supervisor/worker.go

func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker

這個函數只是簡單地填充數據結構,return &worker{s: s, wg: wg}

 

5、containerd/supervisor/worker.go

// Start runs a loop in charge of starting new containers

func (w *worker) Start()

(1)、遍歷w.s.startTasks,調用process, err := t.container.Start(t.checkPointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))

(2)、調用w.s.monitor.MonitorOOM(t.Container)和w.s.monitorProcess(process)對container和process進行監控

(3)、當我們從checkpoint restore一個容器的時候,不需要start process。因此,在t.CheckpointPath == ""的時候,調用process.Start()

(4)、調用ContainerStartTimer.UpdateSince(started),started是當前的時間

(5)、最后,調用t.Err <- nil, t.StartResponse <- StartResponse{Container: t.Container},和w.s.notifySubscribers(Event{Timestamp: time.Now, ID: t.container.ID(), Type: StateStart}),進行消息通知

 

---------------------------------------------------------------------------- monitor 分析 -----------------------------------------------------------------------

Monitor的數據結構如下所示:

// Monitor represents a runtime.Process monitor

type Monitor struct {

  m       sync.Mutext
  receivers    map[int]interface{}
  exits      chan runtime.Process
  ooms     chan string
  epollFd    int
}

  

1、containerd/supervisor/monitor_linux.go

// NewMonitor starts a new process monitor and returns it

(1)、首先獲取一個monitor實例,m := &Monitor{receivers: make(map[int]interface{}), exits: make(chan runtime.Process, 1024), oom: make(chan string, 1024)}

(2)、調用fd, err := archutils.EpollCreate1(0),創建一個epoll fd,接着將fd賦值給m.epollFd

(3)、生成一個goroutine,go m.start()

 

2、containerd/supervisor/monitor_linux.go

func (m *Monitor) start()

(1)、該函數就是對各種syscall.EpollEvent進行處理,每次通過調用n, err := archutils.EpollWait(m.epollFd, events[:], -1),獲取n個EpollEvent。

(2)、再通過fd := int(events[i].Fd),r := m.receivers[fd]找到對應的runtimeProcess或者runtime.OOM。

(3)、最后,t := r.(type),再分別對runtime.Process和runtime.OOM進行處理

 

3、containerd/supervisor/monitor_linux.go

// Monitor adds a process to the list of the one being monitored

func (m *Monitor) Monitor(p runtime.Process) error

(1)、調用fd := p.ExitFD() ---> ExitFD returns the fd of the exit pipe,再根據fd新建一個event := syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLHUP,}

(2)、調用archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event)

(3)、最后,調用EpollFdCounter.Inc(1),m.receivers[fd] = p


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM