一、基礎說明
Kafka和zookeeper是兩種典型的有狀態的應用集群服務。首先kafka和zookeeper都需要存儲盤來保存有狀態信息;其次kafka和zookeeper每一個實例都需要有對應的實例Id (Kafka需broker.id, zookeeper需要my.id) 來作為集群內部每個成員的標識,集群內節點之間進行內部通信時需要用到這些標識。
對於這類服務的部署,需要解決兩個大的問題:一個是狀態保存,另一個是集群管理 (多服務實例管理)。kubernetes中提的StatefulSet方便了有狀態集群服務在上的部署和管理。通常來說,通過下面三個手段來實現有狀態集群服務的部署:
- 通過Init Container來做集群的初始化工 作。
- 通過Headless Service來維持集群成員的穩定關系。
- 通過Persistent Volume和Persistent Volume Claim提供網絡存儲來持久化數據。
因此,在K8S集群里面部署類似kafka、zookeeper這種有狀態的服務,不能使用Deployment,必須使用StatefulSet來部署,有狀態簡單來說就是需要持久化數據,比如日志、數據庫數據、服務狀態等。
StatefulSet 應用場景:
- 穩定的持久化存儲,即Pod重新調度后還是能訪問到相同的持久化數據,基於PVC來實現
- 穩定的網絡標志,即Pod重新調度后其PodName和HostName不變,基於Headless Service(即沒有Cluster IP的Service)來實現
- 有序部署,有序擴展,即Pod是有順序的,在部署或者擴展的時候要依據定義的順序依次依次進行(即從0到N-1,在下一個Pod運行之前所有之前的Pod必須都是Running和Ready狀態),基於init containers來實現
- 有序收縮,有序刪除(即從N-1到0)
StatefulSet組成:
- 用於定義網絡標志(DNS domain)的Headless Service
- 用於創建PersistentVolumes的volumeClaimTemplates
- 定義具體應用的StatefulSet
StatefulSet中每個Pod的DNS格式為:
statefulSetName-{0..N-1}.serviceName.namespace.svc.cluster.local,其中:
- statefulSetName為StatefulSet的名字
- 0..N-1為Pod所在的序號,從0開始到N-1
- serviceName為Headless Service的名字
- namespace為服務所在的namespace,Headless Servic和StatefulSet必須在相同的namespace
- svc.cluster.local為K8S的Cluster Domain集群根域
二、部署過程記錄(NAS存儲)
這里使用K8S搭建一個三節點的kafka容器集群,因為kafka集群需要用到存儲,所以需要准備持久卷(Persistent Volume) 簡稱就是PV。
1. 配置StatefulSet的動態持久化存儲
1)使用阿里雲NAS存儲
阿里雲平台創建的NAS存儲地址:1*********-beijing.nas.aliyuncs.com
首先在NAS文件系統里創建zk和kafka的子目錄。
掛載NAS存儲的子目錄必須是真實存在的,否則掛載失敗,報錯不能訪問子目錄。
創建方法:將NAS存儲掛載到任意一個node節點,創建NAS存儲的子目錄,然后再卸載。 [root@k8s-node04 ~]# ls /mnt/ [root@k8s-node04 ~]# mkdir /mnt/test [root@k8s-node04 ~]# mount -t nfs -o vers=3,nolock,proto=tcp,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport 1*********-beijing.nas.aliyuncs.com:/ /mnt/test/ [root@k8s-node04 ~]# cd /mnt/test/ [root@k8s-node04 test]# mkdir kafka [root@k8s-node04 test]# mkdir zk [root@k8s-node04 test]# ll total 2 drwxr-xr-x 2 root root 4096 Jan 24 22:53 kafka drwxr-xr-x 2 root root 4096 Jan 24 22:53 zk [root@k8s-node04 ~]# df -h|grep mnt 1*********-beijing.nas.aliyuncs.com:/ 10P 0 10P 0% /mnt/test [root@k8s-node04 ~]# umount /mnt/test [root@k8s-node04 ~]# rm -rf /mnt/test [root@k8s-node04 ~]# ls /mnt/
2)創建nfs的rbac
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka_zk [root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka_zk [root@k8s-master01 kafka_zk]# vim nfs-rbac.yaml --- apiVersion: v1 kind: ServiceAccount metadata: name: nfs-provisioner namespace: wiseco --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: nfs-provisioner-runner namespace: wiseco rules: - apiGroups: [""] resources: ["persistentvolumes"] verbs: ["get", "list", "watch", "create", "delete"] - apiGroups: [""] resources: ["persistentvolumeclaims"] verbs: ["get", "list", "watch", "update"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["get", "list", "watch"] - apiGroups: [""] resources: ["events"] verbs: ["watch", "create", "update", "patch"] - apiGroups: [""] resources: ["services", "endpoints"] verbs: ["get","create","list", "watch","update"] - apiGroups: ["extensions"] resources: ["podsecuritypolicies"] resourceNames: ["nfs-provisioner"] verbs: ["use"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: run-nfs-provisioner subjects: - kind: ServiceAccount name: nfs-provisioner namespace: wiseco roleRef: kind: ClusterRole name: nfs-provisioner-runner apiGroup: rbac.authorization.k8s.io
創建並查看
[root@k8s-master01 kafka_zk]# kubectl apply -f nfs-rbac.yaml serviceaccount/nfs-provisioner created clusterrole.rbac.authorization.k8s.io/nfs-provisioner-runner created clusterrolebinding.rbac.authorization.k8s.io/run-nfs-provisioner created [root@k8s-master01 kafka_zk]# kubectl get sa -n wiseco|grep nfs nfs-provisioner 1 14s [root@k8s-master01 kafka_zk]# kubectl get clusterrole -n wiseco|grep nfs nfs-provisioner-runner 2021-01-04T13:03:55Z [root@k8s-master01 kafka_zk]# kubectl get clusterrolebinding -n wiseco|grep nfs run-nfs-provisioner ClusterRole/nfs-provisioner-runner 36s
3)創建zk集群的storageclass
[root@k8s-master01 kafka_zk]# mkdir zk [root@k8s-master01 kafka_zk]# cd zk [root@k8s-master01 zk]# pwd /opt/k8s/k8s_project/kafka_zk/zk [root@k8s-master01 zk]# cat zk-nfs-class.yaml apiVersion: storage.k8s.io/v1beta1 kind: StorageClass metadata: name: zk-nfs-storage namespace: wiseco mountOptions: - vers=4.0 - nolock,tcp,noresvport provisioner: zk/nfs reclaimPolicy: Retain
創建並查看
[root@k8s-master01 zk]# kubectl apply -f zk-nfs-class.yaml storageclass.storage.k8s.io/zk-nfs-storage created [root@k8s-master01 zk]# kubectl get sc -n wiseco NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE zk-nfs-storage zk/nfs Retain Immediate false 28s
4)創建kafka集群的storageclass
[root@k8s-master01 zk]# mkdir ../kafka [root@k8s-master01 zk]# cd ../kafka [root@k8s-master01 kafka]# pwd /opt/k8s/k8s_project/kafka_zk/kafka [root@k8s-master01 kafka]# cat kafka-nfs-class.yaml apiVersion: storage.k8s.io/v1beta1 kind: StorageClass metadata: name: kafka-nfs-storage namespace: wiseco mountOptions: - vers=4.0 - nolock,tcp,noresvport provisioner: kafka/nfs reclaimPolicy: Retain
創建並查看
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs-class.yaml storageclass.storage.k8s.io/kafka-nfs-storage created [root@k8s-master01 kafka]# kubectl get sc -n wiseco NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE kafka-nfs-storage kafka/nfs Retain Immediate false 4s zk-nfs-storage zk/nfs Retain Immediate false 12m
5)創建zk集群的nfs-client-provisioner
[root@k8s-master01 zk]# pwd /opt/k8s/k8s-project/kafka_zk/zk [root@k8s-master01 zk]# cat zk-nfs.yml apiVersion: apps/v1 kind: Deployment metadata: name: zk-nfs-client-provisioner namespace: wiseco spec: replicas: 1 selector: matchLabels: app: zk-nfs-client-provisioner strategy: type: Recreate template: metadata: labels: app: zk-nfs-client-provisioner spec: serviceAccount: nfs-provisioner containers: - name: zk-nfs-client-provisioner image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner imagePullPolicy: IfNotPresent volumeMounts: - name: nfs-client-root mountPath: /persistentvolumes env: - name: PROVISIONER_NAME value: zk/nfs - name: NFS_SERVER value: 1*********-beijing.nas.aliyuncs.com - name: NFS_PATH value: /zk volumes: - name: nfs-client-root nfs: server: 1*********-beijing.nas.aliyuncs.com path: /zk
創建並查看
[root@k8s-master01 zk]# kubectl apply -f zk-nfs.yml deployment.apps/zk-nfs-client-provisioner created [root@k8s-master01 zk]# kubectl get pods -n wiseco|grep nfs zk-nfs-client-provisioner-bd8d65798-qrz87 1/1 Running 0 39s
6)創建kafka集群的nfs-client-provisioner
[root@k8s-master01 kafka]# pwd /opt/k8s/k8s-project/kafka_zk/kafka [root@k8s-master01 kafka]# cat kafka-nfs.yml apiVersion: apps/v1 kind: Deployment metadata: name: kafka-nfs-client-provisioner namespace: wiseco spec: replicas: 1 selector: matchLabels: app: kafka-nfs-client-provisioner strategy: type: Recreate template: metadata: labels: app: kafka-nfs-client-provisioner spec: serviceAccount: nfs-provisioner containers: - name: kafka-nfs-client-provisioner image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner imagePullPolicy: IfNotPresent volumeMounts: - name: nfs-client-root mountPath: /persistentvolumes env: - name: PROVISIONER_NAME value: kafka/nfs - name: NFS_SERVER value: 1*********-beijing.nas.aliyuncs.com - name: NFS_PATH value: /kafka volumes: - name: nfs-client-root nfs: server: 1*********-beijing.nas.aliyuncs.com path: /kafka
創建並查看
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs.yml deployment.apps/kafka-nfs-client-provisioner created [root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep nfs kafka-nfs-client-provisioner-6747dc587c-8qjn7 1/1 Running 0 16s zk-nfs-client-provisioner-bd8d65798-cdf6h 1/1 Running 0 47s
2. 創建ZK集群
[root@k8s-master01 zk]# pwd /opt/k8s/k8s_project/kafka_zk/zk [root@k8s-master01 zk]# ls zk-nfs-class.yaml zk-nfs.yml [root@k8s-master01 zk]# vim zk.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: zk-hs labels: app: zk spec: ports: - port: 2888 name: server - port: 3888 name: leader-election clusterIP: None selector: app: zk --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: zk-cs labels: app: zk spec: type: NodePort ports: - port: 2181 targetPort: 2181 name: client nodePort: 32181 selector: app: zk --- apiVersion: policy/v1beta1 kind: PodDisruptionBudget metadata: namespace: wiseco name: zk-pdb spec: selector: matchLabels: app: zk maxUnavailable: 1 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: zok spec: serviceName: zk-hs replicas: 3 selector: matchLabels: app: zk template: metadata: labels: app: zk spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: "app" operator: In values: - zk topologyKey: "kubernetes.io/hostname" containers: - name: kubernetes-zookeeper imagePullPolicy: Always image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10 resources: requests: memory: "1024Mi" cpu: "500m" ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election command: - sh - -c - "start-zookeeper \ --servers=3 \ --data_dir=/var/lib/zookeeper/data \ --data_log_dir=/var/lib/zookeeper/data/log \ --conf_dir=/opt/zookeeper/conf \ --client_port=2181 \ --election_port=3888 \ --server_port=2888 \ --tick_time=2000 \ --init_limit=10 \ --sync_limit=5 \ --heap=512M \ --max_client_cnxns=60 \ --snap_retain_count=3 \ --purge_interval=12 \ --max_session_timeout=40000 \ --min_session_timeout=4000 \ --log_level=INFO" readinessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 livenessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 volumeMounts: - name: datadir mountPath: /var/lib/zookeeper volumeClaimTemplates: - metadata: name: datadir annotations: volume.beta.kubernetes.io/storage-class: "zk-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
創建並查看
[root@k8s-master01 zk]# kubectl apply -f zk.yaml service/zk-hs created service/zk-cs created poddisruptionbudget.policy/zk-pdb created statefulset.apps/zok created
查看創建的zk集群信息
查看zk集群pod [root@k8s-master01 zk]# kubectl get pods -n wiseco|grep zok zok-0 1/1 Running 0 2m1s zok-1 1/1 Running 0 88s zok-2 1/1 Running 0 51s 查看zk集群的svc [root@k8s-master01 zk]# kubectl get svc -n wiseco|grep zk zk-cs NodePort 10.254.223.248 <none> 2181:32181/TCP 2m24s zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 2m25s 查看zk集群的pv和pvc 其中: PV是集群級別的,查詢時可以不需要跟"-n 命名空間" PVC是namespace命名空間級別的,查詢時需要跟"-n 命名空間" [root@k8s-master01 zk]# kubectl get pv -n wiseco|grep zok pvc-2bace45f-4567-4751-a7ee-233c7034bb09 10Gi RWX Delete Bound wiseco/datadir-zok-0 zk-nfs-storage 2m59s pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e 10Gi RWX Delete Bound wiseco/datadir-zok-1 zk-nfs-storage 2m24s pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 10Gi RWX Delete Bound wiseco/datadir-zok-2 zk-nfs-storage 107s [root@k8s-master01 zk]# kubectl get pvc -n wiseco|grep zok datadir-zok-0 Bound pvc-2bace45f-4567-4751-a7ee-233c7034bb09 10Gi RWX zk-nfs-storage 3m2s datadir-zok-1 Bound pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e 10Gi RWX zk-nfs-storage 2m29s datadir-zok-2 Bound pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 10Gi RWX zk-nfs-storage 112s
到NFS服務器端,查看zk集群的持久化共享目錄
可以發現,NFS的持久化目錄名稱組成:namespace名稱-PVC名稱-PV名稱
只要PVC和PC不刪除,這個持久化目錄名稱就不會變。
[root@k8s-harbor01 ~]# cd /data/storage/k8s/zk/ [root@k8s-harbor01 zk]# ll total 0 drwxrwxrwx 3 root root 18 Jan 4 22:22 wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09 drwxrwxrwx 3 root root 18 Jan 4 22:23 wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e drwxrwxrwx 3 root root 18 Jan 4 22:23 wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 [root@k8s-harbor01 zk]# ls * wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09: data wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e: data wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219: data [root@k8s-harbor01 zk]# ls */* wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data: log myid version-2 wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data: log myid version-2 wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data: log myid version-2 [root@k8s-harbor01 zk]# cat wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data/myid 1 [root@k8s-harbor01 zk]# cat wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data/myid 2 [root@k8s-harbor01 zk]# cat wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data/myid 3
查看zk集群主從關系
[root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep zok zok-0 1/1 Running 0 17h zok-1 1/1 Running 0 17h zok-2 1/1 Running 0 17h [root@k8s-master01 kafka]# kubectl exec -ti zok-0 -n wiseco -- zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/bin/../etc/zookeeper/zoo.cfg Mode: follower [root@k8s-master01 kafka]# kubectl exec -ti zok-1 -n wiseco -- zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/bin/../etc/zookeeper/zoo.cfg Mode: leader [root@k8s-master01 kafka]# kubectl exec -ti zok-2 -n wiseco -- zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/bin/../etc/zookeeper/zoo.cfg Mode: follower
驗證zk集群連接
[root@k8s-master01 kafka]# kubectl get svc -n wiseco|grep zk zk-cs NodePort 10.254.223.248 <none> 2181:32181/TCP 17h zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 17h [root@k8s-master01 ~]# kubectl exec -ti zok-1 -n wiseco -- zkCli.sh -server zk-cs:2181 Connecting to zk-cs:2181 2021-01-05 08:16:10,169 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT 2021-01-05 08:16:10,173 [myid:] - INFO [main:Environment@100] - Client environment:host.name=zok-1.zk-hs.wiseco.svc.cluster.local 2021-01-05 08:16:10,173 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_131 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/bin/../build/classes:/usr/bin/../build/lib/*.jar:/usr/bin/../share/zookeeper/zookeeper-3.4.10.jar:/usr/bin/../share/zookeeper/slf4j-log4j12-1.6.1.jar:/usr/bin/../share/zookeeper/slf4j-api-1.6.1.jar:/usr/bin/../share/zookeeper/netty-3.10.5.Final.jar:/usr/bin/../share/zookeeper/log4j-1.2.16.jar:/usr/bin/../share/zookeeper/jline-0.9.94.jar:/usr/bin/../src/java/lib/*.jar:/usr/bin/../etc/zookeeper: 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:os.version=4.4.243-1.el7.elrepo.x86_64 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/ 2021-01-05 08:16:10,177 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=zk-cs:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@22d8cfe0 Welcome to ZooKeeper! JLine support is enabled 2021-01-05 08:16:10,201 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181. Will not attempt to authenticate using SASL (unknown error) 2021-01-05 08:16:10,261 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@876] - Socket connection established to zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, initiating session [zk: zk-cs:2181(CONNECTING) 0] 2021-01-05 08:16:10,316 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, sessionid = 0x376cdc83254000a, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null #到這里,按Enter鍵 [zk: zk-cs:2181(CONNECTED) 0] ls / [zookeeper] [zk: zk-cs:2181(CONNECTED) 1]
3. 創建KAFKA集群
這里要求能在外部訪問到Kafka集群,需要配置三個外網地址,或者一個外網地址+三個端口,依次代理對應后端三個Kafka的Pod容器實例;外網連接Kafka,需要配置使用advertised.listeners監聽器;
- advertised_listeners 監聽器是對外暴露的服務端口,kafka組件之間通訊用的是 listeners;
- advertised_listeners 監聽器會注冊在 zookeeper 中;
- 當我們通過對"<公網ip>:端口"請求建立連接,kafka服務器會通過 zookeeper 中注冊的監聽器,找到定義的公網監聽器(默認監聽器是PLAINTEXT、也可以自定義),然后通過listeners中找到對應的"通訊ip和端口";
1)kafka 2.12-2.5.0版本的Dockerfile鏡像制作
FROM 192.168.10.10/wiseco/jdk1.8.0_192 RUN rm -f /etc/localtime \ && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ && echo "Asia/Shanghai" > /etc/timezone ENV LANG en_US.UTF-8 ENV KAFKA_DATA_DIR /var/lib/kafka/data ENV JAVA_HOME /usr/java/jdk1.8.0_192 ENV KAFKA_HOME /opt/kafka ENV PATH /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/kafka/bin WORKDIR /opt RUN yum install -y wget \ && wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz \ && tar -zvxf kafka_2.12-2.5.0.tgz \ && ln -s /opt/kafka_2.12-2.5.0 /opt/kafka \ && rm -rf kafka_2.12-2.5.0.tgz \ && mkdir -p /var/lib/kafka/data CMD ["/bin/bash"]
制作鏡像並上傳到Harbor參考
# docker build -t 192.168.10.10/wiseco/kafka:v2.12-2.5.0 . # docker push 192.168.10.10/wiseco/kafka:v2.12-2.5.0
2)創建kafka集群
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka/ [root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka/
kafka1.yaml 文件內容:
[root@k8s-master01 kafka]# vim kafka1.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-hs1 labels: app: kafka1 spec: ports: - port: 1099 name: jmx1 clusterIP: None selector: app: kafka1 --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-cs1 labels: app: kafka1 spec: type: NodePort ports: - port: 9092 targetPort: 9092 name: client1 nodePort: 32092 selector: app: kafka1 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: kafoka1 spec: serviceName: kafka-hs1 replicas: 1 selector: matchLabels: app: kafka1 template: metadata: labels: app: kafka1 spec: containers: - name: k8skafka imagePullPolicy: Always image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0 ports: - containerPort: 9092 name: client1 - containerPort: 1099 name: jmx1 command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=0 \ --override advertised.listeners=PLAINTEXT://156.56.46.37:32092 \ --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \ --override log.dirs=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: kafkadatadir mountPath: /var/lib/kafka lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: client1 initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 volumeClaimTemplates: - metadata: name: kafkadatadir annotations: volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
kafka2.yaml 文件內容:
[root@k8s-master01 kafka]# vim kafka2.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-hs2 labels: app: kafka2 spec: ports: - port: 1099 name: jmx2 clusterIP: None selector: app: kafka2 --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-cs2 labels: app: kafka2 spec: type: NodePort ports: - port: 9092 targetPort: 9092 name: client2 nodePort: 32093 selector: app: kafka2 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: kafoka2 spec: serviceName: kafka-hs2 replicas: 1 selector: matchLabels: app: kafka2 template: metadata: labels: app: kafka2 spec: containers: - name: k8skafka imagePullPolicy: Always image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0 ports: - containerPort: 9092 name: client2 - containerPort: 1099 name: jmx2 command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=1 \ --override advertised.listeners=PLAINTEXT://156.56.46.37:32093 \ --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \ --override log.dirs=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: kafkadatadir mountPath: /var/lib/kafka lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: client2 initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 volumeClaimTemplates: - metadata: name: kafkadatadir annotations: volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
kafka3.yaml 文件內容:
[root@k8s-master01 kafka]# vim kafka3.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-hs3 labels: app: kafka3 spec: ports: - port: 1099 name: jmx3 clusterIP: None selector: app: kafka3 --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-cs3 labels: app: kafka3 spec: type: NodePort ports: - port: 9092 targetPort: 9092 name: client3 nodePort: 32094 selector: app: kafka3 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: kafoka3 spec: serviceName: kafka-hs3 replicas: 1 selector: matchLabels: app: kafka3 template: metadata: labels: app: kafka3 spec: containers: - name: k8skafka imagePullPolicy: Always image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0 ports: - containerPort: 9092 name: client3 - containerPort: 1099 name: jmx3 command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=2 \ --override advertised.listeners=PLAINTEXT://156.56.46.37:32094 \ --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \ --override log.dirs=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: kafkadatadir mountPath: /var/lib/kafka lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: client3 initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 volumeClaimTemplates: - metadata: name: kafkadatadir annotations: volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
創建並查看
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka1.yaml [root@k8s-vm01 new_kafka]# kubectl apply -f kafka2.yaml [root@k8s-vm01 new_kafka]# kubectl apply -f kafka3.yaml [root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka kafoka1-0 1/1 Running 0 5d21h kafoka2-0 1/1 Running 0 5d21h kafoka3-0 1/1 Running 0 5d21h [root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka kafka-cs1 NodePort 10.254.66.1 <none> 9092:32092/TCP 5d21h kafka-cs2 NodePort 10.254.177.227 <none> 9092:32093/TCP 5d21h kafka-cs3 NodePort 10.254.136.242 <none> 9092:32094/TCP 5d21h kafka-hs1 ClusterIP None <none> 1099/TCP 5d21h kafka-hs2 ClusterIP None <none> 1099/TCP 5d21h kafka-hs3 ClusterIP None <none> 1099/TCP 5d21h [root@k8s-vm01 new_kafka]# kubectl get statefulset -n wiseco|grep kafoka kafoka1 1/1 5d21h kafoka2 1/1 5d21h kafoka3 1/1 5d21h
查看kafka集群的PV、PVC
[root@k8s-vm01 new_kafka]# kubectl get pv -n wiseco|grep kafoka pvc-17aeea13-4f2a-4f10-8f63-67888c468831 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka1-0 kafka-nfs-storage 6d15h pvc-447d344d-2eaa-4db5-baf8-473ce9811378 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka2-0 kafka-nfs-storage 6d15h pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka3-0 kafka-nfs-storage 6d15h [root@k8s-vm01 new_kafka]# kubectl get pvc -n wiseco|grep kafoka kafkadatadir-kafoka1-0 Bound pvc-17aeea13-4f2a-4f10-8f63-67888c468831 10Gi RWX kafka-nfs-storage 6d15h kafkadatadir-kafoka2-0 Bound pvc-447d344d-2eaa-4db5-baf8-473ce9811378 10Gi RWX kafka-nfs-storage 6d15h kafkadatadir-kafoka3-0 Bound pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b 10Gi RWX kafka-nfs-storage 6d15h
查看NFS服務器上,kafka集群的持久化存儲
[root@k8s_storage ~]# ll /data/storage/kafka/ total 12 drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831 drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378 drwxrwxrwx 52 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b [root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831/meta.properties # #Tue Jan 12 17:16:03 CST 2021 cluster.id=8H9vYTT_RBOiwUKqFRvn0w version=0 broker.id=0 [root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378/meta.properties # #Tue Jan 12 17:17:11 CST 2021 cluster.id=8H9vYTT_RBOiwUKqFRvn0w version=0 broker.id=1 [root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b/meta.properties # #Tue Jan 12 17:17:29 CST 2021 cluster.id=8H9vYTT_RBOiwUKqFRvn0w version=0 broker.id=2
3)驗證kafka集群數據的生產和消費
[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka kafka-cs1 NodePort 10.254.66.1 <none> 9092:32092/TCP 5d21h kafka-cs2 NodePort 10.254.177.227 <none> 9092:32093/TCP 5d21h kafka-cs3 NodePort 10.254.136.242 <none> 9092:32094/TCP 5d21h kafka-hs1 ClusterIP None <none> 1099/TCP 5d21h kafka-hs2 ClusterIP None <none> 1099/TCP 5d21h kafka-hs3 ClusterIP None <none> 1099/TCP 5d21h [root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep zk zk-cs NodePort 10.254.209.162 <none> 2181:32181/TCP 6d16h zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 6d16h
登錄到三個kafka集群中的任意一個kafka的pod容器實例(比如kafka2節點),進行kafka數據生產:
[root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka kafoka1-0 1/1 Running 0 5d21h kafoka2-0 1/1 Running 0 5d21h kafoka3-0 1/1 Running 0 5d21h [root@k8s-vm01 new_kafka]# kubectl exec -ti kafoka2-0 -n wiseco -- /bin/bash [root@kafoka2-0 opt]# 查看topics [root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181 創建topics,比如名稱為wisecotest [root@kafoka2-0 opt]# kafka-topics.sh --create --topic wisecotest --zookeeper zk-cs:2181 --partitions 1 --replication-factor 1 Created topic wisecotest. 查看topics [root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181 wisecotest 往上面創建的名稱為wisecotest的topics里生產數據 [root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest >test1 >test2 >test3 >
再登錄到另外的一個kafka的pod實例,比如kafka3,驗證kafka的數據消費
發現上面生產的kafka數據,已經可以消費了 [root@k8s-vm01 ~]# kubectl exec -ti kafoka3-0 -n wiseco -- /bin/bash [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 再在kafka2的容器里生產數據 [root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest >test1 >test2 >test3 >hahahaha >heiheihei > 再到kafka3的容器里查看數據消費 [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 使用一個外網地址也可以消費 [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 使用kafka內網地址,在K8S集群內部也可以正常消費kafka數據 [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs2:9092 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 下面告警表示:在當前kafoka3的pod內容使用自己的service地址訪問不可達!如果在別的非kafka實例pod內容使用下面地址就沒有這個告警了! 因為K8S規定:在Pod內容不可使用Pod自己的service地址往自己連接! [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9093 --topic wisecotest --from-beginning [2021-01-18 14:47:33,820] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Connection to node -3 (kafka-cs3/10.254.136.242:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2021-01-18 14:47:33,821] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Bootstrap broker kafka-cs3:9093 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) test1 test2 test3 hahahaha heiheihei
現在驗證在K8S集群外部,通過外網地址連接和消費Kafka數據
登錄到安裝kafka客戶端的一台外部服務器上:
[root@dev-env ~]# cd /usr/kafka/kafka_2.11-2.1.0/bin/ [root@dev-env bin]# 如下發現,在K8S集群外部可以通過外網地址連接和消費內部生產的Kafka數據了 [root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 在kafka的pod內部再生產數據 [root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest >test1 >test2 >test3 >hahahaha >heiheihei >123456789 >abcdefghijk > 在外部查看,可以正常消費這些kafka數據 [root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 123456789 abcdefghijk
kafka集群在內部的連接地址:
kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9092
kafka集群在外部的連接地址:
156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094