API Server是Kubernetes的核心組件之一,其作用是通過RESTFUL的方式,向所有客戶端提供一個集群內資源的統一的增改刪查的接口,並將資源的狀態存儲在etcd中。
API Server入口函數的位置在cmd/kube-apiserver/apiserver.go中,也是通過cobra注冊了kube-apiserver的命令。
cmd/kube-apiserver/apiserver.go func main() { rand.Seed(time.Now().UnixNano()) command := app.NewAPIServerCommand(server.SetupSignalHandler()) // TODO: once we switch everything over to Cobra commands, we can go back to calling // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the // normalize func and add the go flag set by hand. // utilflag.InitFlags() logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } }
在NewAPIServerCommand方法里注冊了kube-apiserver命令,其核心仍然是Run命令。
進入cmd/kube-apiserver/app/server.go的Run方法:
cmd/kube-apiserver/app/server.go
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
return server.PrepareRun().Run(stopCh)
}
方法很簡單,主要包含兩條語句。第一句創建了API Server,第二句運行這個server。Run方法后面再研究,首先重點看創建。
一、CreateServerChain
進入CreateServerChain:
cmd/kube-apiserver/app/server.go func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) { nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions) if err != nil { return nil, err } kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) if err != nil { return nil, err } // If additional API servers are added, they should be gated. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount, serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)) if err != nil { return nil, err } apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err } kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook) if err != nil { return nil, err } // otherwise go down the normal path of standing the aggregator up in front of the API server // this wires up openapi kubeAPIServer.GenericAPIServer.PrepareRun() // This will wire up openapi for extension api server apiExtensionsServer.GenericAPIServer.PrepareRun() // aggregator comes last in the chain aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer) if err != nil { return nil, err } aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return nil, err } if insecureServingInfo != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { return nil, err } } return aggregatorServer.GenericAPIServer, nil }
主要進行了以下幾件事:
(1)調用CreateNodeDialer,創建與節點交互的工具。
(2)配置API Server的Config。這里同時還配置了Extension API Server的Config,用於配置用戶自己編寫的API Server。
(3)根據Config,創建API Server和Extension API Server。
(4)運行API Server。通過調用PrepareRun方法實現。
(5)創建並運行aggregator(將API Server和Extension API Server整合在一起,暫時不提)。
第三步通過調用CreateKubeAPIServer實現,下面詳細分析一下CreateKubeAPIServer。
cmd/kube-apiserver/app/server.go
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook) return kubeAPIServer, nil }
首先調用kubeAPIServerConfig.Complete().New方法生成一個kubeAPIServer實例,之后為這個實例添加啟動后執行的鈎子函數。可以看到,這個API Server將前面創建的Extension API Server作為了代理Server。
New方法位於pkg/master/master.go中。進入New方法:
pkg/master/master.go func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) { ... s, err := c.GenericConfig.New("kube-apiserver", delegationTarget) if err != nil { return nil, err } ... m := &Master{ GenericAPIServer: s, } // install legacy rest storage if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.ExtraConfig.StorageFactory, ProxyTransport: c.ExtraConfig.ProxyTransport, KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, EventTTL: c.ExtraConfig.EventTTL, ServiceIPRange: c.ExtraConfig.ServiceIPRange, ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer, ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, APIAudiences: c.GenericConfig.Authentication.APIAudiences, } m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider) } // The order here is preserved in discovery. // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"), // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer. // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go // with specific priorities. // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery // handlers that we have. restStorageProviders := []RESTStorageProvider{ auditregistrationrest.RESTStorageProvider{}, authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences}, authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver}, autoscalingrest.RESTStorageProvider{}, batchrest.RESTStorageProvider{}, certificatesrest.RESTStorageProvider{}, coordinationrest.RESTStorageProvider{}, extensionsrest.RESTStorageProvider{}, networkingrest.RESTStorageProvider{}, policyrest.RESTStorageProvider{}, rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer}, schedulingrest.RESTStorageProvider{}, settingsrest.RESTStorageProvider{}, storagerest.RESTStorageProvider{}, // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names. // See https://github.com/kubernetes/kubernetes/issues/42392 appsrest.RESTStorageProvider{}, admissionregistrationrest.RESTStorageProvider{}, eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL}, } m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...) if c.ExtraConfig.Tunneler != nil { m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes()) } m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook) return m, nil }
首先,通過GenericConfig.New,基於前面配置的Config創建一個API Server對象(並為這個對象添加默認的path,如/versions、/metrics等),再基於這個API Server對象創建一個master對象。
其次,為這個master創建RestStorageProvider,並注冊API。這通過調用InstallLegacyAPI和InstallAPIs方法實現。前者用於注冊k8s前期的核心API(在/api路徑下),后者用於注冊k8s新增的API(在/apis路徑下)。
最后,為master添加鈎子函數,並返回。
下面對New、InstallLegacyAPI方法進行分析。InstallAPIs方法與InstallLegacyAPI的調用邏輯幾乎一致。
二、New
進入GenericConfig.New方法,這個方法創建了一個GenericAPIServer:
k8s.io/aposerver/pkg/server/config.go // New creates a new server which logically combines the handling chain with the passed server. // name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delgating. // delegationTarget may not be nil. func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) { ... handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) s := &GenericAPIServer{ discoveryAddresses: c.DiscoveryAddresses, LoopbackClientConfig: c.LoopbackClientConfig, legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes, admissionControl: c.AdmissionControl, Serializer: c.Serializer, AuditBackend: c.AuditBackend, Authorizer: c.Authorization.Authorizer, delegationTarget: delegationTarget, HandlerChainWaitGroup: c.HandlerChainWaitGroup, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, Handler: apiServerHandler, listedPathProvider: apiServerHandler, openAPIConfig: c.OpenAPIConfig, postStartHooks: map[string]postStartHookEntry{}, preShutdownHooks: map[string]preShutdownHookEntry{}, disabledPostStartHooks: c.DisabledPostStartHooks, healthzChecks: c.HealthzChecks, DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), enableAPIResponseCompression: c.EnableAPIResponseCompression, maxRequestBodyBytes: c.MaxRequestBodyBytes, } ... for k, v := range delegationTarget.PostStartHooks() { s.postStartHooks[k] = v } for k, v := range delegationTarget.PreShutdownHooks() { s.preShutdownHooks[k] = v } genericApiServerHookName := "generic-apiserver-start-informers" if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) { err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error { c.SharedInformerFactory.Start(context.StopCh) return nil }) if err != nil { return nil, err } } ... s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget} installAPI(s, c.Config) ... return s, nil }
這一方法的核心是NewAPIServerHandler方法,此方法初始化了一個Container。這里的Container並非k8s中的容器,而是go-restful的一個概念,可參考https://www.cnblogs.com/ldaniel/p/5868384.html。大體說來,Container是一組WebService的集合,可以監聽在不同的端口上,而WebService又是一組Route的集合,為這些Route創建統一的root path等。這個go-restful項目來源於https://github.com/emicklei/go-restful。
進入NewAPIServerHandler方法:
k8s.io/aposerver/pkg/server/handler.go func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler { nonGoRestfulMux := mux.NewPathRecorderMux(name) if notFoundHandler != nil { nonGoRestfulMux.NotFoundHandler(notFoundHandler) } gorestfulContainer := restful.NewContainer() gorestfulContainer.ServeMux = http.NewServeMux() gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*} gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) { logStackOnRecover(s, panicReason, httpWriter) }) gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) { serviceErrorHandler(s, serviceErr, request, response) }) director := director{ name: name, goRestfulContainer: gorestfulContainer, nonGoRestfulMux: nonGoRestfulMux, } return &APIServerHandler{ FullHandlerChain: handlerChainBuilder(director), GoRestfulContainer: gorestfulContainer, NonGoRestfulMux: nonGoRestfulMux, Director: director, } }
可以看到,方法調用NewContainer方法初始化了一個gorestfulContainer,還調用了Router方法注冊了路由類型為CurlyRouter。這些都與官方https://github.com/emicklei/go-restful/blob/master/examples/restful-curly-router.go上的操作相近。此外,方法還添加了路由選擇器Mux,以及RecoverHandler和ServiceErrorHandler兩個回調函數。
回到New方法,在New方法中通過調用installAPI方法,為API Server預先注冊了一些默認path。
三、InstallLegacyAPI
創建了API Server實例,初始化了Container,接下來就是最關鍵的注冊路由的部分了。
回到master.go,進入InstallLegacyAPI方法:
pkg/master/master.go
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) { legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) if err != nil { klog.Fatalf("Error building core storage: %v", err) } controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { klog.Fatalf("Error in registering group versions: %v", err) } }
包括以下幾步:
(1)調用NewLegacyRESTStorage方法,生成RESTStorage和APIGroupInfo。
進入NewLegacyRESTStorage方法:
pkg/registry/core/rest/storage_core.go func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { apiGroupInfo := genericapiserver.APIGroupInfo{ PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""), VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, Scheme: legacyscheme.Scheme, ParameterCodec: legacyscheme.ParameterCodec, NegotiatedSerializer: legacyscheme.Codecs, } var podDisruptionClient policyclient.PodDisruptionBudgetsGetter if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.IsVersionRegistered(policyGroupVersion) { var err error podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } } restStorage := LegacyRESTStorage{} podTemplateStorage := podtemplatestore.NewREST(restOptionsGetter) eventStorage := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) limitRangeStorage := limitrangestore.NewREST(restOptionsGetter) resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotastore.NewREST(restOptionsGetter) secretStorage := secretstore.NewREST(restOptionsGetter) persistentVolumeStorage, persistentVolumeStatusStorage := pvstore.NewREST(restOptionsGetter) persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcstore.NewREST(restOptionsGetter) configMapStorage := configmapstore.NewREST(restOptionsGetter) namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespacestore.NewREST(restOptionsGetter) endpointsStorage := endpointsstore.NewREST(restOptionsGetter) nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } podStorage := podstore.NewStorage( restOptionsGetter, nodeStorage.KubeletConnectionInfo, c.ProxyTransport, podDisruptionClient, ) ... restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.Binding, "podTemplates": podTemplateStorage, "replicationControllers": controllerStorage.Controller, "replicationControllers/status": controllerStorage.Status, "services": serviceRest, "services/proxy": serviceRestProxy, "services/status": serviceStatusStorage, "endpoints": endpointsStorage, "nodes": nodeStorage.Node, "nodes/status": nodeStorage.Status, "nodes/proxy": nodeStorage.Proxy, "events": eventStorage, "limitRanges": limitRangeStorage, "resourceQuotas": resourceQuotaStorage, "resourceQuotas/status": resourceQuotaStatusStorage, "namespaces": namespaceStorage, "namespaces/status": namespaceStatusStorage, "namespaces/finalize": namespaceFinalizeStorage, "secrets": secretStorage, "serviceAccounts": serviceAccountStorage, "persistentVolumes": persistentVolumeStorage, "persistentVolumes/status": persistentVolumeStatusStorage, "persistentVolumeClaims": persistentVolumeClaimStorage, "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage, "configMaps": configMapStorage, "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate), } if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) { restStorageMap["replicationControllers/scale"] = controllerStorage.Scale } if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) { restStorageMap["pods/eviction"] = podStorage.Eviction } if serviceAccountStorage.Token != nil { restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token } apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap return restStorage, apiGroupInfo, nil }
我們看到,這個方法創建了大量的Storage,包括pod、node、pv等。這些Storage是APIServer與etcd對接時采用的存儲結構。從包名也可以看出,這里注冊的都是core組下的API。
創建后,將這些Storage都添加到restStorageMap中,作為api和Storage的對應關系。這些還不是完整的api路徑,還需要在前面加上諸如core/v1之類的前綴。
最后,將這個restStorageMap放進apiGroupInfo中,並返回。
(2)定義鈎子函數。
(3)調用InstallLegacyAPIGroup方法,為API Server注冊“v1”路由。
InstallLegacyAPIGroup方法為前面創建的apiGroupInfo添加處理器。這一部分很重要,下一篇詳細分析。https://www.cnblogs.com/00986014w/p/10348489.html
