更好的閱讀體驗建議點擊下方原文鏈接。
原文地址:http://maoqide.live/post/cloud/sample-controller/
自己構建 sample-controller.
https://github.com/maoqide/sample-controller
https://github.com/kubernetes/sample-controller
編寫 CRD 定義
sample-controller
├── hack
│ ├── boilerplate.go.txt
│ ├── custom-boilerplate.go.txt
│ ├── update-codegen.sh
│ └── verify-codegen.sh
└── pkg
└── apis
└── samplecontroller
├── register.go
└── v1alpha1
├── doc.go
├── register.go
└── types.go
首先,項目初始如上結構:
hack
目錄下的腳本可以復用,主要是調用了 https://github.com/kubernetes/code-generator 項目中的 generate-groups.sh
腳本,code-gengrator 項目 cmd
目錄下的代碼,需要提前go install
生成對應二進制文件。
pkg
目錄下的文件,需要自己手動編寫,pkg/apis/samplecontroller
是 CRD 所屬的 apiGroup
,v1alpha1
是 apiVersion
,v1alpha1目錄下的types.go
文件,包含了 CRD 類型 Foo
的完整定義。
pkg/apis/samplecontroller/register.go
中,定義了后面所需的全局變量。
pkg/apis/samplecontroller/v1alpha1/doc.go
中,包含了 +<tag-name>[=value]
格式的注釋,這就是 Kubernetes 進行源碼生成用的 Annotation 風格的注釋,doc.go 中的注釋,起到的是全局范圍的作用,包下面的每個 go 文件,同樣可以定義自己的 Annotation 注釋。(關於代碼生成,可以看這篇文章)
pkg/apis/samplecontroller/v1alpha1/types.go
,包含了Foo
類型的完整定義。Foo
是Kubernetes對象的標准定義;FooSpec
是我們需要定義的Foo
類型的具體結構;FooList
包含一組 Foo
對象,apiserver 的 List 接口,返回的是 List 對象類型;FooStatus
描述Foo
類型實例狀態的結構體,可以使用+genclient:noStatus
注釋,則不需要定義FooStatus
。
pkg/apis/samplecontroller/v1alpha1/register.go
,主要作用是通過addKnownTypes()
方法,將我們定義的 CRD 類型 Foo
添加到 Scheme。
代碼生成
pkg
下的上述文件完成,即可執行./hack/update-codegen.sh
,即可生成管理新定義的 CRD 類型所需的 Kubernetes 代碼:
sample-controller
├── hack
│ ├── boilerplate.go.txt
│ ├── custom-boilerplate.go.txt
│ ├── update-codegen.sh
│ └── verify-codegen.sh
└── pkg
├── apis
│ └── samplecontroller
│ ├── register.go
│ └── v1alpha1
│ ├── doc.go
│ ├── register.go
│ ├── types.go
│ └── zz_generated.deepcopy.go
└── generated
├── clientset
│ └── versioned
│ ├── clientset.go
│ ├── doc.go
│ ├── fake
│ │ ├── clientset_generated.go
│ │ ├── doc.go
│ │ └── register.go
│ ├── scheme
│ │ ├── doc.go
│ │ └── register.go
│ └── typed
│ └── samplecontroller
│ └── v1alpha1
│ ├── doc.go
│ ├── fake
│ │ ├── doc.go
│ │ ├── fake_foo.go
│ │ └── fake_samplecontroller_client.go
│ ├── foo.go
│ ├── generated_expansion.go
│ └── samplecontroller_client.go
├── informers
│ └── externalversions
│ ├── factory.go
│ ├── generic.go
│ ├── internalinterfaces
│ │ └── factory_interfaces.go
│ └── samplecontroller
│ ├── interface.go
│ └── v1alpha1
│ ├── foo.go
│ └── interface.go
└── listers
└── samplecontroller
└── v1alpha1
├── expansion_generated.go
└── foo.go
自動生成了 clientset
,informers
,listers
三個文件夾下的文件和apis
下的zz_generated.deepcopy.go
文件。
其中zz_generated.deepcopy.go
中包含 pkg/apis/samplecontroller/v1alpha1/types.go
中定義的結構體的 DeepCopy()
方法。
另外三個文件夾clientset
,informers
,listers
下都是 Kubernetes 生成的客戶端庫,在 controller 中會用到。
controller 代碼編寫
接下來就是編寫具體 controller 的代碼,通過上述步驟生成的客戶端庫訪問 apiserver,監聽 CRD 資源的變化,並觸發對應的動作,如創建或刪除 Deployment
等。
編寫自定義controller(Operator)時,可以使用 Kubernetes 提供的 client-go
客戶端庫。下圖是 Kubernetes 提供的在使用client-go
開發 controller 的過程中,client-go
和 controller 的交互流程:
client-go 組件
-
Reflector: 定義在 cache 包的 Reflector 類中,它監聽特定資源類型(Kind)的 Kubernetes API,在
ListAndWatch
方法中執行。監聽的對象可以是 Kubernetes 的內置資源類型或者是自定義資源類型。當 reflector 通過 watch API 發現新的資源實例被創建,它將通過對應的 list API 獲取到新創建的對象並在watchHandler
方法中將其加入到Delta Fifo
隊列中。 -
Informer: 定義在 cache 包的 base controller 中,它從
Delta Fifo
隊列中 pop 出對象,在processLoop
方法中執行。base controller 的工作是將對象保存一遍后續獲取,並調用 controller 將對象傳給 controller。 -
Indexer: 提供對象的 indexing 方法,定義在 cache 包的 Indexer中。一個典型的 indexing 的應用場景是基於對象的 label 創建索引。Indexer 基於幾個 indexing 方法維護索引,它使用線程安全的 data store 來存儲對象和他們的key。在 cache 包的 Store 類中定義了一個名為
MetaNamespaceKeyFunc
的默認方法,可以為對象生成一個<namespace>/<name>
形式的key。
自定義 controller 組件
- Informer reference: 它是對 Informer 實例的引用,知道如何使用自定義資源對象。你編寫的自定義 controller 需要創建正確的 Informer。
- Indexer reference: 它是對 Indexer 實例的引用,你編寫的自定義 controller 代碼中需要創建它,在獲取對象供后續使用時你會用到這個引用。
client-go 中的 base controller 提供了NewIndexerInformer
來創建 Informer 和 Indexer。在你的代碼中,你可以直接使用 此方法,或者使用 工廠方法 創建 informer。
- Resource Event Handlers: 一些回調方法,當 Informer 想要發送一個對象給 controller 時,會調用這些方法。典型的編寫回調方法的模式,是獲取資源對象的 key 並放入一個
work queue
隊列,等待進一步的處理(Proceess item)。 - Work queue: 在 controller 代碼中創建的隊列,用來解耦對象的傳遞和對應的處理。Resource Event Handlers 的方法就是用來接收對象並將其加入
work queue
。 - Process Item: 在 controller 代碼中創建的方法,用來對
work queue
中的對象做對應處理,可以有一個或多個其他的方法實際做處理,這些方法一般會使用Indexer reference
,或者 list 方法來獲取 key 對應的對象。
編寫自定義 controller
以 sample-controller 為例,整體流程如下:
/*
*** main.go
*/
// 創建 clientset
kubeClient, err := kubernetes.NewForConfig(cfg) // k8s clientset, "k8s.io/client-go/kubernetes"
exampleClient, err := clientset.NewForConfig(cfg) // sample clientset, "k8s.io/sample-controller/pkg/generated/clientset/versioned"
// 創建 Informer
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) // k8s informer, "k8s.io/client-go/informers"
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) // sample informer, "k8s.io/sample-controller/pkg/generated/informers/externalversions"
// 創建 controller,傳入 clientset 和 informer
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// 運行 Informer,Start 方法為非阻塞,會運行在單獨的 goroutine 中
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
// 運行 controller
controller.Run(2, stopCh)
/*
*** controller.go
*/
NewController() *Controller {}
// 將 CRD 資源類型定義加入到 Kubernetes 的 Scheme 中,以便 Events 可以記錄 CRD 的事件
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
// 創建 Broadcaster
eventBroadcaster := record.NewBroadcaster()
// ... ...
// 監聽 CRD 類型'Foo'並注冊 ResourceEventHandler 方法,當'Foo'的實例變化時進行處理
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
// 監聽 Deployment 變化並注冊 ResourceEventHandler 方法,
// 當它的 ownerReferences 為 Foo 類型實例時,將該 Foo 資源加入 work queue
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {}
// 在啟動 worker 前等待緩存同步
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
// 運行兩個 worker 來處理資源
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
// 無限循環,不斷的調用 processNextWorkItem 處理下一個對象
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// 從workqueue中獲取下一個對象並進行處理,通過調用 syncHandler
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
// 調用 workqueue.Done(obj) 方法告訴 workqueue 當前項已經處理完畢,
// 如果我們不想讓當前項重新入隊,一定要調用 workqueue.Forget(obj)。
// 當我們沒有調用Forget時,當前項會重新入隊 workqueue 並在一段時間后重新被獲取。
defer c.workqueue.Done(obj)
var key string
var ok bool
// 我們期望的是 key 'namespace/name' 格式的 string
if key, ok = obj.(string); !ok {
// 無效的項調用Forget方法,避免重新入隊。
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
// 放回workqueue避免偶發的異常
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// 如果沒有異常,Forget當前項,同步成功
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// 對比真實的狀態和期望的狀態並嘗試合並,然后更新Foo類型實例的狀態信息
func (c *Controller) syncHandler(key string) error {
// 通過 workqueue 中的 key 解析出 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 調用 lister 接口通過 namespace 和 name 獲取 Foo 實例
foo, err := c.foosLister.Foos(namespace).Get(name)
deploymentName := foo.Spec.DeploymentName
// 獲取 Foo 實例中定義的 deploymentname
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// 沒有發現對應的 deployment,新建一個
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
}
// OwnerReferences 不是 Foo 實例,warning並返回錯誤
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// deployment 中 的配置和 Foo 實例中 Spec 的配置不一致,即更新 deployment
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
}
// 更新 Foo 實例狀態
err = c.updateFooStatus(foo, deployment)
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
}
接下來編寫對應的 CRD 和 對應 CRD 實例的 yaml 文件及 operator 的 Dockerfile:
sample-controller
├── artifacts
│ └── examples
│ ├── crd.yaml
│ └── example-foo.yaml
├── controller.go
├── Dockerfile
├── hack
│ ├── boilerplate.go.txt
│ ├── custom-boilerplate.go.txt
│ ├── update-codegen.sh
│ └── verify-codegen.sh
├── main.go
└── pkg
├── apis
│ └── samplecontroller
│ ├── register.go
│ └── v1alpha1
│ ├── doc.go
│ ├── register.go
│ ├── types.go
│ └── zz_generated.deepcopy.go
├── generated
│ ├── clientset
│ │ └── ...
│ ├── informers
│ │ └── ...
│ └── listers
│ └── ...
└── signals
└── signal.go
部署到 k8s
controller 鏡像 Dockerfile:
FROM golang
RUN mkdir -p /go/src/k8s.io/sample-controller
ADD . /go/src/k8s.io/sample-controller
WORKDIR /go
RUN go get -v ./...
RUN go install -v ./...
CMD ["/go/bin/sample-controller"]
controller RBAC 及 Deployment yaml:
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: sample-controller
spec:
replicas: 1
template:
metadata:
labels:
app: sample
spec:
containers:
- name: sample
image: "maoqide/sample-controller"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: operator-role
rules:
- apiGroups:
- ""
resources:
- events
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- apps
resources:
- deployments
- events
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- samplecontroller.k8s.io
resources:
- foos
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: operator-rolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: operator-role
subjects:
- kind: ServiceAccount
name: default
namespace: default
將 operator 部署到 k8s 中並創建一個 CRD 對象,即可看到 operator 自動按照 CRD 對象 的配置創建出一個 nginx Deployment。