kafka集群並測試其高可用性
介紹
Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
一、KAFKA
體系結構圖:
- Producer: 生產者,也就是發送消息的一方。生產者負責創建消息,通過zookeeper找到broker,然后將其投遞到 Kafka 中。
- Consumer: 消費者,也就是接收消息的一方。通過zookeeper找對應的broker 進行消費,進而進行相應的業務邏輯處理。
- Broker: 服務代理節點。對於 Kafka 而言,Broker 可以簡單地看作一個獨立的 Kafka 服務節點或 Kafka 服務實例。大多數情況下也可以將 Broker 看作一台 Kafka 服務器,前提是這台服務器上只部署了一個 Kafka 實例。一個或多個 Broker 組成了一個 Kafka 集群。一般而言,我們更習慣使用首字母小寫的 broker 來表示服務代理節點
Send消息流程圖:
Kafka多副本(Replica)機制:
如上圖所示,Kafka 集群中有4個 broker,某個主題中有3個分區,且副本因子(即副本個數)也為3,如此每個分區便有1個 leader 副本和2個 follower 副本。生產者和消費者只與 leader 副本進行交互,而 follower 副本只負責消息的同步,很多時候 follower 副本中的消息相對 leader 副本而言會有一定的滯后。
二、Zookeeper
ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
原理:
高可以用架構圖:
圖中每一個Server代表一個安裝Zookeeper服務的服務器。組成 ZooKeeper 服務的服務器都會在內存中維護當前的服務器狀態,並且每台服務器之間都互相保持着通信。集群間通過 Zab 協議(Zookeeper Atomic Broadcast)來保持數據的一致性。
三、部署kafka&zookeeper集群
我們選擇的是官方的chart地址:https://github.com/helm/charts/tree/master/incubator/kafka
1)編寫自己的values.yaml文件(注意我的storageClass是已經做好了的nfs存儲)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
imageTag:
"5.2.2"
resources:
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 1
memory: 2Gi
kafkaHeapOptions:
"-Xmx2G -Xms2G"
persistence:
enabled:
true
storageClass:
"managed-nfs-storage"
size:
"40Gi"
zookeeper:
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 100m
memory: 536Mi
persistence:
enabled:
true
storageClass:
"managed-nfs-storage"
size:
"10Gi"
|
2)安裝kafka
添加chart倉庫:
1
|
$ helm repo add incubator http:
//storage
.googleapis.com
/kubernetes-charts-incubator
|
部署
1
|
$ helm
install
--name kafka -f values.yaml incubator
/kafka
|
最后我們能看到:
四、測試kafka高可用性
1)根據提示創建一個測試客戶端
1
2
3
4
5
6
7
8
9
10
11
12
13
|
apiVersion: v1
kind: Pod
metadata:
name: testclient
namespace: sscp-
test
spec:
containers:
- name: kafka
image: solsson
/kafka
:0.11.0.0
command
:
- sh
- -c
-
"exec tail -f /dev/null"
|
Once you have the testclient pod above running, you can list all kafka
topics with:
1
|
kubectl -n sscp-
test
exec
testclient -- kafka-topics --zookeeper kafka-
test
-zookeeper:2181 --list
|
To create a new topic:
1
|
kubectl -n sscp-
test
exec
testclient -- kafka-topics --zookeeper kafka-
test
-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
|
To listen for messages on a topic:
1
|
kubectl -n sscp-
test
exec
-ti testclient --
for
x
in
{1..1000};
do
echo
$x;
sleep
2;
done
| kafka-console-producer --broker-list kafka-
test
-headless:9092 --topic test1
|
To stop the listener session above press: Ctrl+C
To start an interactive message producer session:
1
|
kubectl -n sscp-
test
exec
-ti testclient -- kafka-console-producer --broker-list kafka-
test
-headless:9092 --topic test1
|
To create a message in the above session, simply type the message and press "enter"
To end the producer session try: Ctrl+C
注意:有三個kafka節點,消息要發三個副本才能保持其高可用!!!
五、測試Zookeeper高可用性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
1.Create a node by
command
below:
“kubectl
exec
-it testclient
bash
-n sscp-
test
”
“zookeeper-shell kafka-
test
-zookeeper-headless:2181 create
/foo
bar”
2. Check zookeeper status
Watch existing members:
$ kubectl run --attach bbox --image=busybox --restart=Never -- sh -c
'while true; do for i in 0 1 2; do echo zk-${i} $(echo stats | nc kafka-zookeeper-${i}.kafka-zookeeper-headless:2181 | grep Mode); sleep 1; done; done'
zk-2 Mode: follower
zk-0 Mode: follower
zk-1 Mode: leader
zk-2 Mode: follower
3.
kill
the leader by
command
below:
“Kubectl delete pod kafka-
test
-zookeeper-1”
4.Check the previously inserted key by
command
below:
““kubectl
exec
-it testclient
bash
-n sscp-
test
”
“zookeeper-shell kafka-
test
-zookeeper-headless:2181 get
/foo
”
|