“嚴格的順序消費”有多么困難
下面就從3個方面來分析一下,對於一個消息中間件來說,”嚴格的順序消費”有多么困難,或者說不可能。
發送端
發送端不能異步發送,異步發送在發送失敗的情況下,就沒辦法保證消息順序。
比如你連續發了1,2,3。 過了一會,返回結果1失敗,2, 3成功。你把1再重新發送1遍,這個時候順序就亂掉了。
存儲端
對於存儲端,要保證消息順序,會有以下幾個問題:
(1)消息不能分區。也就是1個topic,只能有1個隊列。在Kafka中,它叫做partition;在RocketMQ中,它叫做queue。 如果你有多個隊列,那同1個topic的消息,會分散到多個分區里面,自然不能保證順序。
(2)即使只有1個隊列的情況下,會有第2個問題。該機器掛了之后,能否切換到其他機器?也就是高可用問題。
比如你當前的機器掛了,上面還有消息沒有消費完。此時切換到其他機器,可用性保證了。但消息順序就亂掉了。
要想保證,一方面要同步復制,不能異步復制;另1方面得保證,切機器之前,掛掉的機器上面,所有消息必須消費完了,不能有殘留。很明顯,這個很難!!!
接收端
對於接收端,不能並行消費,也即不能開多線程或者多個客戶端消費同1個隊列。
總結
從上面的分析可以看出,要保證消息的嚴格有序,有多么困難!
發送端和接收端的問題,還好解決一點,限制異步發送,限制並行消費。但對於存儲端,機器掛了之后,切換的問題,就很難解決了。
你切換了,可能消息就會亂;你不切換,那就暫時不可用。這2者之間,就需要權衡了。
業務需要全局有序嗎?
通過上面分析可以看出,要保證一個topic內部,消息嚴格的有序,是很困難的,或者說條件是很苛刻的。
那怎么辦呢?我們一定要使出所有力氣、用盡所有辦法,來保證消息的嚴格有序嗎?
這里就需要從另外一個角度去考慮這個問題:業務角度。正如在下面這篇博客中所說的:
http://www.jianshu.com/p/453c6e7ff81c
實際情況中:
(1)不關注順序的業務大量存在;
(2) 隊列無序不代表消息無序。
第(2)條的意思是說:我們不保證隊列的全局有序,但可以保證消息的局部有序。
舉個例子:保證來自同1個order id的消息,是有序的!
下面就看一下在Kafka和RocketMQ中,分別是如何對待這個問題的:
Kafka中:發送1條消息的時候,可以指定(topic, partition, key) 3個參數。partiton和key是可選的。
如果你指定了partition,那就是所有消息發往同1個partition,就是有序的。並且在消費端,Kafka保證,1個partition只能被1個consumer消費。
或者你指定key(比如order id),具有同1個key的所有消息,會發往同1個partition。也是有序的。
RocketMQ: RocketMQ在Kafka的基礎上,把這個限制更放寬了一步。只指定(topic, key),不指定具體發往哪個隊列。也就是說,它更加不希望業務方,非要去要一個全局的嚴格有序。
Apache Kafka官方保證了partition內部的數據有效性(追加寫、offset讀);為了提高Topic的並發吞吐能力,可以提高Topic的partition數,並通過設置partition的replica來保證數據高可靠;
但是在多個Partition時,不能保證Topic級別的數據有序性。
因此,如果你們就像死磕kafka,但是對數據有序性有嚴格要求,那我建議:
- 創建Topic只指定1個partition,這樣的壞處就是磨滅了kafka最優秀的特性。
所以可以思考下是不是技術選型有問題, kafka本身適合與流式大數據量,要求高吞吐,對數據有序性要求不嚴格的場景。
再談談數據一致性保證:
一致性定義:若某條消息對client可見,那么即使Leader掛了,在新Leader上數據依然可以被讀到
HW-HighWaterMark: client可以從Leader讀到的最大msg offset,即對外可見的最大offset, HW=max(replica.offset)
對於Leader新收到的msg,client不能立刻消費,Leader會等待該消息被所有ISR中的replica同步后,更新HW,此時該消息才能被client消費,這樣就保證了如果Leader fail,該消息仍然可以從新選舉的Leader中獲取。
對於來自內部Broker的讀取請求,沒有HW的限制。同時,Follower也會維護一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)
數據存儲
Topic
一類消息稱為一個Topic
Topic可分為多個Parition;
Parition內部保證數據的有序,按照消息寫入順序給每個消息賦予一個遞增的offset;
為保證數據的安全性,每個Partition有多個Replica
多Parition的優點
並發讀寫,加快讀寫速度
多Partition分布式存儲,利於集群數據的均衡
加快數據恢復的速率:當某台機器掛了,每個Topic僅需恢復一部分的數據,多機器並發
缺點
Partition間Msg無序,若想保證Msg寫入與讀取的序不變,只能申請一個Partition
Partition

每個Partition分為多個Segment
每個Segment包含兩個文件:log文件和index文件,分別命名為start_offset.log和start_offset.index
log文件包含具體的msg數據,每條msg會有一個遞增的offset
Index文件是對log文件的索引:每隔一定大小的塊,索引msg在該segment中的相對offset和在log文件中的位置偏移量

根據msg的offset和log文件名中的start_offset,找到最后一個不大於msgoffset的segment,即為msg所在的segment;
根據對應segment的index文件,進一步查找msg在log文件中的偏移量
從log文件的偏移量開始讀取解析msg,比較msgoffset,找到所要讀取的msg
Partition recovery過程
每個Partition會在磁盤記錄一個RecoveryPoint, 記錄已經flush到磁盤的最大offset。當broker fail 重啟時,會進行loadLogs。首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint的segment及以后的segment, 這些segment就是可能沒有完全flush到磁盤segments。然后調用segment的recover,重新讀取各個segment的msg,並重建索引
優點
以segment為單位管理Partition數據,方便數據生命周期的管理,刪除過期數據簡單
在程序崩潰重啟時,加快recovery速度,只需恢復未完全flush到磁盤的segment
通過命名中offset信息和index文件,大大加快msg查找時間,並且通過分多個Segment,每個index文件很小,查找速度更快
數據的同步

Partition的多個replica中一個為Leader,其余為follower
Producer只與Leader交互,把數據寫入到Leader中
Followers從Leader中拉取數據進行數據同步
Consumer只從Leader拉取數據
ISR:所有不落后的replica集合, 不落后有兩層含義:距離上次FetchRequest的時間不大於某一個值或落后的消息數不大於某一個值,Leader失敗后會從ISR中選取一個Follower做Leader
數據可靠性保證
當Producer向Leader發送數據時,可以通過acks參數設置數據可靠性的級別
0: 不論寫入是否成功,server不需要給Producer發送Response,如果發生異常,server會終止連接,觸發Producer更新meta數據;
1: Leader寫入成功后即發送Response,此種情況如果Leader fail,會丟失數據
-1: 等待所有ISR接收到消息后再給Producer發送Response,這是最強保證
僅設置acks=-1也不能保證數據不丟失,當Isr列表中只有Leader時,同樣有可能造成數據丟失。要保證數據不丟除了設置acks=-1, 還要保證ISR的大小大於等於2,具體參數設置:
request.required.acks:設置為-1 等待所有ISR列表中的Replica接收到消息后采算寫成功;
min.insync.replicas: 設置為大於等於2,保證ISR中至少有兩個Replica
Producer要在吞吐率和數據可靠性之間做一個權衡
數據一致性保證
一致性定義:若某條消息對client可見,那么即使Leader掛了,在新Leader上數據依然可以被讀到
HW-HighWaterMark: client可以從Leader讀到的最大msg offset,即對外可見的最大offset, HW=max(replica.offset)
對於Leader新收到的msg,client不能立刻消費,Leader會等待該消息被所有ISR中的replica同步后,更新HW,此時該消息才能被client消費,這樣就保證了如果Leader fail,該消息仍然可以從新選舉的Leader中獲取。
對於來自內部Broker的讀取請求,沒有HW的限制。同時,Follower也會維護一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)
HDFS數據組織

數據分塊,比如以64M為一個數據塊;
流水線復制:每個數據塊沒有Leader和Follower之分,采用流水線的方式進行數據復制;
就近讀取:為了減少讀取時的網路IO,采用就近讀取,加快讀取速率