文章來源於本人的印象筆記,如出現格式問題可訪問該鏈接查看原文
原創聲明:作者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94
副本機制
Kafka的副本機制會在多個服務端節點上對每個主題分區的日志進行復制,當集群中的某個節點上出現故障時,訪問故障節點的請求會被轉移到其他正常節點的副本上,副本的單位是主題的分區;
kafka每個主題的每個分區都會有一個主副本(Leader)以及0個或多個備份副本(Follower),主副本負責客戶端的讀和寫,備份副本則負責向主副本拉取數據,以便和主副本的數據同步,當主副本掛掉后,kafka會在備份副本中選擇一個作為主副本,繼續為客戶端提供讀寫服務;

分區的主副本應該均勻的分布在各個服務器上,如上圖所示,通常主題分區的數量要比服務器的數量多很多,所以每個服務器都可以成為一些分區的主副本,也能同時成為一些分區的備份副本;
備份副本始終盡量保持與主副本的數據同步,備份副本的日志文件和主副本的日志總是相同的,它們都有相同的偏移量和相同順序的消息。
容錯處理
分布式系統處理故障容錯時,需要明確的定義節點是否處於存活狀態,kafka對節點的存活定義有如下兩個條件
- 節點必須和ZK保持會話
- 如果這個節點是某個分區的備份副本,他必須對分區主副本的寫操作進行復制,並且復制的進度不能落后太多
滿足上述兩個條件,叫做(in - sync)正在同步中,每個分區的主副本會跟蹤正在同步中的備份副本節點(In Sync Replicas 即ISR ),如果一個備份副本掛掉,沒有響應或者同步進度落戶太多,主副本就會將其從同步副本集合中移除,反之,如果備份副本重新趕上主副本,它就會加入到主副本的同步集合中;(這個主副本的同步集合,包含主副本自己以及主副本保持一定同步的備份副本 則統稱為ISR)
在Kafka 中,一條消息只有被ISR集合的所有副本都運用到本地的日志文件,才會認為消息被成功
提交了, 任何時刻,只要ISR至少有一個副本是存活的, Kafka就可以保證“一條消息一旦被提交,就不會丟失” 只有已經提交的消息才能被消費者消費,因此消費者不用擔心會看到因為主副本失敗而
丟失的消息;
對於生產者所發送的一條消息已經寫入到主副本中,但是此時備份副本還沒來及進行數據copy時,主副本就掛掉的情況,那么此時消息就不算寫入成功,生產者會重新發送該消息,當所發送的一條消息成功的復制到ISR的所有副本后,它們才會被認為是提交的,此時才對消費者是可見的。
服務驗證
驗證kafka的服務副本機制和容錯處理。
驗證kafka的副本機制,只需部署一台測試用kafka即可,驗證kafka的容錯處理,及服務節點down機后的處理方式,則需要最低3個服務搭建kafka集群進行模擬;
單節點驗證
先官網下載一個kafka,也可以在此處網盤直接下載
鏈接:https://pan.baidu.com/s/1_tmQA1AqTgh4T81HuW8OlA
提取碼:zc1q
修改config目錄下server.properties內容的基本信息,此處修改內容如下
broker.id=1
listeners=PLAINTEXT://10.0.3.17:9092
log.dirs=/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp
zookeeper.connect=10.0.3.17:2181
zookeeper.properties內容保持不變,然后啟動kafka默認自帶的zk服務,以及kafka服務即可;
啟動zk
./zookeeper-server-start.sh -daemon /opt/gangtise/kafka/kafka_2.11-2.1.0/config/zookeeper.properties
啟動kafka
./kafka-server-start.sh -daemon /opt/gangtise/kafka/kafka_2.11-2.1.0/config/server.properties
新建一個只有一個副本一個分區的topic主題
[root@dev bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic arnold_test
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "arnold_test".
此時在我們所配置的tmp文件夾下則對應新建了一個相同topic名稱的日志文件,由於我們此時新建
arnold_test topic時只指定創建了一個分區,所以該日志目錄下只創建了一個分區文件夾;索引位為0;
除此之外,還可以看到在我們所產生的分區文件同級別的目錄下,存在如下4個 checkpoint結尾的文件,其中 replication-offset-checkpoint(復制偏移檢查點) 記錄對應的各主題分區的偏移量;recovery-point-offset-checkpoint(恢復點偏移檢查點) 記錄對應的各主題分區的恢復點的偏移量;做個標記,后續給詳細的說明
[root@dev kakfa_tmp]# pwd
/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp
[root@dev kakfa_tmp]# ll
total 16
drwxr-xr-x. 2 root root 141 Nov 25 17:21 arnold_test-0
-rw-r--r--. 1 root root 0 Nov 25 17:12 cleaner-offset-checkpoint
-rw-r--r--. 1 root root 4 Nov 25 17:24 log-start-offset-checkpoint
-rw-r--r--. 1 root root 54 Nov 25 17:12 meta.properties
-rw-r--r--. 1 root root 20 Nov 25 17:24 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root 20 Nov 25 17:24 replication-offset-checkpoint
查看arnold_test-0這個分區下的內容如下,其中 .log 結尾的文件則是二進制的日志文件,該日志文件中所存儲的內容則是我們通過kafka生產者向這個topic中所輸入的消息內容;可以使用 string 命令查看該二進制文件的內容。
[root@dev arnold_test-0]# pwd
/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test-0
[root@dev arnold_test-0]# ll
total 4
-rw-r--r--. 1 root root 10485760 Nov 25 17:21 00000000000000000000.index
-rw-r--r--. 1 root root 0 Nov 25 17:21 00000000000000000000.log
-rw-r--r--. 1 root root 10485756 Nov 25 17:21 00000000000000000000.timeindex
-rw-r--r--. 1 root root 8 Nov 25 17:21 leader-epoch-checkpoint
再新建一個只有一個副本三個分區的topic主題
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic arnold_test_many
查看日志分區文件如下
[root@dev kakfa_tmp]# pwd
/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp
[root@dev kakfa_tmp]# ll
total 16
drwxr-xr-x. 2 root root 141 Nov 25 17:21 arnold_test-0
drwxr-xr-x. 2 root root 141 Nov 25 19:18 arnold_test_many-0
drwxr-xr-x. 2 root root 141 Nov 25 19:18 arnold_test_many-1
drwxr-xr-x. 2 root root 141 Nov 25 19:18 arnold_test_many-2
-rw-r--r--. 1 root root 0 Nov 25 17:12 cleaner-offset-checkpoint
-rw-r--r--. 1 root root 4 Nov 25 19:19 log-start-offset-checkpoint
-rw-r--r--. 1 root root 54 Nov 25 17:12 meta.properties
-rw-r--r--. 1 root root 83 Nov 25 19:19 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root 83 Nov 25 19:20 replication-offset-checkpoint
使用console 生產者,向arnold_test_many topic生產6條數據,可以看到此時我們生成6條content內容,按照我們上一章的說法,由於我們沒有使用key值的方式進行存儲,而是直接存儲的內容,那么此時所產生的6條內容,則會按照輪訓的方式依次插入對應的分區中。
[root@dev bin]# ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_test_many
>message1
>message2
>message3
>message4
>message5
>message6
此時我們直接查看kafka分區偏移量記錄的文件,可以看到arnold_test_many 0,arnold_test_many 1,arnold_test_many 2 所對應的分區 所對應的偏移量都是2,因為我們此時插入了6條數據,輪訓均衡的插入到了3個分區中,所以各個分區中的檢查點偏移量都為2。
[root@dev kakfa_tmp]# pwd
/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp
[root@dev kakfa_tmp]# cat replication-offset-checkpoint
0
4
arnold_test_many 2 2
arnold_test_many 1 2
arnold_test 0 0
arnold_test_many 0 2
此時我們使用strings 直接查看各分區中所存儲的日志內容
[root@dev arnold_test_many-0]# pwd
/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test_many-0
[root@dev arnold_test_many-0]# strings 00000000000000000000.log
message2
message5
[root@dev arnold_test_many-1]# pwd
/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test_many-1
[root@dev arnold_test_many-1]# strings 00000000000000000000.log
message1
message4
[root@dev arnold_test_many-2]# pwd
/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test_many-2
[root@dev arnold_test_many-2]# strings 00000000000000000000.log
message3
message6
如上所示,數據采用輪訓的方式均勻的插入到了各個分區中,整理后結果如下所示。
arnold_test_many1 arnold_test_many0 arnold_test_many2
message1 message2 message3
message4 message5 message6
此時可能你會有如此疑問,為什么數據的輪訓不是從 arnold_test_many0開始輪訓插入的,而是從arnold_test_many1開始的?這個我還沒做嚴格測試,所以不排除輪訓的起始分區的確是隨機的,至少並不是按照分區的下標來確定順序的。不過該topic的分區的順序只要第一次插入數據確定后,后續就都將按照這個順序執行,所以,起始分區的確也不是最重要的問題了。
此時我們使用消費者來消費該topic下的所有分區內的數據,可以看到消費者消費kafka中的數據時,並沒有按照生產者的插入順序來讀取出來;這也驗證了我們上一章的說法,kafka並不保證當前topic的全局消息順序,而只是保證當前該topic下各分區的消息順序,也就是我們看到的 分區arnold_test_many2中的message3消息一定在message6前面,分區 arnold_test_many1中的 message1肯定在message4前面;
[root@dev bin]# ./kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092 --from-beginning --topic arnold_test_many
message3
message6
message1
message4
message2
message5
原創聲明:作者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94
分布式模式驗證
直接將我們kafka的部署包,分別cp到不同的機器上,然后修改對應的配置,啟動即可,主要配置內容如下
10.0.3.17
broker.id=1
listeners=PLAINTEXT://10.0.3.17:9092
log.dirs=/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp
zookeeper.connect=10.0.3.17:2181
10.0.6.39
broker.id=2
listeners=PLAINTEXT://10.0.3.17:9092
log.dirs=/home/gangtise/kafka/kafka_2.11-2.1.0/logs/kafka_tmp
zookeeper.connect=10.0.3.17:2181
10.0.3.18
broker.id=3
listeners=PLAINTEXT://10.0.3.18:9092
log.dirs=/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kafka_tmp
zookeeper.connect=10.0.3.17:2181
啟動對應的kafka節點后,通過zk的圖形化工具查看當前的kafka注冊信息,可以看到對應的kafka節點信息都已經注冊到zk上,此時表明部署完成;(由於此處主要是驗證下kafka集群的特性,所以zk的節點則只部署了一個,測試用即可)

新建Topic
由於我們此時已經有了3個服務器節點,對應的服務器broker.id分別為:1,2,3;
此時我們創建一個新的Topic,指定該Topic為三個副本,三個分區,如下:--replication-factor 3 表示創建對應的副本數為3, --partitions 3 表示創建對應的分區數為3
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic arnold_test_many_three
使用--describe命令查看該topic的詳細信息
[root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three
Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs:
Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: arnold_test_many_three Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
上述查詢結果詳細解釋一下,如下:
第一行含義是:
Topic arnold_test_many_three 一共有三個分區(PartitionCount:3),三個副本(ReplicationFactor:3),對應的配置的具體信息,則是看下方的詳情(Config)
第二行含義是:
Topic arnold_test_many_three 的第0個分區(Partition: 0),對應的主副本Leader節點是3(Leader: 3),該0分區對應的副本分別是在3,1,2節點上存儲(Replicas: 3,1,2),其中當前的ISR信息分別是3,1,2(Isr: 3,1,2);
那么對應的第三行的含義則也是與第二行相似,表示當前的 Partition 1分區的三個副本,分別是在broker.id 為1,2,3的三個服務節點上存儲,其中主副本Leader副本則是broker.id為1的服務器上的副本,則是主副本;當前該分區的同步信息 isr 是1,2,3,表示所有節點均同步正常;
ISR表示正在同步的節點集合,詳細解釋,可以看上述容錯處理中的內容解釋;
手動停止節點
此時我們將broker.id為2的節點服務停止掉,再觀察下當前的副本情況;
直接使用kill -9 強制進程退出的方式;
[gangtise@yinhua-ca000003 arnold_test_many_three-0]$ kill -9 94130
此時再觀察對應的topic的詳細信息
[root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three
Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs:
Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1
Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3
Topic: arnold_test_many_three Partition: 2 Leader: 3 Replicas: 2,3,1 Isr: 3,1
可以看到,分區的主副本已經進行了轉移,原先的Partition 2 分區的,Leader節點由原來的2,變更為了現在的3;
也就是說當前的Partition2分區的Leader節點主副本任務由原來的2服務器節點,變更成了由服務器節點3來接管Partition2分區的主副本任務;
另外則是,所有的Replicas 副節點的信息,是沒有變化的,還是和以前的一摸一樣,畢竟2節點只是down機,修復好以后還是要重新承擔副本的任務的;
雖然每個分區的Replicas沒有變化,但是每個分區的ISR中不再包含 2節點;(也就是說,后續的消息的同步集合中,只需要1,3節點同步完成,則同步完成,便會直接給生產者返回一個消息,OK同步完成;而2節點,等到后續節點修復后,主動copy其他分區的副本內容到自身的節點上,等到進度完全和其它節點保持一致后,則重新加入到ISR同步集合中。)
那么此時,我們再重新啟動下 2節點;
[gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ ./bin/kafka-server-start.sh -daemon /home/gangtise/kafka/kafka_2.11-2.1.0/config/server.properties
然后再重新查看當前Topic的詳細信息,啪一下啊,很快啊,可以看到ISR中的同步集合中,2節點也已經正常回歸;
[root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three
Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs:
Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
Topic: arnold_test_many_three Partition: 2 Leader: 3 Replicas: 2,3,1 Isr: 3,1,2
如上,可以看到,此時的ISR中已經包含了2節點;但是,觀察上面結果可以看到,Partition0分區和Partition2分區的主節點都還是在 3服務器節點上,也就是當前該兩個分區都過於依賴3服務器的主副本節點,而主副本節點又要負責消息的讀寫的任務,所以3服務節點,壓力較大,此時通過執行平衡操作,來解決分區主節點的問題
執行自平衡
[root@dev bin]# ./kafka-preferred-replica-election.sh --zookeeper 10.0.3.17:2181
[root@dev bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic arnold_test_many_three
Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs:
Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
Topic: arnold_test_many_three Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 3,1,2
Partition2的副本集[2,3,1],原有的Leader編號是3,通過執行上述平衡操作后,分區2的主副本節點由原有3,遷移到了節點2。
不過,如果在服務down機后,不手動執行平衡操作的情況下,那么過一段時間,kafka會自動進行一下平衡,只要新上來的服務是穩定的情況下,kafka會自動將主副本平衡到新的節點上;
命令匯總
上述所使用到的幾個命令匯總
啟動zk
./zookeeper-server-start.sh -daemon /opt/gangtise/kafka/kafka_2.11-2.1.0/config/zookeeper.properties
啟動kafka
./kafka-server-start.sh -daemon /opt/gangtise/kafka/kafka_2.11-2.1.0/config/server.properties
創建Topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic arnold_test_many
生產者
./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_test_many
消費者
./kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092 --from-beginning --topic arnold_test_many
查看Topic詳細信息(包含節點的分布情況)
./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three
分區自平衡
./kafka-preferred-replica-election.sh --zookeeper 10.0.3.17:2181
