大家都知道 Kafka 是一個非常牛逼的消息隊列框架,阿里的 RocketMQ 也是在 Kafka 的基礎上進行改進的。對於初學者來說,一開始面對這么一個龐然大物會不知道怎么入手。那么這篇文章就帶你先了解一下 Kafka 的技術架構,讓你從全局的視野認識 Kafka。了解了 Kafka 的整體架構和消息流程之后,腦海里就會有一個大致的結構,這時候再去學習每個部分就容易得多了。
我們先來看一下 Kafka 的整體架構圖:
Kafka 的架構圖可以分為四個部分:
- Producer Cluster:生產者集群。一般由許多個實際的業務項目組成,其不斷地往 Kafka 集群中寫入數據。
- Kafka Cluster:Kafka 服務器集群。這里就是 Kafka 作為重要的一部分,這里負責接收生產者寫入的數據,並將其持久化到文件里,最終將消息提供給 Consumer Cluster。
- Zookeeper Cluster:Zookeeper 集群。Zookeeper 負責維護整個 Kafka 集群的 Topic 信息、Kafka Controller 等信息。
- Consumer Cluster:消費者集群。與 Producer Cluster 一樣,其一般是由許多個實際的業務項目組成,不斷地從 Kafka Cluster 中讀取數據。
了解了 Kafka 的整體架構,那一個消息是怎么從生產者到 Kafka Server,又是如何從 Kafka Server 到消費者的呢?一般來說,一個消息的流轉可以分為下面幾個階段:
- 服務器啟動階段
- 生產者發送消息階段
- Kafka存儲消息階段
- 消費者拉取消息階段
服務器啟動階段
首先,我們會啟動 Zookeeper 服務器,作為集群管理服務器。接着,啟動 Kafka Server。Kafka Server 會向 Zookeeper 服務器注冊信息,接着啟動線程池監聽客戶端的連接請求。最后,啟動生產者和消費者,連接到 Zookeeper 服務器,從 Zookeeper 服務器獲取到對應的 Kafka Server 信息[1]。
生產者發送消息階段
當需要將消息存入消息隊列中時,生產者根據配置的分片算法,選擇分到哪一個 partition 中。在發送一條消息時,可以指定這條消息的 key,Producer 根據這個 key 和 Partition 機制來判斷應該將這條消息發送到哪個 Parition。
Paritition 機制可以通過指定 Producer 的 paritition.class 這一參數來指定,該 class 必須實現 kafka.producer.Partitioner 接口。如果不實現 Partition 接口,那么會使用默認的分區算法,即根據根據 key 哈希后取余[2]。
隨后生產者與該 Partition Leader 建立聯系,之后將消息發送至該 partition leader。之后生產者會根據設置的 request.required.acks 參數不同,選擇等待或或直接發送下一條消息。
- request.required.acks = 0 表示 Producer 不等待來自 Leader 的 ACK 確認,直接發送下一條消息。在這種情況下,如果 Leader 分片所在服務器發生宕機,那么這些已經發送的數據會丟失。
- request.required.acks = 1 表示 Producer 等待來自 Leader 的 ACK 確認,當收到確認后才發送下一條消息。在這種情況下,消息一定會被寫入到 Leader 服務器,但並不保證 Follow 節點已經同步完成。所以如果在消息已經被寫入 Leader 分片,但是還未同步到 Follower 節點,此時Leader 分片所在服務器宕機了,那么這條消息也就丟失了,無法被消費到。
- request.required.acks = -1 表示 Producer 等待來自 Leader 和所有 Follower 的 ACK 確認之后,才發送下一條消息。在這種情況下,除非 Leader 節點和所有 Follower 節點都宕機了,否則不會發生消息的丟失。
Kafka存儲消息階段
當 Kafka 接收到消息后,其並不直接將消息寫入磁盤,而是先寫入內存中。之后根據生產者設置參數的不同,選擇是否回復 ack 給生產者。之后有一個線程會定期將內存中的數據刷入磁盤,這里有兩個參數控制着這個過程:
# 數據達到多少條就將消息刷到磁盤
#log.flush.interval.messages=10000
# 多久將累積的消息刷到磁盤,任何一個達到指定值就觸發寫入
#log.flush.interval.ms=1000
如果我們設置 log.flush.interval.messages=1,那么每次來一條消息,就會刷一次磁盤。通過這種方式,就可以達到消息絕對不丟失的目的,這種情況我們稱之為同步刷盤。反之,我們稱之為異步刷盤。
於此同時,Kafka 服務器也會進行副本的復制,該 Partition 的 Follower 會從 Leader 節點拉取數據進行保存。然后將數據存儲到 Partition 的 Follower 節點中。
消費者拉取消息階段
在消費者啟動時,其會連接到 zk 注冊節點,之后根據所連接 topic 的 partition 個數和消費者個數,進行 partition 分配。一個 partition 最多只能被一個線程消費,但一個線程可以消費多個 partition。其分配算法如下:
1. 將目標 topic 下的所有 partirtion 排序,存於PT
2. 對某 consumer group 下所有 consumer 排序,存於 CG,第 i 個consumer 記為 Ci
3. N=size(PT)/size(CG),向上取整
4. 解除 Ci 對原來分配的 partition 的消費權(i從0開始)
5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci
我們用例子簡單描述下這個算法的內容:假設我們連接的 topic 有 8 個 partition,此時有 3 個消費線程。那么 partition 的分配過程大致是這樣的:
- 8/3=2.667,向上取整就是3,也就是說每個consumer分配3個分區。
- 那么給第一個消費者分配p0/p1/p2三個分區。
- 給第二個消費者分配p3/p4/p5三個分區。
- 給第三個消費者分配p6/p7兩個分區。
接着消費者連接對應分區的 Kafka Server,並從該分區服務器拉取數據。
總結
這篇文章簡單介紹了 Kafka 框架的技術架構以及消息流轉過程,並介紹了其中的某些細節。通過這篇文章,相信大家對 Kafka 框架應該有個大致的了解。