Kafka 幾個實現細節
關於Kafka大方向上的介紹已經很多了,infoq上面不少不錯的資源
http://www.oschina.net/translate/kafka-design?cmp&p=1#
http://www.infoq.com/cn/articles/kafka-analysis-part-1
http://www.infoq.com/cn/articles/kafka-analysis-part-2
http://www.infoq.com/cn/articles/kafka-analysis-part-3
http://www.infoq.com/cn/articles/kafka-analysis-part-4
主要想從幾個細節出發簡單寫一下Kafka,也為自己做一些積累。
基本概念
下面的概念中有部分邏輯概念,部分實體概念。
Broker
物理概念,指服務於Kafka的一個node。
topic
MQ中的抽象概念,是一個消費標示。用於保證Producer以及Consumer能夠通過該標示進行對接。可以理解為一種Naming方式。
partition
Topic的一個子概念,一個topic可具有多個partition,但Partition一定屬於一個topic。
值得注意的是:
- 在實現上都是以每個Partition為基本實現單元的。
- 消費時,每個消費線程最多只能使用一個partition。
- 一個topic中partition的數量,就是每個user group中消費該topic的最大並行度數量。
User group
為了便於實現MQ中的多播,重復消費等引入的概念。如果ConsumerA以及ConsumerB同在一個UserGroup,那么ConsumerA消費的數據ConsumerB就無法消費了。
即:所有usergroup中的consumer使用一套offset。
Offset
Offset專指Partition以及User Group而言,記錄某個user group在某個partiton中當前已經消費到達的位置。
總結
Kafka使用了Topic以及Partition的概念。其中Partition隸屬於Topic,即topic1可以具有多個partition。而Partition則是Consumer消費的基本單元,即topic1有幾個partition,那么最多就可以有多少個consumer同時在一個User Group里消費這個topic。而Offset則是記錄了UserGroup在每個partiton中的偏移值。
概念介紹:生產者,消費者,消費語義
- 生產者
生產者直接向某topic的某partition發送數據。leader負責主備策略,寫入數據,發送ack。
- 消費者
消費者使用fetch的方式拉取數據。kafkaServer不直接負責每個consumer的當前消費到了哪里,所以需要client端和zk聯合維護每個partition讀到了哪里,即Offset。
所以這樣看上去,kafkaServer在一定程度上更像是一個大部分為順序讀取的,基於文件的日志系統。
因為簡單,所以穩定。
- 消費語義
對比其他MQ的多播,等語義,Kafka看上去略顯單薄,其主要通過User Group的概念實現消費語義。而UserGroup實際對應的就是Offset的更改策略。
User1,User2同屬一個userGroup時,即表示二者共用一套Offset。因每個partition 的offset只能由一個線程維護,因此注定了每個UserGroup里只能有一個消費線程對一個partition進行消費。
同樣,如果希望實現多播,那就User1和User2用兩個userGroup。
Kafka實現細節1 :Server端的日志存儲
Kafka因為采用順序寫+無狀態的方式,將可靠性發揮到了極致,使得Kafka成為了一集消息緩存以及MQ於一身的利器。首先第一個問題是搞清楚:Kafka內部存儲日志的方式。
我們知道Partition是Topic的實體,所以當Producer向某topic發送數據時,需要判定幾個問題。
問題1:發到哪個partition,誰來定?
這種問題沒有正確的答案,只有到底在犧牲誰的答案。
在目前0.8.2.1的Kafka中,是交由Producer來解決這個問題的,Producer中有個PartitionManager專門用於負責對每個Message分配partition,或者由使用者更改。
- 優勢
這樣的優勢在於Kafka Server不需要單獨一個LoadBalancer來決定消息去哪里。而且Producer完全可以根據partition的id在ZK里尋找當前Leader,直接與Leader建立連接。
- 劣勢
是不是看到這里發現問題了?是的,如果某個Partition完全不可用,這些消息就無法發送了。使用更加簡化的模型帶來的代價是犧牲了一部分可用性。
當然再有了副本策略之后,使一個partition變得不可用是一件很困難的事情。
問題2:日志如何存儲?
在這里,我們先討論單點存儲結構
Kafka Producer在確定partition leader之后開始與其所在的broker通信。為了使用磁盤的順序寫,即使用Log Structure storage。
為查找方便,Kafka同樣建立了基本的索引結構。想想查詢需求,有什么查詢需求?大部分消息都會被順序讀取,當然也會存在少量的隨機讀取消息(比如處理的時候這條消息處理失敗,需要重新處理)。所以索引在這里的意義僅為簡單支持少量隨機查詢。
所以在索引的實現上,基本上就是為了支持針對某個Offset進行二分查找而存在的索引。
所以在文件存儲上,每個消息被寫成了兩部分,一部分是『消息實體』,一部分是『消息索引』。消息實體格式如下:
On-disk format of a message
message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte
crc : 4 bytes
payload : n bytes
消息索引格式如下,官網那張圖是錯的,你們就別看那個了。
Segement file主要用來搜索offset的時候使用,如果是順序消費,只需要持續讀文件內部內容即可。
問題3:如何實現日志副本&&副本策略&&同步方式
副本問題的提出
日志副本策略是可靠性的核心問題之一,其實現方式也是多種多樣的。包括無主模型,通過paxos之類的協議保證消息順序,但更簡單直接的方式是使用主從結構,主決定順序,從拷貝主的信息。
如果主不掛,從節點沒有存在的意義。但主掛了時,我們需要從備份節點中選出一個主。與此同時,更重要的是:保證一致性。在這里一致性是指:
主ack了的消息,kafka切換主之后,依然可被消費。
主沒有ack的消息,kafka切換主之后,依然沒有被存儲。
因此這里產生了一個trade off:Leader應該什么時候ack呢?
這個問題簡直是分布式環境里永恆(最坑爹)的主題之一了。其引申出的本質問題是,你到底要什么?
- 要可靠性
當然可以,leader收到消息之后,等follower 返回ok了ack,慢死。但好處是,主掛了,哪個follower都可以做主,大家數據都一樣嘛
- 要速度
當然可以,leader收到消息寫入本地就ack,然后再發給follower。問題也很顯而易見,最壞得情況下,有個消息leader返回ack了,但follower因為各種原因沒有寫入,主掛了,丟數據了。
副本問題的集中解決方式
我們來集中討論一下幾種實現方式從而明白我們需要哪種策略吧。
- 方式1: Quorum及類似協議
Quorum直譯為決議團,即通過寫多份,讀多次的方式保證單個值是最新值,通俗理解為抽屜原理。
抽屜原理的適用范圍很廣,Dynamo在某種程度上也是使用了抽屜原理
在Dynamo的使用中,設共有N副本,每次寫保證W個副本ack,每次讀的時候讀R個副本並從中取最新值,則只要保證W+R>N,那么就一定能保證讀到最新數據。但那是在Key-Value的存儲中使用的,沒有數據順序問題。在Kafka里,我們還需要有一個數據順序問題。
Kafka中會持續寫入數據,主接收數據后,向所有follower發送數據。當然,因為網絡問題,每次成功ack的follower可能不完全相同,但可以當有W個節點ack的時候就進行主的ack。
這樣,在主掛的時候,需要R個點共同選主,因為W+R>N,所以對於每條消息,R個點里一定是有一個點是寫成功的。因此通過這R個點,一定可以拼湊出來一份齊全的,和Leader一樣的數據。把這些數據寫入單點,即可實現選新主。
當然,這里隱含了一個協議,就是leader每次向follower發送消息的時候是附帶了消息編號的,且消息編號自增。里面還有很多實現細節(such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set),因為Kafka沒用這種方式實現,所以也就不再復述。
最經典的情況就是R=f+1,W=f+1,N=2f+1
這就是一個典型情況,有leader 以及F1F4四個follower,每次寫入寫入leader的同時,保證至少寫入f=2額外的點。所以當leader寫完所有信息后如果掛掉,從F1F4里任選三個都可以組合出所有的完整的消息。
-
優勢:
這個方式的優勢是,在寫入過程中,跳過了部分反應慢的節點。因為要求W+R>N,所以選主速度應該也還可以。 -
劣勢:性能較差,擁有2f+1的機器只能支持最多f台機器掛掉。假設我希望支持2台機器掛掉,我就需要5台機器。使用5台機器的存儲,但只能存儲一台機器的容量,以及一台機器的吞吐,這顯然不是一個划算的買賣。
這也就是為什么只有在保證主節點的關鍵信息時才會使用類似Quorom的實現方式,而對於大量的數據存儲並不是使用這種方式。(Dynamo應該算個特例吧)
當然還有一些其他相對類似的實現,比如 ZooKeeper的 Zab, Raft, 以及 Viewstamped Replication。Kafka等人認為最接近他們是實現的是微軟亞研院的一篇論文: PacificA(周立東老師的論文....當年就是他電面的我然后直接把我送走了....)
- Kafka自己的ISR機制
Kafka自己使用了一種稱之為In-Sync Replicas(ISR)的機制。
我們回想一下剛才Quorum實現里的問題,支持掛2台的環境需要5台機器,主要是比例太高。之前使用Quorum主要是對每個消息都做f+1的備份,即
以單個消息為進行備份的基本單位,進行可靠性保障
在這種情況下,為了保證每個消息的可靠,所以我們只有一個選擇,那就是寫夠f+1份數據。因為只有這樣,才能保障一個f+1份的讀可以獲取全部數據。其主要問題在於每次ack的機器不一樣。所以,找f+1份才會保險。
但是想想,使用kafka是為了高吞吐,每個機器上數據不全顯然需要多點讀,但我們可不可以讓節點在ack之后,自己慢慢補上自己缺失的數據呢?這樣讀數據的時候就可以讀單點了啊!
事實上,在Quorum默認的實現方式里,節點是不再進行數據交互的。也對,輸入數據量那么大,每個消息持有者都不相同,進行數據交互補充自己不含有的數據可能會帶來很大的網絡開銷,而且存儲同樣是問題。難道也要學dynamo用gossip協議?想想就頭疼,能不能把問題簡化一下呢?
不適合直接用gossip協議的原因是消息數量太大,但不用Gossip協議必然導致多份讀,這對於高吞吐的kafka是不能容忍的。是不是可以換一種思路呢?比如:
以一段時間而非以一個消息為基本單位,進行可靠性保障
個人認為這是ISR機制最核心的思想。
這同樣是基於一個節點故障模型的假設:
對大多數系統而言,其正常工作狀態與異常工作狀態成時間段分布。
所以,如果存在一種方式,能夠按照時間段進行ack,再進行gossip就會變得簡單很多。因為不會有多少個gossip消息傳來傳去。
下面介紹ISR機制
- 是
- 不是