Kafka 幾個實現細節


Kafka 幾個實現細節

關於Kafka大方向上的介紹已經很多了,infoq上面不少不錯的資源

http://www.oschina.net/translate/kafka-design?cmp&p=1#

http://www.infoq.com/cn/articles/kafka-analysis-part-1

http://www.infoq.com/cn/articles/kafka-analysis-part-2

http://www.infoq.com/cn/articles/kafka-analysis-part-3

http://www.infoq.com/cn/articles/kafka-analysis-part-4

主要想從幾個細節出發簡單寫一下Kafka,也為自己做一些積累。

基本概念

下面的概念中有部分邏輯概念,部分實體概念。

Broker

物理概念,指服務於Kafka的一個node。

topic

MQ中的抽象概念,是一個消費標示。用於保證Producer以及Consumer能夠通過該標示進行對接。可以理解為一種Naming方式。

partition

Topic的一個子概念,一個topic可具有多個partition,但Partition一定屬於一個topic。

值得注意的是:

  • 在實現上都是以每個Partition為基本實現單元的。
  • 消費時,每個消費線程最多只能使用一個partition。
  • 一個topic中partition的數量,就是每個user group中消費該topic的最大並行度數量。

User group

為了便於實現MQ中的多播,重復消費等引入的概念。如果ConsumerA以及ConsumerB同在一個UserGroup,那么ConsumerA消費的數據ConsumerB就無法消費了。

即:所有usergroup中的consumer使用一套offset。

Offset

Offset專指Partition以及User Group而言,記錄某個user group在某個partiton中當前已經消費到達的位置。

總結

Kafka使用了Topic以及Partition的概念。其中Partition隸屬於Topic,即topic1可以具有多個partition。而Partition則是Consumer消費的基本單元,即topic1有幾個partition,那么最多就可以有多少個consumer同時在一個User Group里消費這個topic。而Offset則是記錄了UserGroup在每個partiton中的偏移值。

概念介紹:生產者,消費者,消費語義

  • 生產者

生產者直接向某topic的某partition發送數據。leader負責主備策略,寫入數據,發送ack。

  • 消費者

消費者使用fetch的方式拉取數據。kafkaServer不直接負責每個consumer的當前消費到了哪里,所以需要client端和zk聯合維護每個partition讀到了哪里,即Offset。

所以這樣看上去,kafkaServer在一定程度上更像是一個大部分為順序讀取的,基於文件的日志系統。

因為簡單,所以穩定。
  • 消費語義

對比其他MQ的多播,等語義,Kafka看上去略顯單薄,其主要通過User Group的概念實現消費語義。而UserGroup實際對應的就是Offset的更改策略。

User1,User2同屬一個userGroup時,即表示二者共用一套Offset。因每個partition 的offset只能由一個線程維護,因此注定了每個UserGroup里只能有一個消費線程對一個partition進行消費。

同樣,如果希望實現多播,那就User1和User2用兩個userGroup。

Kafka實現細節1 :Server端的日志存儲

Kafka因為采用順序寫+無狀態的方式,將可靠性發揮到了極致,使得Kafka成為了一集消息緩存以及MQ於一身的利器。首先第一個問題是搞清楚:Kafka內部存儲日志的方式。

我們知道Partition是Topic的實體,所以當Producer向某topic發送數據時,需要判定幾個問題。

問題1:發到哪個partition,誰來定?

這種問題沒有正確的答案,只有到底在犧牲誰的答案。

在目前0.8.2.1的Kafka中,是交由Producer來解決這個問題的,Producer中有個PartitionManager專門用於負責對每個Message分配partition,或者由使用者更改。

  • 優勢

這樣的優勢在於Kafka Server不需要單獨一個LoadBalancer來決定消息去哪里。而且Producer完全可以根據partition的id在ZK里尋找當前Leader,直接與Leader建立連接。

  • 劣勢

是不是看到這里發現問題了?是的,如果某個Partition完全不可用,這些消息就無法發送了。使用更加簡化的模型帶來的代價是犧牲了一部分可用性。

當然再有了副本策略之后,使一個partition變得不可用是一件很困難的事情。

問題2:日志如何存儲?

在這里,我們先討論單點存儲結構

Kafka Producer在確定partition leader之后開始與其所在的broker通信。為了使用磁盤的順序寫,即使用Log Structure storage。

為查找方便,Kafka同樣建立了基本的索引結構。想想查詢需求,有什么查詢需求?大部分消息都會被順序讀取,當然也會存在少量的隨機讀取消息(比如處理的時候這條消息處理失敗,需要重新處理)。所以索引在這里的意義僅為簡單支持少量隨機查詢。

所以在索引的實現上,基本上就是為了支持針對某個Offset進行二分查找而存在的索引。

所以在文件存儲上,每個消息被寫成了兩部分,一部分是『消息實體』,一部分是『消息索引』。消息實體格式如下:

On-disk format of a message

message length : 4 bytes (value: 1+4+n) 
"magic" value  : 1 byte
crc            : 4 bytes
payload        : n bytes

消息索引格式如下,官網那張圖是錯的,你們就別看那個了。

Segement file主要用來搜索offset的時候使用,如果是順序消費,只需要持續讀文件內部內容即可。

kafka_message

問題3:如何實現日志副本&&副本策略&&同步方式

副本問題的提出

日志副本策略是可靠性的核心問題之一,其實現方式也是多種多樣的。包括無主模型,通過paxos之類的協議保證消息順序,但更簡單直接的方式是使用主從結構,主決定順序,從拷貝主的信息。

如果主不掛,從節點沒有存在的意義。但主掛了時,我們需要從備份節點中選出一個主。與此同時,更重要的是:保證一致性。在這里一致性是指:

主ack了的消息,kafka切換主之后,依然可被消費。
主沒有ack的消息,kafka切換主之后,依然沒有被存儲。

因此這里產生了一個trade off:Leader應該什么時候ack呢?

這個問題簡直是分布式環境里永恆(最坑爹)的主題之一了。其引申出的本質問題是,你到底要什么?

  • 要可靠性

當然可以,leader收到消息之后,等follower 返回ok了ack,慢死。但好處是,主掛了,哪個follower都可以做主,大家數據都一樣嘛

  • 要速度

當然可以,leader收到消息寫入本地就ack,然后再發給follower。問題也很顯而易見,最壞得情況下,有個消息leader返回ack了,但follower因為各種原因沒有寫入,主掛了,丟數據了。

副本問題的集中解決方式

我們來集中討論一下幾種實現方式從而明白我們需要哪種策略吧。

  • 方式1: Quorum及類似協議

Quorum直譯為決議團,即通過寫多份,讀多次的方式保證單個值是最新值,通俗理解為抽屜原理。

抽屜原理的適用范圍很廣,Dynamo在某種程度上也是使用了抽屜原理

在Dynamo的使用中,設共有N副本,每次寫保證W個副本ack,每次讀的時候讀R個副本並從中取最新值,則只要保證W+R>N,那么就一定能保證讀到最新數據。但那是在Key-Value的存儲中使用的,沒有數據順序問題。在Kafka里,我們還需要有一個數據順序問題。

Kafka中會持續寫入數據,主接收數據后,向所有follower發送數據。當然,因為網絡問題,每次成功ack的follower可能不完全相同,但可以當有W個節點ack的時候就進行主的ack。

這樣,在主掛的時候,需要R個點共同選主,因為W+R>N,所以對於每條消息,R個點里一定是有一個點是寫成功的。因此通過這R個點,一定可以拼湊出來一份齊全的,和Leader一樣的數據。把這些數據寫入單點,即可實現選新主。

當然,這里隱含了一個協議,就是leader每次向follower發送消息的時候是附帶了消息編號的,且消息編號自增。里面還有很多實現細節(such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set),因為Kafka沒用這種方式實現,所以也就不再復述。

最經典的情況就是R=f+1,W=f+1,N=2f+1

ISR1

這就是一個典型情況,有leader 以及F1F4四個follower,每次寫入寫入leader的同時,保證至少寫入f=2額外的點。所以當leader寫完所有信息后如果掛掉,從F1F4里任選三個都可以組合出所有的完整的消息。

  1. 優勢:
    這個方式的優勢是,在寫入過程中,跳過了部分反應慢的節點。因為要求W+R>N,所以選主速度應該也還可以。

  2. 劣勢:性能較差,擁有2f+1的機器只能支持最多f台機器掛掉。假設我希望支持2台機器掛掉,我就需要5台機器。使用5台機器的存儲,但只能存儲一台機器的容量,以及一台機器的吞吐,這顯然不是一個划算的買賣。

這也就是為什么只有在保證主節點的關鍵信息時才會使用類似Quorom的實現方式,而對於大量的數據存儲並不是使用這種方式。(Dynamo應該算個特例吧)

當然還有一些其他相對類似的實現,比如 ZooKeeper的 Zab, Raft, 以及 Viewstamped Replication。Kafka等人認為最接近他們是實現的是微軟亞研院的一篇論文: PacificA(周立東老師的論文....當年就是他電面的我然后直接把我送走了....)

  • Kafka自己的ISR機制

Kafka自己使用了一種稱之為In-Sync Replicas(ISR)的機制。

我們回想一下剛才Quorum實現里的問題,支持掛2台的環境需要5台機器,主要是比例太高。之前使用Quorum主要是對每個消息都做f+1的備份,即

以單個消息為進行備份的基本單位,進行可靠性保障

在這種情況下,為了保證每個消息的可靠,所以我們只有一個選擇,那就是寫夠f+1份數據。因為只有這樣,才能保障一個f+1份的讀可以獲取全部數據。其主要問題在於每次ack的機器不一樣。所以,找f+1份才會保險。

但是想想,使用kafka是為了高吞吐,每個機器上數據不全顯然需要多點讀,但我們可不可以讓節點在ack之后,自己慢慢補上自己缺失的數據呢?這樣讀數據的時候就可以讀單點了啊!

事實上,在Quorum默認的實現方式里,節點是不再進行數據交互的。也對,輸入數據量那么大,每個消息持有者都不相同,進行數據交互補充自己不含有的數據可能會帶來很大的網絡開銷,而且存儲同樣是問題。難道也要學dynamo用gossip協議?想想就頭疼,能不能把問題簡化一下呢?

不適合直接用gossip協議的原因是消息數量太大,但不用Gossip協議必然導致多份讀,這對於高吞吐的kafka是不能容忍的。是不是可以換一種思路呢?比如:

以一段時間而非以一個消息為基本單位,進行可靠性保障

個人認為這是ISR機制最核心的思想。

這同樣是基於一個節點故障模型的假設:

對大多數系統而言,其正常工作狀態與異常工作狀態成時間段分布。

所以,如果存在一種方式,能夠按照時間段進行ack,再進行gossip就會變得簡單很多。因為不會有多少個gossip消息傳來傳去。

下面介紹ISR機制

  • 不是

問題6: Kafka的主掛掉的情況討論

Kafka實現細節2:新玩法.Log Compaction

Kafka實現細節3:消費怎么保證不丟數據?

問題1: Offset怎么存?

問題2: Consumer如何做loadbalance?

問題3: Consumer的關閉異常,會不會存在Offset異常導致多消費或者少消費?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM