作者:杜楊浩,來源:公眾號騰訊雲原生
前言
整個Kubernetes技術體系由聲明式API以及Controller構成,而kube-apiserver是Kubernetes的聲明式api server,並為其它組件交互提供了橋梁,因此加深對kube-apiserver的理解就顯得至關重要了。

整體組件功能
kube-apiserver作為整個Kubernetes集群操作etcd的唯一入口,負責Kubernetes各資源的認證&鑒權、校驗以及CRUD等操作,提供RESTful APIs,供其它組件調用:

kube-apiserver包含三種APIServer:
- aggregatorServer:負責處理 apiregistration.k8s.io 組下的APIService資源請求,同時將來自用戶的請求攔截轉發給aggregated server(AA)
- kubeAPIServer:負責對請求的一些通用處理,包括:認證、鑒權以及各個內建資源(pod, deployment,service and etc)的REST服務等
- apiExtensionsServer:負責 Custom Resource Definition(CRD)apiResources 以及 apiVersions 的注冊,同時處理CRD以及相應 Custom Resource(CR) 的REST請求(如果對應CR不能被處理的話則會返回404),也是apiserver Delegation的最后一環
另外還包括bootstrap-controller,主要負責Kubernetes default apiserver service的創建以及管理。
接下來將對上述組件進行概覽性總結。
bootstrap-controller
apiserver bootstrap-controller
創建&運行邏輯在k8s.io/kubernetes/pkg/master
目錄bootstrap-controller
主要用於創建以及維護內部 kubernetes default apiserver servicekubernetes default apiserver service spec.selector
為空,這是 default apiserver service 與其它正常service的最大區別,表明了這個特殊的service對應的endpoints不由endpoints controller控制,而是直接受kube-apiserver bootstrap-controller
管理(maintained by this code, not by the pod selector)bootstrap-controller
的幾個主要功能如下:- 創建 default、kube-system 和 kube-public 以及 kube-node-lease 命名空間
- 創建&維護 kubernetes default apiserver service 以及對應的 endpoint
- 提供基於 Service ClusterIP 的檢查及修復功能(--service-cluster-ip-range指定范圍)
- 提供基於 Service NodePort 的檢查及修復功能(--service-node-port-range指定范圍)
// k8s.io/kubernetes/pkg/master/controller.go:142
// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start() {
if c.runner != nil {
return
}
// Reconcile during first run removing itself until server is ready.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// run all of the controllers once prior to returning from Start.
if err := repairClusterIPs.RunOnce(); err != nil {
// If we fail to repair cluster IPs apiserver is useless. We should restart and retry.
klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
}
if err := repairNodePorts.RunOnce(); err != nil {
// If we fail to repair node ports apiserver is useless. We should restart and retry.
klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
}
// 定期執行bootstrap controller主要的四個功能(reconciliation)
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
c.runner.Start()
更多代碼原理詳情,參考 kubernetes-reading-notes
kubeAPIServer
KubeAPIServer主要提供對內建API Resources的操作請求,為Kubernetes中各API Resources注冊路由信息,同時暴露RESTful API,使集群中以及集群外的服務都可以通過RESTful API操作Kubernetes中的資源。
另外,kubeAPIServer是整個Kubernetes apiserver的核心,下面將要講述的aggregatorServer以及apiExtensionsServer都是建立在kubeAPIServer基礎上進行擴展的(補充了Kubernetes對用戶自定義資源的能力支持)
kubeAPIServer最核心的功能是為Kubernetes內置資源添加路由,如下:
- 調用
m.InstallLegacyAPI
將核心 API Resources 添加到路由中,在 apiserver 中即是以/api
開頭的 resource - 調用
m.InstallAPIs
將擴展的 API Resources 添加到路由中,在 apiserver 中即是以/apis
開頭的 resource
/ k8s.io/kubernetes/pkg/master/master.go:332
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
...
// 安裝 LegacyAPI(core API)
// install legacy rest storage
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
...
// 安裝 APIs(named groups apis)
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...
return m, nil
}
整個kubeAPIServer提供了三類API Resource接口:
- core group:主要在
/api/v1
下 - named groups:其 path 為
/apis/$GROUP/$VERSION
- 系統狀態的一些 API:如
/metrics
、/version
等
而API的URL大致以 /apis/{group}/{version}/namespaces/{namespace}/resource/{name}
組成,結構如下圖所示:

kubeAPIServer會為每種API資源創建對應的RESTStorage
,RESTStorage的目的是將每種資源的訪問路徑及其后端存儲的操作對應起來:通過構造的REST Storage實現的接口判斷該資源可以執行哪些操作(如:create、update等),將其對應的操作存入到action中,每一個操作對應一個標准的REST method,如create對應REST method為POST
,而update對應REST method為PUT
。最終根據actions數組依次遍歷,對每一個操作添加一個handler(handler對應REST Storage實現的相關接口),並注冊到route,最終對外提供RESTful API,如下:
// m.GenericAPIServer.InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers
// k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
...
// 1、判斷該 resource 實現了哪些 REST 操作接口,以此來判斷其支持的 verbs 以便為其添加路由
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
...
// 2、為 resource 添加對應的 actions(+根據是否支持 namespace)
// Get the list of actions for the given scope.
switch {
case !namespaceScoped:
// Handle non-namespace scoped resources like nodes.
resourcePath := resource
resourceParams := params
itemPath := resourcePath + "/{name}"
nameParams := append(params, nameParam)
proxyParams := append(nameParams, pathParam)
...
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
// Add actions at the resource path: /api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
...
}
...
// 3、從 rest.Storage 到 restful.Route 映射
// 為每個操作添加對應的 handler
for _, action := range actions {
...
switch action.Verb {
...
case "POST": // Create a resource.
var handler restful.RouteFunction
// 4、初始化 handler
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
...
// 5、route 與 handler 進行綁定
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
// TODO: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle multiple different objects being returned.
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
// 6、添加到路由中
routes = append(routes, route)
case "DELETE": // Delete a resource.
...
default:
return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
}
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
ws.Route(route)
}
// Note: update GetAuthorizerAttributes() when adding a custom handler.
}
...
}
kubeAPIServer代碼結構整理如下:
- apiserver整體啟動邏輯:k8s.io/kubernetes/cmd/kube-apiserver
- apiserver bootstrap-controller創建&運行邏輯:k8s.io/kubernetes/pkg/master
- API Resource對應后端RESTStorage(based on genericregistry.Store)創建:k8s.io/kubernetes/pkg/registry
- aggregated-apiserver創建&處理邏輯:k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator
- extensions-apiserver創建&處理邏輯:k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver
- apiserver創建&運行:k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server
- 注冊API Resource資源處理handler(InstallREST&Install®isterResourceHandlers):k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints
- 創建存儲后端(etcdv3):k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage
- genericregistry.Store.CompleteWithOptions初始化:k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry
調用鏈整理如下:

更多代碼原理詳情,參考 kubernetes-reading-notes
aggregatorServer
aggregatorServer主要用於處理擴展Kubernetes API Resources的第二種方式Aggregated APIServer(AA),將CR請求代理給AA:

這里結合 Kubernetes 官方給出的a ggregated apiserver 例子 sample-apiserver,總結原理如下:
aggregatorServer
通過 APIServices 對象關聯到某個 Service 來進行請求的轉發,其關聯的 Service 類型進一步決定了請求轉發的形式。aggregatorServer 包括一個GenericAPIServer
和維護自身狀態的 Controller。其中 GenericAPIServer 主要處理apiregistration.k8s.io
組下的 APIService 資源請求,而Controller包括:apiserviceRegistrationController
:負責根據 APIService 定義的 aggregated server service 構建代理,將CR的請求轉發給后端的 aggregated serveravailableConditionController
:維護 APIServices 的可用狀態,包括其引用 Service 是否可用等autoRegistrationController
:用於保持 API 中存在的一組特定的 APIServicescrdRegistrationController
:負責將 CRD GroupVersions 自動注冊到 APIServices 中openAPIAggregationController
:將 APIServices 資源的變化同步至提供的 OpenAPI 文檔
apiserviceRegistrationController
負責根據 APIService 定義的 aggregated server service 構建代理,將CR的請求轉發給后端的 aggregated server。apiService 有兩種類型:Local(Service為空)以及Service(Service非空)。apiserviceRegistrationController 負責對這兩種類型 apiService 設置代理:Local 類型會直接路由給kube-apiserver
進行處理;而Service類型則會設置代理並將請求轉化為對aggregated Service
的請求(proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
),而請求的負載均衡策略則是優先本地訪問 kube-apiserver(如果service為kubernetes default apiserver service:443
) =>通過service ClusterIP:Port
訪問(默認)或者通過隨機選擇service endpoint backend
進行訪問:
func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
...
proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
// v1. is a special case for the legacy API. It proxies to a wider set of endpoints.
if apiService.Name == legacyAPIServiceName {
proxyPath = "/api"
}
// register the proxy handler
proxyHandler := &proxyHandler{
localDelegate: s.delegateHandler,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
proxyTransport: s.proxyTransport,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
}
...
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
...
// it's time to register the group aggregation endpoint
groupPath := "/apis/" + apiService.Spec.Group
groupDiscoveryHandler := &apiGroupHandler{
codecs: aggregatorscheme.Codecs,
groupName: apiService.Spec.Group,
lister: s.lister,
delegate: s.delegateHandler,
}
// aggregation is protected
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
s.handledGroups.Insert(apiService.Spec.Group)
return nil
}
// k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go:109
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// 加載roxyHandlingInfo處理請求
value := r.handlingInfo.Load()
if value == nil {
r.localDelegate.ServeHTTP(w, req)
return
}
handlingInfo := value.(proxyHandlingInfo)
...
// 判斷APIService服務是否正常
if !handlingInfo.serviceAvailable {
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
}
// 將原始請求轉化為對APIService的請求
// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
if err != nil {
klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
}
location.Host = rloc.Host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
newReq, cancelFn := newRequestForProxy(location, req)
defer cancelFn()
...
proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
handler.ServeHTTP(w, newReq)
}
$ kubectl get APIService
NAME SERVICE AVAILABLE AGE
...
v1.apps Local True 50d
...
v1beta1.metrics.k8s.io kube-system/metrics-server True 50d
...
# default APIServices
$ kubectl get -o yaml APIService/v1.apps
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
kube-aggregator.kubernetes.io/automanaged: onstart
name: v1.apps
selfLink: /apis/apiregistration.k8s.io/v1/apiservices/v1.apps
spec:
group: apps
groupPriorityMinimum: 17800
version: v1
versionPriority: 15
status:
conditions:
- lastTransitionTime: "2020-10-20T10:39:48Z"
message: Local APIServices are always available
reason: Local
status: "True"
type: Available
# aggregated server
$ kubectl get -o yaml APIService/v1beta1.metrics.k8s.io
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
addonmanager.kubernetes.io/mode: Reconcile
kubernetes.io/cluster-service: "true"
name: v1beta1.metrics.k8s.io
selfLink: /apis/apiregistration.k8s.io/v1/apiservices/v1beta1.metrics.k8s.io
spec:
group: metrics.k8s.io
groupPriorityMinimum: 100
insecureSkipTLSVerify: true
service:
name: metrics-server
namespace: kube-system
port: 443
version: v1beta1
versionPriority: 100
status:
conditions:
- lastTransitionTime: "2020-12-05T00:50:48Z"
message: all checks passed
reason: Passed
status: "True"
type: Available
# CRD
$ kubectl get -o yaml APIService/v1.duyanghao.example.com
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
kube-aggregator.kubernetes.io/automanaged: "true"
name: v1.duyanghao.example.com
selfLink: /apis/apiregistration.k8s.io/v1/apiservices/v1.duyanghao.example.com
spec:
group: duyanghao.example.com
groupPriorityMinimum: 1000
version: v1
versionPriority: 100
status:
conditions:
- lastTransitionTime: "2020-12-11T08:45:37Z"
message: Local APIServices are always available
reason: Local
status: "True"
type: Available
aggregatorServer
創建過程中會根據所有 kube-apiserver 定義的API資源創建默認的 APIService 列表,名稱即是$VERSION.$GROUP
,這些 APIService 都會有標簽kube-aggregator.kubernetes.io/automanaged: onstart
,例如:v1.apps apiService。autoRegistrationController
創建並維護這些列表中的 APIService,也即我們看到的 Local apiService;對於自定義的APIService(aggregated server),則不會對其進行處理。aggregated server
實現CR(自定義API資源)的 CRUD API 接口,並可以靈活選擇后端存儲,可以與 core kube-apiserver 一起公用etcd,也可自己獨立部署etcd數據庫或者其它數據庫。 aggregated server 實現的CR API路徑為:/apis/VERSION,具體到 sample apiserver 為:/apis/wardle.example.com/v1alpha1,下面的資源類型有:flunders 以及 fischersaggregated server
通過部署 APIService 類型資源、service fields 指向對應的 aggregated server service 實現與 core kube-apiserver 的集成與交互sample-apiserver
目錄結構如下,可參考編寫自己的 aggregated server:
staging/src/k8s.io/sample-apiserver
├── artifacts
│ ├── example
│ │ ├── apiservice.yaml
...
├── hack
├── main.go
└── pkg
├── admission
├── apis
├── apiserver
├── cmd
├── generated
│ ├── clientset
│ │ └── versioned
...
│ │ └── typed
│ │ └── wardle
│ │ ├── v1alpha1
│ │ └── v1beta1
│ ├── informers
│ │ └── externalversions
│ │ └── wardle
│ │ ├── v1alpha1
│ │ └── v1beta1
│ ├── listers
│ │ └── wardle
│ │ ├── v1alpha1
│ │ └── v1beta1
└── registry
- 其中,artifacts用於部署yaml示例
- hack目錄存放自動腳本(eg: update-codegen)
- main.go是aggregated server啟動入口;pkg/cmd負責啟動aggregated server具體邏輯;pkg/apiserver用於aggregated server初始化以及路由注冊
- pkg/apis負責相關CR的結構體定義,自動生成(update-codegen)
- pkg/admission負責准入的相關代碼
- pkg/generated負責生成訪問CR的clientset,informers,以及listers
- pkg/registry目錄負責CR相關的RESTStorage實現
更多代碼原理詳情,參考 kubernetes-reading-notes
apiExtensionsServer
apiExtensionsServer主要負責CustomResourceDefinition(CRD) apiResources以及apiVersions的注冊,同時處理CRD以及相應CustomResource(CR)的REST請求(如果對應CR不能被處理的話則會返回404),也是apiserver Delegation的最后一環。
原理總結如下:
- Custom Resource,簡稱CR,是Kubernetes自定義資源類型,與之相對應的就是Kubernetes內置的各種資源類型,例如Pod、Service等。利用CR我們可以定義任何想要的資源類型。
- CRD通過yaml文件的形式向Kubernetes注冊CR實現自定義api-resources,屬於第二種擴展Kubernetes API資源的方式,也是普遍使用的一種。
- APIExtensionServer負責CustomResourceDefinition(CRD)apiResources以及apiVersions的注冊,同時處理CRD以及相應CustomResource(CR)的REST請求(如果對應CR不能被處理的話則會返回404),也是apiserver Delegation的最后一環。
- crdRegistrationController負責將CRD GroupVersions自動注冊到APIServices中。具體邏輯為:枚舉所有CRDs,然后根據CRD定義的crd.Spec.Group以及crd.Spec.Versions字段構建APIService,並添加到autoRegisterController.apiServicesToSync中,由autoRegisterController進行創建以及維護操作。這也是為什么創建完CRD后會產生對應的APIService對象。
- APIExtensionServer包含的controller以及功能如下所示:
1)openapiController
:將 crd 資源的變化同步至提供的 OpenAPI 文檔,可通過訪問 /openapi/v2 進行查看;
2)crdController
:負責將 crd 信息注冊到 apiVersions 和 apiResources 中,兩者的信息可通過 kubectl api-versions 和 kubectl api-resources 查看;
3)kubectl api-versions
命令返回所有Kubernetes集群資源的版本信息(實際發出了兩個請求,分別是https://127.0.0.1:6443/api以及https://127.0.0.1:6443/apis,並在最后將兩個請求的返回結果進行了合並)
$ kubectl -v=8 api-versions
I1211 11:44:50.276446 22493 loader.go:375] Config loaded from file: /root/.kube/config
I1211 11:44:50.277005 22493 round_trippers.go:420] GET https://127.0.0.1:6443/api?timeout=32s
...
I1211 11:44:50.290265 22493 request.go:1068] Response Body: {"kind":"APIVersions","versions":["v1"],"serverAddressByClientCIDRs":[{"clientCIDR":"0.0.0.0/0","serverAddress":"x.x.x.x:6443"}]}
I1211 11:44:50.293673 22493 round_trippers.go:420] GET https://127.0.0.1:6443/apis?timeout=32s
...
I1211 11:44:50.298360 22493 request.go:1068] Response Body: {"kind":"APIGroupList","apiVersion":"v1","groups":[{"name":"apiregistration.k8s.io","versions":[{"groupVersion":"apiregistration.k8s.io/v1","version":"v1"},{"groupVersion":"apiregistration.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"apiregistration.k8s.io/v1","version":"v1"}},{"name":"extensions","versions":[{"groupVersion":"extensions/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"extensions/v1beta1","version":"v1beta1"}},{"name":"apps","versions":[{"groupVersion":"apps/v1","version":"v1"}],"preferredVersion":{"groupVersion":"apps/v1","version":"v1"}},{"name":"events.k8s.io","versions":[{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}},{"name":"authentication.k8s.io","versions":[{"groupVersion":"authentication.k8s.io/v1","version":"v1"},{"groupVersion":"authentication.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"authentication.k8s.io/v1"," [truncated 4985 chars]
apiextensions.k8s.io/v1
apiextensions.k8s.io/v1beta1
apiregistration.k8s.io/v1
apiregistration.k8s.io/v1beta1
apps/v1
authentication.k8s.io/v1beta1
...
storage.k8s.io/v1
storage.k8s.io/v1beta1
v1
4)kubectl api-resources
命令就是先獲取所有API版本信息,然后對每一個API版本調用接口獲取該版本下的所有API資源類型
5)namingController
:檢查 crd obj 中是否有命名沖突,可在 crd .status.conditions 中查看;
6)establishingController
:檢查 crd 是否處於正常狀態,可在 crd .status.conditions 中查看;
7)nonStructuralSchemaController
:檢查 crd obj 結構是否正常,可在 crd .status.conditions 中查看;
8)apiApprovalController
:檢查 crd 是否遵循 Kubernetes API 聲明策略,可在 crd .status.conditions 中查看;
9)finalizingController
:類似於 finalizes 的功能,與 CRs 的刪除有關。
總結CR CRUD APIServer處理邏輯如下:
- createAPIExtensionsServer=>NewCustomResourceDefinitionHandler=>crdHandler=>注冊CR CRUD API接口:
// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
...
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
...
return s, nil
}
- crdHandler處理邏輯如下:
- 解析
req(GET /apis/duyanghao.example.com/v1/namespaces/default/students)
,根據請求路徑中的group(duyanghao.example.com),version(v1),以及resource字段(students)獲取對應CRD內容(crd, err := r.crdLister.Get(crdName)) - 通過crd.UID以及crd.Name獲取crdInfo,若不存在則創建對應的
crdInfo(crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name))
。crdInfo中包含了CRD定義以及該CRD對應Custom Resource的customresource.REST storage customresource.REST storage
由CR對應的Group(duyanghao.example.com),Version(v1),Kind(Student),Resource(students)等創建完成,由於CR在Kubernetes代碼中並沒有具體結構體定義,所以這里會先初始化一個范型結構體Unstructured(用於保存所有類型的Custom Resource),並對該結構體進行SetGroupVersionKind操作(設置具體Custom Resource Type)- 從
customresource.REST storage
獲取 Unstructured 結構體后會對其進行相應轉換然后返回
- 解析
// k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go:223
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
...
crdName := requestInfo.Resource + "." + requestInfo.APIGroup
crd, err := r.crdLister.Get(crdName)
...
crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name)
verb := strings.ToUpper(requestInfo.Verb)
resource := requestInfo.Resource
subresource := requestInfo.Subresource
scope := metrics.CleanScope(requestInfo)
...
switch {
case subresource == "status" && subresources != nil && subresources.Status != nil:
handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case subresource == "scale" && subresources != nil && subresources.Scale != nil:
handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case len(subresource) == 0:
handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
default:
responsewriters.ErrorNegotiated(
apierrors.NewNotFound(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Name),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
}
if handlerFunc != nil {
handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc)
handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup)
handler.ServeHTTP(w, req)
return
}
}
更多代碼原理詳情,參考 kubernetes-reading-notes
Conclusion
本文從源碼層面對 Kubernetes apiserver 進行了一個概覽性總結,包括:aggregatorServer,kubeAPIServer,apiExtensionsServer 以及 bootstrap-controller 等。通過閱讀本文可以對 apiserver 內部原理有一個大致的理解,另外也有助於后續深入研究。