Kubelet是Kubernetes集群中node節點的核心組件之一,其作用是管理運行在Pod中的容器,使其處於正常運行狀態。
Kubelet的啟動函數代碼位於cmd/kubelet/kubelet.go中,仍是通過cobra注冊。
cmd/kubelet/kubelet.go
func main() { rand.Seed(time.Now().UnixNano()) command := app.NewKubeletCommand(server.SetupSignalHandler()) logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
一、NewKubeletCommand
在NewKubeletCommand命令中注冊了kubelet命令。進入NewKubeletCommand方法:
cmd/kubelet/app/server.go func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command { cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError) cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc) kubeletFlags := options.NewKubeletFlags() kubeletConfig, err := options.NewKubeletConfiguration() // programmer error if err != nil { klog.Fatal(err) } cmd := &cobra.Command{ Use: componentKubelet, Long: `...`, // The Kubelet has special flag parsing requirements to enforce flag precedence rules, // so we do all our parsing manually in Run, below. // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the // `args` arg to Run, without Cobra's interference. DisableFlagParsing: true, Run: func(cmd *cobra.Command, args []string) { // initial flag parse, since we disable cobra's flag parsing if err := cleanFlagSet.Parse(args); err != nil { cmd.Usage() klog.Fatal(err) } // check if there are non-flag arguments in the command line cmds := cleanFlagSet.Args() if len(cmds) > 0 { cmd.Usage() klog.Fatalf("unknown command: %s", cmds[0]) } // short-circuit on help help, err := cleanFlagSet.GetBool("help") if err != nil { klog.Fatal(`"help" flag is non-bool, programmer error, please correct`) } if help { cmd.Help() return } ... // load kubelet config file, if provided if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 { kubeletConfig, err = loadConfigFile(configFile) if err != nil { klog.Fatal(err) } // We must enforce flag precedence by re-parsing the command line into the new object. // This is necessary to preserve backwards-compatibility across binary upgrades. // See issue #56171 for more details. if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil { klog.Fatal(err) } // update feature gates based on new config if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { klog.Fatal(err) } } // We always validate the local configuration (command line + config file). // This is the default "last-known-good" config for dynamic config, and must always remain valid. if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil { klog.Fatal(err) } // use dynamic kubelet config, if enabled var kubeletConfigController *dynamickubeletconfig.Controller if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 { var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir, func(kc *kubeletconfiginternal.KubeletConfiguration) error { // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence, // so that we get a complete validation at the same point where we can decide to reject dynamic config. // This fixes the flag-precedence component of issue #63305. // See issue #56171 for general details on flag precedence. return kubeletConfigFlagPrecedence(kc, args) }) if err != nil { klog.Fatal(err) } // If we should just use our existing, local config, the controller will return a nil config if dynamicKubeletConfig != nil { kubeletConfig = dynamicKubeletConfig // Note: flag precedence was already enforced in the controller, prior to validation, // by our above transform function. Now we simply update feature gates from the new config. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { klog.Fatal(err) } } } // construct a KubeletServer from kubeletFlags and kubeletConfig kubeletServer := &options.KubeletServer{ KubeletFlags: *kubeletFlags, KubeletConfiguration: *kubeletConfig, } // use kubeletServer to construct the default KubeletDeps kubeletDeps, err := UnsecuredDependencies(kubeletServer) if err != nil { klog.Fatal(err) } // add the kubelet config controller to kubeletDeps kubeletDeps.KubeletConfigController = kubeletConfigController ... // run the kubelet klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration) if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil { klog.Fatal(err) } }, } // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags kubeletFlags.AddFlags(cleanFlagSet) options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig) options.AddGlobalFlags(cleanFlagSet) cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name())) // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags const usageFmt = "Usage:\n %s\n\nFlags:\n%s" cmd.SetUsageFunc(func(cmd *cobra.Command) error { fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2)) return nil }) cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) { fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2)) }) return cmd }
核心仍在於后面的Run字段。可以看到,kubelet在cobra默認的flag和help function處理之外,又在Run中添加了自己的flag和help function處理。
在這些內容之后,有一段加載kubelet config的代碼,因為kubelet在啟動時可以使用本地的config,也可以動態加載自定義的config。
之后,通過kubeletServer := &options.KubeletServer{ }這行代碼,利用之前設置的config和flag創建一個kubeletServer的依賴配置。
最后,調用Run方法,將依賴配置和依賴組件傳入,運行這個kubelet。
Run方法的核心則是調用同一個文件下的run方法。
二、run
進入run方法:
cmd/kubelet/app/server.go func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) { ... // About to get clients and such, detect standaloneMode standaloneMode := true if len(s.KubeConfig) > 0 { standaloneMode = false } if kubeDeps == nil { kubeDeps, err = UnsecuredDependencies(s) if err != nil { return err } } if kubeDeps.Cloud == nil { if !cloudprovider.IsExternal(s.CloudProvider) { cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { return err } if cloud == nil { klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) } else { klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) } kubeDeps.Cloud = cloud } } ... // if in standalone mode, indicate as much by setting all clients to nil switch { case standaloneMode: kubeDeps.KubeClient = nil kubeDeps.DynamicKubeClient = nil kubeDeps.EventClient = nil kubeDeps.HeartbeatClient = nil klog.Warningf("standalone mode, no API client") case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil, kubeDeps.DynamicKubeClient == nil: clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName) if err != nil { return err } kubeDeps.OnHeartbeatFailure = closeAllConns kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet client: %v", err) } kubeDeps.DynamicKubeClient, err = dynamic.NewForConfig(clientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet dynamic client: %v", err) } // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = float32(s.EventRecordQPS) eventClientConfig.Burst = int(s.EventBurst) kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet event client: %v", err) } // make a separate client for heartbeat with throttling disabled and a timeout attached heartbeatClientConfig := *clientConfig heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration // if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second if heartbeatClientConfig.Timeout > leaseTimeout { heartbeatClientConfig.Timeout = leaseTimeout } } heartbeatClientConfig.QPS = float32(-1) kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err) } // CRDs are JSON only, and client renegotiation for streaming is not correct as per #67803 csiClientConfig := restclient.CopyConfig(clientConfig) csiClientConfig.ContentType = "application/json" kubeDeps.CSIClient, err = csiclientset.NewForConfig(csiClientConfig) if err != nil { return fmt.Errorf("failed to initialize kubelet storage client: %v", err) } } if kubeDeps.Auth == nil { auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration) if err != nil { return err } kubeDeps.Auth = auth } if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint) kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint)) if err != nil { return err } } if kubeDeps.ContainerManager == nil { ... kubeDeps.ContainerManager, err = cm.NewContainerManager(......) if err != nil { return err } } if err := checkPermissions(); err != nil { klog.Error(err) } ... if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil { return err } if s.HealthzPort > 0 { healthz.DefaultHealthz() go wait.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil) if err != nil { klog.Errorf("Starting health server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } if s.RunOnce { return nil } // If systemd is used, notify it that we have started go daemon.SdNotify(false, "READY=1") select { case <-done: break case <-stopCh: break } return nil }
方法很長,這里撿重點說。開始一段是加載配置,略去。然后做了以下幾件事:
(1)如果kubelet的依賴組件還沒配置全,則調用UnsecuredDepencencies方法為kubelet配置默認依賴組件。方法會返回一個Dependencies的結構體:
cmd/kubelet/app/server.go // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup // is not valid. It will not start any background processes, and does not include authentication/authorization func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) { // Initialize the TLS Options tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration) if err != nil { return nil, err } mounter := mount.New(s.ExperimentalMounterPath) var pluginRunner = exec.New() if s.Containerized { klog.V(2).Info("Running kubelet in containerized mode") ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New()) if err != nil { return nil, err } mounter = mount.NewNsenterMounter(s.RootDirectory, ne) // an exec interface which can use nsenter for flex plugin calls pluginRunner = nsenter.NewNsenterExecutor(nsenter.DefaultHostRootFsPath, exec.New()) } var dockerClientConfig *dockershim.ClientConfig if s.ContainerRuntime == kubetypes.DockerContainerRuntime { dockerClientConfig = &dockershim.ClientConfig{ DockerEndpoint: s.DockerEndpoint, RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration, ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration, } } return &kubelet.Dependencies{ Auth: nil, // default does not enforce auth[nz] CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here Cloud: nil, // cloud provider might start background processes ContainerManager: nil, DockerClientConfig: dockerClientConfig, KubeClient: nil, HeartbeatClient: nil, CSIClient: nil, EventClient: nil, Mounter: mounter, OOMAdjuster: oom.NewOOMAdjuster(), OSInterface: kubecontainer.RealOS{}, VolumePlugins: ProbeVolumePlugins(), DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner), TLSOptions: tlsOptions}, nil }
可以看到,Dependencies中包含了kubelet的所有依賴組件,包括與API Server交互的KubeClient,管理容器的ContainerManager,與Docker交互的DockerClient的配置等等;
(2)判斷kubelet是否運行在standalone模式,如果是,則將kubelet運行時依賴的4個client都設為空;如果不是且這4個client皆為空,則為這4個client創建配置;
(3)為kubelet的Dependencies創建Auth、CAdvisorInterface和ContainerManager。
(4)運行kubelet。這一步通過調用RunKubelet方法實現,后面詳細分析。
(5)通過select設置為持續運行。
三、RunKubelet
下面仔細看一看RunKubelet方法:
cmd/kubelet/app/server.go // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications: // 1 Integration tests // 2 Kubelet binary // 3 Standalone 'kubernetes' binary // Eventually, #2 will be replaced with instances of #3 func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error { ... hostNetworkSources, err := kubetypes.GetValidatedSources(kubeServer.HostNetworkSources) if err != nil { return err } hostPIDSources, err := kubetypes.GetValidatedSources(kubeServer.HostPIDSources) if err != nil { return err } hostIPCSources, err := kubetypes.GetValidatedSources(kubeServer.HostIPCSources) if err != nil { return err } privilegedSources := capabilities.PrivilegedSources{ HostNetworkSources: hostNetworkSources, HostPIDSources: hostPIDSources, HostIPCSources: hostIPCSources, } capabilities.Setup(kubeServer.AllowPrivileged, privilegedSources, 0) credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory) klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory) if kubeDeps.OSInterface == nil { kubeDeps.OSInterface = kubecontainer.RealOS{} } k, err := CreateAndInitKubelet(......) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } if runOnce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } klog.Info("Started kubelet as runonce") } else { startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer) klog.Info("Started kubelet") } ... return nil }
方法的核心就在於中間的創建並初始化這一段以及后面的運行這一段。runOnce是只運行一次kubelet,將特定的一組pod創建后即退出的運行模式,已基本廢棄,目前主要是調用startKubelet方法持續運行kubelet。startKubelet將在后面的文章中涉及。
進入CreateAndInitKubelet,發現它調用了NewMainKubelet、BirthCry、StartGarbageCollection三個方法。BirthCry做的事情就是記錄一個啟動的event,不提。StartGarbageCollection則是運行容器和鏡像的垃圾回收機制,后面會提到。下一篇將重點對NewMainKubelet方法進行分析。https://www.cnblogs.com/00986014w/p/10895532.html
