kubebuilder是一個使用CRD構建K8s API的SDK,主要功能是:
提供腳手架工具初始化CRD工程,自動生成boilerplate代碼和配置
提供代碼庫封裝底層的K8s go-client
初始化並創建api、webhook:
kubebuilder init --domain fluid.io kubebuilder create api --group data --version v1alpha1 --kind DataBackup --namespaced true kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation
生成的各個文件夾的作用:
-
config
config中各個文件夾分別存了Kustomize、CustomResourceDefinitions、RBAC configuration、WebhookConfigurations等
-
api/v1alpha1
groupversion_info.go包含了API schema定義:
var ( GroupVersion = schema.GroupVersion{Group: "data.fluid.io", Version: "v1alpha1"} //用於注冊資源對象的GV SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} //SchemeBuilder用於將go type添加到GVK scheme AddToScheme = SchemeBuilder.AddToScheme //AddToScheme用於將GV中的type添加到scheme )
創建資源對象xxx后,xxx_type.go中會有相關的結構體定義:
type CronJobSpec struct { Schedule string `json:"schedule"` StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"` Suspend *bool `json:"suspend,omitempty"` JobTemplate batchv1beta1.JobTemplateSpec `json:"jobTemplate"` SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"` FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"` } type ConcurrencyPolicy string const ( AllowConcurrent ConcurrencyPolicy = "Allow" ForbidConcurrent ConcurrencyPolicy = "Forbid" ReplaceConcurrent ConcurrencyPolicy = "Replace" ) type CronJobStatus struct { Active []corev1.ObjectReference `json:"active,omitempty"` LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"` } type CronJob struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec CronJobSpec `json:"spec,omitempty"` Status CronJobStatus `json:"status,omitempty"` } type CronJobList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []CronJob `json:"items"` }
將資源對象添加到Group:
func init() { SchemeBuilder.Register(&CronJob{}, &CronJobList{}) }
xxx_webhook.go:
首先為webhook設置一個名為cronjoblog的logger,此處使用controller-runtime/pkg/log包
var cronjoblog = logf.Log.WithName("cronjob-resource")
設置webhook
func (r *CronJob) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr).For(r).Complete() }
使用webhook.Defaulter接口來補全自定義資源對象的默認值:
var _ webhook.Defaulter = &CronJob{}
Default方法實現了webhook.Defaulter接口:
func (r *CronJob) Default() { cronjoblog.Info("default", "name", r.Name) if r.Spec.ConcurrencyPolicy == "" { r.Spec.ConcurrencyPolicy = AllowConcurrent } if r.Spec.Suspend == nil { r.Spec.Suspend = new(bool) } if r.Spec.SuccessfulJobsHistoryLimit == nil { r.Spec.SuccessfulJobsHistoryLimit = new(int32) *r.Spec.SuccessfulJobsHistoryLimit = 3 } if r.Spec.FailedJobsHistoryLimit == nil { r.Spec.FailedJobsHistoryLimit = new(int32) *r.Spec.FailedJobsHistoryLimit = 1 } }
使用webhook. Validator接口來驗證自定義資源對象的值:
var _ webhook.Validator = &CronJob{}
實現:
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *CronJob) ValidateCreate() error { cronjoblog.Info("validate create", "name", r.Name) return r.validateCronJob() } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type func (r *CronJob) ValidateUpdate(old runtime.Object) error { cronjoblog.Info("validate update", "name", r.Name) return r.validateCronJob() } // ValidateDelete implements webhook.Validator so a webhook will be registered for the type func (r *CronJob) ValidateDelete() error { cronjoblog.Info("validate delete", "name", r.Name) // TODO(user): fill in your validation logic upon object deletion. return nil }
創建、更新時調用了此邏輯:
//驗證name和spec func (r *CronJob) validateCronJob() error { var allErrs field.ErrorList if err := r.validateCronJobName(); err != nil { allErrs = append(allErrs, err) } if err := r.validateCronJobSpec(); err != nil { allErrs = append(allErrs, err) } if len(allErrs) == 0 { return nil } return apierrors.NewInvalid(schema.GroupKind{Group: "batch.tutorial.kubebuilder.io", Kind: "CronJob"}, r.Name, allErrs) } func (r *CronJob) validateCronJobName() *field.Error { if len(r.ObjectMeta.Name) > validationutils.DNS1035LabelMaxLength-11 { return field.Invalid(field.NewPath("metadata").Child("name"), r.Name, "must be no more than 52 characters") } return nil } func (r *CronJob) validateCronJobSpec() *field.Error { return validateScheduleFormat( r.Spec.Schedule, field.NewPath("spec").Child("schedule")) } func validateScheduleFormat(schedule string, fldPath *field.Path) *field.Error { if _, err := cron.ParseStandard(schedule); err != nil { return field.Invalid(fldPath, schedule, err.Error()) } return nil }
-
controllers
xxx_controller.go中是控制器相關邏輯
首先需要定義XxxReconciler結構體,用於reconcile一個Xxx對象
type CronJobReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme Clock }
注釋中說:We can pre-assign some pairs at the top of our reconcile method to have those attached to all log lines in this reconciler.
我理解的是controller-runtime的運行邏輯通過庫
logr被組織成了固定邏輯順序,主要工作是向消息中添加kv對
Reconcile方法中是處理邏輯:
func (r *CronJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ctx := context.Background() log := r.Log.WithValues("cronjob", req.NamespacedName) ...... }
省略處為待用戶實現的具體邏輯,以Cronjob為例,其邏輯分為以下步驟:
1、使用客戶端獲取資源對象
var cronJob batch.CronJob if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil { log.Error(err, "unable to fetch CronJob") return ctrl.Result{}, client.IgnoreNotFound(err) }
客戶端的Get方法,第一個參數是ctx,第三個參數是用於存放的地址
2、列出所有屬於此Cronjob的active的job,更新它們的狀態
//使用客戶端的List方法獲取所有屬於此Cronjob的job var childJobs kbatch.JobList err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}) if err != nil { log.Error(err, "unable to list child Jobs") return ctrl.Result{}, err } var activeJobs []*kbatch.Job var successfulJobs []*kbatch.Job var failedJobs []*kbatch.Job var mostRecentTime *time.Time // find the last run so we can update the status //如果Job是complete或者failer,就認為已經finish isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) { for _, c := range job.Status.Conditions { if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue { return true, c.Type } } return false, "" } // 從annotation中解析出調度時間 getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) { timeRaw := job.Annotations[scheduledTimeAnnotation] if len(timeRaw) == 0 { return nil, nil } timeParsed, err := time.Parse(time.RFC3339, timeRaw) if err != nil { return nil, err } return &timeParsed, nil } // 遍歷 for i, job := range childJobs.Items { _, finishedType := isJobFinished(&job) switch finishedType { case "": // ongoing activeJobs = append(activeJobs, &childJobs.Items[i]) case kbatch.JobFailed: failedJobs = append(failedJobs, &childJobs.Items[i]) case kbatch.JobComplete: successfulJobs = append(successfulJobs, &childJobs.Items[i]) } scheduledTimeForJob, err := getScheduledTimeForJob(&job) if err != nil { log.Error(err, "unable to parse schedule time for child job", "job", &job) continue } if scheduledTimeForJob != nil { if mostRecentTime == nil { mostRecentTime = scheduledTimeForJob } else if mostRecentTime.Before(*scheduledTimeForJob) { mostRecentTime = scheduledTimeForJob } } } if mostRecentTime != nil { cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime} } else { cronJob.Status.LastScheduleTime = nil } cronJob.Status.Active = nil for _, activeJob := range activeJobs { jobRef, err := ref.GetReference(r.Scheme, activeJob) if err != nil { log.Error(err, "unable to make reference to active job", "job", activeJob) continue } cronJob.Status.Active = append(cronJob.Status.Active, *jobRef) } log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs)) //使用客戶端的Update方法更新此Cronjob的status if err := r.Status().Update(ctx, &cronJob); err != nil { log.Error(err, "unable to update CronJob status") return ctrl.Result{}, err }
3、清理舊的job
// 刪除不一定要保證成功 if cronJob.Spec.FailedJobsHistoryLimit != nil { sort.Slice(failedJobs, func(i, j int) bool { if failedJobs[i].Status.StartTime == nil { return failedJobs[j].Status.StartTime != nil } return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime) }) for i, job := range failedJobs { if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit { break } if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil { log.Error(err, "unable to delete old failed job", "job", job) } else { log.V(0).Info("deleted old failed job", "job", job) } } } if cronJob.Spec.SuccessfulJobsHistoryLimit != nil { sort.Slice(successfulJobs, func(i, j int) bool { if successfulJobs[i].Status.StartTime == nil { return successfulJobs[j].Status.StartTime != nil } return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime) }) for i, job := range successfulJobs { if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit { break } if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil { log.Error(err, "unable to delete old successful job", "job", job) } else { log.V(0).Info("deleted old successful job", "job", job) } } }
4、檢查CronJob是否處於suspend狀態,如果是則不進行任何job的創建,直接返回
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend { log.V(1).Info("cronjob suspended, skipping") return ctrl.Result{}, nil }
5、進行下一個調度工作
//計算儲下一次需要創建Job的時間 getNextSchedule := func(cronJob *batch.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) { sched, err := cron.ParseStandard(cronJob.Spec.Schedule) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err) } var earliestTime time.Time if cronJob.Status.LastScheduleTime != nil { earliestTime = cronJob.Status.LastScheduleTime.Time } else { earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time } if cronJob.Spec.StartingDeadlineSeconds != nil { // controller is not going to schedule anything below this point schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)) if schedulingDeadline.After(earliestTime) { earliestTime = schedulingDeadline } } if earliestTime.After(now) { return time.Time{}, sched.Next(now), nil } starts := 0 for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) { lastMissed = t starts++ if starts > 100 { return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.") } } return lastMissed, sched.Next(now), nil } missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now()) if err != nil { log.Error(err, "unable to figure out CronJob schedule") return ctrl.Result{}, nil } scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} //保存結果 log = log.WithValues("now", r.Now(), "next run", nextRun)
6、Run a new job if it’s on schedule, not past the deadline, and not blocked by our concurrency policy
If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
if missedRun.IsZero() { log.V(1).Info("no upcoming scheduled times, sleeping until next") return scheduledResult, nil } // make sure we're not too late to start the run log = log.WithValues("current run", missedRun) tooLate := false if cronJob.Spec.StartingDeadlineSeconds != nil { tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now()) } if tooLate { log.V(1).Info("missed starting deadline for last run, sleeping till next") // TODO(directxman12): events return scheduledResult, nil } // 根據並行策略決定是否運行 if cronJob.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(activeJobs) > 0 { log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs)) return scheduledResult, nil } // 根據並行策略決定是否替換 if cronJob.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { for _, activeJob := range activeJobs { err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)) //刪除不成功無所謂 if client.IgnoreNotFound(err) != nil { log.Error(err, "unable to delete active job", "job", activeJob) return ctrl.Result{}, err } } } constructJobForCronJob := func(cronJob *batch.CronJob, scheduledTime time.Time) (*kbatch.Job, error) { name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix()) //job名包含nominal start time,避免啟動兩次 job := &kbatch.Job{ ObjectMeta: metav1.ObjectMeta{ Labels: make(map[string]string), Annotations: make(map[string]string), Name: name, Namespace: cronJob.Namespace, }, Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(), } for k, v := range cronJob.Spec.JobTemplate.Annotations { job.Annotations[k] = v } job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339) for k, v := range cronJob.Spec.JobTemplate.Labels { job.Labels[k] = v } if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil { return nil, err } return job, nil } // actually make the job job, err := constructJobForCronJob(&cronJob, missedRun) if err != nil { log.Error(err, "unable to construct job from template") // don't bother requeuing until we get a change to the spec return scheduledResult, nil } // create the job on the cluster if err := r.Create(ctx, job); err != nil { log.Error(err, "unable to create Job for CronJob", "job", job) return ctrl.Result{}, err }
7、返回結果,進行Requeue
return scheduledResult, nil
這里我理解的是:為了讓reconciler快速找到job,需要index。
描述Job資源對象中的indexed value,indexer會自動關注namespace,我們只需從Job中提取owner name
當Job改變、刪除的時候,也要能自動通知CronJob的控制器
新建manager:
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error { if r.Clock == nil { r.Clock = realClock{} } if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj runtime.Object) []string { // 獲取資源對象,解析owner job := rawObj.(*kbatch.Job) owner := metav1.GetControllerOf(job) if owner == nil { return nil } if owner.APIVersion != apiGVStr || owner.Kind != "CronJob” { //確定它是cronjob return nil } return []string{owner.Name} }); err != nil { return err } return ctrl.NewControllerManagedBy(mgr).For(&batch.CronJob{}).Owns(&kbatch.Job{}).Complete(r) }
具體向manager添加controller的邏輯參考controller-runtime框架
-
main.go
首先為Controller新建Scheme:
scheme = runtime.NewScheme()
設置一個名為cronjoblog的logger
setupLog = ctrl.Log.WithName("setup")
在init函數中執行這兩句話:
utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(batchv1.AddToScheme(scheme))
main函數:
首先解析metricsAddr、enableLeaderElection等參數
調用SetLogger函數:
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
設置mgr:
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, LeaderElection: enableLeaderElection, LeaderElectionID: "80807133.tutorial.kubebuilder.io", }) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) }
向manager添加controller,該controller包含一個XxxReconciler結構體:
if err = (&controllers.CronJobReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("CronJob"), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CronJob") os.Exit(1) }
實際調用了剛剛編寫的SetupWithManager方法
根據環境變量判斷是否啟用webhook
webhook可以和controller分離
if os.Getenv("ENABLE_WEBHOOKS") != "false" { if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "Captain") os.Exit(1) } }
啟動controller:
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } }
-
dockerfile
默認的dockerfile:
FROM golang:1.15 as builder WORKDIR /workspace COPY go.mod go.mod COPY go.sum go.sum RUN go mod download COPY main.go main.go COPY api/ api/ COPY controllers/ controllers/ RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o manager main.go FROM gcr.io/distroless/static:nonroot WORKDIR / COPY --from=builder /workspace/manager . USER nonroot:nonroot ENTRYPOINT ["/manager"]
-
makefile
makefile中實際調用了controller-gen(屬於kubebuilder的子項目
controller-tools)來自動生成代碼等
crd:trivialVersions=true 生成CRD資源對象
rbac:roleName=xxx 生成ClusterRole資源對象
webhook 生成Mutating、Validating的WebhookConfiguration資源對象
object:headerFile 生成DeepCopy、DeepCopyInto、DeepCopyObject具體實現代碼
output:<generator>:<rule> 控制生成的東西的輸出
<generator>可以是crd等
<rule>可以是artifacts:config=xxx、stdout、none等
沒有指定<rule>的<generator>會輸出到默認的地方,例如RBAC輸出到config/rbac文件夾
path=xxx 指定package root目錄
首先下載/查找controller-gen工具:
controller-gen: ifeq (, $(shell which controller-gen)) @{ \ set -e ;\ CONTROLLER_GEN_TMP_DIR=$$(mktemp -d) ;\ cd $$CONTROLLER_GEN_TMP_DIR ;\ export GO111MODULE=on ;\ go mod init tmp ;\ go get sigs.k8s.io/controller-tools/cmd/controller-gen@v0.3.0 ;\ rm -rf $$CONTROLLER_GEN_TMP_DIR ;\ } CONTROLLER_GEN=$(GOBIN)/controller-gen else CONTROLLER_GEN=$(shell which controller-gen) endif
生成代碼:
generate: controller-gen GO111MODULE=off $(CONTROLLER_GEN) object:headerFile=./hack/boilerplate.go.txt paths="./..."
boilerplate.go.txt里面是協議注釋
docker-build環節(構建鏡像)和manager環節(直接go build生成二進制文件)前除了先執行generate環節生成代碼,還要經過fmt、vet環節(執行go fmt、go vet命令)
docker-build環節前還要進行test環節(執行go test命令)
manifests環節會生成CRD、RBAC、wehook等配置清單文件:
CRD_OPTIONS ?= "crd:trivialVersions=true" # Generate manifests e.g. CRD, RBAC etc. manifests: controller-gen GO111MODULE=off $(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
安裝、卸載CRD在install、uninstall環節(實際用到了kustomize工具)
run環節會直接運行main.go文件,因此需要預先完成generate、manifests,並進行fmt、vet環節
deploy環節同樣使用kustomize工具,在k8s中安裝controller
為自定義資源添加子資源
方法為在api/v1alpha1中添加注釋:
// +kubebuilder:subresource:xxxxx
默認只支持2種
①使status成為CR的子資源
雖然kubebuilder默認會生成status的結構體,但status默認不是CR的子資源
需要:保證用戶只能修改spec部分,控制器才能修改status部分;再加上注釋
// +kubebuilder:subresource:status
②使用kubectl scale命令的形式來方便地修改CRD的replicas值,就像Deploy/Sts一樣
在注釋中添加:
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.currentWorkerNumberScheduled,selectorpath=.status.selector
此時再查詢API:
$ kubectl get --raw /apis/data.fluid.io/v1alpha1/namespaces/default/alluxioruntimes/hbase/scale | jq { "kind": "Scale", "apiVersion": "autoscaling/v1", "metadata": { "name": "hbase", "namespace": "default", "selfLink": "/apis/data.fluid.io/v1alpha1/namespaces/default/alluxioruntimes/hbase/scale", "uid": "a3225118-b789-40eb-9c2e-9cf5a82d28b3", "resourceVersion": "1280009304", "creationTimestamp": "2021-03-25T06:02:21Z" }, "spec": { "replicas": 2 }, "status": { "replicas": 2, "selector": "app=alluxio,release=hbase,role=alluxio-worker" } }