正文
Operator 是 CoreOS 推出的旨在簡化復雜有狀態應用管理,它是一個感知應用狀態的控制器,通過擴展 Kubernetes API 來自動創建、管理和配置應用實例。 Operator 基於 CRD 擴展資源對象,並通過控制器來保證應用處於預期狀態。
- 通過 Kubernetes API 觀察集群的當前狀態;
- 分析當前狀態與期望狀態的差別;
- 調用k8s API消除這些差別。
為什么使用crd
Kubernetes 目前已經成為了集群調度領域最炙手可熱的開源項目之一 。其內置的 controller一般可以滿足大多數使用場景,但對於很多定制化需求,其表達能力還是有限的。因此 Kubernetes 支持 Custom Resource Definition,也就是我們一直提到的 CRD。通過這一特性,用戶可以自己定義資源類型,Kubernetes 會將其視為資源的一種,對其提供像內置資源對象一樣的支持,這樣的實現更加原生。CRD可以大大提高 Kubernetes 的擴展能力 ,以更原生的方式實現定制化要求。
operator設計初衷
我們在管理應用時,會遇到無狀態和有狀態的應用。管理無狀態的應用是相對來說比較簡單的,但是有狀態的應用則比較復雜。Operator 的設計旨在簡化復雜有狀態應用管理,其通過CRD擴展 Kubernetes API 來自動創建、管理和配置應用實例。其本質上是針對特定的場景去做有狀態服務,或者說針對復雜應用場景去簡化其運維管理的工具。
Operator以deployment的形式部署到K8S中。部署完這個Operator之后,想要部署一個集群,其實很方便。因為不需要再去管理這個集群的配置信息了,只需要創建一個CRD,指定創建多少個節點,需要什么版本,Operator會監聽該資源對象,創建出符合配置要求的集群,從而大大簡化運維的難度和成本。
開發不同中間件operator流程大體相同,下面以redis operator進行說明:
首先准備
- 需要一個資源對象定義(CRD)yaml,operator代碼中會根據該yaml去組裝並創建CRD。
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: redisclusters.redis.middleware.hc.cn
spec:
group: redis.middleware.hc.cn
version: v1alpha1
scope: Namespaced
names:
kind: RedisCluster
singular: rediscluster
listKind: RedisClusterList
plural: redisclusters
shortNames:
- rec
后面創建的該CRD類型的資源對象(CR),其kind為該yaml描述中spec.names.kind的值。CR相當於CRD的具體實現。(不同的operator,CRD、CR定義不同);
- 准備一個CR yaml文件,后面operator代碼要根據該yaml結構在types.go中定義結構體。redis的CR yaml如下。operator最終會監聽該CR,解析里面定義的節點數、版本號等參數,驅動做一些事情。
apiVersion: redis.middleware.hc.cn/v1alpha1
kind: RedisCluster
metadata:
name: example000-redis-cluster
namespace: kube-system
spec:
# 代表redis集群的個數
replicas: 7
# 代表是否進入維修狀態
pause: true
# 是否刪除crd以及redis集群
finalizers: foreground
# 鏡像地址
repository: library/redis
# 鏡像版本,便於后續多版本特化支持
version: 3.2.8
#redis集群升級策略
updateStrategy:
# 升級類型為AutoReceive(自動分配,不用AssignStrategies), AssignReceive(指定值分配,需要用AssignStrategies)
type: AssignReceive
pipeline: "100"
assignStrategies:
-
slots: 2000
fromReplicas: nodeId1
-
# 從nodeId3,nodeId4一共分配1000個卡槽
slots: 1000
# 多個nodeId用逗號分隔
fromReplicas: nodeId3,nodeId4
# redis 實例配置詳情
pod:
# 標簽管理:map[string][string]
- labels:
key: value
# 備注管理:map[string][string]
annotations:
key: value
# 環境變量管理
env:
- name: tony
value: aa
- name: MAXMEMORY
value: 2gb
# 親和性管理
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: HC_Status
operator: In
values:
- C
podAntiAffinity: {}
# 資源管理
resources:
limits:
#cpu, memory, storage,ephemeral-storage
cpu: "2"
memory: 4Gi
requests:
cpu: "1"
memory: 2Gi
#statefulset更新模式
updateStrategy:
type: RollingUpdate
# 支持掛載形式: hostPath(不需要persistentVolumeClaimName),nfs(需要persistentVolumeClaimName)
volumes:
type: nfs
persistentVolumeClaimName: pvcName
# 配置文件模板名
configmap: name
# 監控鏡像
monitorImage: string
# 初始化鏡像
initImage: string
# 中間件容器鏡像
middlewareImage: string
status:
#當前statefulset replicas情況
replicas: 6
# 集群階段,None,Creating,Running,Failed,Scaling
# None 或 “”, 就是代表該CRD剛創建
# Creating 代表等待redis資源對象創建完畢(operator 發現CRD創建,創建資源對象,更新狀態)
# Running 代表已進行初始化操作(在Creating之后,發現實例起來完畢,初始化操作)
# Failed 代表着某異常故障
# ---------------------
# Scaling 代表着實例不一致(用戶修改實例,operator發現實例不一致,更新statefulset,更新狀態)
# Upgrading 代表着升級中
# ---------------------
phase: Creating
# 異常問題解釋
reason: "異常問題"
conditions:
- name: redis-cluster-0
instance: 10.168.78.90:6379
type: master
masterNodeId: allkk111snknkcs
nodeId: allkk111snknkcs
domainName: redis-cluster-0.redis-cluster.kube-system.svc.cluster.local
slots: 1024-2048
hostname: docker-vm-3
hostIP: 192.168.26.122
# true or flase
status: "True"
reason: xxxx
message: xxxx
lastTransitionTime: 2019-03-25T03:10:29Z
代碼生成
主要生成符合k8s風格的代碼:
- 生成風格統一的DeepCopy(CustomResources必須實現runtime.Object接口——必須實現DeepCopy方法);
- clientset(自定義資源對象的客戶端);
- listers(用來提供對於 GET/List 資源對象的請求提供只讀緩存層);
- informers(List/Get 資源對象,還可以監聽事件並觸發回調函數。
結構體定義到$ProjectName/pkg/apis/{中間件名稱}/{版本號}/types.go
里:
types.go中結構體定義根據上面准備的CR yaml定義。如下,其中需要注意的是,必須要給結構體加以下兩個注解:
- // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object注解表示:為該類型生成 func (t* T) DeepCopy() *T方法。API類型都需要實現深拷貝;
- // +genclient注解表示為當前類型生成客戶端。
3、編寫$ProjectName/pkg/apis/{中間件名稱}/{版本號}/doc.go,其中定義全局tag:// +k8s:deepcopy-gen=package,表示為包中任何類型生成深拷貝方法。package指定版本。
4、編寫$ProjectName/pkg/apis/{中間件名稱}/{版本號}/register.go,通過scheme注冊自定義CR類型,這樣當和API Server通信的時候就能夠處理該類型;(不同operator需要修改SchemeGroupVersion的Group和Version以及addKnownTypes中注冊的結構體)
package v1alpha1
import (
"harmonycloud.cn/middleware-operator-manager/pkg/apis/redis"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion {Group: redis.GroupName, Version: "v1alpha1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
//注冊CR對象
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&RedisCluster{},
&RedisClusterList{},
)
v1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
5、編寫$ProjectName/pkg/apis/{中間件名稱}/register.go,其中定義了上一步用到的GroupName;
6、使用kubernetes提供的code-generator代碼生成器工具,根據定義好的CR結構體對象生成風格統一的DeepCopy(CustomResources必須實現runtime.Object接口——必須實現DeepCopy方法)、clientset(自定義資源對象的客戶端)、listers(用來提供對於 GET/List 資源對象的請求提供只讀緩存層)、informers(List/Get 資源對象,還可以監聽事件並觸發回調函數)代碼。
code-generator地址如下,下載后放到$GOPATH/src/k8s.io/目錄下:
https://github.com/kubernetes/code-generator
然后執行以下命令,harmonycloud.cn/middleware-operator-manager/pkg/clients表示最終生成的clientset、informers、listers代碼目錄,最后的redis:v1alpha1需要改成{中間件名稱}:{版本}
./generate-groups.sh all "harmonycloud.cn/middleware-operator-manager/pkg/clients" "harmonycloud.cn/middleware-operator-manager/pkg/apis" "redis:v1alpha1"
執行后將生成以下代碼:
生成代碼時可能遇到的坑,請參考:
k8s自定義資源類型代碼自動生成:https://www.jianshu.com/p/cbeb513250d0
參考:
Extending Kubernetes: Create Controllers for Core and Custom Resources
operator主流程代碼開發
首先operator的入口為operator-manager.go里的main函數。
package main
import (
"fmt"
"github.com/spf13/pflag"
"harmonycloud.cn/middleware-operator-manager/cmd/operator-manager/app"
"harmonycloud.cn/middleware-operator-manager/cmd/operator-manager/app/options"
"k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/pkg/version/verflag"
"os"
)
func main() {
//參數初始化配置
s := options.NewOMServer()
s.AddFlags(pflag.CommandLine, app.KnownOperators())
flag.InitFlags()
//日志初始化
logs.InitLogs()
defer logs.FlushLogs()
verflag.PrintAndExitIfRequested()
//進行operator初始化
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
main函數中首先進行對參數的初始化,其中主要包括:operator多實例時的選主配置;事件同步時間;集群創建、升級超時時間;是否啟用leader功能;是否開啟pprof分析功能等,代碼在options.go中。
app.Run(s)根據參數配置進行operator初始化:
- 首先根據參數配置,構建默認客戶端(操作k8s已有資源對象)、leader選舉客戶端、操作擴展資源客戶端等;
- 之后創建CRD資源對象定義,后續創建的CR對象都是該CRD的實例;
- 注冊健康檢查接口、根據啟動參數配置決定是否開啟pprof分析接口功能;
- 創建recorder,主要用於記錄events(k8s資源),用於操作審計;
- 定義Run函數,進行啟動operator,選舉結果的leader執行該函數;
- 判斷是否開啟leader選舉功能;
- 創建leader選舉的資源鎖,目前資源鎖實現了configmaps和endpoints方式,具體代碼在client-go下,默認使用endpoints方式;
- 啟動leader選舉機制,爭搶到鎖,選舉為leader的實例執行OnStartedLeading,即上面定義的Run函數;失去鎖的實例執行OnStoppedLeading函數。
// Run runs the OMServer. This should never exit.
func Run(s *options.OperatorManagerServer) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
//根據參數配置,構建默認客戶端(操作k8s已有資源對象)、leader選舉客戶端、操作擴展資源客戶端等
kubeClient, leaderElectionClient, extensionCRClient, kubeconfig, err := createClients(s)
if err != nil {
return err
}
//根據提前准備好的CRD yaml文件,構建並創建CRD
err = CreateRedisClusterCRD(extensionCRClient)
if err != nil {
if errors.IsAlreadyExists(err) {
glog.Infof("redis cluster crd is already created.")
} else {
fmt.Fprint(os.Stderr, err)
return err
}
}
//注冊健康檢查接口、根據啟動參數配置決定是否開啟pprof分析接口功能
go startHTTP(s)
//創建recorder,主要用於記錄events(k8s資源)
recorder := createRecorder(kubeClient)
//定義Run函數,進行啟動operator,選舉結果的leader執行該函數
run := func(stop <-chan struct{}) {
operatorClientBuilder := operator.SimpleOperatorClientBuilder{
ClientConfig: kubeconfig,
}
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
otx, err := CreateOperatorContext(s, kubeconfig, operatorClientBuilder, rootClientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
otx.InformerFactory = informers.NewSharedInformerFactory(kubeClient, time.Duration(s.ResyncPeriod)*time.Second)
if err := StartOperators(otx, NewOperatorInitializers()); err != nil {
glog.Fatalf("error starting operators: %v", err)
}
otx.RedisInformerFactory.Start(otx.Stop)
otx.InformerFactory.Start(otx.Stop)
close(otx.InformersStarted)
select {}
}
//判斷是否開啟leader選舉功能
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
//創建leader選舉的資源鎖,目前資源鎖實現了configmaps和endpoints方式,具體代碼在client-go下,默認使用endpoints方式
rl, err := resourcelock.New(s.LeaderElection.ResourceLock,
"kube-system",
"middleware-operator-manager",
leaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
//啟動leader選舉機制,爭搶到鎖,選舉為leader的實例執行OnStartedLeading,即上面定義的Run函數;失去鎖的實例執行OnStoppedLeading函數
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
CreateRedisClusterCRD方法根據上面准備的CRD yaml文件構建並創建CRD,只有創建了該CRD,redisCluster資源對象才可以被創建。
func CreateRedisClusterCRD(extensionCRClient *extensionsclient.Clientset) error {
//TODO add CustomResourceValidation due to guarantee redis operator work normally,k8s1.12
crd := &v1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "redisclusters." + v1alpha1.SchemeGroupVersion.Group,
},
Spec: v1beta1.CustomResourceDefinitionSpec{
Group: v1alpha1.SchemeGroupVersion.Group,
Version: v1alpha1.SchemeGroupVersion.Version,
Scope: v1beta1.NamespaceScoped,
Names: v1beta1.CustomResourceDefinitionNames{
Kind: "RedisCluster",
ListKind: "RedisClusterList",
Plural: "redisclusters",
Singular: "rediscluster",
ShortNames: []string{"rec"},
},
},
}
_, err := extensionCRClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
return err
}
CR的apiVersion為CRD的spec.Group/spec.Version即生成代碼時register.go中的GroupName和doc.go中的版本號:
apiVersion: redis.middleware.hc.cn/v1alpha1
kind: RedisCluster
metadata:
name: example000-redis-cluster
namespace: kube-system
Run函數中主要創建context對象,context里包含啟動參數options,kubeconfig配置、RedisInformerFactory(監聽CR變化)、InformerFactory(監聽statefulsetset變化)等,進行啟動operator、啟動informer。
run := func(stop <-chan struct{}) {
operatorClientBuilder := operator.SimpleOperatorClientBuilder{
ClientConfig: kubeconfig,
}
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
//創建context對象,context里包含啟動參數options,kubeconfig配置、RedisInformerFactory(監聽CR變化)、InformerFactory(監聽statefulsetset變化)等
otx, err := CreateOperatorContext(s, kubeconfig, operatorClientBuilder, rootClientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
//創建InformerFactory
otx.InformerFactory = informers.NewSharedInformerFactory(kubeClient, time.Duration(s.ResyncPeriod)*time.Second)
//啟動operator,NewOperatorInitializers()中定義了啟動哪些operator
if err := StartOperators(otx, NewOperatorInitializers()); err != nil {
glog.Fatalf("error starting operators: %v", err)
}
//啟動RedisInformerFactory
otx.RedisInformerFactory.Start(otx.Stop)
//啟動InformerFactory
otx.InformerFactory.Start(otx.Stop)
close(otx.InformersStarted)
//阻塞
select {}
}
NewOperatorInitializers()中定義了啟動哪些operator(新加operator直接在該方法中加):
func NewOperatorInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["rediscluster"] = startRedisClusterController
return controllers
}
CreateOperatorContext函數里根據代碼生成器生成的redis客戶端versionedClient創建了RedisInformerFactory;(根據不同operator生成不同的客戶端,這里需要修改client_builder.go中ClientOrDie的返回值類型),最終創建context對象。
func CreateOperatorContext(s *options.OperatorManagerServer, kubeConfig *restclient.Config, operatorClientBuilder operator.OperatorClientBuilder, rootClientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (OperatorContext, error) {
versionedClient := operatorClientBuilder.ClientOrDie("middleware-shared-informers")
sharedInformers := redisInformerFactory.NewSharedInformerFactory(versionedClient, time.Duration(s.ResyncPeriod)*time.Second)
/*availableResources, err := GetAvailableResources(rootClientBuilder)
if err != nil {
return OperatorContext{}, err
}*/
otx := OperatorContext{
kubeConfig: kubeConfig,
OperatorClientBuilder: operatorClientBuilder,
DefaultClientBuilder: rootClientBuilder,
RedisInformerFactory: sharedInformers,
Options: *s,
//AvailableResources: availableResources,
Stop: stop,
InformersStarted: make(chan struct{}),
}
return otx, nil
}
StartOperators函數啟動所有NewOperatorInitializers中定義的operator,執行startRedisClusterController函數。(不同operator執行不同的啟動函數)。
startRedisClusterController定義在extensions.go中,用於創建operator、啟動worker協程從隊列中取出(用於處理informer監聽變化的資源對象)進行業務邏輯處理。(新增operator需要在extensions.go中增加對應的start函數)
func startRedisClusterController(otx OperatorContext) (bool, error) {
//創建redisOperator
rco, err := redis.NewRedisClusterOperator(
//注冊RedisInformer回調函數
otx.RedisInformerFactory.Cr().V1alpha1().RedisClusters(),
//注冊statefulsetInformer回調函數
otx.InformerFactory.Apps().V1().StatefulSets(),
//默認客戶端,用於操作k8s自身資源對象
otx.DefaultClientBuilder.ClientOrDie("default-kube-client"),
//代碼生成器生成的客戶端,用於操作CR
otx.OperatorClientBuilder.ClientOrDie("rediscluster-operator"),
//kubeconfig配置
otx.kubeConfig,
//啟動參數配置
otx.Options,
)
if err != nil {
return true, fmt.Errorf("error creating rediscluster operator: %v", err)
}
//啟動ConcurrentRedisClusterSyncs個worker協程處理變化的資源對象
go rco.Run(int(otx.Options.ConcurrentRedisClusterSyncs), otx.Stop)
return true, nil
}
NewRedisClusterOperator方法如下,主要創建該operator的結構體,隊列,redisInformer注冊回調函數,statefulsetInformer回調函數的注冊。(不同的operator,需要不同的Informer、處理業務邏輯的方法)
func NewRedisClusterOperator(redisInformer custominfomer.RedisClusterInformer, stsInformer appsinformers.StatefulSetInformer, kubeClient clientset.Interface, customCRDClient customclient.Interface, kubeConfig *rest.Config, options options.OperatorManagerServer) (*RedisClusterOperator, error) {
//創建該operator的recorder,記錄events
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
//創建該operator的結構體
rco := &RedisClusterOperator{
options: &options,
kubeConfig: kubeConfig,
defaultClient: kubeClient,
//extensionCRClient: extensionCRClient,
customCRDClient: customCRDClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "operator-manager"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rediscluster"),
}
//redisInformer注冊回調函數,當informer監聽到redis CR資源變化時,調用對應AddFunc、UpdateFunc、DeleteFunc回調函數將CR資源放到queue中
redisInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rco.addRedisCluster,
UpdateFunc: rco.updateRedisCluster,
// This will enter the sync loop and no-op, because the RedisCluster has been deleted from the store.
DeleteFunc: rco.deleteRedisCluster,
})
//定義最終處理業務邏輯的函數
rco.syncHandler = rco.syncRedisCluster
rco.enqueueRedisCluster = rco.enqueue
rco.redisClusterInformer = redisInformer.Informer()
//redisInformer是否已經開始同步事件變化
rco.redisClusterListerSynced = rco.redisClusterInformer.HasSynced
//lister提供操作informer中緩存的變化的資源接口
rco.redisClusterLister = redisInformer.Lister()
//statefulsetInformer注冊回調函數,當informer監聽到statefulset資源變化時,調用對應AddFunc、UpdateFunc、DeleteFunc回調函數將redis實例的statefulset加入到queue中
stsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: rco.addStatefulSet,
UpdateFunc: func(old, cur interface{}) {
oldSts := old.(*appsv1.StatefulSet)
curSts := cur.(*appsv1.StatefulSet)
if oldSts.Status.Replicas != curSts.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curSts.Name, oldSts.Status.Replicas, curSts.Status.Replicas)
}
rco.updateStatefulSet(oldSts, curSts)
},
DeleteFunc: rco.deleteStatefulSet,
},
)
rco.stsLister = stsInformer.Lister()
//statefulsetInformer是否已經開始同步事件變化
rco.stsListerSynced = stsInformer.Informer().HasSynced
return rco, nil
}
Run函數中等待redis CR資源、statefulset資源對象同步,然后啟動指定個數worker,並永久阻塞,直到stopCh被close(不同operator需要修改rco.redisClusterListerSynced為對應的ListerSynced)
func (rco *RedisClusterOperator) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rco.queue.ShutDown()
glog.Infof("Starting rediscluster operator")
defer glog.Infof("Shutting down rediscluster operator")
//等待redis CR資源、statefulset資源對象同步。
if !controller.WaitForCacheSync("rediscluster", stopCh, rco.redisClusterListerSynced, rco.stsListerSynced) {
return
}
//循環啟動指定個數worker,並永久阻塞,直到stopCh被close
for i := 0; i < workers; i++ {
go wait.Until(rco.worker, time.Second, stopCh)
}
<-stopCh
}
worker方法死循環rco.processNextWorkItem()在隊列Operator中定義的queue中取出變化的資源去處理(不同operator有不同的業務處理邏輯)
func (rco *RedisClusterOperator) worker() {
for rco.processNextWorkItem() {
}
}
從informer監聽到資源對象變化,回調函數將資源對象key(namespace/name)放到queue中,到worker取出queue中的key去做處理,處理完成后Done掉key流程圖如下:
回調函數將資源對象的key加入到queue中,worker從queue中取出key去處理業務,此時key會被放到processing集合中,表示該key正在被處理。worker處理key時如果遇到錯誤,該key會根據重試次數是否大於最大重試次數被加入到rateLimited(可以限制添加到queue中速度,最終還會被加入到queue)。worker處理key成功后,Forget(key)表示從rateLimited中清除,Done(key)表示key處理完畢,從processing集合中刪除。該代碼如下:
func (rco *RedisClusterOperator) processNextWorkItem() bool {
key, quit := rco.queue.Get()
if quit {
return false
}
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
defer rco.queue.Done(key)
err := rco.syncHandler(key.(string))
//加入到rateLimited中、forget(key)
rco.handleErr(err, key)
//處理key,主業務邏輯
go rco.syncHandler(key.(string))
return true
}
開發注意事項
-
開啟worker時,調用cache.WaitForCacheSync等待緩存開始同步。
-
不要改變原始對象(從lister中取出的對象),而要使用DeepCopy,因為緩存在informer之間共享。
-
根據CRD構建Statefulset時,給Statefulset加OwnerReferences,這樣在刪除CRD的時候,可以設置是否級聯刪除statefulset。
參考:
k8s垃圾收集:https://kubernetes.io/zh/docs/concepts/workloads/controllers/garbage-collection/
Kubernetes之Garbage Collection:https://blog.csdn.net/dkfajsldfsdfsd/article/details/81130786
調試
本地用IDE--goland調試代碼時,配置如下:
Run kind:選File;
Files:指定main函數所在文件的全路徑;
Output directory:指定編譯后輸出的二進制文件位置。可輸入。(默認輸出exe格式windows可執行文件)
Run after build:勾選后,編譯完成后運行。
Go tool arguments:填寫-i(用於增量編譯提速)。
Program arguments:用於指定程序啟動參數:
--kubeconfig=D:\SoftwareAndProgram\program\Go\Development\src\harmonycloud.cn\middleware-operator-manager\artifacts\config60 --v=5
--kubeconfig指定kubeconfig文件所在全路徑(即k8s集群master節點的/root/.kube/config),其指定k8s集群apiserver地址已經訪問時的證書信息。
--v指定glog日志級別,--v=5表示只輸出info小於5和error、warn日志。
glog.V(4).Infof("Adding RedisCluster %s", rc.Name)
glog.Warningf("-----------redisCluster: %#v--", redisCluster)
glog.Errorf(err.Error())
鏡像制作
編譯前提
提前安裝好go語言開發環境,正確設置GOROOT和GOPATH環境變量,要求go1.8.3版本以上
編譯二進制
將middleware-operator-manager
放在$GOPATH/src/harmonycloud.cn/
目錄下,進入到 $GOPATH/src/harmonycloud.cn/middleware-operator-manager/cmd/operator-manager
目錄, 最終要生成linux的可執行文件:
- 如果是在windows上編譯:
打開cmd窗口,進入以上目錄后,執行以下命令:
set GOOS=linux
go build -a -o operator-manager
- 如果是在linux上編譯:
執行以下命令:
go build -a -o operator-manager
等待編譯完成,最終在當前目錄下生成operator-manager可執行文件
鏡像制作
$GOPATH/src/harmonycloud.cn/middleware-operator-manager/artifacts
目錄下有Dockerfile文件,基礎鏡像為busybox
FROM busybox
ADD operator-manager /usr/bin/
RUN chmod +x /usr/bin/operator-manager
同級目錄下有operator-manager deployment描述文件operator-manager.yaml:
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
generation: 2
labels:
app: operator-manager
name: operator-manager
namespace: kube-system
spec:
replicas: 2
selector:
matchLabels:
app: operator-manager
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
app: operator-manager
spec:
containers:
- command:
- operator-manager
- --v=5
- --leader-elect=true
image: 192.168.26.46/k8s-deploy/operator-manager:v1
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 200m
memory: 512Mi
imagePullPolicy: Always
name: operator-manager
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 30
同級目錄下有build.sh腳本,指定了docker鏡像倉庫地址為192.168.26.46
#!/bin/bash
docker build -f ./Dockerfile -t operator-manager:v1 .
docker tag operator-manager:v1 192.168.26.46/k8s-deploy/operator-manager:v1
docker push 192.168.26.46/k8s-deploy/operator-manager:v1
kubectl apply -f operator-manager.yaml
執行該腳本即可以將operator-manager二進制打成鏡像並推送到192.168.26.46倉庫的k8s-deploy項目下: 同時執行了
kubectl apply -f operator-manager.yaml
命令創建了operator-manager的deployment對象,完成了部署。
operator高可用
用k8s組件中leader選舉機制實現redis operator組件的高可用,即正常情況下redis operator組件的多個副本只有一個是處於業務邏輯運行狀態,其它副本則不斷的嘗試去獲取鎖,去競爭leader,直到自己成為leader。如果正在運行的leader因某種原因導致當前進程退出,或者鎖丟失,則由其它副本去競爭新的leader,獲取leader繼而執行業務邏輯。
啟動兩個operator-manager實例:
可以看到只有一個實例operator-manager-86d785b5fc-m5rgh在同步事件,處理業務:
operator-manager-86d785b5fc-sszj2實例一直在競爭嘗試獲取鎖:
刪除掉正在同步事件的實例operator-manager-86d785b5fc-m5rgh:
實例operator-manager-86d785b5fc-sszj2競爭獲取到鎖,開始處理業務邏輯:
故可以通過反親和性防止兩個operator-manager實例調度到同一主機上,達到主備高可用。
最后附上源碼地址:
https://github.com/ll837448792/middleware-operator-manager
本公眾號免費提供csdn下載服務,海量IT學習資源,如果你准備入IT坑,勵志成為優秀的程序猿,那么這些資源很適合你,包括但不限於java、go、python、springcloud、elk、嵌入式 、大數據、面試資料、前端 等資源。同時我們組建了一個技術交流群,里面有很多大佬,會不定時分享技術文章,如果你想來一起學習提高,可以公眾號后台回復【2】,免費邀請加技術交流群互相學習提高,會不定期分享編程IT相關資源。
掃碼關注,精彩內容第一時間推給你