kafka總結


參考 http://zqhxuyuan.github.io/2016/01/14/2016-01-14-Kafka-ISR/

http://zqhxuyuan.github.io/2016/01/13/2016-01-13-Kafka-Picture/

http://jianbeike.blogspot.com.au/2016/04/kafka.html

http://zqhxuyuan.github.io/2016/01/14/2016-01-14-Kafka-ISR/

http://www.oschina.net/translate/kafka-replication?print

http://www.jianshu.com/p/3f24d4b53f7f

https://www.iteblog.com/archives/1805

http://www.cnblogs.com/fxjwind/p/4972244.html

http://www.jasongj.com/2015/04/24/KafkaColumn2/

 

//kafka安裝

http://yanliu.org/2015/08/31/kafka%E9%9B%86%E7%BE%A4%E9%85%8D%E7%BD%AE/

 

http://bbs.kekeyun.com/thread-101-1-1.html

http://blog.csdn.net/lizhitao/article/details/45066437

分布式系統 的優點 就是 將原本一台服務器受到的壓力,分散到不同服務器上去 

 

HW表示的是所有ISR中的節點都已經復制完的消息的offset.也是消費者所能獲取到的消息的最大offset,所以叫做high watermark.
注意Leader Partition保存了ISR信息.所以可以看到maybeIncrementLeaderHW()是在appendToLocalLog()內一起執行的

任何Replication的LEO發生變化 (ISR中的followers有任何一個節點LEO改變,看看所有ISR是否都復制了,然后更新HW)

private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = { // 所有inSync副本中最小的LEO(因為每個follower的LEO都可能不一樣), 表示的是最新的hw
  val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) // Leader本身的hw, 是舊的
  val oldHighWatermark = leaderReplica.highWatermark  // 是一個LogOffsetMetadata
  if(oldHighWatermark.precedes(newHighWatermark)) {   // 比較Leader的messageOffset是否小於ISR的
    leaderReplica.highWatermark = newHighWatermark    // Leader小於ISR, 更新Leader為ISR中最小的
    true }else false     // Returns true if the HW was incremented, and false otherwise.
}

 

delay operation complete http://zqhxuyuan.github.io/2016/01/14/2016-01-14-Kafka-ISR/

觸發條件(延遲請求以及增加HW)中關於ISR的部分都是環環相扣的:

  • leader有新消息寫到本地日志(生產者寫新數據) –> A.2 –> DelayedFetch
  • leader replication的LEO發生變化(追加了新消息) –> C.2 –> HW
  • follower向Leader發起fetch請求(ISR的follower會和Leader保持同步) –> B.2 –> DelayedProduce
  • follower所在replication的LEO發生變化(拉取了新消息到本地) –> C.2 –> HW
  • 所有replication的LEO發生變化,Leader的HW也會變化(成功提交了消息) –> C
  • http://zqhxuyuan.github.io/2016/01/14/2016-01-14-Kafka-ISR/.2 –> HW
  • consumer讀取至多Leader的HW,HW變化了,解鎖consumer –> A.1 –> DelayedFetch
  • producer等待ISR都同步成功,導致HW變化,就可以返回響應 –> B.1 –> DelayedProduce

 

 

 

  • Partition副本由Leader和follower組成,只有ISR列表中的副本是僅僅跟着Leader的
  • Leader管理了ISR列表,只有ISR列表中的所有副本都復制了消息,才能認為這條消息是提交的
  • Leader和follower副本都叫做Replica,同一個Partition的不同副本分布在不同Broker上
  • Replica很重要的兩個信息是HighWatermark(HW)和LogEndOffset(LEO)
  • 只有Leader Partition負責客戶端的讀寫,follower從Leader同步數據
  • 所有Replica都會對HW做checkpoint,Leader會在follower的拉取請求時廣播HW給follower

 

 生產者生產數據后,相應broker會感知到offset的變化,然后通知它的follower,同時返回leader的HW, follower會主動向leader拉取數據,但每個follower所在的機器性能不同,可能拉取數據的個數也不一樣,導致各個的LEO也不一樣,為了分區對應的broker中的

數據一致,leader挑選follower返回的各自LEO中,選擇最小的offset,做為HW,並更新,並通知生產者。

  消費者只能消費處於HW以下的數據,因為以下的數據,在follower各個機器中都存在,可理解為數據是一致的

 

關於leader的選舉原理

每一個分區對應的broker中,都只能有一個leader和若干個follower,

kafka沒有采用 少數服從多數 分布式的選舉方法,而是自己實現了一個 ISR (in sync replication) ISR中的follower都是在速度上能跟得上leader的broker

kafka集群第一次使用的時候,里面是沒有數據的,

隨機選一個broker作為leader,余下的broker放到ISR中, 同時啟動一個線程,專門檢查對應的follower,看他們在規定時間內是否fetch數據(默認1s),如果不符合這個條件,就將此follower踢出ISR
當leader宕機后,從ISR中挑選一個做為新leader,但如何挑選新的leader?

 

關於leader的選舉方法

在kafka 0.8之前,在創建一個topic時,相應broker里面是沒有數據的,那么隨機找一個broker做為leader,余下的放到ISR中

 

每個分區中的follower所在的機器 ,都要/broker/ids/[0,1,2]做一個watch ,這樣當/broker/ids/中的某個leader宕機后,zookeeper能通知相應follower,但這樣zookeeper的負載很重

比如說100台broker,有2000個分區,每個分區有3個備份,那么在zookeeper中要安放2000*3=6000個watch,zookeeper本身也是集群,負載過重

kafka從0.8開始,不再采用上面的方法,在所有broker中選舉出一個controller, 這個controler將決定各個分區中leader的選舉

同時 在/broker/ids 做watch,/broker/topics 也做watch

在創建topic時,controler 在  /broker/ids 中讀取broker id  列表,針對每一個分區,在所有的broker 中選取一個做為leader,余下的作為ISR,因為此時剛創建完topic,也沒有數據

同時將leader 以及  ISR寫到 /broker/topic/[topic_name]/partions/0/state 中去, 其內容大概為 leader 為 brokerA , ISR為[brokerB, brokerC, brokerD],

同時告訴相應的broker,因為有些broker是leader,有些broker是follower,要做一些初始化的工作

 

 

詳見 http://jianbeike.blogspot.com.au/2016/04/kafka.html

 makeLeader過程點評

leader的作用除了接收produce和consume請求,還有一點就是管理ISR以及highwatermark。而makeLeader過程就是為了開啟leader的這些功能准備的,首先它要根據topic-partition創建(如果沒有)message log目錄,然后將自己的endlogoffset作為highwatermark,開啟定期檢測isr follower是否脫離isr(長時間未發fetch或者落后leaderlogendoffset太多)。

 

makeFollower過程點評

makeFollower的過程比makeLeader的過程要復雜,剛才說了,leader管理ISR和highwatermark(可以看概念說明那節),那么highwatermark對於Follower可見嗎?當然Follower發送fetch請求時會將自身endlogoffset帶過去,而返回結果中會有leader返回的

highwatermark。

 

為什么要有highwatermark?

答:看上圖,假設某個topic-partition(比如topic1的partition0)的replicalist分配在4台機器上,A,B,C,D,produce端設置的ack為1,也就是只要leader 接收處理message成功就返回成功,那么這時replica list的endlogoffset會出現分化。

A作為leader肯定是endlogoffset最高,B緊隨其后,C機器由於配置比較低,同步較慢,D機器配置最低,已經被A移除了ISR。

假設這個時候某幾個機器出現故障,比如A,C宕機,這時B會成為leader,假如沒有highwatermark,在A重啟時的時候會做makeFollower操作,在宕機時log文件之后直接追加message,而假如B機器的endlogoffset已經達到A的endlogoffset,會產生數據不一致的情況,所以使用highwatermark來避免這種情況。

在A 做makeFollower操作時,將log文件truncate到highwatermark位置,以防止發生數據不一致情況發生。

還有一種情形會導致數據不一致,那就是uncleanleader election,ABC機器都宕機的情況,D機器已經啟動,controller會將D作為leader,很明顯即便有了highwatermark,也會發生數據不一致,同樣消息數據也會丟失。目前kafka 0.8.1.1的版本,沒有將unclean election 開關開放給用戶,所以這塊要做好監控

3.一個follower掛掉重啟的時候首先扔掉它自己highWatermark之后的數據,然后開始追趕leader
 
