kafka學習筆記


kafka學習筆記

參考文檔

Kafka背景及架構介紹

Kafka背景

對網站使用情況做報表,如活動數據(page view、查看內容、搜索內容等)和運營數據(CPU、IO使用率、請求時間、服務器日志等)要用到的數據的收集和分析。

Kafka簡介

Kafka是一個分布式的,基於發布/訂閱的消息系統:

消息處理時間不受數據量的影響,即使TB級別的消息也可以保證訪問的性能。
高吞吐量
支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸
支持離線數據處理和在線實時數據處理
支持水平擴展

Kafka與常用消息中間件的區別

Kafka架構

Broker: 一個Kafka集群包含一個或多個borker
topic: 一個broker下包含多個topic
partition: 一個topic包含一個或多個partition
productor: 生產者
consumer : 消費者
Consumer Group: 每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)

Kafka消息投遞的方式

productor: push
consumer: pull(消費者可以根據自身的能力來進行消費)

消息投遞保障

Kafka默認保證At least once,並且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。

At most once 消息可能會丟,但絕不會重復傳輸
At least one 消息絕不會丟,但可能會重復傳輸 (Kafka默認支持該種方式
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。

productor 消息投遞保障

默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發送實現At most once

也就是說默認情況小Kafka消息是會重復投遞的。

consumer 消息投遞保障

zookeeper中保存了consumer 消費的offset;consumer offset commit的方式有:

獲取消息后馬上commit:有可能消息得不到處理。At most once
獲取消息后先處理再commit:消息至少被處理一遍。At least one
Exactly once:在At least one的基礎上,從架構設計上(與外部系統結合)保證不重復處理。

Kafka的高可用架構

Data Replication
Leader Election: 針對partition而言的

為何需要Data Replication

沒有Replication的,一旦某一個Broker宕機,則其上所有的Partition數據都不可被消費,這與Kafka數據持久性及Delivery Guarantee的設計目標相悖。同時Producer都不能再將數據存於這些Partition中。

如果Producer使用同步模式:嘗試重新發送message.send.max.retries(默認值為3)次后拋出Exception

  • 停止發送后續數據:造成數據的阻塞
  • 繼續發送后續數據:造成本應發往該Broker的數據的丟失

如果Producer使用異步模式:嘗試重新發送message.send.max.retries(默認值為3)次后

  • 記錄該異常並繼續發送后續數據,這會造成數據丟失並且用戶只能通過日志發現該問題。

為何需要Leader Election

指Replica之間的Leader Election

保證同一個Partition的多個Replica之間的數據一致性(其中一個宕機后其它Replica必須要能繼續服務並且即不能造成數據重復也不能造成數據丟失)。

Kafka HA設計解析

如何將所有Replica均勻分布到整個集群

  • 將所有Broker(假設共n個Broker)和待分配的Partition排序
  • 將第i個Partition分配到第(i mod n)個Broker上
  • 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

Data Replication

消息備份

Title:消息備份流程
producter->partition: (leader,負責讀寫數據)
partition->follower: ACK(先應答[內存中]再寫入log)
follower->follower內存
follower內存->ACK應答
ACK應答->follower
follower->log
follower->partition: ISR列表中的所有follower都應答作為partition應答
partition->producter: 所有的ISR replica都應答后,leader partition應答

ACK前需要保證有多少個備份

備份同步列表:ISR(即in-sync Replica)

Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)

同步復制:同步復制要求所有能工作的Follower都復制完,這條消息才會被認為commit,這種復制方式極大的影響了吞吐率
異步復制:Follower異步的從Leader復制數據,數據只要被Leader寫入log就被認為已經commit,這種情況下如果Follower都復制完都落后於Leader,而如果Leader突然宕機,則會丟失數據。

而Kafka采用的既不是同步復制,也不是異步復制。而是采用ISR的方式來均衡確保數據不丟失以及吞吐率。

領導選舉

領導選舉的方式

少數服從多數: 如果我們有2f+1個Replica(包含Leader和Follower),那在commit之前必須保證有f+1個Replica復制完消息,為了保證正確選出新的Leader,fail的Replica不能超過f個。在生產環境下為了保證較高的容錯程度,必須要有大量的Replica,而大量的Replica又會在大數據量下導致性能的急劇下降。
Kafka在Zookeeper中動態維護了一個ISR(in-sync replicas),這個ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為Leader的可能。在這種模式下,對於f+1個Replica,一個Partition能在保證不丟失已經commit的消息的前提下容忍f個Replica的失敗。

如何處理所有Replica都不工作

在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:

等待ISR中的任一個Replica“活”過來,並且選它作為Leader: 等待時間可能較長。
選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader: 可能會導致數據不全,部分消息丟失。

如何選舉Leader

Title:傳統Leader選舉
clound->Leader: leader
Leader->zk: (leader 宕機)
zk->zk節點: znode會自動刪除
clound->follower: follower
follower->zk節點: 創建節點
zk節點->Leader: 競選leader成功

該方法會有3個問題:   

  • split-brain 這是由Zookeeper的特性引起的,雖然Zookeeper能保證所有Watch按順序觸發,但並不能保證同一時刻所有Replica“看”到的狀態是一樣的,這就可能造成不同Replica的響應不一致
  • herd effect 如果宕機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成集群內大量的調整
  • Zookeeper負載過重 每個Replica都要為此在Zookeeper上注冊一個Watch,當集群規模增加到幾千個Partition時Zookeeper負載會過重。

Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式(比Zookeeper Queue的方式更高效)通知需為此作出響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。

broker failover過程簡介

  • Controller在Zookeeper注冊Watch,一旦有Broker宕機(這是用宕機代表任何讓系統認為其die的情景,包括但不限於機器斷電,網絡不可用,GC導致的Stop The World,進程crash等),其在Zookeeper對應的znode會自動被刪除,Zookeeper會fire Controller注冊的watch,Controller讀取最新的幸存的Broker
  • Controller決定set_p,該集合包含了宕機的所有Broker上的所有Partition
  • 對set_p中的每一個Partition
  • 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR
  • 決定該Partition的新Leader。如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。
  • 將新的Leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作只有其version在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1
  • 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。

Kafka的高可用架構二

更多詳細信息

Kafka Consumer設計解析

High Level Consumer: Consumer提供了一個從Kafka消費數據的高層抽象,從而屏蔽掉其中的細節並提供豐富的語義。
Consumer Group: High Level Consumer將從某個Partition中讀取最后一條消息的offset存於Zookeeper中,這個offset基於客戶程序提供給Kafka的名字來保存,這個名字就叫做consumer group。也就是說用來保存consumer offset的名字。

與傳統mq的不同:

  • 很多傳統的Message Queue都會在消息被消費完后將消息刪除 (一方面避免重復消費,另一方面可以保證Queue的長度比較短,提高效率)。
  • Kafka保證一條消息在同一個consumer group中只消費一次。
  • 允許不同的consumer group同時消費同一條消息。

High Level Consumer Rebalance`

每個Consumer被創建時會觸發Consumer Group的Rebalance:

  • High Level Cousumer 啟動時將ID注冊到Zookeeper的其Counsumer group下,路徑為/consumers/[consumer group]/ids/[consumer id]
  • 在/consumers/[consumer group]/ids上注冊Watch
  • 在/brokers/ids上注冊Watch
  • 如果Consumer通過Topic Filter創建消息流,則它會同時在/brokers/topics上也創建Watch
  • 強制自己在其Consumer Group內啟動Rebalance流程

存在的問題:

  • 任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance。
  • 多個consumer同時觸發Rebalance,會導致不正確的Rebalance。

Low Level Consumer

需要對partion或者做特殊處理

  • 同一條消息讀多次
  • 只讀取某個Topic的部分Partition
  • 管理事務,從而確保每條消息被處理一次,且僅被處理一次

與Consumer Group相比,Low Level Consumer要求用戶做大量的額外工作。

  • 必須在應用程序中跟蹤offset,從而確定下一條應該消費哪條消息
  • 應用程序需要通過程序獲知每個Partition的Leader是誰
  • 必須處理Leader的變化

使用Low Level Consumer的一般流程如下

  • 查找到一個“活着”的Broker,並且找出每個Partition的Leader
  • 找出每個Partition的Follower
  • 定義好請求,該請求應該能描述應用程序需要哪些數據
  • Fetch數據
  • 識別Leader的變化,並對之作出必要的響應

Kafka高性能架構之道

宏觀架構層面

利用Partition實現並行處理

  • 由於不同Partition可位於不同機器,因此可以充分利用集群優勢,實現機器間的並行處理。
  • 由於Partition在物理上對應一個文件夾,即使多個Partition位於同一個節點,也可通過配置讓同一節點上的不同Partition置於不同的disk drive上,從而實現磁盤間的並行處理,充分發揮多磁盤的優勢。

利用多磁盤的具體方法是,將不同磁盤mount到不同目錄,然后在server.properties中,將log.dirs設置為多目錄(用逗號分隔)。Kafka會自動將所有Partition盡可能均勻分配到不同目錄也即不同目錄(也即不同disk)上。

Partition是最小並發粒度

  • 多Consumer消費同一個Topic時,同一條消息只會被同一Consumer Group內的一個Consumer所消費。
  • 如果Consumer的個數多於Partition的個數,那么會有部分Consumer無法消費該Topic的任何數據,也即當Consumer個數超過Partition后,增加Consumer並不能增加並行度。

Partition個數決定了可能的最大並行度。

ISR實現可用性與數據一致性的動態平衡

CAP理論

CAP理論是指,分布式系統中,一致性、可用性和分區容忍性最多只能同時滿足兩個。

一致性

  • 通過某個節點的寫操作結果對后面通過其它節點的讀操作可見
  • 如果更新數據后,並發訪問情況下后續讀操作可立即感知該更新,稱為強一致性
  • 如果允許之后部分或者全部感知不到該更新,稱為弱一致性
  • 若在之后的一段時間(通常該時間不固定)后,一定可以感知到該更新,稱為最終一致性

可用性

  • 任何一個沒有發生故障的節點必須在有限的時間內返回合理的結果

分區容忍性

  • 部分節點宕機或者無法與其它節點通信時,各分區間還可保持分布式系統的功能

一般而言,都要求保證分區容忍性。所以在CAP理論下,更多的是需要在可用性和一致性之間做權衡。

常用數據復制及一致性方案

Master-Slave

  • RDBMS的讀寫分離即為典型的Master-Slave方案
  • 同步復制可保證強一致性但會影響可用性
  • 異步復制可提供高可用性但會降低一致性

基於ISR的數據復制方案

只有ISR中的所有Replica都復制完,Leader才會將其置為Commit,它才能被Consumer所消費。

使用ISR方案的原因

  • 由於Leader可移除不能及時與之同步的Follower,故與同步復制相比可避免最慢的Follower拖慢整體速度,也即ISR提高了系統可用性。
  • ISR中的所有Follower都包含了所有Commit過的消息,而只有Commit過的消息才會被Consumer消費,故從Consumer的角度而言,ISR中的所有Replica都始終處於同步狀態,從而與異步復制方案相比提高了數據一致性。
  • ISR可動態調整,極限情況下,可以只包含Leader,極大提高了可容忍的宕機的Follower的數量。與Majority Quorum方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。

具體實現層面

高效使用磁盤

順序寫磁盤

  • 將Partition分為多個Segment,每個Segment對應一個物理文件;
  • 通過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操作。

** 充分利用Page Cache **

Page Cache的優勢:

  • I/O Scheduler會將連續的小塊寫組裝成大塊的物理寫從而提高性能
  • I/O Scheduler會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
  • 充分利用所有空閑內存(非JVM內存)。如果使用應用層Cache(即JVM堆內存),會增加GC負擔
  • 讀操作可直接在Page Cache內進行。如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過Page Cache)交換數據
  • 如果進程重啟,JVM內的Cache會失效,但Page Cache仍然可用

存在問題:

  • 將數據寫入Page Cache,並不保證數據一定完全寫入磁盤。
  • 可能會造成機器宕機時,Page Cache內的數據未寫入磁盤從而造成數據丟失。

解決措施:

  • 這種場景完全可以由Kafka層面的Replication機制去解決
  • 提供了flush.messages和flush.ms兩個參數將Page Cache中的數據強制Flush到磁盤,但是Kafka並不建議使用。(降低性能)

支持多Disk Drive

Broker的log.dirs配置項,允許配置多個文件夾。如果機器上有多個Disk Drive,可將不同的Disk掛載到不同的目錄,然后將這些目錄都配置到log.dirs里。Kafka會盡可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優勢。

零拷貝

  • 傳統模式下的四次拷貝與四次上下文切換
  • sendfile和transferTo實現零拷貝

Linux 2.4+內核通過sendfile系統調用,提供了零拷貝。數據通過DMA拷貝到內核態Buffer后,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數據拷貝外,因為整個讀文件-網絡發送由一個sendfile調用完成,整個過程只有兩次上下文切換,因此大大提高了性能。

注: transferTo和transferFrom並不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。

減少網絡開銷

  • 批處理: 批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。
  • 數據壓縮降低網絡負載: 將整個Batch的消息一起壓縮后傳輸。數據壓縮的一個基本原理是,重復數據越多壓縮效果越好。因此將整個Batch的數據一起壓縮能更大幅度減小數據量,從而更大程度提高網絡傳輸效率。
  • 高效的序列化方式: 減少實際網絡傳輸和磁盤存儲的數據規模,從而提高吞吐率。這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM