Go Client
在進入代碼之前,理解k8s的go client項目是對我們又幫助的。它是k8s client中最古老的一個,因此具有很多特性。 Client-go 沒有使用Swagger生成器,就像前面我們介紹的openAPI一樣。它使用的是源於k8s項目中的源代碼生成工具,這個工具的目的是要生成k8s風格的對象和序列化程序。
該項目是一組包的集合,該包能夠滿足從REST風格的原語到復雜client的不同的編程需求。
RESTClient是一個基礎包,它使用api-machinery庫中的類型作為一組REST原語提供對API的訪問。作為對RESTClient之上的抽象,_clientset_將是你創建k8s client工具的起點。它暴露了公開化的API資源及其對應的序列化。
注意: 在 client-go中還包含了如discovery, dynamic, 和 scale這樣的包,雖然本次不介紹這些包,但是了解它們的能力還是很重要的。
一個簡單的k8s client工具
讓我們再次回顧我們將要構建的工具,來說明go client的用法。pvcwatch是一個簡單的命令行工具,它可以監聽集群中聲明的PVC容量。當總數到達一個閾值的時候,他會采取一個action(在這個例子中是在屏幕上通知顯示)
這個例子是為了展示k8s的go client的以下幾個方面: - 如何去連接 - 資源列表的檢索和遍歷 - 對象監聽
連接API Server
我們Go client的第一步就是建立一個於API Server的連接。為了做到這一點,我們要使用實體包中的clientcmd,如下代碼所示:
import (
...
"k8s.io/client-go/tools/clientcmd"
)
func main() {
kubeconfig := filepath.Join(
os.Getenv("HOME"), ".kube", "config",
)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatal(err)
}
...
}
_Client-go_通過提供實體功能來從不同的上下文中獲取你的配置,從而使之成為一個不重要的任務。
從config文件
正如上面的例子所做的那樣,你能從kubeconfig文件啟動配置來連接API server。當你的代碼運行在集群之外的時候這是一個理想的方案。 clientcmd.BuildConfigFromFlags("", configFile)
從集群
當你的代碼運行在這個集群中的時候,你可以用上面的函數並且不使用任何參數,這個函數就會通過集群的信息去連接api server。
clientcmd.BuildConfigFromFlags("", "")
或者我們可以通過rest包來創建一個使用集群中的信息去配置啟動的(譯者注:k8s里所有的Pod都會以Volume的方式自動掛載k8s里面默認的ServiceAccount,所以會實用默認的ServiceAccount的授權信息),如下:
import "k8s.io/client-go/rest"
...
rest.InClusterConfig()
創建一個clientset
我們需要創建一個序列化的client為了讓我們獲取API對象。在kubernetes包中的Clientset類型定義,提供了去訪問公開的API對象的序列化client,如下:
type Clientset struct {
*authenticationv1beta1.AuthenticationV1beta1Client
*authorizationv1.AuthorizationV1Client
...
*corev1.CoreV1Client
}
一旦我們有正確的配置連接,我們就能使用這個配置去初始化一個clientset,如下:
func main() {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
...
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
}
對於我們的例子,我們使用的是v1的API對象。下一步,我們要使用clientset通過CoreV1()去訪問核心api資源,如下:
func main() {
...
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
api := clientset.CoreV1()
}
獲取集群的PVC列表
我們對clientset執行的最基本操作之一獲取存儲的API對象的列表。在我們的例子中,我們將要拿到一個namespace下面的pvc列表,如下:
import (
...
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func main() {
var ns, label, field string
flag.StringVar(&ns, "namespace", "", "namespace")
flag.StringVar(&label, "l", "", "Label selector")
flag.StringVar(&field, "f", "", "Field selector")
...
api := clientset.CoreV1()
// setup list options
listOptions := metav1.ListOptions{
LabelSelector: label,
FieldSelector: field,
}
pvcs, err := api.PersistentVolumeClaims(ns).List(listOptions)
if err != nil {
log.Fatal(err)
}
printPVCs(pvcs)
...
}
在上面的代碼中,我們使用ListOptions指定 label 和 field selectors (還有namespace)來縮小pvc列表的范圍,這個結果的返回類型是v1.PeristentVolumeClaimList。下面的這個代碼展示了我們如何去遍歷和打印從api server中獲取的pvc列表。
func printPVCs(pvcs *v1.PersistentVolumeClaimList) {
template := "%-32s%-8s%-8s\n"
fmt.Printf(template, "NAME", "STATUS", "CAPACITY")
for _, pvc := range pvcs.Items {
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
fmt.Printf(
template,
pvc.Name,
string(pvc.Status.Phase),
quant.String())
}
}
監聽集群中pvc
k8s的Go client框架支持為指定的API對象在其生命周期事件中監聽集群的能力,包括創建,更新,刪除一個指定對象時候觸發的CREATED,MODIFIED,DELETED事件。對於我們的命令行工具,我們將要監聽在集群中已經聲明的PVC的總量。
對於某一個namespace,當pvc的容量到達了某一個閾值(比如說200Gi),我們將會采取某個動作。為了簡單起見,我們將要在屏幕上打印個通知。但是在更復雜的實現中,可以使用相同的辦法觸發一個自動操作。
啟動監聽功能
現在讓我們為PersistentVolumeClaim這個資源通過Watch去創建一個監聽器。然后這個監聽器通過ResultChan從go的channel中訪問事件通知。
func main() {
...
api := clientset.CoreV1()
listOptions := metav1.ListOptions{
LabelSelector: label,
FieldSelector: field,
}
watcher, err :=api.PersistentVolumeClaims(ns).
Watch(listOptions)
if err != nil {
log.Fatal(err)
}
ch := watcher.ResultChan()
...
}
循環事件
接下來我們將要處理資源事件。但是在我們處理事件之前,我們先聲明resource.Quantity類型的的兩個變量為maxClaimsQuant和totalClaimQuant來分別表示我們的申請資源閾值(譯者注:代表某個ns下集群中運行的PVC申請的上限)和運行總數。
import(
"k8s.io/apimachinery/pkg/api/resource"
...
)
func main() {
var maxClaims string
flag.StringVar(&maxClaims, "max-claims", "200Gi",
"Maximum total claims to watch")
var totalClaimedQuant resource.Quantity
maxClaimedQuant := resource.MustParse(maxClaims)
...
ch := watcher.ResultChan()
for event := range ch {
pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
log.Fatal("unexpected type")
}
...
}
}
在上面的for-range循環中,watcher的channel用於處理來自服務器傳入的通知。每個事件賦值給變量event,並且event.Object的類型被聲明為PersistentVolumeClaim類型,所以我們能從中提取出來。
處理ADDED事件
當一個新的PVC創建的時候,event.Type的值被設置為watch.Added。然后我們用下面的代碼去獲取新增的聲明的容量(quant),將其添加到正在運行的總容量中(totalClaimedQuant)。最后我們去檢查是否當前的容量總值大於當初設定的最大值(maxClaimedQuant),如果大於的話我們就觸發一個事件。
import(
"k8s.io/apimachinery/pkg/watch"
...
)
func main() {
...
for event := range ch {
pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
log.Fatal("unexpected type")
}
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
switch event.Type {
case watch.Added:
totalClaimedQuant.Add(quant)
log.Printf("PVC %s added, claim size %s\n",
pvc.Name, quant.String())
if totalClaimedQuant.Cmp(maxClaimedQuant) == 1 {
log.Printf(
"\nClaim overage reached: max %s at %s",
maxClaimedQuant.String(),
totalClaimedQuant.String())
// trigger action
log.Println("*** Taking action ***")
}
}
...
}
}
}
處理DELETED事件
代碼也會在PVC被刪除的時候做出反應,它執行相反的邏輯以及把被刪除的這個PVC申請的容量在正在運行的容量的總值里面減去。
func main() {
...
for event := range ch {
...
switch event.Type {
case watch.Deleted:
quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
totalClaimedQuant.Sub(quant)
log.Printf("PVC %s removed, size %s\n",
pvc.Name, quant.String())
if totalClaimedQuant.Cmp(maxClaimedQuant) <= 0 {
log.Printf("Claim usage normal: max %s at %s",
maxClaimedQuant.String(),
totalClaimedQuant.String(),
)
// trigger action
log.Println("*** Taking action ***")
}
}
...
}
}
運行程序
當程序在一個運行中的集群被執行的時候,首先會列出PVC的列表。然后開始監聽集群中新的PersistentVolumeClaim事件。
$> ./pvcwatch
Using kubeconfig: /Users/vladimir/.kube/config
--- PVCs ----
NAME STATUS CAPACITY
my-redis-redis Bound 50Gi
my-redis2-redis Bound 100Gi
-----------------------------
Total capacity claimed: 150Gi
-----------------------------
--- PVC Watch (max claims 200Gi) ----
2018/02/13 21:55:03 PVC my-redis2-redis added, claim size 100Gi
2018/02/13 21:55:03
At 50.0% claim capcity (100Gi/200Gi)
2018/02/13 21:55:03 PVC my-redis-redis added, claim size 50Gi
2018/02/13 21:55:03
At 75.0% claim capcity (150Gi/200Gi)
下面讓我們部署一個應用到集群中,這個應用會申請75Gi容量的存儲。(例如,讓我們通過helm去部署一個實例influxdb)。
helm install --name my-influx \
--set persistence.enabled=true,persistence.size=75Gi stable/influxdb
正如下面你看到的,我們的工具立刻反應出來有個新的聲明以及一個警告因為當前的運行的聲明總量已經大於我們設定的閾值。
--- PVC Watch (max claims 200Gi) ----
...
2018/02/13 21:55:03
At 75.0% claim capcity (150Gi/200Gi)
2018/02/13 22:01:29 PVC my-influx-influxdb added, claim size 75Gi
2018/02/13 22:01:29
Claim overage reached: max 200Gi at 225Gi
2018/02/13 22:01:29 *** Taking action ***
2018/02/13 22:01:29
At 112.5% claim capcity (225Gi/200Gi)
相反,從集群中刪除一個PVC的時候,該工具會相應展示提示信息。
...
At 112.5% claim capcity (225Gi/200Gi)
2018/02/14 11:30:36 PVC my-redis2-redis removed, size 100Gi
2018/02/14 11:30:36 Claim usage normal: max 200Gi at 125Gi
2018/02/14 11:30:36 *** Taking action ***
client-go常用api
Example1
import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes"
appsv1beta1 "k8s.io/api/apps/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/typed/apps/v1beta1"
"flag"
"fmt"
"encoding/json"
)
func main() {
//kubelet.kubeconfig 是文件對應地址
kubeconfig := flag.String("kubeconfig", "kubelet.kubeconfig", "(optional) absolute path to the kubeconfig file")
flag.Parse()
// 解析到config
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 創建連接
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
deploymentsClient := clientset.AppsV1beta1().Deployments(apiv1.NamespaceDefault)
//創建deployment
go createDeployment(deploymentsClient)
//監聽deployment
startWatchDeployment(deploymentsClient)
}
//監聽Deployment變化
func startWatchDeployment(deploymentsClient v1beta1.DeploymentInterface) {
w, _ := deploymentsClient.Watch(metav1.ListOptions{})
for {
select {
case e, _ := <-w.ResultChan():
fmt.Println(e.Type, e.Object)
}
}
}
//創建deployemnt,需要謹慎按照部署的k8s版本來使用api接口
func createDeployment(deploymentsClient v1beta1.DeploymentInterface) {
var r apiv1.ResourceRequirements
//資源分配會遇到無法設置值的問題,故采用json反解析
j := `{"limits": {"cpu":"2000m", "memory": "1Gi"}, "requests": {"cpu":"2000m", "memory": "1Gi"}}`
json.Unmarshal([]byte(j), &r)
deployment := &appsv1beta1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "engine",
Labels: map[string]string{
"app": "engine",
},
},
Spec: appsv1beta1.DeploymentSpec{
Replicas: int32Ptr2(1),
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "engine",
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{ Name: "engine",
Image: "my.com/engine:v2",
Resources: r,
},
},
},
},
},
}
fmt.Println("Creating deployment...")
result, err := deploymentsClient.Create(deployment)
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())
}
func int32Ptr2(i int32) *int32 { return &i }
Example2
package main
import (
"flag"
"fmt"
"k8s.io/client-go/1.4/kubernetes"
"k8s.io/client-go/1.4/pkg/api"
"k8s.io/client-go/1.4/pkg/api/unversioned"
"k8s.io/client-go/1.4/pkg/api/v1"
"k8s.io/client-go/1.4/tools/clientcmd"
"log"
)
var (
kubeconfig = flag.String("kubeconfig", "./config", "absolute path to the kubeconfig file")
)
func main() {
flag.Parse()
// uses the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("116.213.205.180:8080", *kubeconfig)
if err != nil {
panic(err.Error())
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 創建pod
pod := new(v1.Pod)
pod.TypeMeta = unversioned.TypeMeta{Kind: "Pod", APIVersion: "v1"}
pod.ObjectMeta = v1.ObjectMeta{Name: "testapi", Namespace: "default", Labels: map[string]string{"name": "testapi"}}
pod.Spec = v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
Containers: []v1.Container{
v1.Container{
Name: "testapi",
Image: "nginx",
Ports: []v1.ContainerPort{
v1.ContainerPort{
ContainerPort: 80,
Protocol: v1.ProtocolTCP,
},
},
},
},
}
_, err = clientset.Core().Pods("default").Create(pod)
if err != nil {
panic(err.Error())
}
// 獲取現有的pod數量
pods, err := clientset.Core().Pods("").List(api.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
// 創建namespace
nc := new(v1.Namespace)
nc.TypeMeta = unversioned.TypeMeta{Kind: "NameSpace", APIVersion: "v1"}
nc.ObjectMeta = v1.ObjectMeta{
Name: "k8s-test",
}
nc.Spec = v1.NamespaceSpec{}
_, err = clientset.Core().Namespaces().Create(nc)
if err != nil {
log.Println(err)
}
// 獲取namespace
namespaces, err := clientset.Core().Namespaces().List(api.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d namespaces in the cluster\n", len(namespaces.Items))
}
deployment
// 列出 deploymentList
deploymentList, err := clientset.AppsV1().Deployments("default").List(metav1.ListOptions{})
// 查詢 deploymentGet
deploymentGet, err := clientset.AppsV1().Deployments("default").Get("nginx-web-v2", metav1.GetOptions{})
// 創建 deploymentCreate
deploymentCreate, err := clientset.AppsV1().Deployments("default").Create(deploymentName)
// 更新 deploymentUpdate
deploymentUpdate, err := clientset.AppsV1().Deployments("default").Update(deploymentName)
// 刪除deployment
err = clientset.AppsV1().Deployments("default").Delete("deploymentName", &metav1.DeleteOptions{})
Pod
不寫命令空間 即列出所有pod
//列出pod
podList, err := clientset.CoreV1().Pods("default").List(&meta_v1.ListOptions{})
//查詢pod
pod, err := clientset.CoreV1().Pods("default").Get(<podName>, meta_v1.GetOptions{})
//創建pod
pod, err := clientset.CoreV1().Pods("default").Create(web)
//更新pod
pod, err := clientset.CoreV1().Pods("default").Update(web)
//刪除pod
err := clientset.CoreV1().Pods("default").Delete(<podName>, &meta_v1.DeleteOptions{})
statefulset
// 列出 statefulList
statefulList, err := clientset.AppsV1().StatefulSets("default").List(metav1.ListOptions{})
// 查詢 statefulGet
statefulGet, err := clientset.AppsV1().StatefulSets("default").Get("web", metav1.GetOptions{})
// 創建 statefulCreate
statefulCreate, err := clientset.AppsV1().StatefulSets("default").Create(statefulName)
// 更新 statefulUpdate
statefulUpdate, err := clientset.AppsV1().StatefulSets("default").Update(statefulName)
// 刪除 stateful
err = clientset.AppsV1().StatefulSets("default").Delete("statefulName", &metav1.DeleteOptions{})
service
// 列出 serviceList
serviceList, err := clientset.CoreV1().Services("default").List(metav1.ListOptions{})
// 查詢 serviceGet
serviceGet, err := clientset.CoreV1().Services("default").Get("web", metav1.GetOptions{})
// 創建 serviceCreate
serviceCreate, err := clientset.CoreV1().Services("default").Create(web)
// 更新 serviceUpdate
serviceUpdate, err := clientset.CoreV1().Services("default").Update(web)
// 刪除 service
err = clientset.CoreV1().Services.("default").Delete("serviceName", &metav1.DeleteOptions{})
Ingress
// 列出 ingressList
ingressList, err := clientset.ExtensionsV1beta1().Ingresses("default").List(metav1.ListOptions{})
// 查詢 ingressGet
ingressGet, err := clientset.ExtensionsV1beta1().Ingresses("default").Get("web", metav1.GetOptions{})
// 創建 ingressCreate
ingressCreate, err := clientset.ExtensionsV1beta1().Ingresses("default").Create(web)
// 更新 ingressUpdate
ingressUpdate, err := clientset.ExtensionsV1beta1().Ingresses("default").Update(web)
// 刪除 ingress
err = clientset.ExtensionsV1beta1().Ingresses("default").Delete("web", &metav1.DeleteOptions{})