如果失敗的follower恢復過來,它首先將自己的日志截斷到上次checkpointed時刻的HW.
因為checkpoint記錄的是所有Partition的hw offset. 當follower失敗時,checkpoint中關於這個Partition的HW就不會再更新了.
而這個時候存儲的HW信息和follower partition replica的offset並不一定是一致的. 比如這個follower獲取消息比較快,
但是ISR中有其他follower復制消息比較慢,這樣Leader並不會很快地更新HW,這個快的follower的hw也不會更新(leader廣播hw給follower)
這種情況下,這個follower日志的offset是比hw要大的.所以在它恢復之后,要將比hw多的部分截掉,然后繼續從leader拉取消息(跟平時一樣).
實際上,ISR中的每個follower日志的offset一定是比hw大的.因為只有ISR中所有follower都復制完消息,leader才會增加hw.
也就是說有可能有些follower復制完了,而有些follower還沒有復制完,那么hw是不會增加的,復制完的follower的offset就比hw要大.
 
4.leader掛掉的會重新選,新leader把自己的endOffset當做新的highWatermark,然后讓其它的replica開始追趕
 

停止對這些成為follower的partition的拉取線程,把這些partition的Log截斷到highWaterMark的位置,並啟動對那些成為leader的partition的拉取線程

 

 

 

 

 當 broker 宕機時,controller在/broker/ids做的watch也會觸發 詳見

跟創建 topic一樣,遍歷/broker/topic/[topic_name]/partition/0/state ,在ISR中選擇一個活着的broker,如果沒有,就在所有的備份broker中找一個,

然后再將新的 leadre, ISP 寫到上面路徑去,同時通知相關broker,做好makeLeader,makeFollower工作

 

 

 

producers

producer發送消息。

producer 可以直接發送到broker對應的leader partition中,不需要經歷任何一個中介的轉發。為實現這個特性,每個broker都可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的你leader partition都在哪。現階段哪些leader partition 是可以直接訪問的?

如果訪問的不是leader partition 怎么搞? 而且我看是可以指定多個進行訪問的。

producer 和 partition 。

producer 可以控制以什么樣的將消息推送到客戶端。實現方法包括隨機,實現一類隨機負載均衡的算法,或者指定一些分區算法。kafka 提供了用戶自定義分區的方法,用戶可以為每一個消息指定一個partitionkey,通過這個key來實現一些hash 分區算法。

 

 http://www.aichengxu.com/view/4683767

 副本獲取器線程,主要定義了以下幾個方法:
1. handlePartitionsWithErrors:處理有錯誤(leader已經發生編程)的分區,當前什么都不做因為controller會應對這些變更
2. handleOffsetOutOfRange:處理一個位移越界的分區返回新的獲取位移值(fetch offset),具體邏輯如下:

獲取給定topic分區在該broker上的副本

獲取該分區leader的結束位移值,如果leader的結束位移值比該副本的結束位移還小的話,先判斷一下是否啟用了unclean leader選舉。若沒有啟用,直接報錯;否則就將follower副本的位移直接截取成leader的結束位移

若follower位移比leader的還小,直接截取所有位移並設置leader的初始位移處開始讀取leader

如果啟用了unclean leader選舉,那么就有可能出現這樣的情景:一個follower宕機了,而同時leader還在不停地寫入消息。當這個follower重啟回來的時候它需要完整地追上leader的進度。就在這個過程中,ISR中所有的副本都宕掉了。那么此時這個follower就會被unclean leader選舉為新的leader,然后它開始寫入從客戶端發來的消息。之后舊的leader恢復,成為了一個follower,它會發現當前leader的最新位移居然比自己的還要小。這種情況下,只能截斷自己的位移使之與當前leader的最新位移保持一致然后繼續處理。
3. processPartitionData:處理獲取到的數據。主要邏輯就是將給定的response數據解析出來並更新到該broker上的副本對象中,比如獲取到的消息集合以及更新高水位值

 

 

Replicated Logs

 

Kafka的partition可以看成是一個replicated log, 每個replica就是這個replicated log其中的一個log。多個replica是為了容忍機器故障,因此同一個partition的不同replica需要被分配到不同的broker上。所以,對於一個partition,broker id即可唯一代表一個replica,也被當作replica id。

 

為了一致性,Kafka在同一個partition的replicas中選出一個作為leader,由它接受client的所有讀寫請求,而其它的replica作為follower,從leader處拉取數據,leader作為唯一的"source of truth"。在有些情況下,follower會truncate自己的log(這個log和以下的log都是指"replicated log"這個概念里的log),然后重新從leader處抓取數據,以求與leader一致(下面會講到)。

 

leader和follower的角色區分,也主要是ReplicaManager來實現。具體地講

 

  • leader
    • leader會接受client的讀取請求和寫入請求。
    • leader需要接受follwer抓取message的請求,返回message給follower
    • leader需要維護ISR(in-sync replicas)列表。“保持同步”的含義有些復雜,0.9之前版本對這個概念的定義與0.9不同,詳情參見KIP-16 - Automated Replica Lag Tuning。0.9版本,broker的參數replica.lag.time.max.ms用來指定ISR的定義,如果leader在這么長時間沒收到follower的拉取請求,或者在這么長時間內,follower沒有fetch到leader的log end offset,就會被leader從ISR中移除。ISR是個很重要的指標,controller選取partition的leader replica時會使用它,因此leader選取ISR后會把結果記到Zookeeper上。
    • leader需要維護high watermark。high watermark以下的消息就是所有ISR列表里的replica都已經讀取的消息(注意,並不是所有replica都一定有這些消息,而只是ISR里的那些才肯定會有)。因此leader會根據follower拉取數據時提供的offset和ISR列表,決定HW,並且在返回給follower的請求中附帶最新的HW。
  • follower
    • follower需要不停地去leader處拉取最新的log
    • follower需要根據leader在fetch reponse中提供的HW,更新自己本地保存的leader的HW信息。在它過行leader或follower轉變時,會用到這個HW。

 

HW與LEO

 

HW、ISR以及leader對於partition這個多副本系統算是一種元數據。ISR和leader確要在controller和所有replica之間保持一致,HW需要在leader和follower之間保持一致,因為在leader轉換的時候,HW是安全線。

 

下面明確一下high watermark和log end offset在源碼里的意義

 

HW  high watermark  offset的數據小於被認為是commit的,注意,offset為high watermark的message並不是commit的。

 

LEO log end offset 這個replica的log里最后一條消息的下一條消息的offset

 

這些數據根據實際需求,以不同的方式在Kafka中傳遞:

 

  • HW。隨着follower的拉取進度的即時變化,HW是隨時在變化的。follower總是向leader請求自己已有messages的下一個offset開始的數據,因此當follower發出了一個fetch request,要求offset為A以上的數據,leader就知道了這個follower的log end offset至少為A。此時就可以統計下ISR里的所有replica的LEO是否已經大於了HW,如果是的話,就提高HW。同時,leader在fetch本地消息給follower時,也會在返回給follower的reponse里附帶自己的HW。這樣follower也就知道了leader處的HW(但是在實現中,follower獲取的只是讀leader本地log時的HW,並不能保證是最新的HW)。但是leader和follower的HW是不同步的,follower處記的HW可能會落后於leader。
  • ISR以及leader。 在需要選舉leader的場景下,leader和ISR是由controller決定的。在選出leader以后,ISR是leader決定。如果誰是leader和ISR只存在於ZK上,那么每個broker都需要在Zookeeper上監聽它host的每個partition的leader和ISR的變化,這樣效率比較低。如果不放在Zookeeper上,那么當controller fail以后,需要從所有broker上重新獲得這些信息,考慮到這個過程中可能出現的問題,也不靠譜。所以leader和ISR的信息存在於Zookeeper上,但是在變更leader時,controller會先在Zookeeper上做出變更,然后再發送LeaderAndIsrRequest給相關的broker。這樣可以在一個LeaderAndIsrRequest里包括這個broker上有變動的所有partition,即batch一批變更新信息給broker,更有效率。另外,在leader變更ISR時,會先在Zookeeper上做出變更,然后再修改本地內存中的ISR。

 

Hight Watermark Checkpoint

 

以外,由於HW是隨時變化的,如果即時更新到Zookeeper,會帶來效率的問題。而HW是如此重要,因此需要持久化,ReplicaManager就啟動了單獨的線程定期把所有的partition的HW的值記到文件中,即做highwatermark-checkpoint。

 

Epoch

 

 除了leader,ISR之外,在replica系統中還有其它三個對於一致性有重要作用的參數:controller epoch、leader epoch和zookeeper version。

 

  • controller epoch: 當新的controller開始工作后,舊的controller可能還在工作,這時就會有兩個自認為是的controller,那么broker該聽哪個的呢?cpmtroller epoch是一個整數,記在Zookeeper的/controller_epoch path的數據中,當新的controller當選后,它更新Zookeeper中的這個數據,把這個整數的值+1,並且以每個命令中都附帶上controller epoch。這樣broker收到一個controller的命令后,就與自己內存中保存的controller epoch比較,如果命令中的值小於內存中的值,就代表是舊的controller的命令,如果大於內存中的值,就更新內存中的controller epoch為新值,並且執行命令。
  • leader epoch: 對於同一個controller,也存在它的LeaderAndIsrRequest以錯誤的順序到達broker的可能,這樣broker就可以在檢查controller的epoch之后,再檢查leader epoch,以確認該執行哪個命令。
  • zkVersion 對於在zookeeper path中存儲的controller epoch, leaderAndIsr信息進行更新時,始終都得進行條件更新,以避免產生競態。比如,在controller讀取Zookeeper上的leaderAndIsr信息后,更新leaderAndIsr信息前,如果leader更改了ISR的信息,而controller以更改前的ISR進行leader選舉的話,就可能會產生異常狀態;或者在controller更新完leaderAndIsr之后,舊的leader又去更新zk上的這個數據,也會使集群不一致。所以,就需要zkVersion來進行條件變更。controller和replica在內存中存儲上一次狀態更新時讀取到的zkVersion,當它依據此狀態做出決定時,需要帶上這個zkVersion做條件更新,以保證根據舊狀態做出的更新不會生效。這種條件更新是使用的kafka.utils.ZkUtils的conditionalUpdatePersistentPath方法。

 

由這三個版本號共同作用,Kafka基本都保證對於leader, ISR, controller的認知在各個broker間不會出現大問題(但是還會有bug和潛在的問題導致認知不一致)。

 

update MetadataCache

 

此外,當broker收到UpdateMetadataRequest時,它會把這個request交給ReplicaManager處理,而ReplicaManager在確定UpdateMetadataRequest的controller epoch有效之后,就會交由MetadataCache來處理。之所以不直接收MetadataCache處理,可能是ReplicaManager處會保存controller epoch, 不過MetadataCache內部也可能獲取controller epoch,只是沒有做為單獨的一個field保存起來。這樣做顯得有些混亂,不知道是什么原因。

 

 

詳見

The Producer

負載均衡

1)producer可以自定義發送到哪個partition的路由規則。默認路由規則:hash(key)%numPartitions,如果key為null則隨機選擇一個partition。

2)自定義路由:如果key是一個user id,可以把同一個user的消息發送到同一個partition,這時consumer就可以從同一個partition讀取同一個user的消息。

異步批量發送

批量發送:配置不多於固定消息數目一起發送並且等待時間小於一個固定延遲的數據。

The Consumer

consumer控制消息的讀取。

Push vs Pull

1)producer push data to broker,consumer pull data from broker

2)consumer pull的優點:consumer自己控制消息的讀取速度和數量。

3)consumer pull的缺點:如果broker沒有數據,則可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有數據。

Consumer Position

1)大部分消息系統由broker記錄哪些消息被消費了,但Kafka不是。

2)Kafka由consumer控制消息的消費,consumer甚至可以回到一個old offset的位置再次消費消息。

Message Delivery Semantics

三種:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有個”acks“配置可以控制接收的leader的在什么情況下就回應producer消息寫入成功。

Consumer:

* 讀取消息,寫log,處理消息。如果處理消息失敗,log已經寫入,則無法再次處理失敗的消息,對應”At most once“。

* 讀取消息,處理消息,寫log。如果消息處理成功,寫log失敗,則消息會被處理兩次,對應”At least once“。

* 讀取消息,同時處理消息並把result和log同時寫入。這樣保證result和log同時更新或同時失敗,對應”Exactly once“。

Kafka默認保證at-least-once delivery,容許用戶實現at-most-once語義,exactly-once的實現取決於目的存儲系統,kafka提供了讀取offset,實現也沒有問題。

 

 

詳見

ü  controllerEpoch

為了防止先發的請求后到來導致broker數據不一致,所以使用版本管理數據,每次更換controller,epoch加1,所以broker永遠只響應本次請求中epoch>=上次請求epoch的請求。

 

 

ü  leaderEpoch

為了防止先發的請求后到來導致broker數據不一致,所以使用版本管理數據,每次選主更換leader,epoch加1,所以broker永遠只響應本次請求中epoch>=上次請求epoch的請求

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM