controller-runtime框架是社區封裝的一個控制器處理的框架
pkg/controllers/controller.go中,定義了Controller接口:
type Controller interface { reconcile.Reconciler Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error Start(ctx context.Context) error GetLogger() logr.Logger }
以及創建controller時的參數:
type Options struct { MaxConcurrentReconciles int Reconciler reconcile.Reconciler RateLimiter ratelimiter.RateLimiter //限速隊列的速率限制 Log logr.Logger //controller使用的logger }
pkg/internal/controller/controller.go中定義的Controller結構體,實現了Controller接口:
type Controller struct { Name string //用於跟蹤、記錄和監控中控制器的唯一標識 MaxConcurrentReconciles int //可以運行的最大並發Reconciles數量,默認值為1 Do reconcile.Reconciler MakeQueue func() workqueue.RateLimitingInterface //一旦控制器准備好啟動,MakeQueue就會為這個控制器構造工作隊列。隊列通過監聽來自Infomer的事件,添加對象到隊列中進行處理 Queue workqueue.RateLimitingInterface SetFields func(i interface{}) error //SetFields將依賴關系注入到其他對象,比如Sources、EventHandlers以及Predicates mu sync.Mutex // 控制器同步信號量 JitterPeriod time.Duration // 允許測試減少JitterPeriod,使其更快完成 Started bool //控制器是否已經啟動 ctx context.Context startWatches []watchDescription Log logr.Logger }
①Do是pkg/reconcile/reconcile.go中定義的Reconcile接口:
type Reconciler interface { Reconcile(context.Context, Request) (Result, error) }
②Informer、Indexer的數據通過startWatches屬性做了一層封裝,以方便在控制器啟動的時候啟動,該屬性是一個watchDescription切片
一個watchDescription包含所有啟動watch操作所需的信息(一個sources、一個handler以及predicates切片):
type watchDescription struct { src source.Source handler handler.EventHandler predicates []predicate.Predicate }
③Queue是client-go中提供的限速隊列,放入到隊列中的元素不是以前默認的元素唯一的KEY,而是經過封裝的reconcile.Request對象:
type Request struct { types.NamespacedName } type NamespacedName struct { Namespace string Name string }
pkg/client/client.go中的client結構體,提供了Create、List、Delete等方法,通過它們可以很方便地得到資源對象
Controller最核心的方法Watch:
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { c.mu.Lock() defer c.mu.Unlock() // 注入Cache到參數中 if err := c.SetFields(src); err != nil { return err } if err := c.SetFields(evthdler); err != nil { return err } for _, pr := range prct { if err := c.SetFields(pr); err != nil { return err } } if !c.Started { // Controller還沒啟動 c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) // watches會被保存到控制器結構體中,直到調用Start(...) 函數 return nil //把watches存放到本地然后返回 } c.Log.Info("Starting EventSource", "source", src) return src.Start(c.ctx, evthdler, c.Queue, prct...) }
第一個參數是pkg/source/source.go中定義的Source接口,它是事件的源:
type Source interface { Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error }
Source的具體實現在pkg/source/source.go中的Kind結構體:
type Kind struct { Type client.Object //要watch的資源對象類型,例如&v1.Pod{} cache cache.Cache //watch時使用的cache }
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { if ks.Type == nil { //必須指明Kind.Type return fmt.Errorf("must specify Kind.Type") } if ks.cache == nil { //start前cache必須准備完成 return fmt.Errorf("must call CacheInto on Kind before calling Start") } i, err := ks.cache.GetInformer(ctx, ks.Type) //從cache中獲取informer if err != nil { if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { log.Error(err, "if kind is a CRD, it should be installed before calling Start", "kind", kindMatchErr.GroupKind) } return err } i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) //添加EventHandler return nil }
AddEventHandler方法中的參數是一個internal.EventHandler結構體,它的成員結構體EventHandler實現了client-go中提供的 ResourceEventHandler接口,也就是實現了OnAdd、OnUpdate、OnDelete等回調函數
pkg/builder/controller.go中的doWatch()方法實際調用了Watch,調用時傳入的就是Kind類型的對象
Watch最終就是調用了傳入的Kind類型的對象的Start方法
func (blder *Builder) doWatch() error { // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // Watches the managed types for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // Do the watch requests for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil }
調用Watch時傳入的handler.EventHandler是一個handler.EnqueueRequestForObject結構體,它實現了Create、Update、Delete、Generic四個方法。
type EnqueueRequestForObject struct{} func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) } // Update implements EventHandler func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { if evt.ObjectOld != nil { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectOld.GetName(), Namespace: evt.ObjectOld.GetNamespace(), }}) } else { enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt) } if evt.ObjectNew != nil { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectNew.GetName(), Namespace: evt.ObjectNew.GetNamespace(), }}) } else { enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt) } } // Delete implements EventHandler func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) } // Generic implements EventHandler func (e *EnqueueRequestForObject) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) }
四個方法均會將資源對象的Name和Namespace組成reconcile.Request放入隊列中
Controller的核心方法Start:
func (c *Controller) Start(ctx context.Context) error { c.mu.Lock() if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } // Set the internal context. c.ctx = ctx c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed err := func() error { defer c.mu.Unlock() defer utilruntime.HandleCrash() for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches c.Log.Info("Starting Controller") for _, watch := range c.startWatches { syncingSource, ok := watch.src.(source.SyncingSource) if !ok { continue } if err := syncingSource.WaitForSync(ctx); err != nil { // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error // Leaving it here because that could happen in the future err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) c.Log.Error(err, "Could not wait for Cache to sync") return err } } c.startWatches = nil if c.JitterPeriod == 0 { c.JitterPeriod = 1 * time.Second } // Launch workers to process resources c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles) ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles)) for i := 0; i < c.MaxConcurrentReconciles; i++ { go wait.UntilWithContext(ctx, func(ctx context.Context) { for c.processNextWorkItem(ctx) { } }, c.JitterPeriod) } c.Started = true return nil }() if err != nil { return err } <-ctx.Done() c.Log.Info("Stopping workers") return nil }
先等待資源對象的Informer同步完成,然后啟動workers來處理資源對象,而且worker函數都是一樣的實現方式:
func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { return false // Stop working } defer c.Queue.Done(obj) ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) c.reconcileHandler(ctx, obj) return true } processNextWorkItem從工作隊列中彈出一個元素,並嘗試通過調用reconcileHandler來處理它 reconcileHandler方法是真正執行元素業務處理的地方,包含了事件處理以及錯誤處理: func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { c.updateMetrics(time.Since(reconcileStartTS)) }() // Make sure that the the object is a valid request. req, ok := obj.(reconcile.Request) if !ok { c.Queue.Forget(obj) c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj) return } log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) ctx = logf.IntoContext(ctx, log) if result, err := c.Do.Reconcile(ctx, req); err != nil { c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "error").Inc() return } else if result.RequeueAfter > 0 { c.Queue.Forget(obj) c.Queue.AddAfter(req, result.RequeueAfter) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue_after").Inc() return } else if result.Requeue { c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue").Inc() return } c.Queue.Forget(obj) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "success").Inc() }
真正的事件處理通過c.Do.Reconcile(ctx, req)暴露給開發者;就算直接調用c.Reconcile(ctx, req),還是會調用c.Do.Reconcile(ctx, req)
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) ctx = logf.IntoContext(ctx, log) return c.Do.Reconcile(ctx, req) }
每種CRD必須定義一個實現了reconcile.Reconcile接口的XxxReconcile結構體,結構體需要包含client.Client(manager定義了GetClient方法),便於從req中獲取資源對象;包含runtime.Scheme,以提供資源對象Kind和Go type的映射(manager定義了runtime.GetScheme方法)
所以對於開發者來說,只需要在此結構體的Reconcile方法中去處理業務邏輯就可以了:
根據方法的返回值來判斷是否需要將元素重新加入限速隊列進行處理
方法的返回值為reconcile.Result類型:
type Result struct { Requeue bool RequeueAfter time.Duration }
如果返回error!=nil,則將元素重新添加到隊列中
如果返回的result.RequeueAfter > 0,則先將元素忘記,然后在result.RequeueAfter時間后加入到隊列中
如果返回result.Requeue = true,則直接將元素重新加入到限速隊列中
如果正常返回reconcile.Result{},則直接忘記這個元素
controller-runtime中的Manager是一個用於初始化共享依賴關系的接口
type Manager interface { // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. Add will inject any dependencies for which the argument // implements the inject interface - e.g. inject.Client. // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled). Add(Runnable) error // Elected is closed when this manager is elected leader of a group of // managers, either because it won a leader election or because no leader // election was configured. Elected() <-chan struct{} // SetFields will set any dependencies on an object for which the object has implemented the inject // interface - e.g. inject.Client. SetFields(interface{}) error // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics. // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be // sensitive and shouldn't be exposed publicly. // If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as // Runnable to the manager via Add method. AddMetricsExtraHandler(path string, handler http.Handler) error // AddHealthzCheck allows you to add Healthz checker AddHealthzCheck(name string, check healthz.Checker) error // AddReadyzCheck allows you to add Readyz checker AddReadyzCheck(name string, check healthz.Checker) error // Start starts all registered Controllers and blocks until the context is cancelled. // Returns an error if there is an error starting any controller. // // If LeaderElection is used, the binary must be exited immediately after this returns, // otherwise components that need leader election might continue to run after the leader // lock was lost. Start(ctx context.Context) error // GetConfig returns an initialized Config GetConfig() *rest.Config // GetScheme returns an initialized Scheme GetScheme() *runtime.Scheme // GetClient returns a client configured with the Config. This client may // not be a fully "direct" client -- it may read from a cache, for // instance. See Options.NewClient for more information on how the default // implementation works. GetClient() client.Client // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer // GetCache returns a cache.Cache GetCache() cache.Cache // GetEventRecorderFor returns a new EventRecorder for the provided name GetEventRecorderFor(name string) record.EventRecorder // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper // GetAPIReader returns a reader that will be configured to use the API server. // This should be used sparingly and only when the client does not fit your // use case. GetAPIReader() client.Reader // GetWebhookServer returns a webhook.Server GetWebhookServer() *webhook.Server // GetLogger returns this manager's logger. GetLogger() logr.Logger }
具體實現在controllerManager結構體:
type controllerManager struct { // config is the rest.config used to talk to the apiserver. Required. config *rest.Config // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults // to scheme.scheme. scheme *runtime.Scheme // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. // These Runnables are managed by lead election. leaderElectionRunnables []Runnable // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. // These Runnables will not be blocked by lead election. nonLeaderElectionRunnables []Runnable cache cache.Cache // TODO(directxman12): Provide an escape hatch to get individual indexers // client is the client injected into Controllers (and EventHandlers, Sources and Predicates). client client.Client // apiReader is the reader that will make requests to the api server and not the cache. apiReader client.Reader // fieldIndexes knows how to add field indexes over the Cache used by this controller, // which can later be consumed via field selectors from the injected client. fieldIndexes client.FieldIndexer // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider // resourceLock forms the basis for leader election resourceLock resourcelock.Interface // leaderElectionReleaseOnCancel defines if the manager should step back from the leader lease // on shutdown leaderElectionReleaseOnCancel bool // mapper is used to map resources to kind, and map kind and version. mapper meta.RESTMapper // metricsListener is used to serve prometheus metrics metricsListener net.Listener // metricsExtraHandlers contains extra handlers to register on http server that serves metrics. metricsExtraHandlers map[string]http.Handler // healthProbeListener is used to serve liveness probe healthProbeListener net.Listener // Readiness probe endpoint name readinessEndpointName string // Liveness probe endpoint name livenessEndpointName string // Readyz probe handler readyzHandler *healthz.Handler // Healthz probe handler healthzHandler *healthz.Handler mu sync.Mutex started bool startedLeader bool healthzStarted bool errChan chan error // Logger is the logger that should be used by this manager. // If none is set, it defaults to log.Log global logger. logger logr.Logger // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that // it must be deferred until after gracefulShutdown is done. leaderElectionCancel context.CancelFunc // stop procedure engaged. In other words, we should not add anything else to the manager stopProcedureEngaged bool // elected is closed when this manager becomes the leader of a group of // managers, either because it won a leader election or because no leader // election was configured. elected chan struct{} startCache func(ctx context.Context) error // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. host string // CertDir is the directory that contains the server key and certificate. // if not set, webhook server would look up the server key and certificate in // {TempDir}/k8s-webhook-server/serving-certs certDir string webhookServer *webhook.Server // leaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. leaseDuration time.Duration // renewDeadline is the duration that the acting controlplane will retry // refreshing leadership before giving up. renewDeadline time.Duration // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration // waitForRunnable is holding the number of runnables currently running so that // we can wait for them to exit before quitting the manager waitForRunnable sync.WaitGroup // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration // onStoppedLeading is callled when the leader election lease is lost. // It can be overridden for tests. onStoppedLeading func() // shutdownCtx is the context that can be used during shutdown. It will be cancelled // after the gracefulShutdownTimeout ended. It must not be accessed before internalStop // is closed because it will be nil. shutdownCtx context.Context internalCtx context.Context internalCancel context.CancelFunc // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} }
通過manager創建controller的步驟:
(1)實例化 manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
第一個參數是client-go的rest/config.go中定義的Config結構體
ctrl.GetConfigOrDie()會調用GetConfigWithContext(),使用~/.kube/config作為默認配置文件生成Config對象
第二個參數是pkg/manager/manager.go中定義的Options結構體,包含了新建manager時候的參數
通過在ctrl.Options結構體中設置namespace字符串,可以設置controller可以管理的namespace
也可以通過設置NewCache為cache.MultiNamespacedCacheBuilder(namespaces字符串切片)讓其管理一系列namespace
NewManager實際調用了manager.new函數,為Manager執行初始化工作:
func New(config *rest.Config, options Options) (Manager, error) { // Initialize a rest.config if none was specified if config == nil { return nil, fmt.Errorf("must specify Config") } options = setOptionsDefaults(options) // 為Options設置一些默認的參數值 // Create the mapper provider mapper, err := options.MapperProvider(config) if err != nil { log.Error(err, "Failed to get API Group-Resources") return nil, err } // Create the cache for the cached read client and registering informers cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) if err != nil { return nil, err } apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper}) if err != nil { return nil, err } writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper}) if err != nil { return nil, err } if options.DryRunClient { writeObj = client.NewDryRunClient(writeObj) } // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } // Create the resource lock to enable leader election) leaderConfig := config if options.LeaderElectionConfig != nil { leaderConfig = options.LeaderElectionConfig } resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{ LeaderElection: options.LeaderElection, LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, }) if err != nil { return nil, err } // Create the metrics listener. This will throw an error if the metrics bind // address is invalid or already in use. metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) if err != nil { return nil, err } // By default we have no extra endpoints to expose on metrics http server. metricsExtraHandlers := make(map[string]http.Handler) // Create health probes listener. This will throw an error if the bind // address is invalid or already in use. healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) if err != nil { return nil, err } return &controllerManager{ config: config, scheme: options.Scheme, cache: cache, fieldIndexes: cache, client: writeObj, apiReader: apiReader, recorderProvider: recorderProvider, resourceLock: resourceLock, mapper: mapper, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, logger: options.Logger, elected: make(chan struct{}), port: options.Port, host: options.Host, certDir: options.CertDir, leaseDuration: *options.LeaseDuration, renewDeadline: *options.RenewDeadline, retryPeriod: *options.RetryPeriod, healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, internalProceduresStop: make(chan struct{}), }, nil }
(2)向manager添加scheme,以將api注冊到scheme,Scheme 提供了GVK 到go type的映射。
如果多個crd,需要多次調用 AddToScheme
err = api.AddToScheme(mgr.GetScheme())
實際調用了api/vxxx/groupversion_info.go中API schema的相關定義:
var ( GroupVersion = schema.GroupVersion{Group: "data.fluid.io", Version: "v1alpha1"} //用於注冊資源對象的GV SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} //SchemeBuilder用於將go type添加到GVK scheme AddToScheme = SchemeBuilder.AddToScheme //AddToScheme用於將GV中的type添加到scheme )
(3)向manager添加controller:
err = ctrl.NewControllerManagedBy(mgr). For(&api.ChaosPod{}). Owns(&corev1.Pod{}). Complete(&reconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), })
①NewControllerManagedBy實際調用了builder.ControllerManagedBy函數,它會返回一個新的控制器構造器Builder對象,生成的控制器將管理器Manager啟動,
type Builder struct { forInput ForInput ownsInput []OwnsInput watchesInput []WatchesInput mgr manager.Manager globalPredicates []predicate.Predicate config *rest.Config ctrl controller.Controller ctrlOptions controller.Options name string } func ControllerManagedBy(m manager.Manager) *Builder { return &Builder{mgr: m} }
②For函數定義了被調諧的對象類型並配置 ControllerManagedBy 通過調諧對象來響應 create/delete/update 事件
調用For函數相當於調用Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder { if blder.forInput.object != nil { blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation") return blder } input := ForInput{object: object} for _, opt := range opts { opt.ApplyToFor(&input) } blder.forInput = input return blder }
③Owns函數就是來配置監聽的資源對象的子資源:
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { input := OwnsInput{object: object} for _, opt := range opts { opt.ApplyToOwns(&input) } blder.ownsInput = append(blder.ownsInput, input) return blder }
例如此處,ChaosPod擁有Pod作為子資源
沒有子資源的話,此部分可省略
④Complete 函數:
func (blder *Builder) Complete(r reconcile.Reconciler) error { _, err := blder.Build(r) return err } //Build根據用戶傳入的自己自己實現的Reconciler結構體,構建應用程序ControllerManagedBy並返回它創建的 Controller func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { if r == nil { return nil, fmt.Errorf("must provide a non-nil Reconciler") } if blder.mgr == nil { return nil, fmt.Errorf("must provide a non-nil Manager") } if blder.forInput.err != nil { return nil, blder.forInput.err } if blder.forInput.object == nil { // Checking the reconcile type exist or not return nil, fmt.Errorf("must provide an object for reconciliation") } blder.loadRestConfig() // Set the Config if err := blder.doController(r); err != nil { // Set the ControllerManagedBy return nil, err } if err := blder.doWatch(); err != nil { // Set the Watch return nil, err } return blder.ctrl, nil } doController方法會將用戶實現的XxxReconciler結構體傳入為Controller的成員Do,因此可以調用c.Do.Reconcile(ctx, req): func (blder *Builder) doController(r reconcile.Reconciler) error { name, err := blder.getControllerName() if err != nil { return err } ctrlOptions := blder.ctrlOptions ctrlOptions.Reconciler = r blder.ctrl, err = newController(name, blder.mgr, ctrlOptions) return err }
會根據Builder.ctrlOptions和用戶傳入的XxxReconciler結構體構建創建controller所用的參數ctrlOptions
再通過newController創建新controller
(4)向manager添加webhook,同樣需要實現邏輯處理
err = ctrl.NewWebhookManagedBy(mgr). For(&api.ChaosPod{}). Complete()
(5)啟動 manager.start()
mgr.Start(ctrl.SetupSignalHandler())
用戶對CRD的數據結構的定義:
type ChaosPod struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec ChaosPodSpec `json:"spec,omitempty"` Status ChaosPodStatus `json:"status,omitempty"` } type ChaosPodList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []ChaosPod `json:"items"` }
需要在zz_generated.deepcopy.go中自動生成deepcopy的相關反法
將資源對象添加到Group的方法:
SchemeBuilder.Register(&ChaosPod{}, &ChaosPodList{})