k8s中部署 zookeeper kafka集群部署


1 環境說明

主機名 系統版本 IP地址 cpu/內存/磁盤 用途 軟件版本
k8s_nfs CentOS7.5 172.16.1.60 2核/2GB/60GB zookeeper、kafka的nfs存儲 nfs-utils-1.3.0-0.68
k8s-master1 CentOS7.5 172.16.1.81 2核/2GB/60GB kubernetes master1節點 k8s v1.20.0
k8s-master2 CentOS7.5 172.16.1.82 2核/2GB/60GB kubernetes master2節點 k8s v1.20.0
k8s-node1 CentOS7.5 172.16.1.83 4核/8GB/60GB kubernetes node1節點 k8s v1.20.0
k8s-node2 CentOS7.5 172.16.1.84 4核/8GB/60GB kubernetes node2節點 k8s v1.20.0

補充: kubernetes集群的控制節點我打了污點不能被pod調度使用。 zookeeper鏡像: guglecontainers/kubernetes-zookeeper:1.0-3.4.10 對應版本: zookeeper-3.4.10 kafka鏡像: cloudtrackinc/kubernetes-kafka:0.10.0.1 對應版本: kafka_2.11-0.10.0.1

 
           
1 nfs服務部署
節點: k8s_nfs
用途: k8s pod 數據持久化存儲
說明: nfs服務的搭建過程不再贅述
驗證:
[root@k8s_nfs ~]# ls /ifs/kubernetes/
kafka  zk
# 注: kafka目錄用於存儲kafka的數據,zk目錄用於存儲zookeeper的數據。

[root@k8s_nfs ~]# showmount -e 172.16.1.60
Export list for 172.16.1.60:
/ifs/kubernetes *

2 nfs-subdir-external-provisioner插件部署
節點: kubernetes集群
用途: 為中間件pod提供pvc自動供給
注意: 在部署前需要在k8s各個節點上部署nfs的客戶端(yum install nfs-utils -y),否則無法部署成功。

項目:
(1) github項目地址: https://github.com/kubernetes-sigs/nfs-subdir-external-provisioner
(2) 下載 deploy 目錄如下文件
class.yaml、deployment.yaml、rbac.yaml
# 可以根據需要修改三個配置文件

配置文件說明:
(1) 創建命名空間
[root@k8s-master1 ~]# kubectl create namespace zk-kafka


(2) 部署文件說明
[root@k8s-master1 zk-kafka-nfs]# ls -l /root/zk-kafka-nfs/
total 20
# kafka集群的storageclass
-rw-r--r-- 1 root root  392 Feb 21 17:09 class_kafka.yaml
# zk集群的storageclass
-rw-r--r-- 1 root root  386 Feb 21 17:08 class_zk.yaml
# kafka集群的nfs-client-provisioner
-rw-r--r-- 1 root root 1355 Feb 21 17:07 deployment_kafka.yaml
# zk集群的nfs-client-provisioner
-rw-r--r-- 1 root root 1325 Feb 21 17:03 deployment_zk.yaml
# nfs的rbac
-rw-r--r-- 1 root root 1905 Feb 21 15:52 rbac.yaml

說明: zk、kafka都在zk-kafka命名空間,共用一套rbac,class、deployment命名不同。

(3) rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: zk-kafka
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: nfs-client-provisioner-runner
rules:
  - apiGroups: [""]
    resources: ["nodes"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["persistentvolumes"]
    verbs: ["get", "list", "watch", "create", "delete"]
  - apiGroups: [""]
    resources: ["persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "update"]
  - apiGroups: ["storage.k8s.io"]
    resources: ["storageclasses"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-nfs-client-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    # replace with namespace where provisioner is deployed
    namespace: zk-kafka
roleRef:
  kind: ClusterRole
  name: nfs-client-provisioner-runner
  apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: zk-kafka
rules:
  - apiGroups: [""]
    resources: ["endpoints"]
    verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: zk-kafka
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    # replace with namespace where provisioner is deployed
    namespace: zk-kafka
roleRef:
  kind: Role
  name: leader-locking-nfs-client-provisioner
  apiGroup: rbac.authorization.k8s.io


(4) deployment_zk.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nfs-client-provisioner-zk
  labels:
    app: nfs-client-provisioner-zk
  # replace with namespace where provisioner is deployed
  namespace: zk-kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      app: nfs-client-provisioner-zk
  template:
    metadata:
      labels:
        app: nfs-client-provisioner-zk
    spec:
      serviceAccountName: nfs-client-provisioner
      containers:
        - name: nfs-client-provisioner-zk
          #image: k8s.gcr.io/sig-storage/nfs-subdir-external-provisioner:v4.0.2
          image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/nfs-subdir-external-provisione:v4.0.1
          volumeMounts:
            - name: nfs-client-root
              mountPath: /persistentvolumes
          env:
            - name: PROVISIONER_NAME
              value: k8s-sigs.io/nfs-subdir-external-provisioner-zk
            - name: NFS_SERVER
              value: 172.16.1.60         # 修改為nfs服務器地址
            - name: NFS_PATH
              value: /ifs/kubernetes/zk  # 修改為nfs共享目錄
      volumes:
        - name: nfs-client-root
          nfs:
            server: 172.16.1.60          # 修改為nfs服務器地址
            path: /ifs/kubernetes/zk     # 修改為nfs共享目錄


(5) deployment_kafka.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nfs-client-provisioner-kafka
  labels:
    app: nfs-client-provisioner-kafka
  # replace with namespace where provisioner is deployed
  namespace: zk-kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      app: nfs-client-provisioner-kafka
  template:
    metadata:
      labels:
        app: nfs-client-provisioner-kafka
    spec:
      serviceAccountName: nfs-client-provisioner
      containers:
        - name: nfs-client-provisioner-kafka
          #image: k8s.gcr.io/sig-storage/nfs-subdir-external-provisioner:v4.0.2
          image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/nfs-subdir-external-provisione:v4.0.1
          volumeMounts:
            - name: nfs-client-root
              mountPath: /persistentvolumes
          env:
            - name: PROVISIONER_NAME
              value: k8s-sigs.io/nfs-subdir-external-provisioner-kafka
            - name: NFS_SERVER
              value: 172.16.1.60            # 修改為nfs服務器地址
            - name: NFS_PATH
              value: /ifs/kubernetes/kafka  # 修改為nfs共享目錄
      volumes:
        - name: nfs-client-root
          nfs:
            server: 172.16.1.60             # 修改為nfs服務器地址
            path: /ifs/kubernetes/kafka     # 修改為nfs共享目錄


(6) class_zk.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: managed-nfs-storage-zk  # pv 動態供給插件名稱
provisioner: k8s-sigs.io/nfs-subdir-external-provisioner-zk  # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:
  archiveOnDelete: "true"    # 修改刪除pv后,自動創建的nfs共享目錄不被刪除。默認false,即刪除


(7) class_kafka.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: managed-nfs-storage-kafka  # pv 動態供給插件名稱
provisioner: k8s-sigs.io/nfs-subdir-external-provisioner-kafka  # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:
  archiveOnDelete: "true"    # 修改刪除pv后,自動創建的nfs共享目錄不被刪除。默認false,即刪除


(8) 部署yaml文件並查看信息
[root@k8s-master1 zk-kafka-nfs]# ls | xargs -i kubectl apply -f {}
[root@k8s-master1 zk-kafka-nfs]# kubectl get deployment,pod,svc -n zk-kafka
[root@k8s-master1 zk-kafka-nfs]# kubectl get sc

圖示: image-20220221192700378

2 zookeeper、kafka說明

1 Kafka和zookeeper是兩種典型的有狀態的應用集群服務。首先kafka和zookeeper都需要存儲盤來保存有狀態信息;其次kafka和zookeeper每
一個實例都需要有對應的實例Id(Kafka需broker.id, zookeeper需要my.id)來作為集群內部每個成員的標識,集群內節點之間進行內部通信時需
要用到這些標識。

2 對於這類服務的部署,需要解決兩個大的問題,一個是狀態保存,另一個是集群管理(多服務實例管理)。kubernetes中的StatefulSet方便了有
狀態集群服務的部署和管理。通常來說,通過下面三個手段來實現有狀態集群服務的部署。
(1) 通過Init Container來做集群的初始化工作。
(2) 通過Headless Service來維持集群成員的穩定關系。
(3) 通過Persistent Volume和Persistent Volume Claim提供網絡存儲來持久化數據。
因此,在K8S集群里面部署類似kafka、zookeeper這種有狀態的服務,不能使用Deployment,必須使用StatefulSet來部署,有狀態簡單來說就
是需要持久化數據,比如日志、數據庫數據、服務狀態等。

3 StatefulSet應用場景
(1) 穩定的持久化存儲,即Pod重新調度后還是能訪問到相同的持久化數據,基於PVC來實現。
(2) 穩定的網絡標志,即Pod重新調度后其PodName和HostName不變,基於Headless Service(即沒有Cluster IP的Service)來實現。
(3) 有序部署,有序擴展,即Pod是有順序的,在部署或者擴展的時候要依據定義的順序依次進行(即從0到N-1,在下一個Pod運行之前所有之前的
Pod必須都是Running和Ready狀態),基於init containers來實現有序收縮,有序刪除(即從N-1到0)。

4 StatefulSet組成
用於定義網絡標志(DNS domain)的Headless Service。
用於創建PersistentVolumes的volumeClaimTemplates。
定義具體應用的StatefulSet。

5 StatefulSet中每個Pod的DNS格式
<statefulSetName-{0..N-1}>.<serviceName>.<namespace>.svc.cluster.local
(1) statefulSetName為StatefulSet的名字,0..N-1為Pod所在的序號,從0開始到N-1。
(2) serviceName為Headless Service的名字。
(3) namespace為服務所在的namespace,Headless Servic和StatefulSet必須在相同的namespace。
(4) svc.cluster.local為K8S的Cluster Domain集群根域。

3 zookeeper集群部署

3.1 yaml配置文件
 
           
1 參考文檔
https://kubernetes.io/zh/docs/tutorials/stateful-application/zookeeper/

2 zk-cluster.yml
zookeeper有三個端口,218138882888,三個端口的作用為:
(1) 2181  # 為client端提供服務的端口
(2) 3888  # 選舉leader使用的端口
(3) 2888  # 集群內節點間通訊的端口

[root@k8s-master1 ~]# mkdir -p zk-kafka-cluster
[root@k8s-master1 ~]# cd zk-kafka-cluster/
[root@k8s-master1 zk-kafka-cluster]# cat zk-cluster.yml
apiVersion: v1
kind: Service
metadata:
  namespace: zk-kafka
  name: zk-hs
  labels:
    app: zk
spec:
  ports:
  - port: 2888
    name: server
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: zk
---
apiVersion: v1
kind: Service
metadata:
  namespace: zk-kafka
  name: zk-cs
  labels:
    app: zk
spec:
  #type: NodePort
  ports:
  - port: 2181
    targetPort: 2181
    name: client
    #nodePort: 32181
  selector:
    app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  namespace: zk-kafka
  name: zk-pdb
spec:
  selector:
    matchLabels:
      app: zk
  maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  namespace: zk-kafka
  name: zk
spec:
  serviceName: zk-hs
  replicas: 3
  selector:
    matchLabels:
      app: zk
  template:
    metadata:
      labels:
        app: zk
    spec:
      containers:
      - name: kubernetes-zookeeper
        imagePullPolicy: Always
        image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/kubernetes-zookeeper:1.0-3.4.10
        resources:
          requests:
            memory: "1024Mi"
            cpu: "500m"
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        command:
        - sh
        - -c
        - "start-zookeeper \
          --servers=3 \
          --data_dir=/var/lib/zookeeper/data \
          --data_log_dir=/var/lib/zookeeper/data/log \
          --conf_dir=/opt/zookeeper/conf \
          --client_port=2181 \
          --election_port=3888 \
          --server_port=2888 \
          --tick_time=2000 \
          --init_limit=10 \
          --sync_limit=5 \
          --heap=512M \
          --max_client_cnxns=60 \
          --snap_retain_count=3 \
          --purge_interval=12 \
          --max_session_timeout=40000 \
          --min_session_timeout=4000 \
          --log_level=INFO"
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/zookeeper
  volumeClaimTemplates:
  - metadata:
      name: datadir
      #annotations:
      #  volume.beta.kubernetes.io/storage-class: "zk-nfs-storage"
    spec:
      storageClassName: "managed-nfs-storage-zk"
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 10Gi

3.2 部署
1 應用yml文件
[root@k8s-master1 zk-kafka-cluster]# kubectl apply -f zk-cluster.yml
service/zk-hs created
service/zk-cs created
poddisruptionbudget.policy/zk-pdb created
statefulset.apps/zk created
2 查看zk集群的pod
[root@k8s-master1 zk-kafka-cluster]# kubectl get pod -n zk-kafka -l app=zk

圖示: image-20220222173530581

3 查看zk集群的svc
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc,ep -n zk-kafka

圖示: image-20220222173611859

4 查看zk集群的pvc、pv
PVC是namespace命名空間級別的,查詢時需要跟"-n 命名空間",PV是集群級別的,查詢時可以不需要跟"-n 命名空間"。
[root@k8s-master1 zk-kafka-cluster]# kubectl get pvc -n zk-kafka
[root@k8s-master1 zk-kafka-cluster]# kubectl get pv

圖示: image-20220222173706308

5 查看zk集群的nfs持久化共享目錄
可以發現,NFS的持久化目錄名稱組成為,<namespace名稱>-<PVC名稱>-<PV名稱>,只要PVC和PV不刪除,這個持久化目錄名稱就不會變。
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/zk/
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/zk/*/
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/zk/*/data/
[root@k8s_nfs ~]# cat /ifs/kubernetes/zk/*/data/myid

圖示: image-20220222175207333

6 查看zk集群的主從關系
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-0 -n zk-kafka -- zkServer.sh status
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-1 -n zk-kafka -- zkServer.sh status
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-2 -n zk-kafka -- zkServer.sh status

圖示: image-20220222185124578

7 查看zk集群的配置文件
[root@k8s-master1 ~]# kubectl exec -it pod/zk-0 -n zk-kafka -- bash
root@zk-0:/# cat /opt/zookeeper/conf/zoo.cfg

圖示: image-20220223172225249

3.3 驗證zk集群的連接
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc -n zk-kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
zk-cs ClusterIP 172.28.20.31 <none> 2181/TCP 79m
zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 79m
[root@k8s-master1 zk-kafka-cluster]#

zookeeper集群連接方式:
zk-cs:2181
zk-0.zk-hs.zk-kafka.svc.cluster.local:2181
zk-1.zk-hs.zk-kafka.svc.cluster.local:2181
zk-2.zk-hs.zk-kafka.svc.cluster.local:2181

[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-0 -n zk-kafka -- zkCli.sh -server zk-cs:2181

圖示: image-20220222192944650

4 kafka集群部署

4.1 yaml配置文件

  
  
  
          
[root@k8s-master1 zk-kafka-cluster]# cat kafka-cluster.yml
apiVersion: v1
kind: Service
metadata:
  namespace: zk-kafka
  name: kafka-cs
  labels:
    app: kafka
spec:
  #type: NodePort
  clusterIP: None
  ports:
  - port: 9092
    targetPort: 9092
    name: client
    #nodePort: 32092
  selector:
    app: kafka
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: kafka-pdb
  namespace: zk-kafka
spec:
  selector:
    matchLabels:
      app: kafka
  minAvailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  namespace: zk-kafka
  name: kafka
spec:
  serviceName: kafka-cs
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: k8s-kafka
        imagePullPolicy: IfNotPresent
        image: cloudtrackinc/kubernetes-kafka:0.10.0.1
        ports:
        - containerPort: 9092
          name: client
        resources:
          requests:
            memory: "1024Mi"
            cpu: "500m"
        command:
        - sh
        - -c
        - "exec /opt/kafka_2.11-0.10.0.1/bin/kafka-server-start.sh /opt/kafka_2.11-0.10.0.1/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9092 \
          --override zookeeper.connect=zk-0.zk-hs.zk-kafka.svc.cluster.local:2181,zk-1.zk-hs.zk-kafka.svc.cluster.local:2181,zk-2.zk-hs.zk-kafka.svc.cluster.local:2181 \
          --override log.dirs=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka
        lifecycle:
          postStart:
            exec:
              command: ["/bin/sh","-c","touch /tmp/health"]
        livenessProbe:
          exec:
            command: ["test","-e","/tmp/health"]
          initialDelaySeconds: 5
          timeoutSeconds: 5
          periodSeconds: 10
        readinessProbe:
          tcpSocket:
            port: client
          initialDelaySeconds: 15
          timeoutSeconds: 5
          periodSeconds: 20
  volumeClaimTemplates:
  - metadata:
      name: datadir
      #annotations:
      #  volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage"
    spec:
      storageClassName: "managed-nfs-storage-kafka"
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 10Gi



4.2 部署
1 應用配置文件
[root@k8s-master1 zk-kafka-cluster]# kubectl apply -f kafka-cluster.yml
service/kafka-cs created
poddisruptionbudget.policy/kafka-pdb created
statefulset.apps/kafka created
2 查看kafka集群的pod
[root@k8s-master1 zk-kafka-cluster]# kubectl get pod -n zk-kafka

圖示: image-20220224110920223

3 查看kafka集群的pvc
[root@k8s-master1 zk-kafka-cluster]# kubectl get pvc -n zk-kafka

圖示: image-20220224111040113

4 查看kafka集群的pv
[root@k8s-master1 zk-kafka-cluster]# kubectl get pv

圖示: image-20220224111932085

5 查看kafka集群的svc
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc,ep -n zk-kafka

圖示: image-20220224112219552

6 查看kafka集群的nfs持久化存儲目錄
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/kafka/
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/kafka/*/
[root@k8s_nfs ~]# cat /ifs/kubernetes/kafka/*/meta.properties

圖示: image-20220224112423792

7 kafka集群內部連接地址
# kubectl run -i --tty --image busybox:1.28.4 dns-test --restart=Never --rm /bin/sh
If you don't see a command prompt, try pressing enter.
/ # nslookup kafka-cs.zk-kafka.svc.cluster.local
Server: 172.28.0.2
Address 1: 172.28.0.2 kube-dns.kube-system.svc.cluster.local

Name: kafka-cs.zk-kafka.svc.cluster.local
Address 1: 172.27.36.90 kafka-2.kafka-cs.zk-kafka.svc.cluster.local
Address 2: 172.27.169.156 kafka-0.kafka-cs.zk-kafka.svc.cluster.local
Address 3: 172.27.169.157 kafka-1.kafka-cs.zk-kafka.svc.cluster.local
/ # exit
pod "dns-test" deleted
[root@k8s-master1 zk-kafka-cluster]#
4.3 測試(驗證kafka集群數據的生產和消費)
1 說明
(1) zookeeper、kafka的svc信息
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc -n zk-kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-cs ClusterIP None <none> 9092/TCP 76m
zk-cs ClusterIP 172.28.66.236 <none> 2181/TCP 107m
zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 107m

(2) 客戶端訪問zookeeper的方式
zk-cs:2181
zk-0.zk-hs.zk-kafka.svc.cluster.local:2181
zk-1.zk-hs.zk-kafka.svc.cluster.local:2181
zk-2.zk-hs.zk-kafka.svc.cluster.local:2181

(3) 客戶端訪問kafka的方式
kafka-0.kafka-cs.zk-kafka.svc.cluster.local:9092
kafka-1.kafka-cs.zk-kafka.svc.cluster.local:9092
kafka-2.kafka-cs.zk-kafka.svc.cluster.local:9092

2 登錄到三個kafka集群中的任意一個kafka的pod容器實例(比如kafka-0節點),進行kafka數據生產
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it pod/kafka-0 -n zk-kafka -- bash
> cd /opt/kafka_2.11-0.10.0.1/bin/

(1) 創建名稱為test的topic
> ./kafka-topics.sh --create \
--topic test \
--zookeeper zk-cs:2181 \
--partitions 3 \
--replication-factor 3
輸出信息如下:
Created topic "test".

(2) 查看topic列表
> ./kafka-topics.sh --list --zookeeper zk-cs:2181
輸出信息如下:
test

(3) 查看名稱為test的topic的描述信息
> ./kafka-topics.sh --describe --zookeeper zk-cs:2181 --topic test
輸出信息如下:
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

(4) 在名稱為test的topic上生產消息
> ./kafka-console-producer.sh --topic test --broker-list localhost:9092
依次輸入如下內容:
1
2
3

3 另起一個窗口,再登錄到另外的一個kafka的pod實例,比如kafka-1,驗證kafka數據消費
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it pod/kafka-1 -n zk-kafka -- bash
> cd /opt/kafka_2.11-0.10.0.1/bin/
> ./kafka-console-consumer.sh --topic test --zookeeper zk-cs:2181 --from-beginning
輸出內容如下:
1
2
3

4 說明
生產者連接kafka集群,消費者及其它連接zookeeper集群或kafka集群(跟kafka的版本有關,版本高的kafka對zookeeper的依賴就小)。
較低版本kafka: --zookeeper <zookeeper集群>:2181
較高版本kafka: --bootstrap-server <kafka集群>:9092

5 知識拾遺

5.1 關於親和性說明
由於我的k8s集群master節點有2台且不允許被pod調度使用,node節點有2台允許被pod調度和使用,不能滿足zookeeper、kafka集群pod的
親和性/反親和性,所以就刪除了這方面的yaml配置,在實際生產中建議加上。
5.2 自定義kafka版本鏡像

  
  
  
          
1 說明
我這里kafka鏡像使用的是官方鏡像(cloudtrackinc/kubernetes-kafka:0.10.0.1 對應版本為kafka_2.11-0.10.0.1),該版本比較老,如果
想使用較新的kafka版本就需要自定義kafka鏡像。
http://kafka.apache.org/downloads.html
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
JDK版本: jdk-8u45-linux-x64.tar.gz

2 Dockerfile文件
# cat Dockerfile
FROM centos:7
LABEL maintainer liuchang

RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

ADD jdk-8u45-linux-x64.tar.gz /usr/local
ADD kafka_2.13-2.8.0.tgz /opt

RUN ln -s /usr/local/jdk1.8.0_45 /usr/local/jdk && \
    ln -s /opt/kafka_2.13-2.8.0 /opt/kafka

ENV JAVA_HOME /usr/local/jdk
ENV CLASSPATH $JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
ENV PATH $JAVA_HOME/bin:/opt/kafka/bin:$PATH
ENV LANG en_US.UTF-8

WORKDIR /opt

CMD ["/bin/bash"]

3 構建kafka鏡像
[root@k8s-master1 kafka-cluster-image]# ls -l
total 238956
-rw-r--r-- 1 root root       464 Feb 24 23:15 Dockerfile
-rw-r--r-- 1 root root 173271626 Mar 14  2021 jdk-8u45-linux-x64.tar.gz
-rw-r--r-- 1 root root  71403603 Jul 25  2021 kafka_2.13-2.8.0.tgz
-rw-r--r-- 1 root root      7048 Feb 24 23:33 kafka-cluster.yml

# docker build -t registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka:2.13-2.8.0 .
說明: 構建完成的鏡像我已經上傳到了阿里個人鏡像庫上。

4 部署到k8s集群中進行測試
(1) 修改kafka-cluster.yml文件
1) 修改image如下
image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka:2.13-2.8.0
2) 修改kafka啟動命令如下
exec kafka-server-start.sh /opt/kafka/config/server.properties

(2) 進行消費者和生產者測試
注意: 由於kafka版本較高,對zookeeper的依賴小,在進行消費者測試時,使用--bootstrap-server參數。
# kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning



5.3 K8S部署Kafka界面管理工具(kafkamanager)

  
  
  
          
1 說明
(1) kafka-manager 是雅虎開源的apache-kafka管理工具,是用Scala編寫的,可以在web頁面進行kafka的相關操作
1) 管理kafka集群
2) 方便集群狀態監控(包括topics, consumers, offsets, brokers, replica distribution, partition distribution)
3) 方便選擇分區副本
4) 配置分區任務,包括選擇使用哪些brokers
5) 可以對分區任務重分配
6) 提供不同的選項來創建及刪除topic
7) Topic list會指明哪些topic被刪除
8) 批量產生分區任務並且和多個topic和brokers關聯
9) 批量運行多個主題對應的多個分區
10) 向已經存在的主題中添加分區
11) 對已經存在的topic修改配置
12) 可以在broker level和topic level的度量中啟用JMX polling功能
13) 可以過濾在ZK上沒有ids/ owners/offsets/ directories的consumer

(2) 開源倉庫地址
https://github.com/yahoo/CMAK

(3) 下載源碼包
https://github.com/yahoo/CMAK/tree/2.0.0.2
https://github.com/yahoo/CMAK/archive/refs/tags/2.0.0.2.tar.gz

要求:
Kafka 0.8.. or 0.9.. or 0.10.. or 0.11..
Java 8+ (我這里使用的jdk版本為jdk-8u45-linux-x64.tar.gz)

2 編譯
參考文檔:
https://www.scala-sbt.org/download.html
https://www.cnblogs.com/coding-farmer/p/12097519.html
由於編譯的依賴包需要FQ,我這里下載別人編譯好的包"kafka-manager-2.0.0.2.zip"3 修改kafka-manager-2.0.0.2.zip配置文件
# unzip kafka-manager-2.0.0.2.zip
# vim kafka-manager-2.0.0.2/conf/application.conf
1) 修改 kafka-manager.zkhosts="kafka-manager-zookeeper:2181" 如下
kafka-manager.zkhosts="zk-0.zk-hs.zk-kafka.svc.cluster.local:2181,zk-1.zk-hs.zk-kafka.svc.cluster.local:2181,zk-2.zk-hs.zk-kafka.svc.cluster.local:2181"
2) 啟用用戶密碼登錄,默認false不啟用
basicAuthentication.enabled=true
3) 修改用戶登錄密碼
basicAuthentication.password="admin@123"

修改完成后重新打包:
# tar -czf kafka-manager-2.0.0.2.tar.gz kafka-manager-2.0.0.2/

4 制作Dockerfile鏡像
(1) Dockerfile文件
# cat Dockerfile
FROM centos:7
LABEL maintainer liuchang

RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

ADD jdk-8u45-linux-x64.tar.gz /usr/local
ADD kafka-manager-2.0.0.2.tar.gz /opt

RUN ln -s /usr/local/jdk1.8.0_45 /usr/local/jdk && \
    ln -s /opt/kafka-manager-2.0.0.2 /opt/kafka-manager

ENV JAVA_HOME /usr/local/jdk
ENV CLASSPATH $JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
ENV PATH $JAVA_HOME/bin:$PATH
ENV LANG en_US.UTF-8

WORKDIR /opt

EXPOSE 9000
CMD ["/opt/kafka-manager/bin/kafka-manager"]

(2) 構建並上傳到阿里雲個人鏡像倉庫
[root@k8s-master1 kafka-manager]# ls -l
total 357576
-rw-r--r-- 1 root root       509 Feb 25 14:39 Dockerfile
-rw-r--r-- 1 root root 173271626 Mar 14  2021 jdk-8u45-linux-x64.tar.gz
-rw-r--r-- 1 root root  96171216 Feb 25 14:30 kafka-manager-2.0.0.2.tar.gz
-rw-r--r-- 1 root root  96701356 Feb 25 13:48 kafka-manager-2.0.0.2.zip
-rw-r--r-- 1 root root      1839 Feb 25 15:00 kafka-manager.yml

# docker build -t registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka-manager:2.0.0.2 .
# docker push registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka-manager:2.0.0.2

5 kafka-manager.yml文件
# cat kafka-manager.yml
apiVersion: v1
kind: Service
metadata:
  name: kafka-manager
  namespace: zk-kafka
  labels:
    app: kafka-manager
spec:
  type: NodePort
  selector:
    app: kafka-manager
  ports:
  - name: http
    port: 9000
    targetPort: 9000
    nodePort: 30096
---
apiVersion: apps/v1
kind: Deployment
metadata:

  name: kafka-manager
  namespace: zk-kafka
spec:
  replicas: 1
  minReadySeconds: 10
  strategy:
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
    type: RollingUpdate
  selector:
    matchLabels:
      app: kafka-manager
  template:
    metadata:
      labels:
        app: kafka-manager
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                      - kafka-manager
              topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 120
      containers:
      - name: kafka-manager
        image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka-manager:2.0.0.2
        imagePullPolicy: Always
        ports:
        - name: cport
          containerPort: 9000
        resources:
          requests:
            cpu: 100m
            memory: 100Mi
          limits:
            cpu: 500m
            memory: 400Mi
        lifecycle:
          postStart:
            exec:
              command: ["/bin/sh","-c","touch /tmp/health"]
        livenessProbe:
          exec:
            command: ["test","-e","/tmp/health"]
          initialDelaySeconds: 5
          timeoutSeconds: 5
          periodSeconds: 10
        readinessProbe:
          tcpSocket:
            port: cport
          initialDelaySeconds: 15
          timeoutSeconds: 5
          periodSeconds: 20

6 應用kafka-manager.yml文件
[root@k8s-master1 kafka-manager]# kubectl apply -f kafka-manager.yml
service/kafka-manager created
deployment.apps/kafka-manager created

[root@k8s-master1 kafka-manager]# kubectl get pod -n zk-kafka
NAME                                            READY   STATUS    RESTARTS   AGE
kafka-0                                         1/1     Running   4          15h
kafka-1                                         1/1     Running   4          15h
kafka-2                                         1/1     Running   3          15h
kafka-manager-7d86bc79c8-v5gfx                  1/1     Running   11         60m
nfs-client-provisioner-kafka-7544b56556-sw5cq   1/1     Running   4          29h
nfs-client-provisioner-zk-85c888b6cf-zslhx      1/1     Running   4          29h
zk-0                                            1/1     Running   4          29h
zk-1                                            1/1     Running   4          29h
zk-2                                            1/1     Running   4          29h

[root@k8s-master1 kafka-manager]# kubectl get svc -n zk-kafka
NAME            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
kafka-cs        ClusterIP   None             <none>        9092/TCP            15h
kafka-manager   NodePort    172.28.196.251   <none>        9000:30096/TCP      61m
zk-cs           ClusterIP   172.28.66.236    <none>        2181/TCP            29h
zk-hs           ClusterIP   None             <none>        2888/TCP,3888/TCP   29h

7 訪問kafka-manager
(1) 使用K8S的nodeport端口訪問kafka-manager
訪問地址: http://<NodeIP>:30096/
用戶名: admin
密碼: admin@123



圖示: image-20220225161042662

(2) Add Cluster

創建: image-20220225161330895

配置: image-20220225161522230

保存后: image-20220225162133319

(3) 查看創建集群的具體信息

圖示: image-20220225162314405

image-20220225162528818



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM