kafka學習筆記
Kafka背景及架構介紹
Kafka背景
對網站使用情況做報表,如活動數據(page view、查看內容、搜索內容等)和運營數據(CPU、IO使用率、請求時間、服務器日志等)要用到的數據的收集和分析。
Kafka簡介
Kafka是一個分布式的,基於發布/訂閱的消息系統:
消息處理時間不受數據量的影響,即使TB級別的消息也可以保證訪問的性能。
高吞吐量
支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸
支持離線數據處理和在線實時數據處理
支持水平擴展
Kafka與常用消息中間件的區別
Kafka架構
Broker: 一個Kafka集群包含一個或多個borker
topic: 一個broker下包含多個topic
partition: 一個topic包含一個或多個partition
productor: 生產者
consumer : 消費者
Consumer Group: 每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)
Kafka消息投遞的方式
productor: push
consumer: pull(消費者可以根據自身的能力來進行消費)
消息投遞保障
Kafka默認保證At least once,並且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。
At most once 消息可能會丟,但絕不會重復傳輸
At least one 消息絕不會丟,但可能會重復傳輸 (Kafka默認支持該種方式)
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。
productor 消息投遞保障
默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發送實現At most once
也就是說默認情況小Kafka消息是會重復投遞的。
consumer 消息投遞保障
zookeeper中保存了consumer 消費的offset;consumer offset commit的方式有:
獲取消息后馬上commit:有可能消息得不到處理。
At most once
獲取消息后先處理再commit:消息至少被處理一遍。At least one
Exactly once:在At least one的基礎上,從架構設計上(與外部系統結合)保證不重復處理。
Kafka的高可用架構
Data Replication
Leader Election: 針對partition而言的
為何需要Data Replication
沒有Replication的,一旦某一個Broker宕機,則其上所有的Partition數據都不可被消費,這與Kafka數據持久性及Delivery Guarantee的設計目標相悖。同時Producer都不能再將數據存於這些Partition中。
如果Producer使用
同步模式:嘗試重新發送message.send.max.retries(默認值為3)次后拋出Exception
- 停止發送后續數據:造成數據的阻塞
- 繼續發送后續數據:造成本應發往該Broker的數據的丟失
如果Producer使用
異步模式:嘗試重新發送message.send.max.retries(默認值為3)次后
- 記錄該異常並繼續發送后續數據,這會造成數據丟失並且用戶只能通過日志發現該問題。
為何需要Leader Election
指Replica之間的Leader Election
保證同一個Partition的多個Replica之間的數據一致性(其中一個宕機后其它Replica必須要能繼續服務並且即不能造成數據重復也不能造成數據丟失)。
Kafka HA設計解析
如何將所有Replica均勻分布到整個集群
- 將所有Broker(假設共n個Broker)和待分配的Partition排序
- 將第i個Partition分配到第(i mod n)個Broker上
- 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上
Data Replication
消息備份
Title:消息備份流程
producter->partition: (leader,負責讀寫數據)
partition->follower: ACK(先應答[內存中]再寫入log)
follower->follower內存
follower內存->ACK應答
ACK應答->follower
follower->log
follower->partition: ISR列表中的所有follower都應答作為partition應答
partition->producter: 所有的ISR replica都應答后,leader partition應答
ACK前需要保證有多少個備份
備份同步列表:ISR(即in-sync Replica)
Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)
同步復制:同步復制要求所有能工作的Follower都復制完,這條消息才會被認為commit,這種復制方式極大的影響了吞吐率
異步復制:Follower異步的從Leader復制數據,數據只要被Leader寫入log就被認為已經commit,這種情況下如果Follower都復制完都落后於Leader,而如果Leader突然宕機,則會丟失數據。
而Kafka采用的既不是同步復制,也不是異步復制。而是采用ISR的方式來均衡確保數據不丟失以及吞吐率。
領導選舉
領導選舉的方式
少數服從多數: 如果我們有2f+1個Replica(包含Leader和Follower),那在commit之前必須保證有f+1個Replica復制完消息,為了保證正確選出新的Leader,fail的Replica不能超過f個。在生產環境下為了保證較高的容錯程度,必須要有大量的Replica,而大量的Replica又會在大數據量下導致性能的急劇下降。
Kafka在Zookeeper中動態維護了一個ISR(in-sync replicas),這個ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為Leader的可能。在這種模式下,對於f+1個Replica,一個Partition能在保證不丟失已經commit的消息的前提下容忍f個Replica的失敗。
如何處理所有Replica都不工作
在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:
等待ISR中的任一個Replica“活”過來,並且選它作為Leader: 等待時間可能較長。
選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader: 可能會導致數據不全,部分消息丟失。
如何選舉Leader
Title:傳統Leader選舉
clound->Leader: leader
Leader->zk: (leader 宕機)
zk->zk節點: znode會自動刪除
clound->follower: follower
follower->zk節點: 創建節點
zk節點->Leader: 競選leader成功
該方法會有3個問題:
- split-brain 這是由Zookeeper的特性引起的,雖然Zookeeper能保證所有Watch按順序觸發,但並不能保證同一時刻所有Replica“看”到的狀態是一樣的,這就可能造成不同Replica的響應不一致
- herd effect 如果宕機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成集群內大量的調整
- Zookeeper負載過重 每個Replica都要為此在Zookeeper上注冊一個Watch,當集群規模增加到幾千個Partition時Zookeeper負載會過重。
Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式(比Zookeeper Queue的方式更高效)通知需為此作出響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。
broker failover過程簡介
- Controller在Zookeeper注冊Watch,一旦有Broker宕機(這是用宕機代表任何讓系統認為其die的情景,包括但不限於機器斷電,網絡不可用,GC導致的Stop The World,進程crash等),其在Zookeeper對應的znode會自動被刪除,Zookeeper會fire Controller注冊的watch,Controller讀取最新的幸存的Broker
- Controller決定set_p,該集合包含了宕機的所有Broker上的所有Partition
- 對set_p中的每一個Partition
- 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR
- 決定該Partition的新Leader。如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。
- 將新的Leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作只有其version在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1
- 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。
Kafka的高可用架構二
Kafka Consumer設計解析
High Level Consumer: Consumer提供了一個從Kafka消費數據的高層抽象,從而屏蔽掉其中的細節並提供豐富的語義。
Consumer Group: High Level Consumer將從某個Partition中讀取最后一條消息的offset存於Zookeeper中,這個offset基於客戶程序提供給Kafka的名字來保存,這個名字就叫做consumer group。也就是說用來保存consumer offset的名字。
與傳統mq的不同:
- 很多傳統的Message Queue都會在消息被消費完后將消息刪除 (一方面避免重復消費,另一方面可以保證Queue的長度比較短,提高效率)。
- Kafka保證一條消息在同一個consumer group中只消費一次。
- 允許不同的consumer group同時消費同一條消息。
High Level Consumer Rebalance`
每個Consumer被創建時會觸發Consumer Group的Rebalance:
- High Level Cousumer 啟動時將ID注冊到Zookeeper的其Counsumer group下,路徑為/consumers/[consumer group]/ids/[consumer id]
- 在/consumers/[consumer group]/ids上注冊Watch
- 在/brokers/ids上注冊Watch
- 如果Consumer通過Topic Filter創建消息流,則它會同時在/brokers/topics上也創建Watch
- 強制自己在其Consumer Group內啟動Rebalance流程
存在的問題:
- 任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance。
- 多個consumer同時觸發Rebalance,會導致不正確的Rebalance。
Low Level Consumer
需要對partion或者做特殊處理
- 同一條消息讀多次
- 只讀取某個Topic的部分Partition
- 管理事務,從而確保每條消息被處理一次,且僅被處理一次
與Consumer Group相比,Low Level Consumer要求用戶做大量的額外工作。
- 必須在應用程序中跟蹤offset,從而確定下一條應該消費哪條消息
- 應用程序需要通過程序獲知每個Partition的Leader是誰
- 必須處理Leader的變化
使用Low Level Consumer的一般流程如下
- 查找到一個“活着”的Broker,並且找出每個Partition的Leader
- 找出每個Partition的Follower
- 定義好請求,該請求應該能描述應用程序需要哪些數據
- Fetch數據
- 識別Leader的變化,並對之作出必要的響應
Kafka高性能架構之道
宏觀架構層面
利用Partition實現並行處理
- 由於不同Partition可位於不同機器,因此可以充分利用集群優勢,實現機器間的並行處理。
- 由於Partition在物理上對應一個文件夾,即使多個Partition位於同一個節點,也可通過配置讓同一節點上的不同Partition置於不同的disk drive上,從而實現磁盤間的並行處理,充分發揮多磁盤的優勢。
利用多磁盤的具體方法是,將不同磁盤mount到不同目錄,然后在server.properties中,將log.dirs設置為多目錄(用逗號分隔)。Kafka會自動將所有Partition盡可能均勻分配到不同目錄也即不同目錄(也即不同disk)上。
Partition是最小並發粒度
- 多Consumer消費同一個Topic時,同一條消息只會被同一Consumer Group內的一個Consumer所消費。
- 如果Consumer的個數多於Partition的個數,那么會有部分Consumer無法消費該Topic的任何數據,也即當Consumer個數超過Partition后,增加Consumer並不能增加並行度。
Partition個數決定了可能的最大並行度。
ISR實現可用性與數據一致性的動態平衡
CAP理論
CAP理論是指,分布式系統中,一致性、可用性和分區容忍性最多只能同時滿足兩個。
一致性
- 通過某個節點的寫操作結果對后面通過其它節點的讀操作可見
- 如果更新數據后,並發訪問情況下后續讀操作可立即感知該更新,稱為強一致性
- 如果允許之后部分或者全部感知不到該更新,稱為弱一致性
- 若在之后的一段時間(通常該時間不固定)后,一定可以感知到該更新,稱為最終一致性
可用性
- 任何一個沒有發生故障的節點必須在有限的時間內返回合理的結果
分區容忍性
- 部分節點宕機或者無法與其它節點通信時,各分區間還可保持分布式系統的功能
一般而言,都要求保證分區容忍性。所以在CAP理論下,更多的是需要在可用性和一致性之間做權衡。
常用數據復制及一致性方案
Master-Slave
- RDBMS的讀寫分離即為典型的Master-Slave方案
- 同步復制可保證強一致性但會影響可用性
- 異步復制可提供高可用性但會降低一致性
基於ISR的數據復制方案
只有ISR中的所有Replica都復制完,Leader才會將其置為Commit,它才能被Consumer所消費。
使用ISR方案的原因
- 由於Leader可移除不能及時與之同步的Follower,故與同步復制相比可避免最慢的Follower拖慢整體速度,也即ISR提高了系統可用性。
- ISR中的所有Follower都包含了所有Commit過的消息,而只有Commit過的消息才會被Consumer消費,故從Consumer的角度而言,ISR中的所有Replica都始終處於同步狀態,從而與異步復制方案相比提高了數據一致性。
- ISR可動態調整,極限情況下,可以只包含Leader,極大提高了可容忍的宕機的Follower的數量。與Majority Quorum方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。
具體實現層面
高效使用磁盤
順序寫磁盤
- 將Partition分為多個Segment,每個Segment對應一個物理文件;
- 通過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操作。
** 充分利用Page Cache **
Page Cache的優勢:
- I/O Scheduler會將連續的小塊寫組裝成大塊的物理寫從而提高性能
- I/O Scheduler會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
- 充分利用所有空閑內存(非JVM內存)。如果使用應用層Cache(即JVM堆內存),會增加GC負擔
- 讀操作可直接在Page Cache內進行。如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過Page Cache)交換數據
- 如果進程重啟,JVM內的Cache會失效,但Page Cache仍然可用
存在問題:
- 將數據寫入Page Cache,並不保證數據一定完全寫入磁盤。
- 可能會造成機器宕機時,Page Cache內的數據未寫入磁盤從而造成數據丟失。
解決措施:
- 這種場景完全可以由Kafka層面的Replication機制去解決
- 提供了flush.messages和flush.ms兩個參數將Page Cache中的數據強制Flush到磁盤,但是Kafka並不建議使用。(降低性能)
支持多Disk Drive
Broker的log.dirs配置項,允許配置多個文件夾。如果機器上有多個Disk Drive,可將不同的Disk掛載到不同的目錄,然后將這些目錄都配置到log.dirs里。Kafka會盡可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優勢。
零拷貝
- 傳統模式下的四次拷貝與四次上下文切換
- sendfile和transferTo實現零拷貝
Linux 2.4+內核通過sendfile系統調用,提供了零拷貝。數據通過DMA拷貝到內核態Buffer后,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數據拷貝外,因為整個讀文件-網絡發送由一個sendfile調用完成,整個過程只有兩次上下文切換,因此大大提高了性能。
注: transferTo和transferFrom並不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。
減少網絡開銷
- 批處理: 批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。
- 數據壓縮降低網絡負載: 將整個Batch的消息一起壓縮后傳輸。數據壓縮的一個基本原理是,重復數據越多壓縮效果越好。因此將整個Batch的數據一起壓縮能更大幅度減小數據量,從而更大程度提高網絡傳輸效率。
- 高效的序列化方式: 減少實際網絡傳輸和磁盤存儲的數據規模,從而提高吞吐率。這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。
