在Kubernetes中,一個API對象在Etcd里的完整資源路徑,是由:Group(API組)、Version(API版本)和Resource(API資源類型)三個部分組成的。
通過這樣的結構,整個Kubernetes里的所有API對象,可以用如下的樹形結構表示出來
如果現在要聲明一個CronJob對象,那么YAML的開始部分會這么寫
apiVersion: batch/v2alpha1
kind: CronJob
...
CronJob就是這個API對象的資源類型,Batch就是它們的組,v2alpha1就是它的版本
Kubernetes是如何對Resource、Group和Version進行解析,從而找到Kubernetes里找到CronJob對象的定義呢?
1、Kubernetes會匹配API對象的組
對於Kubernetes里的核心API對象(如Pod,Node)是不需要Group的,Kubernetes會直接在/api這個層級進行下一步的匹配過程
對於非核心對象,Kubernetes必須在/apis這個層級里查找它對應的Group,進而根據batch這個Group名字,找到/apis/batch。
API Group的分類是以對象功能為依據的
2、Kubernetes會進一步匹配到API對象的版本號
在Kubernetes中,同一種API對象可以有多個版本,對於會影響到用戶的變更就可以通過升級新版本來處理,從而保證了向后兼容。
3、Kubernetes會匹配API對象的資源類型
APIServer創建對象
在前面匹配到正確的版本之后,Kubernetes就知道要創建的是一個/apis/batch/v2alpha1下的CronJob對象,APIServer會繼續創建這個Cronjob對象。創建過程如下圖
- 當發起創建CronJob的POST請求之后,YAML的信息就被提交給了APIServer,APIServer的第一個功能就是過濾這個請求,並完成一些前置性的工作,比如授權、超時處理、審計等
- 請求進入MUX和Routes流程,MUX和Routes是APIServer完成URL和Handler綁定的場所。APIServer的Handler要做的事情,就是按照上面介紹的匹配過程,找到對應的CronJob類型定義。
- 根據這個CronJob類型定義,使用用戶提交的YAML文件里的字段,創建一個CronJob對象。這個過程中,APIServer會把用戶提交的YAML文件,轉換成一個叫做Super Version的對象,它正是該API資源類型所有版本的字段全集,這樣用戶提交的不同版本的YAML文件,就都可以用這個SuperVersion對象來進行處理了。
- APIServer會先后進行Admission(如Admission Controller 和 Initializer)和Validation操作(負責驗證這個對象里的各個字段是否何方,被驗證過得API對象都保存在APIServer里一個叫做Registry的數據結構中)。
- APIServer會把驗證過得API對象轉換成用戶最初提交的版本,進行系列化操作,並調用Etcd的API把它保存起來。
API插件CRD(Custom Resource Definition)
CRD允許用戶在Kubernetes中添加一個跟Pod、Node類似的、新的API資源類型,即:自定義API資源
舉個栗子,添加一個叫Network的API資源類型,它的作用是一旦用戶創建一個Network對象,那么Kubernetes就可以使用這個對象定義的網絡參數,調用真實的網絡插件,為用戶創建一個真正的網絡
Network對象YAML文件,名叫example-network.yaml,API資源類型是Network,API組是samplecrd.k8s.io,版本是v1
apiVersion: samplecrd.k8s.io/v1 kind: Network metadata: name: example-network spec: cidr: "192.168.0.0/16" gateway: "192.168.0.1"
上面這個YAML文件,就是一個具體的自定義API資源實例,也叫CR(Custom Resource),為了讓Kubernetes認識這個CR,就需要讓Kubernetes明白這個CR的宏觀定義CRD。接下來編寫一個CRD的YAML文件,名字是network.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: networks.samplecrd.k8s.io
spec:
group: samplecrd.k8s.io
version: v1
names:
kind: Network
plural: networks
scope: Namespaced
在這個CRD中,指定了“group:samplecrd.k8s.io“ ”version:v1”的API信息,也指定了這個CR的資源類型叫做Network, 復數(plural)是networks。scope是Namespaced,定義的這個Network是屬於一個Namespace的對象,類似於Pod
下面是一些代碼部分的操作
首先,在GOPATH下創建一個結構如下的項目
$ tree $GOPATH/src/github.com/<your-name>/k8s-controller-custom-resource . ├── controller.go ├── crd │ └── network.yaml ├── example │ └── example-network.yaml ├── main.go └── pkg └── apis └── samplecrd ├── register.go └── v1 ├── doc.go ├── register.go └── types.go
其中,pkg/apis/samplecrd就是API組的名字,v1是版本,v1下面的types.go文件里,有Network對象的完整描述
然后在pkg/api/samplecrd目錄下創建了一個register.go文件,用來防止后面要用到的全局變量
package samplecrd const ( GroupName = "samplecrd.k8s.io" Version = "v1" )
接着在pkg/api/samplecrd目錄下添加一個doc.go文件(Golang的文檔源文件)
// +k8s:deepcopy-gen=package // +groupName=samplecrd.k8s.io package v1
+ <tag_name>[=value]格式的注釋,是Kubernetes進行代碼生成要用的Annotation風格的注釋
+k8s:deepcopy-gen=package的意思是請為整個v1包里的所有類型定義自動生成Deepcopy方法
+groupName=samplecrd.k8s.io 定義了這個包對應的API組的名稱
這些定義在doc.go文件的注釋,起到的是全局的代碼生成控制的作用,所以也被稱為Global Tags
接下來添加types.go文件,它定義一個Network類型到底有哪些字段(比如,spec字段里的內容)
package v1 ... // +genclient // +genclient:noStatus // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // Network describes a Network resource type Network struct { // TypeMeta is the metadata for the resource, like kind and apiversion metav1.TypeMeta `json:",inline"` // ObjectMeta contains the metadata for the particular object, including // things like... // - name // - namespace // - self link // - labels // - ... etc ... metav1.ObjectMeta `json:"metadata,omitempty"` Spec networkspec `json:"spec"` } // networkspec is the spec for a Network resource type networkspec struct { Cidr string `json:"cidr"` Gateway string `json:"gateway"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // NetworkList is a list of Network resources type NetworkList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata"` Items []Network `json:"items"` }
+genclient 指請為下面這個API資源類型生成Client代碼
+genclient:noStatus 指在這個API資源類型定義里,沒有Status字段,否則生成的Client就會自動帶上UpdateStatus
Network類型定義方法和標准的Kubernetes對象一樣,都包括的TypeMeta(API 元數據)和OgjectMeta(對象元數據)字段
而其中的Spec字段是需要自己定義的部分,在networkspec里,定義個Cidr和Gateway兩個字段,其中,每個字段最后面的部分,比如 json:"cidr",指的就是這個字段被轉換成JOSN格式之后的名字,也就是YAML文件里字段的名稱
除了定義一個Network類型,還需要定義一個Network類型用來描述一組Network對象應該包括哪些字段。因為在Kubernetes中,獲取所以X對象的List()方法,返回值都是List類型,而不是X類型的數組。+genclient只需要寫在Network上,而不用寫在NetworkList上,因為NetworkList只是一個返回值類型,Network才是主類型。
+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object 指請在生成DeepCopy時,實現Kubernetes提供的runtime.Object接口
最后還需要編寫一個pkg/apis/samplecrd/v1/register.go文件
registry的作用就是注冊一個類型給APIServer。其中Network資源類型待在服務器端的注冊工作,APIServer會自動完成。但要讓客戶端知道這個Network資源類型的定義,就需要我們在項目里添加一個register.go文件。
package v1 ... // addKnownTypes adds our types to the API scheme by registering // Network and NetworkList func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes( SchemeGroupVersion, &Network{}, &NetworkList{}, ) // register the type in the scheme metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
接着使用Kubernetes提供的代碼生成工具(k8s.io/code-generator),為上面定義的Network資源類型自動生成clientset,、informer和lister。其中client就是操作Network對象所需要使用的客戶端
# 代碼生成的工作目錄,也就是我們的項目路徑 $ ROOT_PACKAGE="github.com/resouer/k8s-controller-custom-resource" # API Group $ CUSTOM_RESOURCE_NAME="samplecrd" # API Version $ CUSTOM_RESOURCE_VERSION="v1" # 安裝 k8s.io/code-generator $ go get -u k8s.io/code-generator/... $ cd $GOPATH/src/k8s.io/code-generator # 執行代碼自動生成,其中 pkg/client 是生成目標目錄,pkg/apis 是類型定義目錄 $ ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION" $ tree . ├── controller.go ├── crd │ └── network.yaml ├── example │ └── example-network.yaml ├── main.go └── pkg ├── apis │ └── samplecrd │ ├── constants.go │ └── v1 │ ├── doc.go │ ├── register.go │ ├── types.go │ └── zz_generated.deepcopy.go └── client ├── clientset ├── informers └── listers
其中pkg/apis/samplecre/v1下面的zz_generated.deepcoy.go文件,就是自動是哪個從的DeepCopy代碼文件。整個Client目錄都是Kubernetes為Network類型生成的客戶端庫 。
下面是在Kubernetes集群里創建一個API對象
首先使用network.yaml文件,在Kubernetes中創建Network對象的CRD(Custom Resource Definition):
$ kubectl apply -f crd/network.yaml customresourcedefinition.apiextensions.k8s.io/networks.samplecrd.k8s.io created $ kubectl get crd NAME CREATED AT networks.samplecrd.k8s.io 2018-09-15T10:57:12Z
然后可以創建一個Network對象
$ kubectl apply -f example/example-network.yaml network.samplecrd.k8s.io/example-network created $ kubectl get network NAME AGE example-network 8s $ kubectl describe network example-network Name: example-network Namespace: default Labels: <none> ...API Version: samplecrd.k8s.io/v1 Kind: Network Metadata: ... Generation: 1 Resource Version: 468239 ... Spec: Cidr: 192.168.0.0/16 Gateway: 192.168.0.1
自定義控制器
上面舉了一個在Kubernetes里添加API資源的過程,下面將為Network這個自定義API對象編寫一個自定義控制器(Custom Controller)
基於聲明式API業務功能的實現,往往需要通過控制器模式來監視API對象的變化(如,創建或刪除Network),然后以此來決定實際要執行的具體工作。
自定義控制器工作原理
控制器要做的第一件事是從Kubernetes的APIServer里獲取它所關心的對象,即這里定義的Network對象
這個操作依靠的Informer的代碼塊完成的,Informer與API對象是一一對應的,因此傳遞給自定義控制器的是一個Network對象的Informer
在創建Informer工廠時,需要給它傳遞一個networkClient,Network Informer使用這個networkClient跟APIServer建立了連接。負責維護這個連接的是Informer所使用的Reflector包中的ListAndWatch方法,它用來獲取並監聽這些Network對象實例的變化。
在ListAndWatch機制下,一旦APIServer端有新的Network實例被創建、刪除或者更新,Reflector都會收到事件通知,這時,該事件及它對應的API對象這個組合,就被稱為增量Delta,它會被放假一個Delta FIFO Queue中。另一方面,Inform會不斷地從這個Delta FIFO Queue讀取增量,每拿到一個增量,Informer就會判斷這個增量里的事件類型,然后創建或更新本地對象的緩沖。
同步本地緩存是Informer的第一個職責,也是它最重要的職責。它的第二個職責是根據這些事件的類型,觸發實現注冊好的ResourceEventHandler。這些Handler需要在創建控制器的時候注冊給它對應的Informer。
編寫main函數
定義並初始化一個自定義控制器,然后期待它
func main() { ... //根據Master配置(APIServer的地址端口和kubeconfig的路徑)創建一個Kubernetes的client(kubeClient)和Network對象的client(networkClient) cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) ... kubeClient, err := kubernetes.NewForConfig(cfg) ... networkClient, err := clientset.NewForConfig(cfg) ... //為Network對象創建一個叫做InformerFactory的工廠,並使用它生產一個Network對象的Informer,傳遞給控制器 networkInformerFactory := informers.NewSharedInformerFactory(networkClient, ...) controller := NewController(kubeClient, networkClient, networkInformerFactory.Samplecrd().V1().Networks())
//啟動上述Informer go networkInformerFactory.Start(stopCh)
//執行controller.run,啟動自定義控制器 if err = controller.Run(2, stopCh); err != nil { glog.Fatalf("Error running controller: %s", err.Error()) } }
編寫控制器
func NewController( kubeclientset kubernetes.Interface, networkclientset clientset.Interface, networkInformer informers.NetworkInformer) *Controller { ... controller := &Controller{ kubeclientset: kubeclientset, networkclientset: networkclientset, networksLister: networkInformer.Lister(), networksSynced: networkInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(..., "Networks"), ... } networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueNetwork, UpdateFunc: func(old, new interface{}) { oldNetwork := old.(*samplecrdv1.Network) newNetwork := new.(*samplecrdv1.Network) if oldNetwork.ResourceVersion == newNetwork.ResourceVersion { return } controller.enqueueNetwork(new) }, DeleteFunc: controller.enqueueNetworkForDelete, return controller }
在前面main函數里創建了兩個client(kubeclientset 和 networkclientset),然后在這段代碼里,使用這個client 和前面創建的 Informer,初始化了自定義控制器
並且還設置了一個工作隊列(work queue ),它的作用是復制同步Informer和控制循環之間的數據。
然后,為networkInformer注冊了三個Handler(AddFunc、 UpdateFunc 和 DeleteFunc),分別對應API對象的添加、更新、刪除操作。
具體的處理操作,是將該事件對應的API對象加入到工作隊列中,實際入隊的是API對象的Key,即該API對象的<namespace>/<name>, 控制循環會不斷從這個工作隊列里拿到這些key,然后開始執行真正的控制邏輯。
因而,所謂的Informer,其實就是一個帶有本地緩存和索引機制的、可以注冊EventHandler的client。它是自定義控制跟API Server進行數據同步的重要組件。Informer通過ListAndWatch方法,把APIServer中的API對象緩存在本地,並負責更新和維護這個緩存。其中ListAndWatch方法的含義是:首先通過APIServer的LIST API獲取所有最新版本的API對象,然后再通過WATCH API監聽所有這些API對象的變化,通過監聽到的事件變化,Informer就可以實時地更新本地緩存,並且調用這些事件對應的EventHandler。在這個過程中,每經過resyncPeriod指定的時間,Informer維護的本地緩存,都會使用最近的LIST返回的結果強制更新一次,從而保證緩存的有效性。在Kubernetes中,這個緩存強制更新的操作叫做:resync
編寫控制循環
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { ... if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
//等待Informer完成一次本地緩存數據的同步操作 return fmt.Errorf("failed to wait for caches to sync") } ... for i := 0; i < threadiness; i++ {
// 通過gorouting並發啟動多個無限循環的任務 go wait.Until(c.runWorker, time.Second, stopCh) } ... return nil }
編寫業務邏輯
func (c *Controller) runWorker() { for c.processNextWorkItem() { } } func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() ... err := func(obj interface{}) error { ... if err := c.syncHandler(key); err != nil { return fmt.Errorf("error syncing '%s': %s", key, err.Error()) } c.workqueue.Forget(obj) ... return nil }(obj) ... return true } func (c *Controller) syncHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) ... network, err := c.networksLister.Networks(namespace).Get(name) if err != nil { if errors.IsNotFound(err) { glog.Warningf("Network does not exist in local cache: %s/%s, will delete it from Neutron ...", namespace, name) glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...", namespace, name) // FIX ME: call Neutron API to delete this network by name. // // neutron.Delete(namespace, name) return nil } ... return err } glog.Infof("[Neutron] Try to process network: %#v ...", network) // FIX ME: Do diff(). // // actualNetwork, exists := neutron.Get(namespace, name) // // if !exists { // neutron.Create(namespace, name) // } else if !reflect.DeepEqual(actualNetwork, network) { // neutron.Update(namespace, name) // } return nil }
編譯部署
# Clone repo $ git clone https://github.com/resouer/k8s-controller-custom-resource$ cd k8s-controller-custom-resource ### Skip this part if you don't want to build # Install dependency $ go get github.com/tools/godep $ godep restore # Build $ go build -o samplecrd-controller . $ ./samplecrd-controller -kubeconfig=$HOME/.kube/config -alsologtostderr=true I0915 12:50:29.051349 27159 controller.go:84] Setting up event handlers I0915 12:50:29.051615 27159 controller.go:113] Starting Network control loop I0915 12:50:29.051630 27159 controller.go:116] Waiting for informer caches to sync E0915 12:50:29.066745 27159 reflector.go:134] github.com/resouer/k8s-controller-custom-resource/pkg/client/informers/externalversions/factory.go:117: Failed to list *v1.Network: the server could not find the requested resource (get networks.samplecrd.k8s.io) ... $ kubectl apply -f crd/network.yaml ... I0915 12:50:29.051630 27159 controller.go:116] Waiting for informer caches to sync ... I0915 12:52:54.346854 25245 controller.go:121] Starting workers I0915 12:52:54.346914 25245 controller.go:127] Started workers $ cat example/example-network.yaml apiVersion: samplecrd.k8s.io/v1 kind: Network metadata: name: example-network spec: cidr: "192.168.0.0/16" gateway: "192.168.0.1" $ kubectl apply -f example/example-network.yaml network.samplecrd.k8s.io/example-network created ... I0915 12:50:29.051349 27159 controller.go:84] Setting up event handlers I0915 12:50:29.051615 27159 controller.go:113] Starting Network control loop I0915 12:50:29.051630 27159 controller.go:116] Waiting for informer caches to sync ... I0915 12:52:54.346854 25245 controller.go:121] Starting workers I0915 12:52:54.346914 25245 controller.go:127] Started workers I0915 12:53:18.064409 25245 controller.go:229] [Neutron] Try to process network: &v1.Network{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"example-network", GenerateName:"", Namespace:"default", ... ResourceVersion:"479015", ... Spec:v1.NetworkSpec{Cidr:"192.168.0.0/16", Gateway:"192.168.0.1"}} ... I0915 12:53:18.064650 25245 controller.go:183] Successfully synced 'default/example-network' ...