深入理解controller-runtime框架


controller-runtime框架是社區封裝的一個控制器處理的框架
 
pkg/controllers/controller.go中,定義了Controller接口:
type Controller interface {
     reconcile.Reconciler
     Watch(src source.Source, eventhandler  handler.EventHandler, predicates ...predicate.Predicate) error
     Start(ctx context.Context) error
     GetLogger() logr.Logger
}
以及創建controller時的參數:
type Options struct {
     MaxConcurrentReconciles int
     Reconciler reconcile.Reconciler
     RateLimiter ratelimiter.RateLimiter  //限速隊列的速率限制
     Log logr.Logger    //controller使用的logger
}
 
pkg/internal/controller/controller.go中定義的Controller結構體,實現了Controller接口:
type Controller struct {
     Name string      //用於跟蹤、記錄和監控中控制器的唯一標識
     MaxConcurrentReconciles int    //可以運行的最大並發Reconciles數量,默認值為1
     Do reconcile.Reconciler
     MakeQueue func() workqueue.RateLimitingInterface    //一旦控制器准備好啟動,MakeQueue就會為這個控制器構造工作隊列。隊列通過監聽來自Infomer的事件,添加對象到隊列中進行處理
     Queue workqueue.RateLimitingInterface
     SetFields func(i interface{}) error   //SetFields將依賴關系注入到其他對象,比如Sources、EventHandlers以及Predicates
     mu sync.Mutex   // 控制器同步信號量
     JitterPeriod time.Duration    // 允許測試減少JitterPeriod,使其更快完成
     Started bool         //控制器是否已經啟動
     ctx context.Context
     startWatches []watchDescription
     Log logr.Logger        
}
①Do是pkg/reconcile/reconcile.go中定義的Reconcile接口:
type Reconciler interface {
     Reconcile(context.Context, Request) (Result, error)
}
②Informer、Indexer的數據通過startWatches屬性做了一層封裝,以方便在控制器啟動的時候啟動,該屬性是一個watchDescription切片
一個watchDescription包含所有啟動watch操作所需的信息(一個sources、一個handler以及predicates切片):
type watchDescription struct {
     src        source.Source
     handler    handler.EventHandler
     predicates []predicate.Predicate
}
③Queue是client-go中提供的限速隊列,放入到隊列中的元素不是以前默認的元素唯一的KEY,而是經過封裝的reconcile.Request對象:
type Request struct {
     types.NamespacedName
}
type NamespacedName struct {
     Namespace string
     Name      string
}
pkg/client/client.go中的client結構體,提供了Create、List、Delete等方法,通過它們可以很方便地得到資源對象
 
Controller最核心的方法Watch:
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
     c.mu.Lock()
     defer c.mu.Unlock()
     // 注入Cache到參數中
     if err := c.SetFields(src); err != nil {
           return err
     }
     if err := c.SetFields(evthdler); err != nil {
           return err
     }
     for _, pr := range prct {
           if err := c.SetFields(pr); err != nil {
                return err
           }
     }
     if !c.Started {      // Controller還沒啟動
           c.startWatches = append(c.startWatches,  watchDescription{src: src, handler: evthdler, predicates: prct})  // watches會被保存到控制器結構體中,直到調用Start(...) 函數
           return nil     //把watches存放到本地然后返回
     }
     c.Log.Info("Starting EventSource", "source", src)
     return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

第一個參數是pkg/source/source.go中定義的Source接口,它是事件的源:

type Source interface {
     Start(context.Context, handler.EventHandler,  workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
Source的具體實現在pkg/source/source.go中的Kind結構體:
type Kind struct {
     Type client.Object  //要watch的資源對象類型,例如&v1.Pod{}
     cache cache.Cache   //watch時使用的cache
}

  

 
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
     prct ...predicate.Predicate) error {
     if ks.Type == nil {   //必須指明Kind.Type
           return fmt.Errorf("must specify Kind.Type")
     }
     if ks.cache == nil {  //start前cache必須准備完成
           return fmt.Errorf("must call CacheInto on Kind before  calling Start")
     }
     i, err := ks.cache.GetInformer(ctx, ks.Type)  //從cache中獲取informer
     if err != nil {
           if kindMatchErr, ok := err.(*meta.NoKindMatchError);  ok {
                log.Error(err, "if kind is a CRD, it should be  installed before calling Start",
                     "kind", kindMatchErr.GroupKind)
           }
           return err
     }
     i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})  //添加EventHandler
     return nil
}
AddEventHandler方法中的參數是一個internal.EventHandler結構體,它的成員結構體EventHandler實現了client-go中提供的 ResourceEventHandler接口,也就是實現了OnAdd、OnUpdate、OnDelete等回調函數
 
pkg/builder/controller.go中的doWatch()方法實際調用了Watch,調用時傳入的就是Kind類型的對象
Watch最終就是調用了傳入的Kind類型的對象的Start方法
func (blder *Builder) doWatch() error {
     // Reconcile type
     typeForSrc, err := blder.project(blder.forInput.object,  blder.forInput.objectProjection)
     if err != nil {
           return err
     }
     src := &source.Kind{Type: typeForSrc}
     hdler := &handler.EnqueueRequestForObject{}
     allPredicates := append(blder.globalPredicates,  blder.forInput.predicates...)
     if err := blder.ctrl.Watch(src, hdler, allPredicates...);  err != nil {
           return err
     }
     // Watches the managed types
     for _, own := range blder.ownsInput {
           typeForSrc, err := blder.project(own.object,  own.objectProjection)
           if err != nil {
                return err
           }
           src := &source.Kind{Type: typeForSrc}
           hdler := &handler.EnqueueRequestForOwner{
                OwnerType:    blder.forInput.object,
                IsController: true,
           }
           allPredicates := append([]predicate.Predicate(nil),  blder.globalPredicates...)
           allPredicates = append(allPredicates,  own.predicates...)
           if err := blder.ctrl.Watch(src, hdler,  allPredicates...); err != nil {
                return err
           }
     }
     // Do the watch requests
     for _, w := range blder.watchesInput {
           allPredicates := append([]predicate.Predicate(nil),  blder.globalPredicates...)
           allPredicates = append(allPredicates,  w.predicates...)
           // If the source of this watch is of type  *source.Kind, project it.
           if srckind, ok := w.src.(*source.Kind); ok {
                typeForSrc, err := blder.project(srckind.Type,  w.objectProjection)
                if err != nil {
                     return err
                }
                srckind.Type = typeForSrc
           }
           if err := blder.ctrl.Watch(w.src, w.eventhandler,  allPredicates...); err != nil {
                return err
           }
     }
     return nil
}

  

調用Watch時傳入的handler.EventHandler是一個handler.EnqueueRequestForObject結構體,它實現了Create、Update、Delete、Generic四個方法。
type EnqueueRequestForObject struct{}
 
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent,  q workqueue.RateLimitingInterface) {
     if evt.Object == nil {
           enqueueLog.Error(nil, "CreateEvent received with no  metadata", "event", evt)
           return
     }
     q.Add(reconcile.Request{NamespacedName:  types.NamespacedName{
           Name:      evt.Object.GetName(),
           Namespace: evt.Object.GetNamespace(),
     }})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent,  q workqueue.RateLimitingInterface) {
     if evt.ObjectOld != nil {
           q.Add(reconcile.Request{NamespacedName:  types.NamespacedName{
                Name:      evt.ObjectOld.GetName(),
                Namespace: evt.ObjectOld.GetNamespace(),
           }})
     } else {
           enqueueLog.Error(nil, "UpdateEvent received with no  old metadata", "event", evt)
     }
     if evt.ObjectNew != nil {
           q.Add(reconcile.Request{NamespacedName:  types.NamespacedName{
                Name:      evt.ObjectNew.GetName(),
                Namespace: evt.ObjectNew.GetNamespace(),
           }})
     } else {
           enqueueLog.Error(nil, "UpdateEvent received with no  new metadata", "event", evt)
     }
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent,  q workqueue.RateLimitingInterface) {
     if evt.Object == nil {
           enqueueLog.Error(nil, "DeleteEvent received with no  metadata", "event", evt)
           return
     }
     q.Add(reconcile.Request{NamespacedName:  types.NamespacedName{
           Name:      evt.Object.GetName(),
           Namespace: evt.Object.GetNamespace(),
     }})
}
// Generic implements EventHandler
func (e *EnqueueRequestForObject) Generic(evt  event.GenericEvent, q workqueue.RateLimitingInterface) {
     if evt.Object == nil {
           enqueueLog.Error(nil, "GenericEvent received with no  metadata", "event", evt)
           return
     }
     q.Add(reconcile.Request{NamespacedName:  types.NamespacedName{
           Name:      evt.Object.GetName(),
           Namespace: evt.Object.GetNamespace(),
     }})
}

  

四個方法均會將資源對象的Name和Namespace組成reconcile.Request放入隊列中
 
 
Controller的核心方法Start:
func (c *Controller) Start(ctx context.Context) error {
     c.mu.Lock()
     if c.Started {
           return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
     }
     // Set the internal context.
     c.ctx = ctx
     c.Queue = c.MakeQueue()
     defer c.Queue.ShutDown() // needs to be outside the iife  so that we shutdown after the stop channel is closed
     err := func() error {
           defer c.mu.Unlock()
           defer utilruntime.HandleCrash()
           for _, watch := range c.startWatches {
                c.Log.Info("Starting EventSource", "source",  watch.src)
                if err := watch.src.Start(ctx, watch.handler,  c.Queue, watch.predicates...); err != nil {
                     return err
                }
           }
           // Start the SharedIndexInformer factories to begin  populating the SharedIndexInformer caches
           c.Log.Info("Starting Controller")
           for _, watch := range c.startWatches {
                syncingSource, ok :=  watch.src.(source.SyncingSource)
                if !ok {
                     continue
                }
                if err := syncingSource.WaitForSync(ctx); err !=  nil {
                     // This code is unreachable in case of  kube watches since WaitForCacheSync will never return an error
                     // Leaving it here because that could  happen in the future
                     err := fmt.Errorf("failed to wait for %s  caches to sync: %w", c.Name, err)
                     c.Log.Error(err, "Could not wait for Cache  to sync")
                     return err
                }
           }
           c.startWatches = nil
           if c.JitterPeriod == 0 {
                c.JitterPeriod = 1 * time.Second
           }
           // Launch workers to process resources
           c.Log.Info("Starting workers", "worker count",  c.MaxConcurrentReconciles)
           ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles))
           for i := 0; i < c.MaxConcurrentReconciles; i++ {
                go wait.UntilWithContext(ctx, func(ctx  context.Context) {
                     for c.processNextWorkItem(ctx) {
                     }
                }, c.JitterPeriod)
           }
           c.Started = true
           return nil
     }()
     if err != nil {
           return err
     }
     <-ctx.Done()
     c.Log.Info("Stopping workers")
     return nil
}

  

先等待資源對象的Informer同步完成,然后啟動workers來處理資源對象,而且worker函數都是一樣的實現方式:
func (c *Controller) processNextWorkItem(ctx context.Context)  bool {
     obj, shutdown := c.Queue.Get()
     if shutdown {
           return false   // Stop working
     }
     defer c.Queue.Done(obj)
     ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
     defer  ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
     c.reconcileHandler(ctx, obj)
     return true
}
processNextWorkItem從工作隊列中彈出一個元素,並嘗試通過調用reconcileHandler來處理它
reconcileHandler方法是真正執行元素業務處理的地方,包含了事件處理以及錯誤處理:
func (c *Controller) reconcileHandler(ctx context.Context, obj  interface{}) {
     // Update metrics after processing each item
     reconcileStartTS := time.Now()
     defer func() {
           c.updateMetrics(time.Since(reconcileStartTS))
     }()
     // Make sure that the the object is a valid request.
     req, ok := obj.(reconcile.Request)
     if !ok {
           c.Queue.Forget(obj)
           c.Log.Error(nil, "Queue item was not a Request",  "type", fmt.Sprintf("%T", obj), "value", obj)
           return
     }
     log := c.Log.WithValues("name", req.Name, "namespace",  req.Namespace)
     ctx = logf.IntoContext(ctx, log)
     if result, err := c.Do.Reconcile(ctx, req); err != nil {
           c.Queue.AddRateLimited(req)
           ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
           ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name,  "error").Inc()
           return
     } else if result.RequeueAfter > 0 {
           c.Queue.Forget(obj)
           c.Queue.AddAfter(req, result.RequeueAfter)
           ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name,  "requeue_after").Inc()
           return
     } else if result.Requeue {
           c.Queue.AddRateLimited(req)
           ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name,  "requeue").Inc()
           return
     }
     c.Queue.Forget(obj)
     ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name,  "success").Inc()
}
真正的事件處理通過c.Do.Reconcile(ctx, req)暴露給開發者;就算直接調用c.Reconcile(ctx, req),還是會調用c.Do.Reconcile(ctx, req)
func (c *Controller) Reconcile(ctx context.Context, req  reconcile.Request) (reconcile.Result, error) {
     log := c.Log.WithValues("name", req.Name, "namespace",  req.Namespace)
     ctx = logf.IntoContext(ctx, log)
     return c.Do.Reconcile(ctx, req)
}
每種CRD必須定義一個實現了reconcile.Reconcile接口的XxxReconcile結構體,結構體需要包含client.Client(manager定義了GetClient方法),便於從req中獲取資源對象;包含runtime.Scheme,以提供資源對象Kind和Go type的映射(manager定義了runtime.GetScheme方法)
所以對於開發者來說,只需要在此結構體的Reconcile方法中去處理業務邏輯就可以了:
根據方法的返回值來判斷是否需要將元素重新加入限速隊列進行處理
方法的返回值為reconcile.Result類型:
type Result struct {
     Requeue bool    
     RequeueAfter time.Duration
} 
    如果返回error!=nil,則將元素重新添加到隊列中
    如果返回的result.RequeueAfter > 0,則先將元素忘記,然后在result.RequeueAfter時間后加入到隊列中
    如果返回result.Requeue = true,則直接將元素重新加入到限速隊列中
    如果正常返回reconcile.Result{},則直接忘記這個元素
 
controller-runtime中的Manager是一個用於初始化共享依賴關系的接口
type Manager interface {
     // Add will set requested dependencies on the component,  and cause the component to be
     // started when Start is called.  Add will inject any  dependencies for which the argument
     // implements the inject interface - e.g. inject.Client.
     // Depending on if a Runnable implements  LeaderElectionRunnable interface, a Runnable can be run in  either
     // non-leaderelection mode (always running) or leader  election mode (managed by leader election if enabled).
     Add(Runnable) error
     // Elected is closed when this manager is elected leader  of a group of
     // managers, either because it won a leader election or  because no leader
     // election was configured.
     Elected() <-chan struct{}
     // SetFields will set any dependencies on an object for  which the object has implemented the inject
     // interface - e.g. inject.Client.
     SetFields(interface{}) error
     // AddMetricsExtraHandler adds an extra handler served on  path to the http server that serves metrics.
     // Might be useful to register some diagnostic endpoints  e.g. pprof. Note that these endpoints meant to be
     // sensitive and shouldn't be exposed publicly.
     // If the simple path -> handler mapping offered here is  not enough, a new http server/listener should be added as
     // Runnable to the manager via Add method.
     AddMetricsExtraHandler(path string, handler http.Handler)  error
     // AddHealthzCheck allows you to add Healthz checker
     AddHealthzCheck(name string, check healthz.Checker) error
     // AddReadyzCheck allows you to add Readyz checker
     AddReadyzCheck(name string, check healthz.Checker) error
     // Start starts all registered Controllers and blocks  until the context is cancelled.
     // Returns an error if there is an error starting any  controller.
     //
     // If LeaderElection is used, the binary must be exited  immediately after this returns,
     // otherwise components that need leader election might  continue to run after the leader
     // lock was lost.
     Start(ctx context.Context) error
     // GetConfig returns an initialized Config
     GetConfig() *rest.Config
     // GetScheme returns an initialized Scheme
     GetScheme() *runtime.Scheme
     // GetClient returns a client configured with the Config.  This client may
     // not be a fully "direct" client -- it may read from a  cache, for
     // instance.  See Options.NewClient for more information  on how the default
     // implementation works.
     GetClient() client.Client
     // GetFieldIndexer returns a client.FieldIndexer  configured with the client
     GetFieldIndexer() client.FieldIndexer
     // GetCache returns a cache.Cache
     GetCache() cache.Cache
     // GetEventRecorderFor returns a new EventRecorder for the  provided name
     GetEventRecorderFor(name string) record.EventRecorder
     // GetRESTMapper returns a RESTMapper
     GetRESTMapper() meta.RESTMapper
     // GetAPIReader returns a reader that will be configured  to use the API server.
     // This should be used sparingly and only when the client  does not fit your
     // use case.
     GetAPIReader() client.Reader
     // GetWebhookServer returns a webhook.Server
     GetWebhookServer() *webhook.Server
     // GetLogger returns this manager's logger.
     GetLogger() logr.Logger
}
具體實現在controllerManager結構體:
type controllerManager struct {
     // config is the rest.config used to talk to the  apiserver.  Required.
     config *rest.Config
     // scheme is the scheme injected into Controllers,  EventHandlers, Sources and Predicates.  Defaults
     // to scheme.scheme.
     scheme *runtime.Scheme
     // leaderElectionRunnables is the set of Controllers that  the controllerManager injects deps into and Starts.
     // These Runnables are managed by lead election.
     leaderElectionRunnables []Runnable
     // nonLeaderElectionRunnables is the set of webhook  servers that the controllerManager injects deps into and Starts.
     // These Runnables will not be blocked by lead election.
     nonLeaderElectionRunnables []Runnable
     cache cache.Cache
     // TODO(directxman12): Provide an escape hatch to get  individual indexers
     // client is the client injected into Controllers (and  EventHandlers, Sources and Predicates).
     client client.Client
     // apiReader is the reader that will make requests to the  api server and not the cache.
     apiReader client.Reader
     // fieldIndexes knows how to add field indexes over the  Cache used by this controller,
     // which can later be consumed via field selectors from  the injected client.
     fieldIndexes client.FieldIndexer
     // recorderProvider is used to generate event recorders  that will be injected into Controllers
     // (and EventHandlers, Sources and Predicates).
     recorderProvider *intrec.Provider
     // resourceLock forms the basis for leader election
     resourceLock resourcelock.Interface
     // leaderElectionReleaseOnCancel defines if the manager  should step back from the leader lease
     // on shutdown
     leaderElectionReleaseOnCancel bool
     // mapper is used to map resources to kind, and map kind  and version.
     mapper meta.RESTMapper
     // metricsListener is used to serve prometheus metrics
     metricsListener net.Listener
     // metricsExtraHandlers contains extra handlers to  register on http server that serves metrics.
     metricsExtraHandlers map[string]http.Handler
     // healthProbeListener is used to serve liveness probe
     healthProbeListener net.Listener
     // Readiness probe endpoint name
     readinessEndpointName string
     // Liveness probe endpoint name
     livenessEndpointName string
     // Readyz probe handler
     readyzHandler *healthz.Handler
     // Healthz probe handler
     healthzHandler *healthz.Handler
     mu             sync.Mutex
     started        bool
     startedLeader  bool
     healthzStarted bool
     errChan        chan error
     // Logger is the logger that should be used by this  manager.
     // If none is set, it defaults to log.Log global logger.
     logger logr.Logger
     // leaderElectionCancel is used to cancel the leader  election. It is distinct from internalStopper,
     // because for safety reasons we need to os.Exit() when we  lose the leader election, meaning that
     // it must be deferred until after gracefulShutdown is  done.
     leaderElectionCancel context.CancelFunc
     // stop procedure engaged. In other words, we should not  add anything else to the manager
     stopProcedureEngaged bool
     // elected is closed when this manager becomes the leader  of a group of
     // managers, either because it won a leader election or  because no leader
     // election was configured.
     elected chan struct{}
     startCache func(ctx context.Context) error
     // port is the port that the webhook server serves at.
     port int
     // host is the hostname that the webhook server binds to.
     host string
     // CertDir is the directory that contains the server key  and certificate.
     // if not set, webhook server would look up the server key  and certificate in
     // {TempDir}/k8s-webhook-server/serving-certs
     certDir string
     webhookServer *webhook.Server
     // leaseDuration is the duration that non-leader  candidates will
     // wait to force acquire leadership.
     leaseDuration time.Duration
     // renewDeadline is the duration that the acting  controlplane will retry
     // refreshing leadership before giving up.
     renewDeadline time.Duration
     // retryPeriod is the duration the LeaderElector clients  should wait
     // between tries of actions.
     retryPeriod time.Duration
     // waitForRunnable is holding the number of runnables  currently running so that
     // we can wait for them to exit before quitting the  manager
     waitForRunnable sync.WaitGroup
     // gracefulShutdownTimeout is the duration given to  runnable to stop
     // before the manager actually returns on stop.
     gracefulShutdownTimeout time.Duration
     // onStoppedLeading is callled when the leader election  lease is lost.
     // It can be overridden for tests.
     onStoppedLeading func()
     // shutdownCtx is the context that can be used during  shutdown. It will be cancelled
     // after the gracefulShutdownTimeout ended. It must not be  accessed before internalStop
     // is closed because it will be nil.
     shutdownCtx context.Context
     internalCtx    context.Context
     internalCancel context.CancelFunc
     // internalProceduresStop channel is used internally to  the manager when coordinating
     // the proper shutdown of servers. This channel is also  used for dependency injection.
     internalProceduresStop chan struct{}
}
通過manager創建controller的步驟:
(1)實例化 manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(),  ctrl.Options{})  
第一個參數是client-go的rest/config.go中定義的Config結構體
    ctrl.GetConfigOrDie()會調用GetConfigWithContext(),使用~/.kube/config作為默認配置文件生成Config對象
第二個參數是pkg/manager/manager.go中定義的Options結構體,包含了新建manager時候的參數
    通過在ctrl.Options結構體中設置namespace字符串,可以設置controller可以管理的namespace
    也可以通過設置NewCache為cache.MultiNamespacedCacheBuilder(namespaces字符串切片)讓其管理一系列namespace
NewManager實際調用了manager.new函數,為Manager執行初始化工作:
func New(config *rest.Config, options Options) (Manager, error)  {
     // Initialize a rest.config if none was specified
     if config == nil {
           return nil, fmt.Errorf("must specify Config")
     }
     options = setOptionsDefaults(options)   // 為Options設置一些默認的參數值
     // Create the mapper provider
     mapper, err := options.MapperProvider(config)
     if err != nil {
           log.Error(err, "Failed to get API Group-Resources")
           return nil, err
     }
     // Create the cache for the cached read client and  registering informers
     cache, err := options.NewCache(config,  cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync:  options.SyncPeriod, Namespace: options.Namespace})
     if err != nil {
           return nil, err
     }
     apiReader, err := client.New(config,  client.Options{Scheme: options.Scheme, Mapper: mapper})
     if err != nil {
           return nil, err
     }
     writeObj, err := options.NewClient(cache, config,  client.Options{Scheme: options.Scheme, Mapper: mapper})
     if err != nil {
           return nil, err
     }
     if options.DryRunClient {
           writeObj = client.NewDryRunClient(writeObj)
     }
     // Create the recorder provider to inject event recorders  for the components.
     // TODO(directxman12): the log for the event provider  should have a context (name, tags, etc) specific
     // to the particular controller that it's being injected  into, rather than a generic one like is here.
     recorderProvider, err :=  options.newRecorderProvider(config, options.Scheme,  log.WithName("events"), options.makeBroadcaster)
     if err != nil {
           return nil, err
     }
     // Create the resource lock to enable leader election)
     leaderConfig := config
     if options.LeaderElectionConfig != nil {
           leaderConfig = options.LeaderElectionConfig
     }
     resourceLock, err := options.newResourceLock(leaderConfig,  recorderProvider, leaderelection.Options{
           LeaderElection:             options.LeaderElection,
           LeaderElectionResourceLock:  options.LeaderElectionResourceLock,
           LeaderElectionID:           options.LeaderElectionID,
           LeaderElectionNamespace:     options.LeaderElectionNamespace,
     })
     if err != nil {
           return nil, err
     }
     // Create the metrics listener. This will throw an error  if the metrics bind
     // address is invalid or already in use.
     metricsListener, err :=  options.newMetricsListener(options.MetricsBindAddress)
     if err != nil {
           return nil, err
     }
     // By default we have no extra endpoints to expose on  metrics http server.
     metricsExtraHandlers := make(map[string]http.Handler)
     // Create health probes listener. This will throw an error  if the bind
     // address is invalid or already in use.
     healthProbeListener, err :=  options.newHealthProbeListener(options.HealthProbeBindAddress)
     if err != nil {
           return nil, err
     }
     return &controllerManager{
           config:                  config,
           scheme:                  options.Scheme,
           cache:                   cache,
           fieldIndexes:            cache,
           client:                  writeObj,
           apiReader:               apiReader,
           recorderProvider:        recorderProvider,
           resourceLock:            resourceLock,
           mapper:                  mapper,
           metricsListener:         metricsListener,
           metricsExtraHandlers:    metricsExtraHandlers,
           logger:                  options.Logger,
           elected:                 make(chan struct{}),
           port:                    options.Port,
           host:                    options.Host,
           certDir:                 options.CertDir,
           leaseDuration:           *options.LeaseDuration,
           renewDeadline:           *options.RenewDeadline,
           retryPeriod:             *options.RetryPeriod,
           healthProbeListener:     healthProbeListener,
           readinessEndpointName:    options.ReadinessEndpointName,
           livenessEndpointName:     options.LivenessEndpointName,
           gracefulShutdownTimeout:  *options.GracefulShutdownTimeout,
           internalProceduresStop:  make(chan struct{}),
     }, nil
}
(2)向manager添加scheme,以將api注冊到scheme,Scheme 提供了GVK 到go type的映射。
如果多個crd,需要多次調用 AddToScheme
err = api.AddToScheme(mgr.GetScheme())
實際調用了api/vxxx/groupversion_info.go中API schema的相關定義:
var (
     GroupVersion = schema.GroupVersion{Group: "data.fluid.io", Version: "v1alpha1"}  //用於注冊資源對象的GV
     SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}        //SchemeBuilder用於將go type添加到GVK scheme
     AddToScheme = SchemeBuilder.AddToScheme        //AddToScheme用於將GV中的type添加到scheme
)
(3)向manager添加controller:
err = ctrl.NewControllerManagedBy(mgr).
     For(&api.ChaosPod{}).
     Owns(&corev1.Pod{}).
     Complete(&reconciler{
           Client: mgr.GetClient(),
           scheme: mgr.GetScheme(),
     })
①NewControllerManagedBy實際調用了builder.ControllerManagedBy函數,它會返回一個新的控制器構造器Builder對象,生成的控制器將管理器Manager啟動,
type Builder struct {
     forInput         ForInput
     ownsInput        []OwnsInput
     watchesInput     []WatchesInput
     mgr              manager.Manager
     globalPredicates []predicate.Predicate
     config           *rest.Config
     ctrl             controller.Controller
     ctrlOptions      controller.Options
     name             string
}
 
func ControllerManagedBy(m manager.Manager) *Builder {
     return &Builder{mgr: m}
}
②For函數定義了被調諧的對象類型並配置 ControllerManagedBy 通過調諧對象來響應 create/delete/update 事件
調用For函數相當於調用Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
func (blder *Builder) For(object client.Object, opts  ...ForOption) *Builder {
     if blder.forInput.object != nil {
           blder.forInput.err = fmt.Errorf("For(...) should only  be called once, could not assign multiple objects for  reconciliation")
           return blder
     }
     input := ForInput{object: object}
     for _, opt := range opts {
           opt.ApplyToFor(&input)
     }
     blder.forInput = input
     return blder
}
③Owns函數就是來配置監聽的資源對象的子資源:
func (blder *Builder) Owns(object client.Object, opts  ...OwnsOption) *Builder {
     input := OwnsInput{object: object}
     for _, opt := range opts {
           opt.ApplyToOwns(&input)
     }
     blder.ownsInput = append(blder.ownsInput, input)
     return blder
}
例如此處,ChaosPod擁有Pod作為子資源
沒有子資源的話,此部分可省略
④Complete 函數:
func (blder *Builder) Complete(r reconcile.Reconciler) error {
     _, err := blder.Build(r)
     return err
}


//Build根據用戶傳入的自己自己實現的Reconciler結構體,構建應用程序ControllerManagedBy並返回它創建的 Controller
func (blder *Builder) Build(r reconcile.Reconciler)  (controller.Controller, error) {
     if r == nil {
           return nil, fmt.Errorf("must provide a non-nil  Reconciler")
     }
     if blder.mgr == nil {
           return nil, fmt.Errorf("must provide a non-nil  Manager")
     }
     if blder.forInput.err != nil {
           return nil, blder.forInput.err
     }
     if blder.forInput.object == nil {   // Checking the reconcile type exist or not
           return nil, fmt.Errorf("must provide an object for  reconciliation")
     }
     blder.loadRestConfig()       // Set the Config
     if err := blder.doController(r); err != nil {  // Set the ControllerManagedBy
           return nil, err
     }
     if err := blder.doWatch(); err != nil {   // Set the Watch
           return nil, err
     }
     return blder.ctrl, nil
}
doController方法會將用戶實現的XxxReconciler結構體傳入為Controller的成員Do,因此可以調用c.Do.Reconcile(ctx, req):
func (blder *Builder) doController(r reconcile.Reconciler) error  {
     name, err := blder.getControllerName()
     if err != nil {
           return err
     }
     ctrlOptions := blder.ctrlOptions
     ctrlOptions.Reconciler = r
     blder.ctrl, err = newController(name, blder.mgr,  ctrlOptions)
     return err
}

會根據Builder.ctrlOptions和用戶傳入的XxxReconciler結構體構建創建controller所用的參數ctrlOptions

再通過newController創建新controller

(4)向manager添加webhook,同樣需要實現邏輯處理
err = ctrl.NewWebhookManagedBy(mgr).
     For(&api.ChaosPod{}).
     Complete()
(5)啟動 manager.start()
mgr.Start(ctrl.SetupSignalHandler())

 

用戶對CRD的數據結構的定義:

type ChaosPod struct {
     metav1.TypeMeta   `json:",inline"`
     metav1.ObjectMeta `json:"metadata,omitempty"`
     Spec   ChaosPodSpec   `json:"spec,omitempty"`
     Status ChaosPodStatus `json:"status,omitempty"`
}
 
type ChaosPodList struct {
     metav1.TypeMeta `json:",inline"`
     metav1.ListMeta `json:"metadata,omitempty"`
     Items           []ChaosPod `json:"items"`
}

需要在zz_generated.deepcopy.go中自動生成deepcopy的相關反法

將資源對象添加到Group的方法:
SchemeBuilder.Register(&ChaosPod{}, &ChaosPodList{})

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM