helm安裝kafka集群並測試其高可用性


介紹

Kafka是由Apache軟件基金會開發的一個開源流處理平台,由ScalaJava編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

一、KAFKA

體系結構圖:

 

  1. Producer 生產者,也就是發送消息的一方。生產者負責創建消息,通過zookeeper找到broker然后將其投遞到 Kafka 中。
  2. Consumer 消費者,也就是接收消息的一方。通過zookeeper找對應的broker 進行消費進而進行相應的業務邏輯處理。
  3. Broker 服務代理節點。對於 Kafka 而言,Broker 可以簡單地看作一個獨立的 Kafka 服務節點或 Kafka 服務實例。大多數情況下也可以將 Broker 看作一台 Kafka 服務器,前提是這台服務器上只部署了一個 Kafka 實例。一個或多個 Broker 組成了一個 Kafka 集群。一般而言,我們更習慣使用首字母小寫的 broker 來表示服務代理節點

Send消息流程圖:

 

Kafka多副本(Replica)機制

 

如上圖所示,Kafka 集群中有4broker,某個主題中有3個分區,且副本因子(即副本個數)也為3,如此每個分區便有1leader 副本和2follower 副本。生產者和消費者只與 leader 副本進行交互,而 follower 副本只負責消息的同步,很多時候 follower 副本中的消息相對 leader 副本而言會有一定的滯后。

二、Zookeeper

ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。

原理:

ZooKeeper是以Fast Paxos算法為基礎的, Paxos 算法存在 活鎖的問題,即當有多個proposer交錯提交時,有可能互相排斥導致沒有一個proposer能提交成功,而Fast Paxos作了一些優化,通過選舉產生一個leader (領導者),只有leader才能提交proposer,具體算法可見Fast Paxos。因此,要想弄懂ZooKeeper首先得對Fast Paxos有所了解。
ZooKeeper的基本運轉流程:
1、選舉Leader。
2、同步數據。
3、選舉Leader過程中算法有很多,但要達到的選舉標准是一致的。
4、Leader要具有最高的執行ID,類似root權限。
5、集群中大多數的機器得到響應並接受選出的Leader。  

高可以用架構圖:

圖中每一個Server代表一個安裝Zookeeper服務的服務器。組成 ZooKeeper 服務的服務器都會在內存中維護當前的服務器狀態,並且每台服務器之間都互相保持着通信。集群間通過 Zab 協議(Zookeeper Atomic Broadcast)來保持數據的一致性。

三、部署kafka&zookeeper集群

我們選擇的是官方的chart地址:https://github.com/helm/charts/tree/master/incubator/kafka

1)編寫自己的values.yaml文件(注意我的storageClass是已經做好了的nfs存儲)

imageTag: "5.2.2"  
resources: 
   limits:
     cpu: 2
     memory: 4Gi
   requests:
     cpu: 1
     memory: 2Gi
kafkaHeapOptions: "-Xmx2G -Xms2G"
persistence:
  enabled: true
  storageClass: "managed-nfs-storage"
  size: "40Gi"
zookeeper:
  resources: 
    limits:
      cpu: 1
      memory: 2Gi
    requests:
      cpu: 100m
      memory: 536Mi
  persistence:
    enabled: true
    storageClass: "managed-nfs-storage"
    size: "10Gi"

 2)安裝kafka

添加chart倉庫:

$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator

 部署

$ helm install --name kafka -f values.yaml incubator/kafka

 最后我們能看到:

四、測試kafka高可用性

1)根據提示創建一個測試客戶端

apiVersion: v1
kind: Pod
metadata:
  name: testclient
  namespace: sscp-test
spec:
  containers:
  - name: kafka
    image: solsson/kafka:0.11.0.0
    command:
      - sh
      - -c
      - "exec tail -f /dev/null"

Once you have the testclient pod above running, you can list all kafka
topics with:

  kubectl -n sscp-test exec testclient -- kafka-topics --zookeeper kafka-test-zookeeper:2181 --list


To create a new topic:

  kubectl -n sscp-test exec testclient -- kafka-topics --zookeeper kafka-test-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1


To listen for messages on a topic:

  kubectl -n sscp-test exec -ti testclient -- for x in {1..1000}; do echo $x; sleep 2; done | kafka-console-producer --broker-list kafka-test-headless:9092 --topic test1

To stop the listener session above press: Ctrl+C

To start an interactive message producer session:

  kubectl -n sscp-test exec -ti testclient -- kafka-console-producer --broker-list kafka-test-headless:9092 --topic test1

To create a message in the above session, simply type the message and press "enter"

To end the producer session try: Ctrl+C

查看topic連接狀態

kafka-consumer-groups --bootstrap-server kafka-sit:9092 --describe --group device-group

泄洪命令

kafka-consumer-groups --bootstrap-server kafka-sit:9092 --group device-group --reset-offsets --to-latest --topic  transport-device-req --execute 


注意:有三個kafka節點,消息要發三個副本才能保持其高可用!!!

五、測試Zookeeper高可用性

1.Create a node by command below:

“kubectl exec -it testclient bash -n sscp-test”

“zookeeper-shell kafka-test-zookeeper-headless:2181 create /foo bar”

2. Check zookeeper status

Watch existing members:
$ kubectl run --attach bbox --image=busybox --restart=Never -- sh -c 'while true; do for i in 0 1 2; do echo zk-${i} $(echo stats | nc kafka-zookeeper-${i}.kafka-zookeeper-headless:2181 | grep Mode); sleep 1; done; done'
zk-2 Mode: follower
zk-0 Mode: follower
zk-1 Mode: leader
zk-2 Mode: follower

3.kill the leader by command below:

“Kubectl delete pod kafka-test-zookeeper-1”

4.Check the previously inserted key by command below:

““kubectl exec -it testclient bash -n sscp-test”

“zookeeper-shell kafka-test-zookeeper-headless:2181  get /foo”

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM