主機名 | 系統版本 | IP地址 | cpu/內存/磁盤 | 用途 | 軟件版本 |
---|---|---|---|---|---|
k8s_nfs | CentOS7.5 | 172.16.1.60 | 2核/2GB/60GB | zookeeper、kafka的nfs存儲 | nfs-utils-1.3.0-0.68 |
k8s-master1 | CentOS7.5 | 172.16.1.81 | 2核/2GB/60GB | kubernetes master1節點 | k8s v1.20.0 |
k8s-master2 | CentOS7.5 | 172.16.1.82 | 2核/2GB/60GB | kubernetes master2節點 | k8s v1.20.0 |
k8s-node1 | CentOS7.5 | 172.16.1.83 | 4核/8GB/60GB | kubernetes node1節點 | k8s v1.20.0 |
k8s-node2 | CentOS7.5 | 172.16.1.84 | 4核/8GB/60GB | kubernetes node2節點 | k8s v1.20.0 |
補充: kubernetes集群的控制節點我打了污點不能被pod調度使用。
zookeeper鏡像: guglecontainers/kubernetes-zookeeper:1.0-3.4.10 對應版本: zookeeper-3.4.10 kafka鏡像: cloudtrackinc/kubernetes-kafka:0.10.0.1 對應版本: kafka_2.11-0.10.0.1
1 nfs服務部署
節點: k8s_nfs
用途: k8s pod 數據持久化存儲
說明: nfs服務的搭建過程不再贅述
驗證:
[root@k8s_nfs ~]# ls /ifs/kubernetes/
kafka zk
# 注: kafka目錄用於存儲kafka的數據,zk目錄用於存儲zookeeper的數據。
[root@k8s_nfs ~]# showmount -e 172.16.1.60
Export list for 172.16.1.60:
/ifs/kubernetes *
2 nfs-subdir-external-provisioner插件部署
節點: kubernetes集群
用途: 為中間件pod提供pvc自動供給
注意: 在部署前需要在k8s各個節點上部署nfs的客戶端(yum install nfs-utils -y),否則無法部署成功。
項目:
(1) github項目地址: https://github.com/kubernetes-sigs/nfs-subdir-external-provisioner
(2) 下載 deploy 目錄如下文件
class.yaml、deployment.yaml、rbac.yaml
# 可以根據需要修改三個配置文件
配置文件說明:
(1) 創建命名空間
[root@k8s-master1 ~]# kubectl create namespace zk-kafka
(2) 部署文件說明
[root@k8s-master1 zk-kafka-nfs]# ls -l /root/zk-kafka-nfs/
total 20
# kafka集群的storageclass
-rw-r--r-- 1 root root 392 Feb 21 17:09 class_kafka.yaml
# zk集群的storageclass
-rw-r--r-- 1 root root 386 Feb 21 17:08 class_zk.yaml
# kafka集群的nfs-client-provisioner
-rw-r--r-- 1 root root 1355 Feb 21 17:07 deployment_kafka.yaml
# zk集群的nfs-client-provisioner
-rw-r--r-- 1 root root 1325 Feb 21 17:03 deployment_zk.yaml
# nfs的rbac
-rw-r--r-- 1 root root 1905 Feb 21 15:52 rbac.yaml
說明: zk、kafka都在zk-kafka命名空間,共用一套rbac,class、deployment命名不同。
(3) rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: zk-kafka
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nfs-client-provisioner-runner
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: run-nfs-client-provisioner
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: zk-kafka
roleRef:
kind: ClusterRole
name: nfs-client-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: zk-kafka
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: zk-kafka
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: zk-kafka
roleRef:
kind: Role
name: leader-locking-nfs-client-provisioner
apiGroup: rbac.authorization.k8s.io
(4) deployment_zk.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nfs-client-provisioner-zk
labels:
app: nfs-client-provisioner-zk
# replace with namespace where provisioner is deployed
namespace: zk-kafka
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: nfs-client-provisioner-zk
template:
metadata:
labels:
app: nfs-client-provisioner-zk
spec:
serviceAccountName: nfs-client-provisioner
containers:
- name: nfs-client-provisioner-zk
#image: k8s.gcr.io/sig-storage/nfs-subdir-external-provisioner:v4.0.2
image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/nfs-subdir-external-provisione:v4.0.1
volumeMounts:
- name: nfs-client-root
mountPath: /persistentvolumes
env:
- name: PROVISIONER_NAME
value: k8s-sigs.io/nfs-subdir-external-provisioner-zk
- name: NFS_SERVER
value: 172.16.1.60 # 修改為nfs服務器地址
- name: NFS_PATH
value: /ifs/kubernetes/zk # 修改為nfs共享目錄
volumes:
- name: nfs-client-root
nfs:
server: 172.16.1.60 # 修改為nfs服務器地址
path: /ifs/kubernetes/zk # 修改為nfs共享目錄
(5) deployment_kafka.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nfs-client-provisioner-kafka
labels:
app: nfs-client-provisioner-kafka
# replace with namespace where provisioner is deployed
namespace: zk-kafka
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: nfs-client-provisioner-kafka
template:
metadata:
labels:
app: nfs-client-provisioner-kafka
spec:
serviceAccountName: nfs-client-provisioner
containers:
- name: nfs-client-provisioner-kafka
#image: k8s.gcr.io/sig-storage/nfs-subdir-external-provisioner:v4.0.2
image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/nfs-subdir-external-provisione:v4.0.1
volumeMounts:
- name: nfs-client-root
mountPath: /persistentvolumes
env:
- name: PROVISIONER_NAME
value: k8s-sigs.io/nfs-subdir-external-provisioner-kafka
- name: NFS_SERVER
value: 172.16.1.60 # 修改為nfs服務器地址
- name: NFS_PATH
value: /ifs/kubernetes/kafka # 修改為nfs共享目錄
volumes:
- name: nfs-client-root
nfs:
server: 172.16.1.60 # 修改為nfs服務器地址
path: /ifs/kubernetes/kafka # 修改為nfs共享目錄
(6) class_zk.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: managed-nfs-storage-zk # pv 動態供給插件名稱
provisioner: k8s-sigs.io/nfs-subdir-external-provisioner-zk # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:
archiveOnDelete: "true" # 修改刪除pv后,自動創建的nfs共享目錄不被刪除。默認false,即刪除
(7) class_kafka.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: managed-nfs-storage-kafka # pv 動態供給插件名稱
provisioner: k8s-sigs.io/nfs-subdir-external-provisioner-kafka # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:
archiveOnDelete: "true" # 修改刪除pv后,自動創建的nfs共享目錄不被刪除。默認false,即刪除
(8) 部署yaml文件並查看信息
[root@k8s-master1 zk-kafka-nfs]# ls | xargs -i kubectl apply -f {}
[root@k8s-master1 zk-kafka-nfs]# kubectl get deployment,pod,svc -n zk-kafka
[root@k8s-master1 zk-kafka-nfs]# kubectl get sc
2 zookeeper、kafka說明
1 Kafka和zookeeper是兩種典型的有狀態的應用集群服務。首先kafka和zookeeper都需要存儲盤來保存有狀態信息;其次kafka和zookeeper每
一個實例都需要有對應的實例Id(Kafka需broker.id, zookeeper需要my.id)來作為集群內部每個成員的標識,集群內節點之間進行內部通信時需
要用到這些標識。
2 對於這類服務的部署,需要解決兩個大的問題,一個是狀態保存,另一個是集群管理(多服務實例管理)。kubernetes中的StatefulSet方便了有
狀態集群服務的部署和管理。通常來說,通過下面三個手段來實現有狀態集群服務的部署。
(1) 通過Init Container來做集群的初始化工作。
(2) 通過Headless Service來維持集群成員的穩定關系。
(3) 通過Persistent Volume和Persistent Volume Claim提供網絡存儲來持久化數據。
因此,在K8S集群里面部署類似kafka、zookeeper這種有狀態的服務,不能使用Deployment,必須使用StatefulSet來部署,有狀態簡單來說就
是需要持久化數據,比如日志、數據庫數據、服務狀態等。
3 StatefulSet應用場景
(1) 穩定的持久化存儲,即Pod重新調度后還是能訪問到相同的持久化數據,基於PVC來實現。
(2) 穩定的網絡標志,即Pod重新調度后其PodName和HostName不變,基於Headless Service(即沒有Cluster IP的Service)來實現。
(3) 有序部署,有序擴展,即Pod是有順序的,在部署或者擴展的時候要依據定義的順序依次進行(即從0到N-1,在下一個Pod運行之前所有之前的
Pod必須都是Running和Ready狀態),基於init containers來實現有序收縮,有序刪除(即從N-1到0)。
4 StatefulSet組成
用於定義網絡標志(DNS domain)的Headless Service。
用於創建PersistentVolumes的volumeClaimTemplates。
定義具體應用的StatefulSet。
5 StatefulSet中每個Pod的DNS格式
<statefulSetName-{0..N-1}>.<serviceName>.<namespace>.svc.cluster.local
(1) statefulSetName為StatefulSet的名字,0..N-1為Pod所在的序號,從0開始到N-1。
(2) serviceName為Headless Service的名字。
(3) namespace為服務所在的namespace,Headless Servic和StatefulSet必須在相同的namespace。
(4) svc.cluster.local為K8S的Cluster Domain集群根域。
3 zookeeper集群部署
3.1 yaml配置文件
1 參考文檔
https://kubernetes.io/zh/docs/tutorials/stateful-application/zookeeper/
2 zk-cluster.yml
zookeeper有三個端口,2181、3888、2888,三個端口的作用為:
(1) 2181 # 為client端提供服務的端口
(2) 3888 # 選舉leader使用的端口
(3) 2888 # 集群內節點間通訊的端口
[root@k8s-master1 ~]# mkdir -p zk-kafka-cluster
[root@k8s-master1 ~]# cd zk-kafka-cluster/
[root@k8s-master1 zk-kafka-cluster]# cat zk-cluster.yml
apiVersion: v1
kind: Service
metadata:
namespace: zk-kafka
name: zk-hs
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
clusterIP: None
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
namespace: zk-kafka
name: zk-cs
labels:
app: zk
spec:
#type: NodePort
ports:
- port: 2181
targetPort: 2181
name: client
#nodePort: 32181
selector:
app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
namespace: zk-kafka
name: zk-pdb
spec:
selector:
matchLabels:
app: zk
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: zk-kafka
name: zk
spec:
serviceName: zk-hs
replicas: 3
selector:
matchLabels:
app: zk
template:
metadata:
labels:
app: zk
spec:
containers:
- name: kubernetes-zookeeper
imagePullPolicy: Always
image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/kubernetes-zookeeper:1.0-3.4.10
resources:
requests:
memory: "1024Mi"
cpu: "500m"
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
volumeClaimTemplates:
- metadata:
name: datadir
#annotations:
# volume.beta.kubernetes.io/storage-class: "zk-nfs-storage"
spec:
storageClassName: "managed-nfs-storage-zk"
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
3.2 部署
1 應用yml文件
[root@k8s-master1 zk-kafka-cluster]# kubectl apply -f zk-cluster.yml
service/zk-hs created
service/zk-cs created
poddisruptionbudget.policy/zk-pdb created
statefulset.apps/zk created
2 查看zk集群的pod
[root@k8s-master1 zk-kafka-cluster]# kubectl get pod -n zk-kafka -l app=zk
3 查看zk集群的svc
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc,ep -n zk-kafka
4 查看zk集群的pvc、pv
PVC是namespace命名空間級別的,查詢時需要跟"-n 命名空間",PV是集群級別的,查詢時可以不需要跟"-n 命名空間"。
[root@k8s-master1 zk-kafka-cluster]# kubectl get pvc -n zk-kafka
[root@k8s-master1 zk-kafka-cluster]# kubectl get pv
5 查看zk集群的nfs持久化共享目錄
可以發現,NFS的持久化目錄名稱組成為,<namespace名稱>-<PVC名稱>-<PV名稱>,只要PVC和PV不刪除,這個持久化目錄名稱就不會變。
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/zk/
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/zk/*/
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/zk/*/data/
[root@k8s_nfs ~]# cat /ifs/kubernetes/zk/*/data/myid
6 查看zk集群的主從關系
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-0 -n zk-kafka -- zkServer.sh status
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-1 -n zk-kafka -- zkServer.sh status
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-2 -n zk-kafka -- zkServer.sh status
7 查看zk集群的配置文件
[root@k8s-master1 ~]# kubectl exec -it pod/zk-0 -n zk-kafka -- bash
root@zk-0:/# cat /opt/zookeeper/conf/zoo.cfg
3.3 驗證zk集群的連接
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc -n zk-kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
zk-cs ClusterIP 172.28.20.31 <none> 2181/TCP 79m
zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 79m
[root@k8s-master1 zk-kafka-cluster]#
zookeeper集群連接方式:
zk-cs:2181
zk-0.zk-hs.zk-kafka.svc.cluster.local:2181
zk-1.zk-hs.zk-kafka.svc.cluster.local:2181
zk-2.zk-hs.zk-kafka.svc.cluster.local:2181
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it zk-0 -n zk-kafka -- zkCli.sh -server zk-cs:2181
4 kafka集群部署
4.1 yaml配置文件
[root@k8s-master1 zk-kafka-cluster]# cat kafka-cluster.yml apiVersion: v1 kind: Service metadata: namespace: zk-kafka name: kafka-cs labels: app: kafka spec: #type: NodePort clusterIP: None ports: - port: 9092 targetPort: 9092 name: client #nodePort: 32092 selector: app: kafka --- apiVersion: policy/v1beta1 kind: PodDisruptionBudget metadata: name: kafka-pdb namespace: zk-kafka spec: selector: matchLabels: app: kafka minAvailable: 2 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: zk-kafka name: kafka spec: serviceName: kafka-cs replicas: 3 selector: matchLabels: app: kafka template: metadata: labels: app: kafka spec: containers: - name: k8s-kafka imagePullPolicy: IfNotPresent image: cloudtrackinc/kubernetes-kafka:0.10.0.1 ports: - containerPort: 9092 name: client resources: requests: memory: "1024Mi" cpu: "500m" command: - sh - -c - "exec /opt/kafka_2.11-0.10.0.1/bin/kafka-server-start.sh /opt/kafka_2.11-0.10.0.1/config/server.properties --override broker.id=${HOSTNAME##*-} \ --override listeners=PLAINTEXT://:9092 \ --override zookeeper.connect=zk-0.zk-hs.zk-kafka.svc.cluster.local:2181,zk-1.zk-hs.zk-kafka.svc.cluster.local:2181,zk-2.zk-hs.zk-kafka.svc.cluster.local:2181 \ --override log.dirs=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: datadir mountPath: /var/lib/kafka lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: client initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 volumeClaimTemplates: - metadata: name: datadir #annotations: # volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage" spec: storageClassName: "managed-nfs-storage-kafka" accessModes: - ReadWriteMany resources: requests: storage: 10Gi
4.2 部署
1 應用配置文件
[root@k8s-master1 zk-kafka-cluster]# kubectl apply -f kafka-cluster.yml
service/kafka-cs created
poddisruptionbudget.policy/kafka-pdb created
statefulset.apps/kafka created
2 查看kafka集群的pod
[root@k8s-master1 zk-kafka-cluster]# kubectl get pod -n zk-kafka
3 查看kafka集群的pvc
[root@k8s-master1 zk-kafka-cluster]# kubectl get pvc -n zk-kafka
4 查看kafka集群的pv
[root@k8s-master1 zk-kafka-cluster]# kubectl get pv
5 查看kafka集群的svc
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc,ep -n zk-kafka
6 查看kafka集群的nfs持久化存儲目錄
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/kafka/
[root@k8s_nfs ~]# ls -l /ifs/kubernetes/kafka/*/
[root@k8s_nfs ~]# cat /ifs/kubernetes/kafka/*/meta.properties
7 kafka集群內部連接地址
# kubectl run -i --tty --image busybox:1.28.4 dns-test --restart=Never --rm /bin/sh
If you don't see a command prompt, try pressing enter.
/ # nslookup kafka-cs.zk-kafka.svc.cluster.local
Server: 172.28.0.2
Address 1: 172.28.0.2 kube-dns.kube-system.svc.cluster.local
Name: kafka-cs.zk-kafka.svc.cluster.local
Address 1: 172.27.36.90 kafka-2.kafka-cs.zk-kafka.svc.cluster.local
Address 2: 172.27.169.156 kafka-0.kafka-cs.zk-kafka.svc.cluster.local
Address 3: 172.27.169.157 kafka-1.kafka-cs.zk-kafka.svc.cluster.local
/ # exit
pod "dns-test" deleted
[root@k8s-master1 zk-kafka-cluster]#
4.3 測試(驗證kafka集群數據的生產和消費)
1 說明
(1) zookeeper、kafka的svc信息
[root@k8s-master1 zk-kafka-cluster]# kubectl get svc -n zk-kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-cs ClusterIP None <none> 9092/TCP 76m
zk-cs ClusterIP 172.28.66.236 <none> 2181/TCP 107m
zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 107m
(2) 客戶端訪問zookeeper的方式
zk-cs:2181
zk-0.zk-hs.zk-kafka.svc.cluster.local:2181
zk-1.zk-hs.zk-kafka.svc.cluster.local:2181
zk-2.zk-hs.zk-kafka.svc.cluster.local:2181
(3) 客戶端訪問kafka的方式
kafka-0.kafka-cs.zk-kafka.svc.cluster.local:9092
kafka-1.kafka-cs.zk-kafka.svc.cluster.local:9092
kafka-2.kafka-cs.zk-kafka.svc.cluster.local:9092
2 登錄到三個kafka集群中的任意一個kafka的pod容器實例(比如kafka-0節點),進行kafka數據生產
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it pod/kafka-0 -n zk-kafka -- bash
> cd /opt/kafka_2.11-0.10.0.1/bin/
(1) 創建名稱為test的topic
> ./kafka-topics.sh --create \
--topic test \
--zookeeper zk-cs:2181 \
--partitions 3 \
--replication-factor 3
輸出信息如下:
Created topic "test".
(2) 查看topic列表
> ./kafka-topics.sh --list --zookeeper zk-cs:2181
輸出信息如下:
test
(3) 查看名稱為test的topic的描述信息
> ./kafka-topics.sh --describe --zookeeper zk-cs:2181 --topic test
輸出信息如下:
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
(4) 在名稱為test的topic上生產消息
> ./kafka-console-producer.sh --topic test --broker-list localhost:9092
依次輸入如下內容:
1
2
3
3 另起一個窗口,再登錄到另外的一個kafka的pod實例,比如kafka-1,驗證kafka數據消費
[root@k8s-master1 zk-kafka-cluster]# kubectl exec -it pod/kafka-1 -n zk-kafka -- bash
> cd /opt/kafka_2.11-0.10.0.1/bin/
> ./kafka-console-consumer.sh --topic test --zookeeper zk-cs:2181 --from-beginning
輸出內容如下:
1
2
3
4 說明
生產者連接kafka集群,消費者及其它連接zookeeper集群或kafka集群(跟kafka的版本有關,版本高的kafka對zookeeper的依賴就小)。
較低版本kafka: --zookeeper <zookeeper集群>:2181
較高版本kafka: --bootstrap-server <kafka集群>:9092
5 知識拾遺
5.1 關於親和性說明
由於我的k8s集群master節點有2台且不允許被pod調度使用,node節點有2台允許被pod調度和使用,不能滿足zookeeper、kafka集群pod的
親和性/反親和性,所以就刪除了這方面的yaml配置,在實際生產中建議加上。
5.2 自定義kafka版本鏡像
1 說明 我這里kafka鏡像使用的是官方鏡像(cloudtrackinc/kubernetes-kafka:0.10.0.1 對應版本為kafka_2.11-0.10.0.1),該版本比較老,如果 想使用較新的kafka版本就需要自定義kafka鏡像。 http://kafka.apache.org/downloads.html https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz JDK版本: jdk-8u45-linux-x64.tar.gz 2 Dockerfile文件 # cat Dockerfile FROM centos:7 LABEL maintainer liuchang RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime ADD jdk-8u45-linux-x64.tar.gz /usr/local ADD kafka_2.13-2.8.0.tgz /opt RUN ln -s /usr/local/jdk1.8.0_45 /usr/local/jdk && \ ln -s /opt/kafka_2.13-2.8.0 /opt/kafka ENV JAVA_HOME /usr/local/jdk ENV CLASSPATH $JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar ENV PATH $JAVA_HOME/bin:/opt/kafka/bin:$PATH ENV LANG en_US.UTF-8 WORKDIR /opt CMD ["/bin/bash"] 3 構建kafka鏡像 [root@k8s-master1 kafka-cluster-image]# ls -l total 238956 -rw-r--r-- 1 root root 464 Feb 24 23:15 Dockerfile -rw-r--r-- 1 root root 173271626 Mar 14 2021 jdk-8u45-linux-x64.tar.gz -rw-r--r-- 1 root root 71403603 Jul 25 2021 kafka_2.13-2.8.0.tgz -rw-r--r-- 1 root root 7048 Feb 24 23:33 kafka-cluster.yml # docker build -t registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka:2.13-2.8.0 . 說明: 構建完成的鏡像我已經上傳到了阿里個人鏡像庫上。 4 部署到k8s集群中進行測試 (1) 修改kafka-cluster.yml文件 1) 修改image如下 image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka:2.13-2.8.0 2) 修改kafka啟動命令如下 exec kafka-server-start.sh /opt/kafka/config/server.properties (2) 進行消費者和生產者測試 注意: 由於kafka版本較高,對zookeeper的依賴小,在進行消費者測試時,使用--bootstrap-server參數。 # kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning
5.3 K8S部署Kafka界面管理工具(kafkamanager)
1 說明 (1) kafka-manager 是雅虎開源的apache-kafka管理工具,是用Scala編寫的,可以在web頁面進行kafka的相關操作 1) 管理kafka集群 2) 方便集群狀態監控(包括topics, consumers, offsets, brokers, replica distribution, partition distribution) 3) 方便選擇分區副本 4) 配置分區任務,包括選擇使用哪些brokers 5) 可以對分區任務重分配 6) 提供不同的選項來創建及刪除topic 7) Topic list會指明哪些topic被刪除 8) 批量產生分區任務並且和多個topic和brokers關聯 9) 批量運行多個主題對應的多個分區 10) 向已經存在的主題中添加分區 11) 對已經存在的topic修改配置 12) 可以在broker level和topic level的度量中啟用JMX polling功能 13) 可以過濾在ZK上沒有ids/ owners/offsets/ directories的consumer (2) 開源倉庫地址 https://github.com/yahoo/CMAK (3) 下載源碼包 https://github.com/yahoo/CMAK/tree/2.0.0.2 https://github.com/yahoo/CMAK/archive/refs/tags/2.0.0.2.tar.gz 要求: Kafka 0.8.. or 0.9.. or 0.10.. or 0.11.. Java 8+ (我這里使用的jdk版本為jdk-8u45-linux-x64.tar.gz) 2 編譯 參考文檔: https://www.scala-sbt.org/download.html https://www.cnblogs.com/coding-farmer/p/12097519.html 由於編譯的依賴包需要FQ,我這里下載別人編譯好的包"kafka-manager-2.0.0.2.zip"。 3 修改kafka-manager-2.0.0.2.zip配置文件 # unzip kafka-manager-2.0.0.2.zip # vim kafka-manager-2.0.0.2/conf/application.conf 1) 修改 kafka-manager.zkhosts="kafka-manager-zookeeper:2181" 如下 kafka-manager.zkhosts="zk-0.zk-hs.zk-kafka.svc.cluster.local:2181,zk-1.zk-hs.zk-kafka.svc.cluster.local:2181,zk-2.zk-hs.zk-kafka.svc.cluster.local:2181" 2) 啟用用戶密碼登錄,默認false不啟用 basicAuthentication.enabled=true 3) 修改用戶登錄密碼 basicAuthentication.password="admin@123" 修改完成后重新打包: # tar -czf kafka-manager-2.0.0.2.tar.gz kafka-manager-2.0.0.2/ 4 制作Dockerfile鏡像 (1) Dockerfile文件 # cat Dockerfile FROM centos:7 LABEL maintainer liuchang RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime ADD jdk-8u45-linux-x64.tar.gz /usr/local ADD kafka-manager-2.0.0.2.tar.gz /opt RUN ln -s /usr/local/jdk1.8.0_45 /usr/local/jdk && \ ln -s /opt/kafka-manager-2.0.0.2 /opt/kafka-manager ENV JAVA_HOME /usr/local/jdk ENV CLASSPATH $JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar ENV PATH $JAVA_HOME/bin:$PATH ENV LANG en_US.UTF-8 WORKDIR /opt EXPOSE 9000 CMD ["/opt/kafka-manager/bin/kafka-manager"] (2) 構建並上傳到阿里雲個人鏡像倉庫 [root@k8s-master1 kafka-manager]# ls -l total 357576 -rw-r--r-- 1 root root 509 Feb 25 14:39 Dockerfile -rw-r--r-- 1 root root 173271626 Mar 14 2021 jdk-8u45-linux-x64.tar.gz -rw-r--r-- 1 root root 96171216 Feb 25 14:30 kafka-manager-2.0.0.2.tar.gz -rw-r--r-- 1 root root 96701356 Feb 25 13:48 kafka-manager-2.0.0.2.zip -rw-r--r-- 1 root root 1839 Feb 25 15:00 kafka-manager.yml # docker build -t registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka-manager:2.0.0.2 . # docker push registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka-manager:2.0.0.2 5 kafka-manager.yml文件 # cat kafka-manager.yml apiVersion: v1 kind: Service metadata: name: kafka-manager namespace: zk-kafka labels: app: kafka-manager spec: type: NodePort selector: app: kafka-manager ports: - name: http port: 9000 targetPort: 9000 nodePort: 30096 --- apiVersion: apps/v1 kind: Deployment metadata: name: kafka-manager namespace: zk-kafka spec: replicas: 1 minReadySeconds: 10 strategy: rollingUpdate: maxSurge: 1 maxUnavailable: 0 type: RollingUpdate selector: matchLabels: app: kafka-manager template: metadata: labels: app: kafka-manager spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: "app" operator: In values: - kafka-manager topologyKey: "kubernetes.io/hostname" terminationGracePeriodSeconds: 120 containers: - name: kafka-manager image: registry.cn-hangzhou.aliyuncs.com/k8s-image01/kafka-manager:2.0.0.2 imagePullPolicy: Always ports: - name: cport containerPort: 9000 resources: requests: cpu: 100m memory: 100Mi limits: cpu: 500m memory: 400Mi lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: cport initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 6 應用kafka-manager.yml文件 [root@k8s-master1 kafka-manager]# kubectl apply -f kafka-manager.yml service/kafka-manager created deployment.apps/kafka-manager created [root@k8s-master1 kafka-manager]# kubectl get pod -n zk-kafka NAME READY STATUS RESTARTS AGE kafka-0 1/1 Running 4 15h kafka-1 1/1 Running 4 15h kafka-2 1/1 Running 3 15h kafka-manager-7d86bc79c8-v5gfx 1/1 Running 11 60m nfs-client-provisioner-kafka-7544b56556-sw5cq 1/1 Running 4 29h nfs-client-provisioner-zk-85c888b6cf-zslhx 1/1 Running 4 29h zk-0 1/1 Running 4 29h zk-1 1/1 Running 4 29h zk-2 1/1 Running 4 29h [root@k8s-master1 kafka-manager]# kubectl get svc -n zk-kafka NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka-cs ClusterIP None <none> 9092/TCP 15h kafka-manager NodePort 172.28.196.251 <none> 9000:30096/TCP 61m zk-cs ClusterIP 172.28.66.236 <none> 2181/TCP 29h zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 29h 7 訪問kafka-manager (1) 使用K8S的nodeport端口訪問kafka-manager 訪問地址: http://<NodeIP>:30096/ 用戶名: admin 密碼: admin@123
(2) Add Cluster
(3) 查看創建集群的具體信息