kubernetes代碼版本:v1.20
看這篇文章的前提:
-
有 golang 的基礎
-
對於 kubernetes 有基本的了解
kube-apiserver的啟動過程可以分為以下幾個步驟:
- 1.資源注冊
- 2.解析命令行參數
- 3.創建 apiserver 通用配置
- 4.創建 APIExtensionsServer
- 5.創建 KubeAPIServer
- 6.創建 AggregatorServer
- 7.啟動服務
並不會把每一步都仔細分析。我只挑主要的步驟進行研究一下。
1.資源注冊
首先要把 apiserver 支持的資源注冊到資源表中,資源注冊這一過程並不是通過函數調用實現的,而是使用 golang 的 import 導包機制實現資源注冊(golang 的導入機制我就不贅述了)。
整個組件的入口函數是這個文件:
k8s.io/kubernetes/cmd/kube-apiserver/apiserver.go
首先就是初始化 command 對象,執行了 app.NewAPIServerCommand()
函數。函數在 k8s.io/kubernetes/cmd/kube-apiserver/app/server.go
文件中。
可以發現在 import 的時候導入了這個包
k8s.io/kubernetes/pkg/api/legacyscheme
在 k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/apiserver/scheme/scheme.go
文件中,在 scheme 包里面,定義了三個全局變量
var (
// 資源注冊表
Scheme = runtime.NewScheme()
// 編、解碼器
Codecs = serializer.NewCodecFactory(Scheme)
// 參數編、解碼器
ParameterCodec = runtime.NewParameterCodec(Scheme)
)
這三個變量可以在 apiserver 組件的代碼中任何地方使用。
kube-apiserver 啟動時還導入了 controlplane 包,包中的 import_known_versions.go 文件調用了 kubernetes 支持資源的 install 包,代碼如下:
k8s.io/kubernetes/pkg/controlplane/import_known_versions.go
import (
// These imports are the API groups the API server will support.
_ "k8s.io/kubernetes/pkg/apis/admission/install"
_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
_ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/certificates/install"
_ "k8s.io/kubernetes/pkg/apis/coordination/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
_ "k8s.io/kubernetes/pkg/apis/discovery/install"
_ "k8s.io/kubernetes/pkg/apis/events/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
_ "k8s.io/kubernetes/pkg/apis/networking/install"
_ "k8s.io/kubernetes/pkg/apis/node/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install"
)
可以看到里面注冊的所有資源都是在 k8s.io/kubernetes/pkg/apis
文件下,都在本資源下面的 install 包里面 定義了自己的 init 方法。
以 admission 為例:
func Install(scheme *runtime.Scheme) {
// 首先注冊 admission 的內部版本
utilruntime.Must(admission.AddToScheme(scheme))
// 其他外部版本
utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
// 版本順序
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
}
***
簡單說一下什么叫做內部版本,什么叫做外部版本:
kubernetes 中每個資源都有自己的不同版本,例如 v1, v1/beat1, v1/alpha1 等等,當資源需要在不同版本之間轉換時,我們不能為每兩個不同的版本之間都寫一個轉換方法,如果后續有新的版本添加進來,就意味着需要為新的版本對應已經存在的每個版本都編寫轉換方法,復雜度會指數級上升。在kubernetes中則通過一個內部版本的設計來進行解決,內部版本是一個穩定的版本,所有的版本都只針對目標版本來進行轉換的實現,而不關注其他版本。可以理解為,這個所謂的’內部版本‘包含其他所有版本的所有字段,我們在進行轉換的時候先把對象轉換成內部版本,然后再從內部版本轉換為目標版本即可。
***
2.解析命令行參數
kubernetes 使用的是開源的命令行庫 Cobra,所有組件均使用這個解析庫,此庫在 github 上面是開源的,有興趣的可以自行了解一下。
我只簡單敘述一下:
Cobra既是一個用來創建強大的現代CLI命令行的golang庫,也是一個生成程序應用和命令行文件的程序。使用方法如下:
創建 Cmd 主命令對象,並在對象中定義各種 Run 方法(此處只是定義,並不是執行),執行順序是 PersistentPreRun --> PerRun --> Run --> PostRun --> PersistentPostRun.然后添加命令行參數(Flag),比如我們使用的 kubectl get pod -n kube-system
后面的 -n 就是 Flag,最后執行 command 對象的 Execute 方法回調我們此前定義的各種函數。
現在,讓我們在回到最初的 NewAPIServerCommand 函數中,看一看 kubernetes 的命令行解析代碼(不太重要的代碼我會省略):
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go
func NewAPIServerCommand() *cobra.Command {
// 初始化各個模塊的默認配置
s := options.NewServerRunOptions()
// 生成 cmd 對象
cmd := &cobra.Command{
...
// 定義方法
RunE: func(cmd *cobra.Command, args []string) error {
...
// 填充成完整的參數對象
completedOptions, err := Complete(s)
...
// 驗證參數的合法性
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// 將完全的參數對象傳入 Run 函數。Run 里面完成了 apiserver 組件的啟動邏輯,這是一個常駐進程。
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
...
}
...
}
順便回到 apiserver 的入口文件中,看看 main 文件中的啟動代碼:
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewAPIServerCommand()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
就是按照我上面說的順序定義的,kubernetes 中所有組件的啟動流程大體上都是這個樣子。
進入上面說的 Run 函數里面我們接着看,接下來的所有服務項的配置和創建都是在 CreateServerChain 函數中完成的。
CreateServerChain 是完成 server 初始化的方法,里面包含 APIExtensionsServer、KubeAPIServer、AggregatorServer 初始化的所有流程,最終返回 aggregatorapiserver.APIAggregator 實例,初始化流程主要有:http filter chain 的配置、API Group 的注冊、http path 與 handler 的關聯以及 handler 后端存儲 etcd 的配置。
3.創建 apiserver 通用配置
在 CreateServerChain 函數中,基本可以從函數名猜出來每一步在干什么,通用配置的創建就是在函數 CreateKubeAPIServerConfig() 中完成的。進入到以下函數中看一下詳細實現:CreateKubeAPIServerConfig() --> buildGenericConfig()
。
創建通用配置流程主要有以下幾步:
- 1.GenericConfig 實例化
代碼如下:
// 為 genericConfig 設置默認值。
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
// 啟動/禁止 GV 及 resource
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
...
// openAPI 規范
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
...
genericConfig.Version = &kubeVersion
- 2.StorageFactoryConfig
apiserver 組件使用 etcd 作為集群的存儲,系統中所使用的所有資源、集群狀態、配置等都在這上面保存。代碼部分如下:
// 初始化 storageFactoryConfig 配置對象。
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
// 初始化 etcd 相關的配置信息,補全配置對象,返回 completedStorageFactoryConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
}
// 根據上面補完的配置信息,創建 storageFactory 對象
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
return
}
- 3.Authorizer認證、授權配置
作為整個系統的存儲對象交互入口,每個系統的請求都需要經過認證、授權、准入控制器這些階段,准入控制器下面再說,涉及到的代碼如下:
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
這個有必要進入到函數內部在看一看具體做了些什么,按照下面的路徑點進去:
BuildAuthorizer() --> authorizationConfig.New()
New 函數主要就是在 for 循環中根據 config.AuthorizationModes 配置了 authorizers 和 ruleResolvers 兩個變量,這個 config.AuthorizationModes 是在最初在初始化配置對象執行 options.NewServerRunOptions() 的時候賦值的,具體路徑及代碼如下:
k8s.io/kubernetes/pkg/kubeapiserver/options/authorization.go
func NewBuiltInAuthorizationOptions() *BuiltInAuthorizationOptions {
return &BuiltInAuthorizationOptions{
// 初始賦值就一個 "AlwaysAllow" 字符串,這是默認的配置。
Modes: []string{authzmodes.ModeAlwaysAllow},
WebhookVersion: "v1beta1",
WebhookCacheAuthorizedTTL: 5 * time.Minute,
WebhookCacheUnauthorizedTTL: 30 * time.Second,
WebhookRetryBackoff: genericoptions.DefaultAuthWebhookRetryBackoff(),
}
}
在回到New()中,在函數最后返回了認證和授權對象。
return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
authorizers 是已啟用的認證器列表,union.New將它合並成一個認證器。
ruleResolvers 是已啟用的規則解析器,union.NewRuleResolvers 也是合並了一下
可以看到默認的授權是 AlwaysAllow,具體其它類型可以在啟動的時候在配置里面設置,只要配置了,就會實例化該授權對象,認證的時候會遍歷每一個授權器,有一個認證成功就ok。
- 4.Admission准入控制器配置
准入控制(Admission Control)在授權后對請求做進一步的驗證或添加默認參數,在對kubernetes api服務器的請求過程中,先經過認證、授權后,執行准入操作,再對目標對象進行操作
在對集群進行請求時,每個准入控制插件都按順序運行,只有全部插件都通過的請求才會進入系統,如果序列中的任何插件拒絕請求,則整個請求將被拒絕,並返回錯誤信息。
准入控制器是在初始化 ServerRunOptions 的時候 New 的,得再回到最開始的 NewAPIServerCommand() 函數中,
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go
s := options.NewServerRunOptions()
函數中執行的操作就是給哥哥配置賦默認值,找到 Admission
字段,他執行的賦值函數是
kubeoptions.NewAdmissionOptions()
這個函數中主要執行了兩個 RegisterAllAdmissionPlugins() 函數,來看代碼:
func NewAdmissionOptions() *AdmissionOptions {
// 在這個函數里面執行了一個 RegisterAllAdmissionPlugins 函數,進到函數里面能看到。
options := genericoptions.NewAdmissionOptions()
// 第二個 RegisterAllAdmissionPlugins 函數。
RegisterAllAdmissionPlugins(options.Plugins)
options.RecommendedPluginOrder = AllOrderedPlugins
options.DefaultOffPlugins = DefaultOffAdmissionPlugins()
return &AdmissionOptions{
GenericAdmission: options,
}
}
這兩個 RegisterAllAdmissionPlugins
所執行的注冊的插件不太一樣,但是能看到都是執行了不同組件中的 Register()
函數,其實這個操作就是把控制插件存放進配置對象中,上面的代碼最開始賦值的變量 options,是一個 AdmissionOptions 對象,
type AdmissionOptions struct {
RecommendedPluginOrder []string
DefaultOffPlugins sets.String
EnablePlugins []string
DisablePlugins []string
ConfigFile string
Plugins *admission.Plugins
Decorators admission.Decorators
}
對象中的 Plugins
字段就是存放插件的,是一個 admission.Plugins 對象:
type Factory func(config io.Reader) (Interface, error)
type Plugins struct {
// 並發保護
lock sync.Mutex
// 以鍵值對的形式存放插件,key 就是插件的名稱,value 是插件的實現函數,是上面的 Factory 對象
registry map[string]Factory
}
可以隨便點開一個插件的注冊代碼看一下,例如按照下面遞進的進入到函數里面:
NewAdmissionOptions() --> RegisterAllAdmissionPlugins() --> admit.Register() --> plugins.Register(name, func)
// 安全的把 name 和 Factory 對象保存到 registry 字段里。
func (ps *Plugins) Register(name string, plugin Factory) {
ps.lock.Lock()
defer ps.lock.Unlock()
if ps.registry != nil {
_, found := ps.registry[name]
if found {
klog.Fatalf("Admission plugin %q was registered twice", name)
}
} else {
ps.registry = map[string]Factory{}
}
klog.V(1).Infof("Registered admission plugin %q", name)
ps.registry[name] = plugin
}
4.創建 APIExtensionsServer
回到 CreateServerChain 函數中,接下來就是創建 apiExtensionsServer,先創建 config 對象,然后根據 config 創建 server:
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
直接看 createAPIExtensionsServer 函數的實現,首先用 config 對象生成了 completeConfig,然后執行 New。
- 1.創建 genericServer
涉及到的代碼部分:
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
genericServer 是所有的 server 對象都會依賴的基礎服務。
- 2.實例化 apiResourceConfig
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
- 3.實例化 apiGroupInfo
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
// 判斷對應的資源組是否開啟,如果開啟,則將 group、version 以及 resource 與資源對象進行映射並存儲到 VersionedResourcesStorageMap 字段中
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
// customresourcedefinitions
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
}
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
...
}
- 4.注冊APIGroup
這一步非常重要,涉及到一個第三方庫 go-restful 的一些知識,主要就是因為該框架可定制程度最靈活,但同時也意味着使用起來更加難以理解。
接下來按照函數調用的遞進關系一層一層的簡單記錄一下:
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
InstallAPIGroup
函數直接調用了 s.InstallAPIGroups(apiGroupInfo)
接下來做的就是遍歷 apiGroupInfos,將<資源組>/<資源對象>/<資源名稱> 映射到 HTTP PATH 中去。
for _, apiGroupInfo := range apiGroupInfos {
// 主要的流程是這個函數
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
...
}
在 installAPIResources
函數中通過 InstallREST
函數將資源存儲對象作為資源的 Handlers 方法添加,最后使用 go-restful 中的方法將路徑和方法進行注冊。
InstallREST
接受一個 Container 對象,此 Container 只是 go-restful 中的概念,和 docker 並沒有任何關系,一個 Container 監聽一個端口,一個 Container 就相當於一個 HTTP Server,對外提供 HTTP 服務。每個 Container 可以包含多個 WebServer,WebServer 相當於一組不同的服務集合,例如我們設計一個圖書管理系統,和書本相關的都可以放在 /books 路徑下,包含各種增刪改查的業務,和用戶相關的都可以放在 /users 下面,這就相當於兩個服務。每個 WebServer 下面可以包含多個 Router。
Container 監聽的端口接受到請求,分發給對應的 WebServer,然后在匹到具體的 Router,調用對應的 Handlers 去處理。
簡單說明一下之后可以看看代碼:
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
// 定義了 PATH,表現形式為 <apiPrefix>/<group>/<version>,比如 /apis/apiextensions.k8s.io/v1beta1
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
// 實例化 APIInstaller
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
// 創建一個 WebServer,即返回值的第三個 ws,為資源注冊對應的 handlers 方法,完成資源與方法的綁定並且注冊
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
// 將 WebServer 添加到 container 中
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}