kubectl exec 在kubelet中的處理流程


基於kuebrnetes v1.17

簡單來說,一個完整的streaming請求如下:

  • 客戶端 kubectl exec -i -t ...
  • kube-apiserver 向 Kubelet 發送流式請求 /exec/
  • Kubelet 通過 CRI 接口向 CRI Shim 請求 Exec 的 URL
  • CRI Shim 向 Kubelet 返回 Exec URL
  • Kubelet 向 kube-apiserver 返回重定向的響應
  • kube-apiserver 重定向流式請求到 Exec URL,接着就是 CRI Shim 內部的 Streaming Server 跟 kube-apiserver 進行數據交互,完成 Exec 的請求和響應

在 v1.10 及更早版本中,容器運行時必需返回一個 API Server 可直接訪問的 URL(通常跟 Kubelet 使用相同的監聽地址);而從 v1.11 開始,Kubelet 新增了 --redirect-container-streaming(默認為 false),默認不再轉發而是代理 Streaming 請求,這樣運行時可以返回一個 localhost 的 URL。通過 Kubelet 代理的好處是由 Kubelet 處理與 API server 通信之間的請求認證。

可以看到kubelet在處理exec請求的時候分為兩步, 1. 首先獲取streaming URL,2. 隨后根據該URL建立流式請求。

獲取streaming URL

kubelet在啟動時會初始化一個serve,注冊好對應的handler, exec的handler如下:


	ws = new(restful.WebService)
	ws.
		Path("/exec")
	ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	s.restfulCont.Add(ws)

所有的路徑最后都由getExec來進行處理:

// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
	params := getExecRequestParams(request)
	streamOpts, err := remotecommandserver.NewOptions(request.Request)
	if err != nil {
		utilruntime.HandleError(err)
		response.WriteError(http.StatusBadRequest, err)
		return
	}
	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
	if !ok {
		response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
		return
	}

	podFullName := kubecontainer.GetPodFullName(pod)
	url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
	if err != nil {
		streaming.WriteError(err, response.ResponseWriter)
		return
	}
	if s.redirectContainerStreaming {
		http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
		return
	}
	proxyStream(response.ResponseWriter, request.Request, url)
}

以上代碼可以看出,首先調用host.GetExec獲取URL,然后判斷是否開啟重定向,如果開啟則進行重定向,否則直接代理請求到該streaming URL。這里host對象對應的實現其實就是kubelet, 我們看下GetExec的實現:

// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
	container, err := kl.findContainer(podFullName, podUID, containerName)
	if err != nil {
		return nil, err
	}
	if container == nil {
		return nil, fmt.Errorf("container not found (%q)", containerName)
	}
	return kl.streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
}

這里只是繼續調用streamingRuntime的GetExec方法,streamingRuntime是個interface,具體的實現是kubeGenericRuntimeManager

// GetExec gets the endpoint the runtime will serve the exec request from.
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
	req := &runtimeapi.ExecRequest{
		ContainerId: id.ID,
		Cmd:         cmd,
		Tty:         tty,
		Stdin:       stdin,
		Stdout:      stdout,
		Stderr:      stderr,
	}
	resp, err := m.runtimeService.Exec(req)
	if err != nil {
		return nil, err
	}

	return url.Parse(resp.Url)
}

繼而調用了runtimeService.Exec方法, 此處runtimeService根據CRI創建的remoteRuntimeService,簡單來說就是對應CRI server的client端

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	ctx, cancel := getContextWithTimeout(r.timeout)
	defer cancel()

	resp, err := r.runtimeClient.Exec(ctx, req)
	if err != nil {
		klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
		return nil, err
	}

	if resp.Url == "" {
		errorMessage := "URL is not set"
		klog.Errorf("Exec failed: %s", errorMessage)
		return nil, errors.New(errorMessage)
	}

	return resp, nil
}

調用cri client請求cri server端,在這里server端就是docker shim (docker Service對象)

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	if ds.streamingServer == nil {
		return nil, streaming.NewErrorStreamingDisabled("exec")
	}
	_, err := checkContainerStatus(ds.client, req.ContainerId)
	if err != nil {
		return nil, err
	}
	return ds.streamingServer.GetExec(req)
}

調用dockerService.StreamingSerer的GetExec方法,streamingServer的所有方法都定義在:pkg/kubelet/server/streaming/server.go里

func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	if err := validateExecRequest(req); err != nil {
		return nil, err
	}
	token, err := s.cache.Insert(req)
	if err != nil {
		return nil, err
	}
	return &runtimeapi.ExecResponse{
		Url: s.buildURL("exec", token),
	}, nil
}

可以看到這里只是返回一個簡單的token組合成的url, 之所以生成一個token是因為用戶的命令中可能包含各種各樣的字符,各種長度的字符,需要格式化為一個簡單的token。 該token會緩存在本地,后面真正的exec請求會攜帶這該token,通過該token找到之前的具體請求。

處理streaming請求

在獲取到該exec真正的URL后,就需要通過該URL來獲取真正的數據了。為該URL提供服務的sever一般位於CRI的實現之中,例如docker shim會創建一個streamingServer來提供服務。

各個運行時 streaming server 的處理框架都是類似的,kublet為了方便各runtime實現CRI接口,提供了統一的包,位於:pkg/kubelet/server/streaming/server.go。 各種底層runtime只需要實現其中的steaming.Runtime接口就可以簡單創建一個streamingServer:

// Runtime is the interface to execute the commands and provide the streams.
type Runtime interface {
	Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
	Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
	PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error
}

目前kubelet內置了docker runtime的實現:dockershim,在dockershim中streaming.Runtime的實現結構體為streamingRuntime:

type streamingRuntime struct {
	client      libdocker.Interface
	execHandler ExecHandler
}

var _ streaming.Runtime = &streamingRuntime{}

其中docker client作為成員函數,以便后面請求docker獲取數據。


NOTE
注意區分kubelet中各種runtime的定義,說實話各種runtime確實挺亂的,我們需要明確各種Runtime是定義在哪個scope下的,streaming.Runtime是個interface, 位於pkg/kubelet/server/streaming/server.go,用來定義流處理請求的所需要的動作。streamingRuntime是dockershim對streaming.Runtime interface的具體實現,位於pkg/kubelet/dockershim/docker_streaming.go,該結構體為private的。 另一個比較容易混淆的是pkg/kubelet/container/runtime.go (俗稱kubecontainer)中的StreamingRuntime, 該interface為public的,用來定義GetExec/GetAttach/GetPortForward接口


利用該streaming.Runtime就可以創建streamingServer了:


// NewServer creates a new Server for stream requests.
// TODO(tallclair): Add auth(n/z) interface & handling.
func NewServer(config Config, runtime Runtime) (Server, error) {
	s := &server{
		config:  config,
		runtime: &criAdapter{runtime},
		cache:   newRequestCache(),
	}

	if s.config.BaseURL == nil {
		s.config.BaseURL = &url.URL{
			Scheme: "http",
			Host:   s.config.Addr,
		}
		if s.config.TLSConfig != nil {
			s.config.BaseURL.Scheme = "https"
		}
	}

	ws := &restful.WebService{}
	endpoints := []struct {
		path    string
		handler restful.RouteFunction
	}{
		{"/exec/{token}", s.serveExec},
		{"/attach/{token}", s.serveAttach},
		{"/portforward/{token}", s.servePortForward},
	}
	// If serving relative to a base path, set that here.
	pathPrefix := path.Dir(s.config.BaseURL.Path)
	for _, e := range endpoints {
		for _, method := range []string{"GET", "POST"} {
			ws.Route(ws.
				Method(method).
				Path(path.Join(pathPrefix, e.path)).
				To(e.handler))
		}
	}
	handler := restful.NewContainer()
	handler.Add(ws)
	s.handler = handler
	s.server = &http.Server{
		Addr:      s.config.Addr,
		Handler:   s.handler,
		TLSConfig: s.config.TLSConfig,
	}

	return s, nil
}

NewServer中會注冊對應的handler來處理/exec/{token}類接口。這里exec的handdler為ServerExec方法:

func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
	token := req.PathParameter("token")
	cachedRequest, ok := s.cache.Consume(token)
	if !ok {
		http.NotFound(resp.ResponseWriter, req.Request)
		return
	}
	exec, ok := cachedRequest.(*runtimeapi.ExecRequest)
	if !ok {
		http.NotFound(resp.ResponseWriter, req.Request)
		return
	}

	streamOpts := &remotecommandserver.Options{
		Stdin:  exec.Stdin,
		Stdout: exec.Stdout,
		Stderr: exec.Stderr,
		TTY:    exec.Tty,
	}

	remotecommandserver.ServeExec(
		resp.ResponseWriter,
		req.Request,
		s.runtime,
		"", // unused: podName
		"", // unusued: podUID
		exec.ContainerId,
		exec.Cmd,
		streamOpts,
		s.config.StreamIdleTimeout,
		s.config.StreamCreationTimeout,
		s.config.SupportedRemoteCommandProtocols)
}

進而調用ServeExec方法:

// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
	ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
	if !ok {
		// error is handled by createStreams
		return
	}
	defer ctx.conn.Close()

	err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
	if err != nil {
		if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
			rc := exitErr.ExitStatus()
			ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
				Status: metav1.StatusFailure,
				Reason: remotecommandconsts.NonZeroExitCodeReason,
				Details: &metav1.StatusDetails{
					Causes: []metav1.StatusCause{
						{
							Type:    remotecommandconsts.ExitCodeCauseType,
							Message: fmt.Sprintf("%d", rc),
						},
					},
				},
				Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
			}})
		} else {
			err = fmt.Errorf("error executing command in container: %v", err)
			runtime.HandleError(err)
			ctx.writeStatus(apierrors.NewInternalError(err))
		}
	} else {
		ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
			Status: metav1.StatusSuccess,
		}})
	}
}


在remotecommandserver.ServeExec中調用了executer.ExecInContainer 方法, 該executer接口的實現是criAdapter, criAdapter只是Runtime的一個wrapper,真正調用的是Runtime.Exec, Runtime是個interface,我們來看下具體在dockershim中的實現:

func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
	return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}

// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
	container, err := checkContainerStatus(r.client, containerID)
	if err != nil {
		return err
	}
	return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
}

其中execHandler是在初始化streamRuntime的時候定義的NativeExecHandler, 可以看到是直接調用libdocker api與docker進行交互

// NativeExecHandler executes commands in Docker containers using Docker's exec API.
type NativeExecHandler struct{}

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
	done := make(chan struct{})
	defer close(done)

	createOpts := dockertypes.ExecConfig{
		Cmd:          cmd,
		AttachStdin:  stdin != nil,
		AttachStdout: stdout != nil,
		AttachStderr: stderr != nil,
		Tty:          tty,
	}
	execObj, err := client.CreateExec(container.ID, createOpts)
	if err != nil {
		return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err)
	}

	// Have to start this before the call to client.StartExec because client.StartExec is a blocking
	// call :-( Otherwise, resize events don't get processed and the terminal never resizes.
	//
	// We also have to delay attempting to send a terminal resize request to docker until after the
	// exec has started; otherwise, the initial resize request will fail.
	execStarted := make(chan struct{})
	go func() {
		select {
		case <-execStarted:
			// client.StartExec has started the exec, so we can start resizing
		case <-done:
			// ExecInContainer has returned, so short-circuit
			return
		}

		kubecontainer.HandleResizing(resize, func(size remotecommand.TerminalSize) {
			client.ResizeExecTTY(execObj.ID, uint(size.Height), uint(size.Width))
		})
	}()

	startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
	streamOpts := libdocker.StreamOptions{
		InputStream:  stdin,
		OutputStream: stdout,
		ErrorStream:  stderr,
		RawTerminal:  tty,
		ExecStarted:  execStarted,
	}
	err = client.StartExec(execObj.ID, startOpts, streamOpts)
	if err != nil {
		return err
	}

	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()
	count := 0
	for {
		inspect, err2 := client.InspectExec(execObj.ID)
		if err2 != nil {
			return err2
		}
		if !inspect.Running {
			if inspect.ExitCode != 0 {
				err = &dockerExitError{inspect}
			}
			break
		}

		count++
		if count == 5 {
			klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID)
			break
		}

		<-ticker.C
	}

	return err
}


至此整個處理流程就結束了。

其實整個流程也比較簡單,就是各個runtime shim實現一個kubelet定義好的iterface streaming.Runtime, 然后就可以利用kubelet提供的一個統一的工具package簡單實現一個streaming server。該server負責兩件事情,1. getExec: 首先根據用戶請求的命令返回一個帶有token的url,重定向用戶請求到該url。2. serveExec: 隨后真正提供exec的服務,該exec調用各個runtime shim的具體實現.。

關於 RedirectContainerStreaming

在前面提到,如果kubelet開啟了RedirectContainerStreaming,則kubelet會將streaming URL返回給apiserver, 隨后apiserver會重定向到該streaming URL。這樣設計是為了避免所有的流式請求都經過kubelet對kubelet造成壓力, 但是從另外一方面考慮,這樣做的缺點是無法使用kubelet的認證功能。

那接下來我們仔細探究一下該參數所起到的真正作用,kubelet中默認runtime是docker,所以這里的研究對象就是dockershim:

一: 如果將RedirectContainerStreaming參數設置為true 則返回的URL類似於/cri/exec/aRbQe4pn,可以看到這里的域名默認是沒有hostname的,則重定向時默認重定向到原來的hostname,即kubelet監聽的hostname。 所以其實dockershim作為默認的runtime時,設置RedirectContainerStreaming為true並不會有什么本質的區別,對kubelet的性能影響並沒有減少,因為所有的流處理還是經過了kubelet。

上述/cri/exec/aRbQe4pn這個路徑在kubelet server中的對應handler為criHandler。 kubelet啟動的時候會對該criHandler進行賦值,將dockerService賦值給criHandler (此處docker service其實就是docker shim)

if crOptions.RedirectContainerStreaming {
	klet.criHandler = ds
}

dockerService中的實現為:

func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if ds.streamingServer != nil {
		ds.streamingServer.ServeHTTP(w, r)
	} else {
		http.NotFound(w, r)
	}
}

簡單調用了streamingSever的ServeHTTP方法:

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	s.handler.ServeHTTP(w, r)
}

在streamingServer會根據注冊的handler進行處理請, 這就又回到了我們上面提到的第二步: 提供streaming請求

整個workflow如下:

二: 如果設置RedirectContainerStreaming為false 則此時第一步獲取到的streaming URL形如:http://127.0.0.1:36699/exec/8rYzmQK9。 可以看到這里是帶有hostname的。 因為此時kubelet並不會將該URL返回給apiserver,會直接請求該URL進行代理,如此一來就可以通過127.0.0.1進行直接通信,這個localhost的端口必然是由docker shim來進行監聽:

// Start initializes and starts components in dockerService.
func (ds *dockerService) Start() error {
	ds.initCleanup()

	// Initialize the legacy cleanup flag.
	if ds.startLocalStreamingServer {
		go func() {
			if err := ds.streamingServer.Start(true); err != nil {
				klog.Fatalf("Streaming server stopped unexpectedly: %v", err)
			}
		}()
	}
	return ds.containerManager.Start()
}

在dockerShim中調用了streamingServer的Start方法啟動監聽端口:

func (s *server) Start(stayUp bool) error {
	if !stayUp {
		// TODO(tallclair): Implement this.
		return errors.New("stayUp=false is not yet implemented")
	}

	listener, err := net.Listen("tcp", s.config.Addr)
	if err != nil {
		return err
	}
	// Use the actual address as baseURL host. This handles the "0" port case.
	s.config.BaseURL.Host = listener.Addr().String()
	if s.config.TLSConfig != nil {
		return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
	}
	return s.server.Serve(listener)
}

通過啟動一個監聽在localhost的sever, 這就又回到了我們上面提到的第二步: 提供streaming請求

整個workflow如下:

仔細觀察這兩幅workflow圖片的差別就會發現, RedirectContainerStreaming 在默認的dockershim中並沒有實質的作用,此時還是建議將該值設置為false來提供流處理請求的認證功能。

Reference

Kubernetes 容器運行時演進
How does 'kubectl exec' work?
CRI Streaming Requests
kubernetes PR#38742: [CRI] Don't include user data in CRI streaming redirect URLs


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM