kubernetes源碼閱讀筆記——API Server(之一)


API Server是Kubernetes的核心組件之一,其作用是通過RESTFUL的方式,向所有客戶端提供一個集群內資源的統一的增改刪查的接口,並將資源的狀態存儲在etcd中。

API Server入口函數的位置在cmd/kube-apiserver/apiserver.go中,也是通過cobra注冊了kube-apiserver的命令。

cmd/kube-apiserver/apiserver.go

func main() {
    rand.Seed(time.Now().UnixNano())

    command := app.NewAPIServerCommand(server.SetupSignalHandler())

    // TODO: once we switch everything over to Cobra commands, we can go back to calling
    // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    // normalize func and add the go flag set by hand.
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "error: %v\n", err)
        os.Exit(1)
    }
}

在NewAPIServerCommand方法里注冊了kube-apiserver命令,其核心仍然是Run命令。

進入cmd/kube-apiserver/app/server.go的Run方法:

cmd/kube-apiserver/app/server.go

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
	// To help debugging, immediately log version
	klog.Infof("Version: %+v", version.Get())

	server, err := CreateServerChain(completeOptions, stopCh)
	if err != nil {
		return err
	}

	return server.PrepareRun().Run(stopCh)
}

方法很簡單,主要包含兩條語句。第一句創建了API Server,第二句運行這個server。Run方法后面再研究,首先重點看創建。

一、CreateServerChain

進入CreateServerChain:

cmd/kube-apiserver/app/server.go

func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
    nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    if err != nil {
        return nil, err
    }

    kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return nil, err
    }

    // If additional API servers are added, they should be gated.
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
        serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
    if err != nil {
        return nil, err
    }
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }

    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
    if err != nil {
        return nil, err
    }

    // otherwise go down the normal path of standing the aggregator up in front of the API server
    // this wires up openapi
    kubeAPIServer.GenericAPIServer.PrepareRun()

    // This will wire up openapi for extension api server
    apiExtensionsServer.GenericAPIServer.PrepareRun()

    // aggregator comes last in the chain
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    if err != nil {
        return nil, err
    }
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
        // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
        return nil, err
    }

    if insecureServingInfo != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
            return nil, err
        }
    }

    return aggregatorServer.GenericAPIServer, nil
}

主要進行了以下幾件事:

(1)調用CreateNodeDialer,創建與節點交互的工具。

(2)配置API Server的Config。這里同時還配置了Extension API Server的Config,用於配置用戶自己編寫的API Server。

(3)根據Config,創建API Server和Extension API Server。

(4)運行API Server。通過調用PrepareRun方法實現。

(5)創建並運行aggregator(將API Server和Extension API Server整合在一起,暫時不提)。

第三步通過調用CreateKubeAPIServer實現,下面詳細分析一下CreateKubeAPIServer。

cmd/kube-apiserver/app/server.go

func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook) return kubeAPIServer, nil }

首先調用kubeAPIServerConfig.Complete().New方法生成一個kubeAPIServer實例,之后為這個實例添加啟動后執行的鈎子函數。可以看到,這個API Server將前面創建的Extension API Server作為了代理Server。

New方法位於pkg/master/master.go中。進入New方法:

pkg/master/master.go

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
    
        ...

    s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
    if err != nil {
        return nil, err
    }

    ...

    m := &Master{
        GenericAPIServer: s,
    }

    // install legacy rest storage
    if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
        legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
            StorageFactory:              c.ExtraConfig.StorageFactory,
            ProxyTransport:              c.ExtraConfig.ProxyTransport,
            KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
            EventTTL:                    c.ExtraConfig.EventTTL,
            ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
            ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
            LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
            ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
            ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
            APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
        }
        m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
    }

    // The order here is preserved in discovery.
    // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
    // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
    // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
    // with specific priorities.
    // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
    // handlers that we have.
    restStorageProviders := []RESTStorageProvider{
        auditregistrationrest.RESTStorageProvider{},
        authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
        authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
        autoscalingrest.RESTStorageProvider{},
        batchrest.RESTStorageProvider{},
        certificatesrest.RESTStorageProvider{},
        coordinationrest.RESTStorageProvider{},
        extensionsrest.RESTStorageProvider{},
        networkingrest.RESTStorageProvider{},
        policyrest.RESTStorageProvider{},
        rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
        schedulingrest.RESTStorageProvider{},
        settingsrest.RESTStorageProvider{},
        storagerest.RESTStorageProvider{},
        // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
        // See https://github.com/kubernetes/kubernetes/issues/42392
        appsrest.RESTStorageProvider{},
        admissionregistrationrest.RESTStorageProvider{},
        eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
    }
    m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)

    if c.ExtraConfig.Tunneler != nil {
        m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
    }

    m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)

    return m, nil
}

