作者 | 劉洋(炎尋) 阿里雲高級開發工程師
導讀:自定義資源 CRD(Custom Resource Definition)可以擴展 Kubernetes API,掌握 CRD 是成為 Kubernetes 高級玩家的必備技能,本文將介紹 CRD 和 Controller 的概念,並對 CRD 編寫框架 Kubebuilder 進行深入分析,讓您真正理解並能快速開發 CRD。
概覽
控制器模式與聲明式 API
在正式介紹 Kubebuidler 之前,我們需要先了解下 K8s 底層實現大量使用的控制器模式,以及讓用戶大呼過癮的聲明式 API,這是介紹 CRDs 和 Kubebuidler 的基礎。
控制器模式
K8s 作為一個“容器編排”平台,其核心的功能是編排,Pod 作為 K8s 調度的最小單位,具備很多屬性和字段,K8s 的編排正是通過一個個控制器根據被控制對象的屬性和字段來實現。
下面我們看一個例子:
apiVersion: apps/v1
kind: Deployment
metadata:
name: test
spec:
selector:
matchLabels:
app: test
replicas: 2
template:
metadata:
labels:
app: test
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
K8s 集群在部署時包含了 Controllers 組件,里面對於每個 build-in 的資源類型(比如 Deployments, Statefulset, CronJob, ...)都有對應的 Controller,基本是 1:1 的關系。上面的例子中,Deployment 資源創建之后,對應的 Deployment Controller 編排動作很簡單,確保攜帶了 app=test 的 Pod 個數永遠等於 2,Pod 由 template 部分定義,具體來說,K8s 里面是 kube-controller-manager 這個組件在做這件事,可以看下 K8s 項目的 pkg/controller 目錄,里面包含了所有控制器,都以獨有的方式負責某種編排功能,但是它們都遵循一個通用編排模式,即:調諧循環(Reconcile loop),其偽代碼邏輯為:
for {
actualState := GetResourceActualState(rsvc)
expectState := GetResourceExpectState(rsvc)
if actualState == expectState {
// do nothing
} else {
Reconcile(rsvc)
}
}
就是一個無限循環(實際是事件驅動+定時同步來實現,不是無腦循環)不斷地對比期望狀態和實際狀態,如果有出入則進行 Reconcile(調諧)邏輯將實際狀態調整為期望狀態。期望狀態就是我們的對象定義(通常是 YAML 文件),實際狀態是集群里面當前的運行狀態(通常來自於 K8s 集群內外相關資源的狀態匯總),控制器的編排邏輯主要是第三步做的,這個操作被稱為調諧(Reconcile),整個控制器調諧的過程稱為“Reconcile Loop”,調諧的最終結果一般是對被控制對象的某種寫操作,比如增/刪/改 Pod。
在控制器中定義被控制對象是通過“模板”完成的,比如 Deployment 里面的 template 字段里的內容跟一個標准的 Pod 對象的 API 定義一樣,所有被這個 Deployment 管理的 Pod 實例,都是根據這個 template 字段的創建的,這就是 PodTemplate,一個控制對象的定義一般是由上半部分的控制定義(期望狀態),加上下半部分的被控制對象的模板組成。
聲明式 API
所謂聲明式就是“告訴 K8s 你要什么,而不是告訴它怎么做的命令”,一個很熟悉的例子就是 SQL,你“告訴 DB 根據條件和各類算子返回數據,而不是告訴它怎么遍歷,過濾,聚合”。在 K8s 里面,聲明式的體現就是 kubectl apply 命令,在對象創建和后續更新中一直使用相同的 apply 命令,告訴 K8s 對象的終態即可,底層是通過執行了一個對原有 API 對象的 PATCH 操作來實現的,可以一次性處理多個寫操作,具備 Merge 能力 diff 出最終的 PATCH,而命令式一次只能處理一個寫請求。
聲明式 API 讓 K8s 的“容器編排”世界看起來溫柔美好,而控制器(以及容器運行時,存儲,網絡模型等)才是這太平盛世的幕后英雄。說到這里,就會有人希望也能像 build-in 資源一樣構建自己的自定義資源(CRD-Customize Resource Definition),然后為自定義資源寫一個對應的控制器,推出自己的聲明式 API。K8s 提供了 CRD 的擴展方式來滿足用戶這一需求,而且由於這種擴展方式十分靈活,在最新的 1.15 版本對 CRD 做了相當大的增強。對於用戶來說,實現 CRD 擴展主要做兩件事:
- 編寫 CRD 並將其部署到 K8s 集群里;
這一步的作用就是讓 K8s 知道有這個資源及其結構屬性,在用戶提交該自定義資源的定義時(通常是 YAML 文件定義),K8s 能夠成功校驗該資源並創建出對應的 Go struct 進行持久化,同時觸發控制器的調諧邏輯。
- 編寫 Controller 並將其部署到 K8s 集群里。
這一步的作用就是實現調諧邏輯。
Kubebuilder 就是幫我們簡化這兩件事的工具,現在我們開始介紹主角。
Kubebuilder 是什么?
摘要
Kubebuilder 是一個使用 CRDs 構建 K8s API 的 SDK,主要是:
- 提供腳手架工具初始化 CRDs 工程,自動生成 boilerplate 代碼和配置;
- 提供代碼庫封裝底層的 K8s go-client;
方便用戶從零開始開發 CRDs,Controllers 和 Admission Webhooks 來擴展 K8s。
核心概念
GVKs&GVRs
GVK = GroupVersionKind,GVR = GroupVersionResource。
API Group & Versions(GV)
API Group 是相關 API 功能的集合,每個 Group 擁有一或多個 Versions,用於接口的演進。
Kinds & Resources
每個 GV 都包含多個 API 類型,稱為 Kinds,在不同的 Versions 之間同一個 Kind 定義可能不同, Resource 是 Kind 的對象標識(resource type),一般來說 Kinds 和 Resources 是 1:1 的,比如 pods Resource 對應 Pod Kind,但是有時候相同的 Kind 可能對應多個 Resources,比如 Scale Kind 可能對應很多 Resources:deployments/scale,replicasets/scale,對於 CRD 來說,只會是 1:1 的關系。
每一個 GVK 都關聯着一個 package 中給定的 root Go type,比如 apps/v1/Deployment 就關聯着 K8s 源碼里面 k8s.io/api/apps/v1 package 中的 Deployment struct,我們提交的各類資源定義 YAML 文件都需要寫:
- apiVersion:這個就是 GV 。
- kind:這個就是 K。
根據 GVK K8s 就能找到你到底要創建什么類型的資源,根據你定義的 Spec 創建好資源之后就成為了 Resource,也就是 GVR。GVK/GVR 就是 K8s 資源的坐標,是我們創建/刪除/修改/讀取資源的基礎。
Scheme
每一組 Controllers 都需要一個 Scheme,提供了 Kinds 與對應 Go types 的映射,也就是說給定 Go type 就知道他的 GVK,給定 GVK 就知道他的 Go type,比如說我們給定一個 Scheme: "tutotial.kubebuilder.io/api/v1".CronJob{} 這個 Go type 映射到 batch.tutotial.kubebuilder.io/v1 的 CronJob GVK,那么從 Api Server 獲取到下面的 JSON:
{
"kind": "CronJob",
"apiVersion": "batch.tutorial.kubebuilder.io/v1",
...
}
就能構造出對應的 Go type了,通過這個 Go type 也能正確地獲取 GVR 的一些信息,控制器可以通過該 Go type 獲取到期望狀態以及其他輔助信息進行調諧邏輯。
Manager
Kubebuilder 的核心組件,具有 3 個職責:
- 負責運行所有的 Controllers;
- 初始化共享 caches,包含 listAndWatch 功能;
- 初始化 clients 用於與 Api Server 通信。
Cache
Kubebuilder 的核心組件,負責在 Controller 進程里面根據 Scheme 同步 Api Server 中所有該 Controller 關心 GVKs 的 GVRs,其核心是 GVK -> Informer 的映射,Informer 會負責監聽對應 GVK 的 GVRs 的創建/刪除/更新操作,以觸發 Controller 的 Reconcile 邏輯。
Controller
Kubebuidler 為我們生成的腳手架文件,我們只需要實現 Reconcile 方法即可。
Clients
在實現 Controller 的時候不可避免地需要對某些資源類型進行創建/刪除/更新,就是通過該 Clients 實現的,其中查詢功能實際查詢是本地的 Cache,寫操作直接訪問 Api Server。
Index
由於 Controller 經常要對 Cache 進行查詢,Kubebuilder 提供 Index utility 給 Cache 加索引提升查詢效率。
Finalizer
在一般情況下,如果資源被刪除之后,我們雖然能夠被觸發刪除事件,但是這個時候從 Cache 里面無法讀取任何被刪除對象的信息,這樣一來,導致很多垃圾清理工作因為信息不足無法進行,K8s 的 Finalizer 字段用於處理這種情況。在 K8s 中,只要對象 ObjectMeta 里面的 Finalizers 不為空,對該對象的 delete 操作就會轉變為 update 操作,具體說就是 update deletionTimestamp 字段,其意義就是告訴 K8s 的 GC“在deletionTimestamp 這個時刻之后,只要 Finalizers 為空,就立馬刪除掉該對象”。
所以一般的使用姿勢就是在創建對象時把 Finalizers 設置好(任意 string),然后處理 DeletionTimestamp 不為空的 update 操作(實際是 delete),根據 Finalizers 的值執行完所有的 pre-delete hook(此時可以在 Cache 里面讀取到被刪除對象的任何信息)之后將 Finalizers 置為空即可。
OwnerReference
K8s GC 在刪除一個對象時,任何 ownerReference 是該對象的對象都會被清除,與此同時,Kubebuidler 支持所有對象的變更都會觸發 Owner 對象 controller 的 Reconcile 方法。
所有概念集合在一起如圖 1 所示:
圖 1-Kubebuilder 核心概念
Kubebuilder 怎么用?
1. 創建腳手架工程
kubebuilder init --domain edas.io
這一步創建了一個 Go module 工程,引入了必要的依賴,創建了一些模板文件。
2. 創建 API
kubebuilder create api --group apps --version v1alpha1 --kind Application
這一步創建了對應的 CRD 和 Controller 模板文件,經過 1、2 兩步,現有的工程結構如圖 2 所示:
圖 2-Kubebuilder 生成的工程結構說明
3. 定義 CRD
在圖 2 中對應的文件定義 Spec 和 Status。
4. 編寫 Controller 邏輯
在圖 3 中對應的文件實現 Reconcile 邏輯。
5. 測試發布
本地測試完之后使用 Kubebuilder 的 Makefile 構建鏡像,部署我們的 CRDs 和 Controller 即可。
Kubebuilder 出現的意義?
讓擴展 K8s 變得更簡單,K8s 擴展的方式很多,Kubebuilder 目前專注於 CRD 擴展方式。
深入
在使用 Kubebuilder 的過程中有些問題困擾着我:
- 如何同步自定義資源以及 K8s build-in 資源?
- Controller 的 Reconcile 方法是如何被觸發的?
- Cache 的工作原理是什么?
- ...
帶着這些問題我們去看看源碼 😄。
源碼閱讀
從 main.go 開始
Kubebuilder 創建的 main.go 是整個項目的入口,邏輯十分簡單:
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
appsv1alpha1.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme
}
func main() {
...
// 1、init Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme, MetricsBindAddress: metricsAddr})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// 2、init Reconciler(Controller)
err = (&controllers.ApplicationReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Application"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
if err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EDASApplication")
os.Exit(1)
}
// +kubebuilder:scaffold:builder
setupLog.Info("starting manager")
// 3、start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
可以看到在 init 方法里面我們將 appsv1alpha1 注冊到 Scheme 里面去了,這樣一來 Cache 就知道 watch 誰了,main 方法里面的邏輯基本都是 Manager 的:
- 初始化了一個 Manager;
- 將 Manager 的 Client 傳給 Controller,並且調用 SetupWithManager 方法傳入 Manager 進行 Controller 的初始化;
- 啟動 Manager。
我們的核心就是看這 3 個流程。
Manager 初始化
Manager 初始化代碼如下:
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
...
// Create the cache for the cached read client and registering informers
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
...
return &controllerManager{
config: config,
scheme: options.Scheme,
errChan: make(chan error),
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
port: options.Port,
host: options.Host,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
}, nil
}
可以看到主要是創建 Cache 與 Clients:
創建 Cache
Cache 初始化代碼如下:
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(...) *specificInformersMap {
ip := &specificInformersMap{
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
resync: resync,
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
// MapEntry contains the cached data for an Informer
type MapEntry struct {
// Informer is the cached informer
Informer cache.SharedIndexInformer
// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
}
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
...
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
}
return dynamicClient.Resource(mapping.Resource).List(opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(opts)
},
}, nil
}
可以看到 Cache 主要就是創建了 InformersMap,Scheme 里面的每個 GVK 都創建了對應的 Informer,通過 informersByGVK 這個 map 做 GVK 到 Informer 的映射,每個 Informer 會根據 ListWatch 函數對對應的 GVK 進行 List 和 Watch。
創建 Clients
創建 Clients 很簡單:
// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
Reader: &client.DelegatingReader{
CacheReader: cache,
ClientReader: c,
},
Writer: c,
StatusClient: c,
}, nil
}
讀操作使用上面創建的 Cache,寫操作使用 K8s go-client 直連。
Controller 初始化
下面看看 Controller 的啟動:
func (r *EDASApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&appsv1alpha1.EDASApplication{}).
Complete(r)
return err
}
使用的是 Builder 模式,NewControllerManagerBy 和 For 方法都是給 Builder 傳參,最重要的是最后一個方法 Complete,其邏輯是:
func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
// Set the Manager
if err := blder.doManager(); err != nil {
return nil, err
}
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
...
return blder.mgr, nil
}
主要是看看 doController 和 doWatch 方法:
doController 方法
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}
if len(name) == 0 {
return nil, fmt.Errorf("must specify Name for Controller")
}
if options.MaxConcurrentReconciles <= 0 {
options.MaxConcurrentReconciles = 1
}
// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}
// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
}
// Add the controller as a Manager components
return c, mgr.Add(c)
}
該方法初始化了一個 Controller,傳入了一些很重要的參數:
- Do:Reconcile 邏輯;
- Cache:找 Informer 注冊 Watch;
- Client:對 K8s 資源進行 CRUD;
- Queue:Watch 資源的 CUD 事件緩存;
- Recorder:事件收集。
doWatch 方法
func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.apiType}
hdler := &handler.EnqueueRequestForObject{}
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
if err != nil {
return err
}
// Watches the managed types
for _, obj := range blder.managedObjects {
src := &source.Kind{Type: obj}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.apiType,
IsController: true,
}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
return err
}
}
// Do the watch requests
for _, w := range blder.watchRequest {
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
return err
}
}
return nil
}
可以看到該方法對本 Controller 負責的 CRD 進行了 watch,同時底下還會 watch 本 CRD 管理的其他資源,這個 managedObjects 可以通過 Controller 初始化 Buidler 的 Owns 方法傳入,說到 Watch 我們關心兩個邏輯:
- 注冊的 handler
type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.MetaOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaOld.GetName(),
Namespace: evt.MetaOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}
if evt.MetaNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
}
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
可以看到 Kubebuidler 為我們注冊的 Handler 就是將發生變更的對象的 NamespacedName 入隊列,如果在 Reconcile 邏輯中需要判斷創建/更新/刪除,需要有自己的判斷邏輯。
- 注冊的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
log.Info("Starting EventSource", "controller", c.Name, "source", src)
return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
...
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}
我們的 Handler 實際注冊到 Informer 上面,這樣整個邏輯就串起來了,通過 Cache 我們創建了所有 Scheme 里面 GVKs 的 Informers,然后對應 GVK 的 Controller 注冊了 Watch Handler 到對應的 Informer,這樣一來對應的 GVK 里面的資源有變更都會觸發 Handler,將變更事件寫到 Controller 的事件隊列中,之后觸發我們的 Reconcile 方法。
Manager 啟動
func (cm *controllerManager) Start(stop <-chan struct{}) error {
...
go cm.startNonLeaderElectionRunnables()
...
}
func (cm *controllerManager) startNonLeaderElectionRunnables() {
...
// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()
...
// Start Controllers
for _, c := range cm.nonLeaderElectionRunnables {
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}
cm.started = true
}
主要就是啟動 Cache,Controller,將整個事件流運轉起來,我們下面來看看啟動邏輯。
Cache 啟動
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
func() {
...
// Start each informer
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(stop)
}
}()
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
...
// informer push resource obj CUD delta to this fifo queue
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// handler to process delta
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// this is internal controller process delta generate by reflector
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
...
wg.StartWithChannel(processorStopCh, s.processor.run)
s.controller.Run(stopCh)
}
func (c *controller) Run(stopCh <-chan struct{}) {
...
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
// reflector is delta producer
wg.StartWithChannel(stopCh, r.Run)
// internal controller's processLoop is comsume logic
wait.Until(c.processLoop, time.Second, stopCh)
}
Cache 的初始化核心是初始化所有的 Informer,Informer 的初始化核心是創建了 reflector 和內部 controller,reflector 負責監聽 Api Server 上指定的 GVK,將變更寫入 delta 隊列中,可以理解為變更事件的生產者,內部 controller 是變更事件的消費者,他會負責更新本地 indexer,以及計算出 CUD 事件推給我們之前注冊的 Watch Handler。
Controller 啟動
// Start implements controller.Controller
func (c *Controller) Start(stop <-chan struct{}) error {
...
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// Process work items
go wait.Until(func() {
for c.processNextWorkItem() {
}
}, c.JitterPeriod, stop)
}
...
}
func (c *Controller) processNextWorkItem() bool {
...
obj, shutdown := c.Queue.Get()
...
var req reconcile.Request
var ok bool
if req, ok = obj.(reconcile.Request);
...
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
if result, err := c.Do.Reconcile(req); err != nil {
c.Queue.AddRateLimited(req)
...
}
...
}
Controller 的初始化是啟動 goroutine 不斷地查詢隊列,如果有變更消息則觸發到我們自定義的 Reconcile 邏輯。
整體邏輯串連
上面我們通過源碼閱讀已經十分清楚整個流程,但是正所謂一圖勝千言,我制作了一張整體邏輯串連圖(圖 3)來幫助大家理解:
圖 3-Kubebuidler 整體邏輯串連圖
Kubebuilder 作為腳手架工具已經為我們做了很多,到最后我們只需要實現 Reconcile 方法即可,這里不再贅述。
守得雲開見月明
剛開始使用 Kubebuilder 的時候,因為封裝程度很高,很多事情都是懵逼狀態,剖析完之后很多問題就很明白了,比如開頭提出的幾個:
- 如何同步自定義資源以及 K8s build-in 資源?
需要將自定義資源和想要 Watch 的 K8s build-in 資源的 GVKs 注冊到 Scheme 上,Cache 會自動幫我們同步。
- Controller 的 Reconcile 方法是如何被觸發的?
通過 Cache 里面的 Informer 獲取資源的變更事件,然后通過兩個內置的 Controller 以生產者消費者模式傳遞事件,最終觸發 Reconcile 方法。
- Cache 的工作原理是什么?
GVK -> Informer 的映射,Informer 包含 Reflector 和 Indexer 來做事件監聽和本地緩存。
還有很多問題我就不一一說了,總之,現在 Kubebuilder 現在不再是黑盒。
同類工具對比
Operator Framework 與 Kubebuilder 很類似,這里因為篇幅關系不再展開。
最佳實踐
模式
- 使用 OwnerRefrence 來做資源關聯,有兩個特性:
- Owner 資源被刪除,被 Own 的資源會被級聯刪除,這利用了 K8s 的 GC;
- 被 Own 的資源對象的事件變更可以觸發 Owner 對象的 Reconcile 方法;
- 使用 Finalizer 來做資源的清理。
注意點
- 不使用 Finalizer 時,資源被刪除無法獲取任何信息;
- 對象的 Status 字段變化也會觸發 Reconcile 方法;
- Reconcile 邏輯需要冪等;
優化
總結
通過深入分析,我們可以看到 Kubebuilder 提供的功能對於快速編寫 CRD 和 Controller 是十分有幫助的,無論是 Istio、Knative 等知名項目還是各種自定義 Operators,都大量使用了 CRD,將各種組件抽象為 CRD,Kubernetes 變成控制面板將成為一個趨勢,希望本文能夠幫助大家理解和把握這個趨勢。
“ 阿里巴巴雲原生微信公眾號(ID:Alicloudnative)關注微服務、Serverless、容器、Service Mesh等技術領域、聚焦雲原生流行技術趨勢、雲原生大規模的落地實踐,做最懂雲原生開發者的技術公眾號。”