K8S部署kafka集群,不做后端存儲的情況(消息和消息隊列)-20210207


kafka概述:

    kafka是一個分布式的基於發布/訂閱模式的消息隊列,主要應用於大數據實時處理領域。

消息隊列概述:

    傳統消息隊列的應用場景 (圖片來自尚硅谷嗶哩嗶哩kafka課程文檔)

 

     使用消息隊列的好處:

1. 解耦:允許你獨立的擴展或者修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2. 可恢復性: 系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍任可以在系統恢復后被處理。

3. 緩沖: 有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。

4. 靈活性&&峰值處理能力: 在訪問量劇增的情況下,應用任然需要繼續發揮作用,但是這樣的突發流量並不常見。如果專門為處理這種如法的流量峰值來投入資源隨時待命,那么將是巨大的資源浪費。使用消息隊列能夠使關鍵組件頂住突發的壓力,而不會因為突發的超負荷的請求而完全崩潰。

5. 異步通信: 很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理他們。

    消息隊列的兩種模式:

1. 點對點模式(一對一,消費者主動拉取數據,消息收到后消息清除)

    消息生產者生產消息發送到Queue(隊列)中,然后消息消費者從Queue中取出並且消費信息。消息被消費以后,Queue中不再存儲,所以消息消費者不可能消費到已經被消費的消息。Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。

 

 2. 發布/訂閱模式(一對多,消費者消費數據之后不會清除消息)

     消息生產者(發布)將消息發布到topic(主題)中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。

 Kafka基礎架構

 

1. Producere:  消息生產者,就是向kafka broker 發消息的客戶端。

2.  Consumer: 消息消費者,向kafka broker取消息的客戶端。

3. Consumer group: 消費者組,由多個consumer組成。消費者組內每個消費者負責不同分區的數據,一個分區只能由一個組內消費者消費;消費者組之間相互不會影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。

4. Broker : 一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

5. Topic : 可以理解為一個隊列,一個主題,生產者和消費者面向的都是一個topic。

6. Partition: 為了實現擴展性,一個非常大的topic可以分布到多個broker(kafka服務器)上,一個topic可以分為多個partition,每個partition是一個有序隊列。

7. Replica: 副本,為保證集群中的某個節點發生故障時,該節點上的partirion數據不丟失,並且kafka任然能夠繼續工作,kafka提供了副本機制,一個topic的每個分區都有若干個副本,一個leadder和若干個follower。

8. Leader: 每個分區多個副本的“主”,生產者發送數據的對象,以及消費者消費數據的對象都是leadder。

9. Follower: 每個分區多個副本中的“從”,實時從leader中同步數據,保持和leader數據的同步。leader發生故障時,某個follower會成為新的leader。

 

在K8S集群中部署kafka集群:

集群規划:

master:192.168.11.113

node1:192.168.11.106

node2:192.168.11.116

 

以下操作都在master上操作:

1. 創建一個kafka的名稱空間。

[root@master ~]# kubectl create namespace kafka

  查看創建的名稱空間

[root@master ~]# kubectl get namespace kafka
NAME    STATUS   AGE
kafka   Active   23h

2. 安裝zookeeper集群,kafka基於zookeeper做服務注冊發現。

     2.1 創建zookeeper service 的yaml文件

[root@master ~]# cd yaml/kafka/
[root@master kafka]# vim zookeeper-svc.yaml

apiVersion: v1
kind: Service
metadata:
  name: zoo1
  labels:
    app: zookeeper-1
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-1
---
apiVersion: v1
kind: Service
metadata:
  name: zoo2
  labels:
    app: zookeeper-2
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-2
---
apiVersion: v1
kind: Service
metadata:
  name: zoo3
  labels:
    app: zookeeper-3
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-3

  使用kubectl命令創建zookeeper svc

[root@master kafka]# kubectl apply -f zookeeper-svc.yaml

  查看zookeeper svc

[root@master kafka]# kubectl get svc -n kafka
NAME              TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                      
zoo1              ClusterIP   10.1.125.242   <none>        2181/TCP,2888/TCP,3888/TCP   21h
zoo2              ClusterIP   10.1.211.184   <none>        2181/TCP,2888/TCP,3888/TCP   21h
zoo3              ClusterIP   10.1.85.246    <none>        2181/TCP,2888/TCP,3888/TCP   21h

  2.2 創建zookeeper-deployment.yaml 服務的yaml文件

[root@master kafka]# vim zookeeper-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-1
      name: zookeeper-1
  template:
    metadata:
      labels:
        app: zookeeper-1
        name: zookeeper-1
    spec:
      containers:
      - name: zoo1
        image: zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOO_MY_ID
          value: "1"
        - name: ZOO_SERVERS
          value: "server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper-deployment-2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-2
      name: zookeeper-2
  template:
    metadata:
      labels:
        app: zookeeper-2
        name: zookeeper-2
    spec:
      containers:
      - name: zoo2
        image: zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOO_MY_ID
          value: "2"
        - name: ZOO_SERVERS
          value: "server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper-deployment-3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-3
      name: zookeeper-3
  template:
    metadata:
      labels:
        app: zookeeper-3
        name: zookeeper-3
    spec:
      containers:
      - name: zoo3
        image: zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOO_MY_ID
          value: "3"
        - name: ZOO_SERVERS
          value: "server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181"

  創建zookeeper服務

kubectl apply -f zookeeper-deployment.yaml

  查看zookeeper pod 狀態

[root@master kafka]# kubectl get pod -n kafka
NAME                                      READY   STATUS    RESTARTS   AGE
zookeeper-deployment-1-66cb7dcb-j4zcr     1/1     Running   1          20h
zookeeper-deployment-2-56fc4644c7-q6nx4   1/1     Running   1          20h
zookeeper-deployment-3-7c7789cf98-9m5kn   1/1     Running   1          20h

3. 部署kafka

    3.1 創建kafka svc的yaml文件

vim kafka-svc.yaml

apiVersion: v1
kind: Service
metadata:
  name: kafka-service-1
  labels:
    app: kafka-service-1
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-1
    targetPort: 9092
    nodePort: 30901
    protocol: TCP
  selector:
    app: kafka-service-1
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-2
  labels:
    app: kafka-service-2
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-2
    targetPort: 9092
    nodePort: 30902
    protocol: TCP
  selector:
    app: kafka-service-2
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-3
  labels:
    app: kafka-service-3
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-3
    targetPort: 9092
    nodePort: 30903
    protocol: TCP
  selector:
    app: kafka-service-3

  使用kubectl命令創建kafka svc

[root@master kafka]# kubectl apply -f kafka-svc.yaml

  查看SVC

[root@master kafka]# kubectl get svc -n kafka
NAME              TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                      AGE
kafka-service-1   NodePort    10.1.246.238   <none>        9092:30901/TCP               24h
kafka-service-2   NodePort    10.1.214.60    <none>        9092:30902/TCP               24h
kafka-service-3   NodePort    10.1.252.232   <none>        9092:30903/TCP               24h
zoo1              ClusterIP   10.1.125.242   <none>        2181/TCP,2888/TCP,3888/TCP   24h
zoo2              ClusterIP   10.1.211.184   <none>        2181/TCP,2888/TCP,3888/TCP   24h
zoo3              ClusterIP   10.1.85.246    <none>        2181/TCP,2888/TCP,3888/TCP   24h

  創建 zookeeper-deployment.yaml 文件

[root@master kafka]# vim kafka-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka-service-1
  template:
    metadata:
      labels:
        name: kafka-service-1
        app: kafka-service-1
    spec:
      containers:
      - name: kafka-1
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: 10.1.246.238 
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_CREATE_TOPICS
          value: mytopic:2:1
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://192.168.11.113:30901
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://0.0.0.0:9092
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-2
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka-service-2
  template:
    metadata:
      labels:
        name: kafka-service-2
        app: kafka-service-2
    spec:
      containers:
      - name: kafka-2
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: 10.1.214.60 
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "2"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://192.168.11.113:30902
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://0.0.0.0:9092
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-3
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka-service-3
  template:
    metadata:
      labels:
        name: kafka-service-3
        app: kafka-service-3
    spec:
      containers:
      - name: kafka-3
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: 10.1.252.232 
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "3"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://192.168.11.113:30903
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://0.0.0.0:9092

  說明: 

1.  KAFKA_ADVERTISED_HOST_NAME     對應  kafka 的svc cluster ip地址,可通過: kubectl get svc -n kafka 查詢,例如:

[root@master kafka]# kubectl get svc -n kafka
NAME              TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                      AGE
kafka-service-1   NodePort    10.1.246.238   <none>        9092:30901/TCP               24h
kafka-service-2   NodePort    10.1.214.60    <none>        9092:30902/TCP               24h
kafka-service-3   NodePort    10.1.252.232   <none>        9092:30903/TCP               24h

2. KAFKA_ADVERTISED_LISTENERS  監聽地址,填寫master或者node1或者node2的主機地址加 K8S 所暴露的映射端口(30000 - 32767)

metadata:
  name: kafka-deployment-1
      ………………
          - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://192.168.11.113:30901   
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-2
      ………………
          - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://192.168.11.113:30902
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-3
      ………………
          - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://192.168.11.113:30903
          

  使用kubectl創建kafka

[root@master kafka]# kubectl apply -f kafka-deployment.yaml

  查看 kafka的狀態

[root@master kafka]# kubectl get pod -n kafka
NAME                                      READY   STATUS    RESTARTS   AGE
kafka-deployment-1-6d8d6c9bd4-vj9tf       1/1     Running   1          25h
kafka-deployment-2-6c94d5cf5b-cghvc       1/1     Running   1          25h
kafka-deployment-3-566cf5994b-mz4mh       1/1     Running   1          25h
zookeeper-deployment-1-66cb7dcb-j4zcr     1/1     Running   1          25h
zookeeper-deployment-2-56fc4644c7-q6nx4   1/1     Running   1          25h
zookeeper-deployment-3-7c7789cf98-9m5kn   1/1     Running   1          25h

測試,命令行測試

      你可以進入任意一個pod然后使用命令行進行kafka的操作:

我們測試進入 kafka-deployment-1-6d8d6c9bd4-vj9tf  模擬消息生產者:

[root@master ~]# kubectl exec -it kafka-deployment-1-6d8d6c9bd4-vj9tf -n kafka /bin/bash
bash-4.4#

  進入  kafka-deployment-3-566cf5994b-mz4mh  模擬消息消費者:

[root@master kafka]# kubectl exec -it kafka-deployment-3-566cf5994b-mz4mh -n kafka /bin/bash
bash-4.4#

  進入容器后,進入kafka的命令目錄(生產者和消費者一樣):

bash-4.4# cd /opt/kafka/bin/

  消息生產者新建 test topic,准備發消息:

具體命令格式:

kafka-console-producer.sh --broker-list <kafka-svc1-clusterIP>:9092,<kafka-svc2-clusterIP>:9092,<kafka-svc3-clusterIP>:9092 --topic test

 虛擬機性能不行,我這里只寫一個地址,也可以連接多個地址測試 

bash-4.4# kafka-console-producer.sh --broker-list 10.1.246.238:9092 --topic test
>

  打開消費者容器終端,模擬消費者收消息:

bin/kafka-console-consumer.sh --bootstrap-server <任意kafka-svc-clusterIP>:9092 --topic test --from-beginning

  我還是連接一個地址

bash-4.4# kafka-console-consumer.sh --bootstrap-server 10.1.252.232:9092 --topic test --from-beginning

  然后在消息生產者容器中輸入想要發送的消息

bash-4.4# kafka-console-producer.sh --broker-list 10.1.246.238:9092 --topic test
>野豬套天下第一
>test
>!@#$
>123
>

  發送完成在消息接收者容器里能夠看到生產者發送的消息(在生產者輸入信息,查看消費者是否能夠接收到。如果接收到,說明運行成功。)

bash-4.4# kafka-console-consumer.sh --bootstrap-server 10.1.252.232:9092 --topic test --from-beginning
野豬套天下第一
test
!@#$
123

  最后,還可以執行下面的命令以測試列出所有的消息主題:

kafka-topics.sh --list --zookeeper [zookeeper的service的clusterIP]:2181


bash-4.4# kafka-topics.sh --list --zookeeper zoo1:2181
__consumer_offsets
mytopic
test                    # 我們創建的test

  

常用的命令和目錄:

# 進入容器
kubectl exec -it kafka-deployment-1-xxxxxxxxxxx -n zookeeper /bin/bash cd cd opt/kafka # 查看topics bin/kafka-topics.sh --list --zookeeper <任意zookeeper-svc-clusterIP>:2181
# 手動創建主題 bin/kafka-topics.sh --create --zookeeper <zookeeper-svc1-clusterIP>:2181,<zookeeper-svc2-clusterIP>:2181,<zookeeper-svc3-clusterIP>:2181 --topic test --partitions 3 --replication-factor 1
# 寫(CTRL+D結束寫內容) bin/kafka-console-producer.sh --broker-list <kafka-svc1-clusterIP>:9092,<kafka-svc2-clusterIP>:9092,<kafka-svc3-clusterIP>:9092 --topic test
# 讀(CTRL+C結束讀內容) bin/kafka-console-consumer.sh --bootstrap-server <任意kafka-svc-clusterIP>:9092 --topic test --from-beginning

  

注意: 如果集群內部使用kafka,可以使用cluster ip 或者 K8S內的dns服務名稱加對應的端口

            如果集群外部想使用kafka,可以使用集群內任意一台主機的IP地址 加 映射出去的(30000 - 32767端口)

 

可以配合kuboard來方便的展示集群,例如:

 

 

 

# 至此,我們在K8S集群內部署不帶存儲的Kafka集群完畢,不做持久化。

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM