kubebuilder 的運行邏輯
概述
下面是kubebuilder 的架構圖。可以看到最外層是通過名為Manager
的組件驅動的,Manager中包含了多個組件,其中Cache
中保存了gvk和informer的映射關系,用於通過informer的方式緩存kubernetes 的對象。Controller
使用workqueue的方式緩存informer傳遞過來的對象,后續提取workqueue中的對象,傳遞給Reconciler
進行處理。
本文不介紹kuberbuilder的用法,如有需要可以參考如下三篇文章:
- Kubernetes Operator for Beginners — What, Why, How
- Advanced Kubernetes Operators Development
- Advanced Kubernetes Operator Development with Finalizer, Informer, and Webhook
本次使用的controller-runtime的版本是:v0.11.0
引用自:Controller Runtime 的四種使用姿勢
下面展示了用戶創建的 Manager 和 Reconciler 以及 Controller Runtime 自己啟動的 Cache 和 Controller。先看用戶側的,Manager 是用戶初始化的時候需要創建的,用來啟動 Controller Runtime 的組件;Reconciler 是用戶自己需要提供的組件,用於處理自己的業務邏輯。
而 controller-runtime 側的組件,Cache 顧名思義就是緩存,用於建立 Informer 對 ApiServer 進行連接 watch 資源,並將 watch 到的 object 推入隊列;Controller 一方面會向 Informer 注冊 eventHandler,另一方面會從隊列中拿數據並執行用戶側 Reconciler 的函數。
![]()
controller-runtime 側整個工作流程如下:
![]()
首先 Controller 會先向 Informer 注冊特定資源的 eventHandler;然后 Cache 會啟動 Informer,Informer 向 ApiServer 發出請求,建立連接;當 Informer 檢測到有資源變動后,使用 Controller 注冊進來的 eventHandler 判斷是否推入隊列中;當隊列中有元素被推入時,Controller 會將元素取出,並執行用戶側的 Reconciler。
下述例子的代碼生成參考:Building your own kubernetes CRDs
Managers
manager負責運行controllers和webhooks,並設置公共依賴,如clients、caches、schemes等。
kubebuilder的處理
kubebuilder會自動在main.go中創建Manager:
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "3b9f5c61.com.bolingcavalry",
})
controllers是通過調用Manager.Start
接口啟動的。
Controllers
controller使用events
來觸發reconcile的請求。通過controller.New接口可以初始化一個controller,並通過manager.Start啟動該controller。
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
if err != nil {
return nil, err
}
// Add the controller as a Manager components
return c, mgr.Add(c) // 將controller添加到manager中
}
kubebuilder的處理
kubebuilder會自動在main.go中生成一個SetupWithManager
函數,在Complete
中創建並將controller添加到manager,具體見下文:
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.Guestbook{}).
Complete(r)
}
在main.go中調用Manager.Start
接口來啟動controller:
mgr.Start(ctrl.SetupSignalHandler())
Reconcilers
Controller的核心是實現了Reconciler接口。Reconciler 會接收到一個reconcile請求,該請求中包含對象的name和namespace。reconcile會對比對象和其所擁有(own)的資源的當前狀態與期望狀態,並據此做出相應的調整。
通常Controller會根據集群事件(如Creating、Updating、Deleting Kubernetes對象)或外部事件(如GitHub Webhooks、輪詢外部資源等)觸發reconcile。
注意:Reconciler中傳入的
reqeust
中僅包含對象的名稱和命名空間,並沒有對象的其他信息,因此需要通過kubernetes client來獲取對象的相關信息。type Request struct { // NamespacedName is the name and namespace of the object to reconcile. types.NamespacedName }
type NamespacedName struct { Namespace string Name string }
Reconciler接口的描述如下,其中給出了其處理邏輯的例子:
- 讀取一個對象以及其所擁有的所有pod
- 觀察到對象期望的副本數為5,但實際只有一個pod副本
- 創建4個pods,並設置OwnerReferences
/*
Reconciler implements a Kubernetes API for a specific Resource by Creating, Updating or Deleting Kubernetes
objects, or by making changes to systems external to the cluster (e.g. cloudproviders, github, etc).
reconcile implementations compare the state specified in an object by a user against the actual cluster state,
and then perform operations to make the actual cluster state reflect the state specified by the user.
Typically, reconcile is triggered by a Controller in response to cluster Events (e.g. Creating, Updating,
Deleting Kubernetes objects) or external Events (GitHub Webhooks, polling external sources, etc).
Example reconcile Logic:
* Read an object and all the Pods it owns.
* Observe that the object spec specifies 5 replicas but actual cluster contains only 1 Pod replica.
* Create 4 Pods and set their OwnerReferences to the object.
reconcile may be implemented as either a type:
type reconcile struct {}
func (reconcile) reconcile(controller.Request) (controller.Result, error) {
// Implement business logic of reading and writing objects here
return controller.Result{}, nil
}
Or as a function:
controller.Func(func(o controller.Request) (controller.Result, error) {
// Implement business logic of reading and writing objects here
return controller.Result{}, nil
})
Reconciliation is level-based, meaning action isn't driven off changes in individual Events, but instead is
driven by actual cluster state read from the apiserver or a local cache.
For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted,
instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing.
*/
type Reconciler interface {
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
Reconcile(context.Context, Request) (Result, error)
}
重新執行Reconciler
Reconcile
除了根據事件執行之外還可以重復調用,方法比較簡單,即在實現Reconcile(context.Context, Request) (Result, error)
的方法中,將返回值Result.Requeue
設置為true
,此時會非周期性地重復調用Reconcile
;另一種是給Result.RequeueAfter
設置一個時間范圍,當超時之后會重新調用Reconcile
。其重復執行的處理邏輯位於reconcileHandler
方法中,其實就是將老的obj從workqueue中刪除,然后重新入隊列,兩種情況的處理邏輯如下:
case err != nil:
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error")
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
Requeue和RequeueAfter對結果的影響如下:
Requeue | RequeueAfter | Error | Result |
---|---|---|---|
any | any | !nil | Requeue with rate limiting. |
true | 0 | nil | Requeue with rate limiting. |
any | >0 | nil | Requeue after specified RequeueAfter . |
false | 0 | nil | Do not requeue. |
kubebuilder的處理
kubebuilder會在guestbook_controller.go 中生成一個實現了Reconciler接口的模板:
func (r *GuestbookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
// TODO(user): your logic here
return ctrl.Result{}, nil
}
那么Reconciler又是怎么和controller關聯起來的呢?在上文提到 kubebuilder 會通過Complete
(SetupWithManager
中調用)創建並添加controller到manager,同時可以看到Complete
中傳入的就是reconcile.Reconciler
接口,這就是controller和Reconciler關聯的入口:
func (blder *Builder) Complete(r reconcile.Reconciler) error {
_, err := blder.Build(r)
return err
}
后續會通過: Builder.Build -->Builder.doController-->newController 最終傳遞給controller的初始化接口controller.New
,並賦值給Controller.Do
變量。controller.New
中創建的controller結構如下,可以看到還為MakeQueue
賦予了一個創建workqueue的函數,新事件會緩存到該workqueue中,后續傳遞給Reconcile進行處理:
// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name),
RecoverPanic: options.RecoverPanic,
}, nil
上面有講controller會根據事件來調用Reconciler,那它是如何傳遞事件的呢?
可以看下Controller的啟動接口(Manager.Start中會調用Controller.Start接口),可以看到其調用了processNextWorkItem
來處理workqueue中的事件:
func (c *Controller) Start(ctx context.Context) error {
...
c.Queue = c.MakeQueue() //通過MakeQueue初始化一個workqueue
...
wg := &sync.WaitGroup{}
err := func() error {
...
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
for c.processNextWorkItem(ctx) {
}
}()
}
...
}()
...
}
繼續查看processNextWorkItem
,可以看到該處理邏輯與client-go中的workqueue的處理方式一樣,從workqueue中拿出事件對象,然后傳遞給reconcileHandler
:
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get() //獲取workqueue中的對象
if shutdown {
// Stop working
return false
}
defer c.Queue.Done(obj)
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
c.reconcileHandler(ctx, obj)
return true
}
后續會通過Controller.reconcileHandler --> Controller.Reconcile -->Controller.Do.Reconcile 最終將事件傳遞給Reconcile(自己實現的Reconcile賦值給了controller的Do
變量)。
總結一下:kubebuilder首先通過SetupWithManager
將Reconcile
賦值給controller,在Manager啟動時會調用Controller.Start
啟動controller,controller會不斷獲取其workqueue中的對象,並傳遞給Reconcile進行處理。
Controller事件來源
上面講了controller是如何處理事件的,那么workqueue中的事件是怎么來的呢?
回到Builder.Complete-->Builder.build,從上面內容可以知道在doController
函數中進行了controller的初始化,並將Reconciler和controller關聯起來。在下面有個doWatch
函數,該函數中注冊了需要watch的對象類型,以及eventHandler(類型為handler.EnqueueRequestForObject
),並通過controller的Watch
接口啟動對資源的監控:
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
...
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {//初始化controller
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
return blder.ctrl, nil
}
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)//格式化資源類型
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc} //初始化資源類型
hdler := &handler.EnqueueRequestForObject{} //初始化eventHandler
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { //啟動對資源的監控
return err
}
...
}
上述的
blder.forInput.object
就是SetupWithManager
中的For
的參數(&webappv1.Guestbook{})func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&webappv1.Guestbook{}). Complete(r) }
繼續看controller.Watch接口,可以看到其調用了src.Start
(src的類型為 source.Kind),將evthdler(&handler.EnqueueRequestForObject{})、c.Qeueue關聯起來(c.Qeueue為Reconciler提供參數)
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
在Kind.Start 中會根據ks.Type選擇合適的informer,並添加事件管理器internal.EventHandler
:
在Manager初始化時(如未指定)默認會創建一個Cache,該Cache中保存了gvk到cache.SharedIndexInformer 的映射關系,ks.cache.GetInformer 中會提取對象的gvk信息,並根據gvk獲取informer。
在Manager.Start的時候會啟動Cache中的informer。
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
...
go func() {
...
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
...
return true, nil
});
...
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
...
}()
return nil
}
internal.EventHandler
中實現了SharedIndexInformer
所需的ResourceEventHandler
接口
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
看下EventHandler 是如何將OnAdd
監聽到的對象添加到隊列中的:
func (e EventHandler) OnAdd(obj interface{}) {
...
e.EventHandler.Create(c, e.Queue)
}
可以看到在EnqueueRequestForObject.Create中提取了對象的名稱和命名空間,並添加到了隊列中:
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
至此將整個Kubebuilder串起來了。
與使用client-go的區別
client-go
在需要操作kubernetes資源時,通常會使用client-go來編寫資源的CRUD邏輯,或使用informer機制來監聽資源的變更,並在OnAdd、OnUpdate、OnDelete中進行相應的處理。
kubebuilder Operator
從上述講解可以了解到,Operator一般會涉及兩方面:object以及其所有(own)的資源。Reconcilers是核心處理邏輯,但其只能獲取到資源的名稱和命名空間,並不知道資源的操作(增刪改)是什么,也不知道資源的其他信息,目的就是在收到資源變更時,根據object的期望狀態來調整資源的狀態。
kubebuilder也提供了client庫,可以對kubernetes資源進行CRUD操作,但建議這種情況下直接使用client-go進行操作:
package main import ( "context" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" ) var c client.Client func main() { // Using a typed object. pod := &corev1.Pod{} // c is a created client. _ = c.Get(context.Background(), client.ObjectKey{ Namespace: "namespace", Name: "name", }, pod) // Using a unstructured object. u := &unstructured.Unstructured{} u.SetGroupVersionKind(schema.GroupVersionKind{ Group: "apps", Kind: "Deployment", Version: "v1", }) _ = c.Get(context.Background(), client.ObjectKey{ Namespace: "namespace", Name: "name", }, u) }