首先,通過GenericConfig.New,基於前面配置的Config創建一個API Server對象(並為這個對象添加默認的path,如/versions、/metrics等),再基於這個API Server對象創建一個master對象。

其次,為這個master創建RestStorageProvider,並注冊API。這通過調用InstallLegacyAPI和InstallAPIs方法實現。前者用於注冊k8s前期的核心API(在/api路徑下),后者用於注冊k8s新增的API(在/apis路徑下)。

最后,為master添加鈎子函數,並返回。

下面對New、InstallLegacyAPI方法進行分析。InstallAPIs方法與InstallLegacyAPI的調用邏輯幾乎一致。

二、New

進入GenericConfig.New方法,這個方法創建了一個GenericAPIServer:

k8s.io/aposerver/pkg/server/config.go // New creates a new server which logically combines the handling chain with the passed server.
// name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delgating.
// delegationTarget may not be nil.
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
    ...

    handlerChainBuilder := func(handler http.Handler) http.Handler {
        return c.BuildHandlerChainFunc(handler, c.Config)
    }
    apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

    s := &GenericAPIServer{
        discoveryAddresses:     c.DiscoveryAddresses,
        LoopbackClientConfig:   c.LoopbackClientConfig,
        legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
        admissionControl:       c.AdmissionControl,
        Serializer:             c.Serializer,
        AuditBackend:           c.AuditBackend,
        Authorizer:             c.Authorization.Authorizer,
        delegationTarget:       delegationTarget,
        HandlerChainWaitGroup:  c.HandlerChainWaitGroup,

        minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
        ShutdownTimeout:   c.RequestTimeout,

        SecureServingInfo: c.SecureServing,
        ExternalAddress:   c.ExternalAddress,

        Handler: apiServerHandler,

        listedPathProvider: apiServerHandler,

        openAPIConfig: c.OpenAPIConfig,

        postStartHooks:         map[string]postStartHookEntry{},
        preShutdownHooks:       map[string]preShutdownHookEntry{},
        disabledPostStartHooks: c.DisabledPostStartHooks,

        healthzChecks: c.HealthzChecks,

        DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),

        enableAPIResponseCompression: c.EnableAPIResponseCompression,
        maxRequestBodyBytes:          c.MaxRequestBodyBytes,
    }

    ...

    for k, v := range delegationTarget.PostStartHooks() {
        s.postStartHooks[k] = v
    }

    for k, v := range delegationTarget.PreShutdownHooks() {
        s.preShutdownHooks[k] = v
    }

    genericApiServerHookName := "generic-apiserver-start-informers"
    if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) {
        err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
            c.SharedInformerFactory.Start(context.StopCh)
            return nil
        })
        if err != nil {
            return nil, err
        }
    }

    ...

    s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}

    installAPI(s, c.Config)

    ...

    return s, nil
}

這一方法的核心是NewAPIServerHandler方法,此方法初始化了一個Container。這里的Container並非k8s中的容器,而是go-restful的一個概念,可參考https://www.cnblogs.com/ldaniel/p/5868384.html。大體說來,Container是一組WebService的集合,可以監聽在不同的端口上,而WebService又是一組Route的集合,為這些Route創建統一的root path等。這個go-restful項目來源於https://github.com/emicklei/go-restful

進入NewAPIServerHandler方法:

k8s.io/aposerver/pkg/server/handler.go

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
    nonGoRestfulMux := mux.NewPathRecorderMux(name)
    if notFoundHandler != nil {
        nonGoRestfulMux.NotFoundHandler(notFoundHandler)
    }

    gorestfulContainer := restful.NewContainer()
    gorestfulContainer.ServeMux = http.NewServeMux()
    gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
    gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
        logStackOnRecover(s, panicReason, httpWriter)
    })
    gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
        serviceErrorHandler(s, serviceErr, request, response)
    })

    director := director{
        name:               name,
        goRestfulContainer: gorestfulContainer,
        nonGoRestfulMux:    nonGoRestfulMux,
    }

    return &APIServerHandler{
        FullHandlerChain:   handlerChainBuilder(director),
        GoRestfulContainer: gorestfulContainer,
        NonGoRestfulMux:    nonGoRestfulMux,
        Director:           director,
    }
}

可以看到,方法調用NewContainer方法初始化了一個gorestfulContainer,還調用了Router方法注冊了路由類型為CurlyRouter。這些都與官方https://github.com/emicklei/go-restful/blob/master/examples/restful-curly-router.go上的操作相近。此外,方法還添加了路由選擇器Mux,以及RecoverHandler和ServiceErrorHandler兩個回調函數。

回到New方法,在New方法中通過調用installAPI方法,為API Server預先注冊了一些默認path。

三、InstallLegacyAPI

創建了API Server實例,初始化了Container,接下來就是最關鍵的注冊路由的部分了。

回到master.go,進入InstallLegacyAPI方法:

pkg/master/master.go

func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) { legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) if err != nil { klog.Fatalf("Error building core storage: %v", err) } controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { klog.Fatalf("Error in registering group versions: %v", err) } }

包括以下幾步:

(1)調用NewLegacyRESTStorage方法,生成RESTStorage和APIGroupInfo。

進入NewLegacyRESTStorage方法:

pkg/registry/core/rest/storage_core.go

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    apiGroupInfo := genericapiserver.APIGroupInfo{
        PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
        VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
        Scheme:                       legacyscheme.Scheme,
        ParameterCodec:               legacyscheme.ParameterCodec,
        NegotiatedSerializer:         legacyscheme.Codecs,
    }

    var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
    if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.IsVersionRegistered(policyGroupVersion) {
        var err error
        podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    }
    restStorage := LegacyRESTStorage{}

    podTemplateStorage := podtemplatestore.NewREST(restOptionsGetter)

    eventStorage := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
    limitRangeStorage := limitrangestore.NewREST(restOptionsGetter)

    resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotastore.NewREST(restOptionsGetter)
    secretStorage := secretstore.NewREST(restOptionsGetter)
    persistentVolumeStorage, persistentVolumeStatusStorage := pvstore.NewREST(restOptionsGetter)
    persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcstore.NewREST(restOptionsGetter)
    configMapStorage := configmapstore.NewREST(restOptionsGetter)

    namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespacestore.NewREST(restOptionsGetter)

    endpointsStorage := endpointsstore.NewREST(restOptionsGetter)

    nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    podStorage := podstore.NewStorage(
        restOptionsGetter,
        nodeStorage.KubeletConnectionInfo,
        c.ProxyTransport,
        podDisruptionClient,
    )

    ...

    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.Binding,

        "podTemplates": podTemplateStorage,

        "replicationControllers":        controllerStorage.Controller,
        "replicationControllers/status": controllerStorage.Status,

        "services":        serviceRest,
        "services/proxy":  serviceRestProxy,
        "services/status": serviceStatusStorage,

        "endpoints": endpointsStorage,

        "nodes":        nodeStorage.Node,
        "nodes/status": nodeStorage.Status,
        "nodes/proxy":  nodeStorage.Proxy,

        "events": eventStorage,

        "limitRanges":                   limitRangeStorage,
        "resourceQuotas":                resourceQuotaStorage,
        "resourceQuotas/status":         resourceQuotaStatusStorage,
        "namespaces":                    namespaceStorage,
        "namespaces/status":             namespaceStatusStorage,
        "namespaces/finalize":           namespaceFinalizeStorage,
        "secrets":                       secretStorage,
        "serviceAccounts":               serviceAccountStorage,
        "persistentVolumes":             persistentVolumeStorage,
        "persistentVolumes/status":      persistentVolumeStatusStorage,
        "persistentVolumeClaims":        persistentVolumeClaimStorage,
        "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
        "configMaps":                    configMapStorage,

        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
        restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    }
    if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
        restStorageMap["pods/eviction"] = podStorage.Eviction
    }
    if serviceAccountStorage.Token != nil {
        restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
    }
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap

    return restStorage, apiGroupInfo, nil
}

我們看到,這個方法創建了大量的Storage,包括pod、node、pv等。這些Storage是APIServer與etcd對接時采用的存儲結構。從包名也可以看出,這里注冊的都是core組下的API。

創建后,將這些Storage都添加到restStorageMap中,作為api和Storage的對應關系。這些還不是完整的api路徑,還需要在前面加上諸如core/v1之類的前綴。

最后,將這個restStorageMap放進apiGroupInfo中,並返回。

(2)定義鈎子函數。

(3)調用InstallLegacyAPIGroup方法,為API Server注冊“v1”路由。

InstallLegacyAPIGroup方法為前面創建的apiGroupInfo添加處理器。這一部分很重要,下一篇詳細分析。https://www.cnblogs.com/00986014w/p/10348489.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM