kubernetes kube-apiserver啟動流程分析


kubernetes代碼版本:v1.20

看這篇文章的前提:

  • 有 golang 的基礎

  • 對於 kubernetes 有基本的了解

kube-apiserver的啟動過程可以分為以下幾個步驟

  • 1.資源注冊
  • 2.解析命令行參數
  • 3.創建 apiserver 通用配置
  • 4.創建 APIExtensionsServer
  • 5.創建 KubeAPIServer
  • 6.創建 AggregatorServer
  • 7.啟動服務

並不會把每一步都仔細分析。我只挑主要的步驟進行研究一下。

1.資源注冊

首先要把 apiserver 支持的資源注冊到資源表中,資源注冊這一過程並不是通過函數調用實現的,而是使用 golang 的 import 導包機制實現資源注冊(golang 的導入機制我就不贅述了)。

整個組件的入口函數是這個文件:

k8s.io/kubernetes/cmd/kube-apiserver/apiserver.go

首先就是初始化 command 對象,執行了 app.NewAPIServerCommand() 函數。函數在 k8s.io/kubernetes/cmd/kube-apiserver/app/server.go 文件中。

可以發現在 import 的時候導入了這個包

k8s.io/kubernetes/pkg/api/legacyscheme

k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/apiserver/scheme/scheme.go 文件中,在 scheme 包里面,定義了三個全局變量

var (
    // 資源注冊表
	Scheme = runtime.NewScheme()
    // 編、解碼器
	Codecs = serializer.NewCodecFactory(Scheme)
    // 參數編、解碼器
	ParameterCodec = runtime.NewParameterCodec(Scheme)
)

這三個變量可以在 apiserver 組件的代碼中任何地方使用。

kube-apiserver 啟動時還導入了 controlplane 包,包中的 import_known_versions.go 文件調用了 kubernetes 支持資源的 install 包,代碼如下:
k8s.io/kubernetes/pkg/controlplane/import_known_versions.go

import (
	// These imports are the API groups the API server will support.
	_ "k8s.io/kubernetes/pkg/apis/admission/install"
	_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
	_ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
	_ "k8s.io/kubernetes/pkg/apis/apps/install"
	_ "k8s.io/kubernetes/pkg/apis/authentication/install"
	_ "k8s.io/kubernetes/pkg/apis/authorization/install"
	_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
	_ "k8s.io/kubernetes/pkg/apis/batch/install"
	_ "k8s.io/kubernetes/pkg/apis/certificates/install"
	_ "k8s.io/kubernetes/pkg/apis/coordination/install"
	_ "k8s.io/kubernetes/pkg/apis/core/install"
	_ "k8s.io/kubernetes/pkg/apis/discovery/install"
	_ "k8s.io/kubernetes/pkg/apis/events/install"
	_ "k8s.io/kubernetes/pkg/apis/extensions/install"
	_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
	_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
	_ "k8s.io/kubernetes/pkg/apis/networking/install"
	_ "k8s.io/kubernetes/pkg/apis/node/install"
	_ "k8s.io/kubernetes/pkg/apis/policy/install"
	_ "k8s.io/kubernetes/pkg/apis/rbac/install"
	_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
	_ "k8s.io/kubernetes/pkg/apis/storage/install"
)

可以看到里面注冊的所有資源都是在 k8s.io/kubernetes/pkg/apis 文件下,都在本資源下面的 install 包里面 定義了自己的 init 方法。

以 admission 為例:

func Install(scheme *runtime.Scheme) {
    // 首先注冊 admission 的內部版本
	utilruntime.Must(admission.AddToScheme(scheme))
    // 其他外部版本
	utilruntime.Must(v1beta1.AddToScheme(scheme))
	utilruntime.Must(v1.AddToScheme(scheme))
    // 版本順序
	utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
}
***
簡單說一下什么叫做內部版本,什么叫做外部版本:
kubernetes 中每個資源都有自己的不同版本,例如 v1, v1/beat1, v1/alpha1 等等,當資源需要在不同版本之間轉換時,我們不能為每兩個不同的版本之間都寫一個轉換方法,如果后續有新的版本添加進來,就意味着需要為新的版本對應已經存在的每個版本都編寫轉換方法,復雜度會指數級上升。在kubernetes中則通過一個內部版本的設計來進行解決,內部版本是一個穩定的版本,所有的版本都只針對目標版本來進行轉換的實現,而不關注其他版本。可以理解為,這個所謂的’內部版本‘包含其他所有版本的所有字段,我們在進行轉換的時候先把對象轉換成內部版本,然后再從內部版本轉換為目標版本即可。
***

2.解析命令行參數

kubernetes 使用的是開源的命令行庫 Cobra,所有組件均使用這個解析庫,此庫在 github 上面是開源的,有興趣的可以自行了解一下。

我只簡單敘述一下:
Cobra既是一個用來創建強大的現代CLI命令行的golang庫,也是一個生成程序應用和命令行文件的程序。使用方法如下:

創建 Cmd 主命令對象,並在對象中定義各種 Run 方法(此處只是定義,並不是執行),執行順序是 PersistentPreRun --> PerRun --> Run --> PostRun --> PersistentPostRun.然后添加命令行參數(Flag),比如我們使用的 kubectl get pod -n kube-system 后面的 -n 就是 Flag,最后執行 command 對象的 Execute 方法回調我們此前定義的各種函數。

現在,讓我們在回到最初的 NewAPIServerCommand 函數中,看一看 kubernetes 的命令行解析代碼(不太重要的代碼我會省略):

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go

func NewAPIServerCommand() *cobra.Command {
    // 初始化各個模塊的默認配置
	s := options.NewServerRunOptions()
    // 生成 cmd 對象
	cmd := &cobra.Command{
		...
        // 定義方法
		RunE: func(cmd *cobra.Command, args []string) error {
			...
            // 填充成完整的參數對象
			completedOptions, err := Complete(s)
			...
            // 驗證參數的合法性
			if errs := completedOptions.Validate(); len(errs) != 0 {
				return utilerrors.NewAggregate(errs)
			}
            // 將完全的參數對象傳入 Run 函數。Run 里面完成了 apiserver 組件的啟動邏輯,這是一個常駐進程。
			return Run(completedOptions, genericapiserver.SetupSignalHandler())
		},
        ...
	}
	...
}

順便回到 apiserver 的入口文件中,看看 main 文件中的啟動代碼:

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewAPIServerCommand()

	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

就是按照我上面說的順序定義的,kubernetes 中所有組件的啟動流程大體上都是這個樣子。


進入上面說的 Run 函數里面我們接着看,接下來的所有服務項的配置和創建都是在 CreateServerChain 函數中完成的。

CreateServerChain 是完成 server 初始化的方法,里面包含 APIExtensionsServer、KubeAPIServer、AggregatorServer 初始化的所有流程,最終返回 aggregatorapiserver.APIAggregator 實例,初始化流程主要有:http filter chain 的配置、API Group 的注冊、http path 與 handler 的關聯以及 handler 后端存儲 etcd 的配置。


3.創建 apiserver 通用配置

在 CreateServerChain 函數中,基本可以從函數名猜出來每一步在干什么,通用配置的創建就是在函數 CreateKubeAPIServerConfig() 中完成的。進入到以下函數中看一下詳細實現:CreateKubeAPIServerConfig() --> buildGenericConfig()
創建通用配置流程主要有以下幾步:

  • 1.GenericConfig 實例化

代碼如下:

// 為 genericConfig 設置默認值。
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
// 啟動/禁止 GV 及 resource
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
...
// openAPI 規范
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
...
genericConfig.Version = &kubeVersion
  • 2.StorageFactoryConfig

apiserver 組件使用 etcd 作為集群的存儲,系統中所使用的所有資源、集群狀態、配置等都在這上面保存。代碼部分如下:

// 初始化 storageFactoryConfig 配置對象。
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
// 初始化 etcd 相關的配置信息,補全配置對象,返回 completedStorageFactoryConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
    lastErr = err
    return
}
// 根據上面補完的配置信息,創建 storageFactory 對象
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
    return
}
  • 3.Authorizer認證、授權配置

作為整個系統的存儲對象交互入口,每個系統的請求都需要經過認證、授權、准入控制器這些階段,准入控制器下面再說,涉及到的代碼如下:

genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)

這個有必要進入到函數內部在看一看具體做了些什么,按照下面的路徑點進去:

BuildAuthorizer() --> authorizationConfig.New()

New 函數主要就是在 for 循環中根據 config.AuthorizationModes 配置了 authorizers 和 ruleResolvers 兩個變量,這個 config.AuthorizationModes 是在最初在初始化配置對象執行 options.NewServerRunOptions() 的時候賦值的,具體路徑及代碼如下:
k8s.io/kubernetes/pkg/kubeapiserver/options/authorization.go

func NewBuiltInAuthorizationOptions() *BuiltInAuthorizationOptions {
	return &BuiltInAuthorizationOptions{
        // 初始賦值就一個 "AlwaysAllow" 字符串,這是默認的配置。
		Modes:                       []string{authzmodes.ModeAlwaysAllow},
		WebhookVersion:              "v1beta1",
		WebhookCacheAuthorizedTTL:   5 * time.Minute,
		WebhookCacheUnauthorizedTTL: 30 * time.Second,
		WebhookRetryBackoff:         genericoptions.DefaultAuthWebhookRetryBackoff(),
	}
}

在回到New()中,在函數最后返回了認證和授權對象。

return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil

authorizers 是已啟用的認證器列表,union.New將它合並成一個認證器。

ruleResolvers 是已啟用的規則解析器,union.NewRuleResolvers 也是合並了一下

可以看到默認的授權是 AlwaysAllow,具體其它類型可以在啟動的時候在配置里面設置,只要配置了,就會實例化該授權對象,認證的時候會遍歷每一個授權器,有一個認證成功就ok。

  • 4.Admission准入控制器配置

准入控制(Admission Control)在授權后對請求做進一步的驗證或添加默認參數,在對kubernetes api服務器的請求過程中,先經過認證、授權后,執行准入操作,再對目標對象進行操作

在對集群進行請求時,每個准入控制插件都按順序運行,只有全部插件都通過的請求才會進入系統,如果序列中的任何插件拒絕請求,則整個請求將被拒絕,並返回錯誤信息。

准入控制器是在初始化 ServerRunOptions 的時候 New 的,得再回到最開始的 NewAPIServerCommand() 函數中,
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go

s := options.NewServerRunOptions()

函數中執行的操作就是給哥哥配置賦默認值,找到 Admission 字段,他執行的賦值函數是

kubeoptions.NewAdmissionOptions()

這個函數中主要執行了兩個 RegisterAllAdmissionPlugins() 函數,來看代碼:

func NewAdmissionOptions() *AdmissionOptions {
	// 在這個函數里面執行了一個 RegisterAllAdmissionPlugins 函數,進到函數里面能看到。
	options := genericoptions.NewAdmissionOptions()
	// 第二個 RegisterAllAdmissionPlugins 函數。
	RegisterAllAdmissionPlugins(options.Plugins)
	options.RecommendedPluginOrder = AllOrderedPlugins
	options.DefaultOffPlugins = DefaultOffAdmissionPlugins()

	return &AdmissionOptions{
		GenericAdmission: options,
	}
}

這兩個 RegisterAllAdmissionPlugins 所執行的注冊的插件不太一樣,但是能看到都是執行了不同組件中的 Register() 函數,其實這個操作就是把控制插件存放進配置對象中,上面的代碼最開始賦值的變量 options,是一個 AdmissionOptions 對象,

type AdmissionOptions struct {
	RecommendedPluginOrder []string
	DefaultOffPlugins sets.String

	EnablePlugins []string
	DisablePlugins []string
	ConfigFile string
	Plugins *admission.Plugins
	Decorators admission.Decorators
}

對象中的 Plugins 字段就是存放插件的,是一個 admission.Plugins 對象:

type Factory func(config io.Reader) (Interface, error)

type Plugins struct {
	// 並發保護
	lock     sync.Mutex
	// 以鍵值對的形式存放插件,key 就是插件的名稱,value 是插件的實現函數,是上面的 Factory 對象
	registry map[string]Factory
}

可以隨便點開一個插件的注冊代碼看一下,例如按照下面遞進的進入到函數里面:
NewAdmissionOptions() --> RegisterAllAdmissionPlugins() --> admit.Register() --> plugins.Register(name, func)

// 安全的把 name 和 Factory 對象保存到 registry 字段里。
func (ps *Plugins) Register(name string, plugin Factory) {
	ps.lock.Lock()
	defer ps.lock.Unlock()
	if ps.registry != nil {
		_, found := ps.registry[name]
		if found {
			klog.Fatalf("Admission plugin %q was registered twice", name)
		}
	} else {
		ps.registry = map[string]Factory{}
	}

	klog.V(1).Infof("Registered admission plugin %q", name)
	ps.registry[name] = plugin
}

4.創建 APIExtensionsServer

回到 CreateServerChain 函數中,接下來就是創建 apiExtensionsServer,先創建 config 對象,然后根據 config 創建 server:

apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
	serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
	return nil, err
}
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
	return nil, err
}

直接看 createAPIExtensionsServer 函數的實現,首先用 config 對象生成了 completeConfig,然后執行 New。

  • 1.創建 genericServer

涉及到的代碼部分:

genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)

genericServer 是所有的 server 對象都會依賴的基礎服務。

  • 2.實例化 apiResourceConfig
s := &CustomResourceDefinitions{
		GenericAPIServer: genericServer,
	}
  • 3.實例化 apiGroupInfo
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
// 判斷對應的資源組是否開啟,如果開啟,則將 group、version 以及 resource 與資源對象進行映射並存儲到 VersionedResourcesStorageMap 字段中
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
	storage := map[string]rest.Storage{}
	// customresourcedefinitions
	customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
	if err != nil {
		return nil, err
	}
	storage["customresourcedefinitions"] = customResourceDefinitionStorage
	storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)

	apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
}
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
	...
}
  • 4.注冊APIGroup

這一步非常重要,涉及到一個第三方庫 go-restful 的一些知識,主要就是因為該框架可定制程度最靈活,但同時也意味着使用起來更加難以理解。

接下來按照函數調用的遞進關系一層一層的簡單記錄一下:

if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
		return nil, err
	}

InstallAPIGroup 函數直接調用了 s.InstallAPIGroups(apiGroupInfo)

接下來做的就是遍歷 apiGroupInfos,將<資源組>/<資源對象>/<資源名稱> 映射到 HTTP PATH 中去。

for _, apiGroupInfo := range apiGroupInfos {
	// 主要的流程是這個函數
	if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
		return fmt.Errorf("unable to install api resources: %v", err)
	}
	...
}

installAPIResources 函數中通過 InstallREST 函數將資源存儲對象作為資源的 Handlers 方法添加,最后使用 go-restful 中的方法將路徑和方法進行注冊。

InstallREST 接受一個 Container 對象,此 Container 只是 go-restful 中的概念,和 docker 並沒有任何關系,一個 Container 監聽一個端口,一個 Container 就相當於一個 HTTP Server,對外提供 HTTP 服務。每個 Container 可以包含多個 WebServer,WebServer 相當於一組不同的服務集合,例如我們設計一個圖書管理系統,和書本相關的都可以放在 /books 路徑下,包含各種增刪改查的業務,和用戶相關的都可以放在 /users 下面,這就相當於兩個服務。每個 WebServer 下面可以包含多個 Router。

Container 監聽的端口接受到請求,分發給對應的 WebServer,然后在匹到具體的 Router,調用對應的 Handlers 去處理。

簡單說明一下之后可以看看代碼:

func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
	// 定義了 PATH,表現形式為 <apiPrefix>/<group>/<version>,比如 /apis/apiextensions.k8s.io/v1beta1
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	// 實例化 APIInstaller
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}
	// 創建一個 WebServer,即返回值的第三個 ws,為資源注冊對應的 handlers 方法,完成資源與方法的綁定並且注冊
	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
	versionDiscoveryHandler.AddToWebService(ws)
	// 將 WebServer 添加到 container 中
	container.Add(ws)
	return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}

5.創建 KubeAPIServer

6.創建 AggregatorServer

7.啟動 服務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM