什么是消息隊列?
簡單來說,消息隊列是存放消息的容器。客戶端可以將消息發送到消息服務器,也可以從消息服務器獲取消息。
問題導讀:
- 為什么需要消息系統?
- kafka架構?
- kafka如何存儲消息?
- Producer如何發送消息?
- Consumer如何消費消息?
- Offset如何保存?
- 如何保證消息不被重復消費?
- 如何保證消息的可靠性傳輸?
- 如何保證消息的順序性?
為什么需要消息系統?
削峰
數據庫的處理能力是有限的,在峰值期,過多的請求落到后台,一旦超過系統的處理能力,可能會使系統掛掉。

如上圖所是,系統的處理能力是2k/s,MQ處理能力是8k/s,峰值請求5k/s,MQ的處理能力遠遠大於數據庫,在高峰期,請求可以先積壓在MQ中,系統可以根據自身的處理能力以2k/s的速度消費這些請求。這樣等高峰期一過,請求可能只有100/s,系統可以很快的消費掉積壓在MQ中的請求。
注意,上面的請求指的是寫請求,查詢請求一般通過緩存解決。
解耦
如下場景,S系統與A、B、C系統緊密耦合。由於需求變動,A系統修改了相關代碼,S系統也需要調整A相關的代碼;過幾天,C系統需要刪除,S緊跟着刪除C相關代碼;又過了幾天,需要新增D系統,S系統又要添加與D相關的代碼;再過幾天,程序猿瘋了...

這樣各個系統緊密耦合,不利於維護,也不利於擴展。現在引入MQ,A系統變動,A自己修改自己的代碼即可;C系統刪除,直接取消訂閱;D系統新增,訂閱相關消息即可。

這樣通過引入消息中間件,使各個系統都與MQ交互,從而避免它們之間的錯綜復雜的調用關系。
Kafka架構

相關概念
1. broker
kafka 集群中包含的服務器。
2. producer
消息生產者。
3. consumer
消息消費者
4. consumer group
每個 consumer 都屬於一個 consumer group,每條消息只能被 consumer group 中的一個 consumer 消費,但可以被多個 consumer group 消費。
5. topic
消息的類別。每條消息都屬於某個topic,不同的topic之間是相互獨立的,即kafka是面向topic的。
6. partition
每個topic分為多個partition,partition是kafka分配的單位。kafka物理上的概念,相當於一個目錄,目錄下的日志文件構成這個partition。
7. replica
partition的副本,保障 partition 的高可用。
8. leader
replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
9. follower
replica 中的一個角色,從 leader 中復制數據。
10. controller
kafka 集群中的其中一個服務器,用來進行 leader election 以及 各種 failover。
12. zookeeper
kafka 通過 zookeeper 來存儲集群的 meta 信息。
Topic and Logs
Message是按照topic來組織的,每個topic可以分成多個partition(對應server.properties/num.partitions)。partition是一個順序的追加日志,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 kafka 吞吐率)。其結構如下
server.properties/num.partitions 表示文件 server.properties 中的 num.partitions 配置項,下同

partition中的每條記錄(message)包含三個屬性:offset, messageSize和data。其中offset表示消息偏移量;messageSize表示消息的大小;data表示消息的具體內容。
partition是以文件的形式存儲在文件系統中,位置由server.properties/log.dirs指定,其命名規則為<topic_name>-<partition_id>。
比如,topic為"page_visits"的消息,分為5個partition,其目錄結構為:

partition可能位於不同的broker上
partition是分段的,每個段是一個segment文件。segment的常用配置有:
#server.properties
#segment文件的大小,默認為 1G
log.segment.bytes=1024*1024*1024
#滾動生成新的segment文件的最大時長
log.roll.hours=24*7
#segment文件保留的最大時長,超時將被刪除
log.retention.hours=24*7
partition目錄下包括了數據文件和索引文件,下圖是某個partition的目錄結構:

index采用稀疏存儲的方式,它不會為每一條message都建立索引,而是每隔一定的字節數建立一條索引,避免索引文件占用過多的空間。缺點是沒有建立索引的offset不能一次定位到message的位置,需要做一次順序掃描,但是掃描的范圍很小。
索引包含兩個部分(均為4個字節的數字),分別為相對offset和position。相對offset表示segment文件中的offset,position表示message在數據文件中的位置。
總結:Kafka的Message存儲采用了分區(partition),磁盤順序讀寫,分段(LogSegment)和稀疏索引這幾個手段來達到高效性
Partition and Replica
一個topic物理上分為多個partition,位於不同的broker上。如果沒有 replica,一旦broker宕機,其上所有的patition將不可用。
每個partition可以有多個replica(對應server.properties/default.replication.factor),分配到不同的broker上,其中有一個leader負責讀寫,處理來自producer和consumer的請求;其它作為follower從leader pull消息,保持與leader的同步。
如何分配partition和replica到broker上
- 將所有Broker(假設共n個Broker)和待分配的Partition排序
- 將第i個Partition分配到第(i mod n)個Broker上
- 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
根據上面的分配規則,若replica的數量大於broker的數量,必定會有兩個相同的replica分配到同一個broker上,產生冗余。因此replica的數量應該小於或等於broker的數量。
leader選舉
kafka 在 zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)動態維護了一個 ISR(in-sync replicas),ISR 里面的所有 replica 都"跟上"了 leader,controller將會從ISR里選一個做leader。具體流程如下:
1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點注冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch
2. controller 從 /brokers/ids 節點讀取可用broker
3. controller決定set_p,該集合包含宕機 broker 上的所有 partition
4. 對 set_p 中的每一個 partition
4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR
4.2 決定新 leader
4.3 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節點
5. 通過 RPC 向相關 broker 發送 leaderAndISRRequest 命令
當ISR為空時,會選一個 replica(不一定是 ISR 成員)作為leader;當所有的 replica 都歇菜了,會等任意一個 replica 復活,將其作為leader。
ISR(同步列表)中的follower都"跟上"了leader,"跟上"並不表示完全一致,它由 server.properties/replica.lag.time.max.ms 配置,表示leader等待follower同步消息的最大時間,如果超時,leader將follower移除ISR。
配置項 replica.lag.max.messages 已經移除
replica同步
kafka通過"拉模式"同步消息,即follower從leader批量拉取數據來同步。具體的可靠性,是由生產者(根據配置項producer.properties/acks)來決定的。
In Kafka 0.9, request.required.acks=-1 which configration of producer is replaced by acks=all, but this old config is remained in docs.(在0.9版本,生產者配置項 request.required.acks=-1 被 acks=all 取代,但是老的配置項還保留在文檔中。ps: 最新的文檔2.2.x request.required.acks 已經不存在了)
| acks | description |
|---|---|
| 0 | producer發送消息后直接返回,不會等待服務器確認 |
| 1 | 服務器將記錄寫進本地log后返回,不會等待follower同步消息。leader宕機后可能丟失一部分未同步的消息 |
| -1/all | 服務器將記錄寫進本地log后,等待所有ISR內的消息同步后返回。除非leader和所有的ISR都掛掉,否則消息不會丟失 |
在acks=-1的時候,如果ISR少於min.insync.replicas指定的數目,將會拋出NotEnoughReplicas或NotEnoughReplicasAfterAppend異常。
Prodecer如何發送消息
Producer首先將消息封裝進一個ProducerRecord實例中。

消息路由
- 發送消息時如果指定了partition,則直接使用;
- 如果指定了key,則對key進行哈希,選出一個partition。這個hash(即分區機制)由producer.properties/partitioner.class指定的類實現,這個路由類需要實現Partitioner接口;
- 如果都未指定,通過round-robin來選partition。
消息並不會立即發送,而是先進行序列化后,發送給Partitioner,也就是上面提到的hash函數,由Partitioner確定目標分區后,發送到一塊內存緩沖區中(發送隊列)。Producer的另一個工作線程(即Sender線程),則負責實時地從該緩沖區中提取出准備好的消息封裝到一個批次內,統一發送到對應的broker中。其過程大致是這樣的:

圖片來自123archu
Consumer
每個Consumer都划歸到一個邏輯Consumer Group中,一個partition只能被同一個Consumer Group中的一個Consumer消費,但可以被不同的Consumer Group消費。
若 topic 的 partition 數量為 p,Consumer Group 中訂閱此 topic 的 consumer 數量為 c; 則:
p < c: 會有 c - p 個 consumer閑置,造成浪費
p > c: 一個 consumer 對應多個 partition
p = c: 一個 consumer 對應一個 partition
應該合理分配consumer和partition的數量,避免造成資源傾斜,最好partiton數目是consumer數目的整數倍。
如何將Partition分配給Consumer
生產過程中broker要分配partition,消費過程這里,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用於分配partition。
當 partition 或 consumer 數量發生變化時,比如 增加 consumer, 減少 consumer(主動或被動),增加 partition,都會進行 rebalance。
其過程如下:
-
consumer 給 coordinator 發送 JoinGroupRequest 請求。這時其他consumer 發 heartbeat 請求過來時,coordinator 會告訴他們,要 rebalance了。其他 consumer 也發送 JoinGroupRequest 請求。
-
coordinator在consumer中選出一個leader,其他作為 follower,通知給各個 consumer,對於leader,還會把 follower 的 metadata 帶給它。
-
consumer leader 根據 consumer metadata 重新分配 partition
-
consumer向coordinator發送SyncGroupRequest,其中leader的SyncGroupRequest會包含分配的情況。coordinator回包,把分配的情況告訴consumer,包括leader。
Consumer Fetch Message
Consumer 采用"拉模式"消費消息,這樣 consumer 可以自行決定消費的行為。
Consumer 調用 poll(duration) 從服務器拉取消息。拉取消息的具體行為由下面的配置項決定:
#consumer.properties
#消費者最多 poll 多少個 record
max.poll.records=500
#消費者 poll 時 partition 返回的最大數據量
max.partition.fetch.bytes=1048576
#Consumer 最大 poll 間隔
#超過此值服務器會認為此 consumer failed
#並將此 consumer 踢出對應的 consumer group
max.poll.interval.ms=300000
在 partition 中,每個消息都有一個 offset。新消息會被寫到 partition 末尾(最新的一個 segment 文件末尾), 每個 partition 上的消息是順序消費的,不同的 partition 之間消息的消費順序是不確定的。
若一個 consumer 消費多個 partition, 則各個 partition 之前消費順序是不確定的,但在每個 partition 上是順序消費。
若來自不同 consumer group 的多個 consumer 消費同一個 partition,則各個 consumer 之間的消費互不影響,每個 Consumer 都會有自己的 offset。

Consumer A 和 Consumer B 屬於不同的 Consumer Group。Cosumer A 讀取到 offset = 9, Consumer B 讀取到 offset = 11,這個值表示下次讀取的位置。也就是說 Consumer A 已經讀取了 offset 為 0 ~ 8 的消息,Consumer B 已經讀取了 offset 為 0 ~ 10 的消息。
下次從 offset = 9 開始讀取的 Consumer 並不一定還是 Consumer A 因為可能發生 rebalance
offset的保存
Consumer 消費 partition 時,需要保存 offset 記錄當前消費位置。
offset 可以選擇自動提交或調用 Consumer 的 commitSync() 或 commitAsync() 手動提交,相關配置為:
#是否自動提交 offset
enable.auto.commit=true
#自動提交間隔。 enable.auto.commit=true 時有效
auto.commit.interval.ms=5000
offset 保存在名叫 __consumeroffsets 的 topic 中。寫消息的 key 由 groupid、topic、partition 組成,value 是 offset。
一般情況下,每個 key 的 offset 都是緩存在內存中,查詢的時候不用遍歷partition,如果沒有緩存,第一次就會遍歷 partition 建立緩存,然后查詢返回。
__consumeroffsets 的 partition 數量由下面的 server 配置決定:
offsets.topic.num.partitions=50
offset 保存在哪個分區上,即 __consumeroffsets 的分區機制,可以表示為:
groupId.hashCode() mode groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 是上面配置的分區數。
因為一個 partition 只能被同一個 Consumer Group 的一個 consumer 消費,因此可以用 groupId 表示此 consumer 消費 offeset 所在分區
消息系統可能遇到那些問題
kafka支持3種消息投遞語義
- at most once:最多一次,消息可能會丟失,但不會重復
獲取數據 -> commit offset -> 業務處理 - at least once:最少一次,消息不會丟失,可能會重復
獲取數據 -> 業務處理 -> commit offset。 - exactly once:只且一次,消息不丟失不重復,只且消費一次(0.11中實現,僅限於下游也是kafka)
如何保證消息不被重復消費?(消息的冪等性)
對於更新操作,天然具有冪等性。
對於新增操作,可以給每條消息一個唯一的id,處理前判斷是否被處理過。這個id可以存儲在 Redis 中,如果是寫數據庫可以用主鍵約束。
如何保證消息的可靠性傳輸?(消息丟失的問題)
根據kafka架構,有三個地方可能丟失消息:Consumer,Producer和 Server
消費端弄丟了數據
當 server.properties/enable.auto.commit 設置為 true 的時候,kafka 會先 commit offset 再處理消息,如果這時候出現異常,這條消息就丟失了。
因此可以關閉自動提交 offset,在處理完成后手動提交 offset,這樣可以保證消息不丟失;但是如果提交 offset 失敗,可能導致重復消費的問題, 這時保證冪等性即可。
Kafka弄丟了消息
如果某個 broker 不小心掛了,此時若 replica 只有一個,broker 上的消息就丟失了;若 replica > 1 ,給 leader 重新選一個 follower 作為新的 leader, 如果 follower 還有些消息沒有同步,這部分消息便丟失了。
可以進行如下配置,避免上面的問題:
- 給 topic 設置 replication.factor 參數:這個值必須大於 1,要求每個 partition 必須有至少 2 個副本。
- 在 Kafka 服務端設置 min.insync.replicas 參數:這個值必須大於 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯系,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。
- 在 producer 端設置 acks=all:這個是要求每條數據,必須是寫入所有 replica 之后,才能認為是寫成功了。
- 在 producer 端設置 retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
Producer弄丟了消息
在 producer 端設置 acks=all,保證所有的ISR都同步了消息才認為寫入成功。
如何保證消息的順序性?
kafka 中 partition 上的消息是順序的,可以將需要順序消費的消息發送到同一個 partition 上,用單個 consumer 消費。
上面是學習kafka時總結的,如有錯誤或不合理的地方,歡迎指正!
參考:
1: kafka學習筆記:知識點整理
2: advanced-java
3: Kafka的Log存儲解析
4: kafka生產者Producer參數設置及參數調優建議-商業環境實戰系列
5: 震驚了!原來這才是kafka!
6: kafka configuration
7: kafka 2.3.0 API
8: kafka consumer 配置詳解和提交方式
