一、kafka的存儲機制
(1)存儲機制:Kafka 中消息是以 topic 進行分類的,生產者生產消息,消費者消費消息,都是面向 topic的。
topic 是邏輯上的概念,而 partition 是物理上的概念,每個 partition 對應於一個 log 文件,該 log 文件中存儲的就是 producer 生產的數據。Producer 生產的數據會被不斷追加到該log 文件末端,且每條數據都有自己的 offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個 offset,以便出錯恢復時,從上次的位置繼續消費。 數據存儲示意圖如下:
1、數據存儲圖解釋解釋:由於生產者生產的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment對應兩個文件——“.index”文件和“.log”文件。這些文件位於一個文件夾下,該文件夾的命名規則為:topic 名稱+分區序號。例如,first 這個 topic 有三個分區,則其對應的文件夾為 first-0,first-1,first-2。
2、index 和 log 文件以當前 segment 的第一條消息的 offset 命名。下圖為 index 文件和 log 文件的結構示意圖。其中 “.index”文件存儲大量的索引信息,“.log”文件存儲大量的數據,索引文件中的元 數據指向對應數據文件中 message 的物理偏移地址。
(2)分區策略
1、分區原因:
a、方便在集群中擴展,每個 Partition 可以通過調整以適應它所在的機器,而一個 topic又可以有多個 Partition 組成,因此整個集群就可以適應任意大小的數據了;
b、可以提高並發,因為可以以 Partition 為單位讀寫了。
2、分區器:首先需要將 producer 發送的數據封裝成一個 ProducerRecord 對象。
a、若ProducerRecord對象指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
b、沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取模得到 partition 值;
c、既沒有 partition 值又沒有 key 值的情況下,第一次調用時隨機生成一個整數(后面每次調用在這個整數上自增),將這個值與 topic 可用的 partition 總數取余得到 partition 值,也就是常說的 round-robin 算法。
(3)數據的可靠性保證即kafka的ACK機制
為保證 producer 發送的數據,能可靠的發送到指定的 topic,topic 的每個 partition 收到producer 發送的數據后,都需要向 producer 發送 ack(acknowledgement 確認收到),如果producer 收到 ack,就會進行下一輪的發送,否則重新發送數據。
1、kafka何時發送ack確認給生產者已確定收到數據:確保有follower與leader同步完成,leader再發送ack,這樣才能保證leader掛掉后,能在follower選舉出新的leader。
2、副本同步策略即多少個follower同步完成后發送ack:全部follower完成同步,才發送ack;優點:選舉新的 leader 時,容忍 n 台節點的故障,需要 n+1 個副本,缺點:延遲高
(4)ISR同步策略:
1、引入ISR的原因:采用全部follower完成同步才發送ack;當leader收到數據,所有 follower 都開始同步數據,但有一個 follower,因為某種故障,遲遲不能與 leader 進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack。
2、Leader 維護了一個動態的 in-sync replica set (ISR),意為和 leader 保持同步的 follower 集合。當 ISR 中的 follower 完成數據的同步之后,leader 就會給 生產者發送 ack。如果 follower長時間未 向 leader 同 步 數 據 , 則 該 follower 將 被 踢 出 ISR , 該 時 間 閾 值 由replica.lag.time.max.ms 參數設定。Leader 發生故障之后,就會從 ISR 中選舉新的 leader。
(5)kafka的ack應答機制提供了以下三種可靠機制,用戶根據對可靠性和延遲的要求進行權衡,
1、ack=0時:producer 不等待broker的ack,這一操作提供了一個最低的延遲,生產者每條消息只會被發送一次,若broker故障則可能會丟失數據
2、ack=1時:producer 等待broker的ack,partition 的 leader 落盤成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么將會丟失數據;
3、ack=-1時:producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功后才返回 ack。但是如果在 follower 同步完成后,broker 發送 ack 之前,leader 發生故障,那么會造成數據重復。
(6)leader和follower中的HW與LEO
1、LEO(Log End Offset):每個副本最大的offset
2、HW(High WaterMark):消費者能見到的最大的offset,ISR集合中最小的LEO
3、LEO與HW所解決的問題
a、follower故障:follower 發生故障后會被臨時踢出 ISR,待 該 follower 恢復后,follower 會讀取本地磁盤記錄的上次的 HW,並將 log 文件高於 HW 的部分截取掉,從 HW 開始向 leader 進行同步。等該 follower 的 LEO 大於等於 該Partition 的 HW(leader的HW),即 follower 追上 leader 之后,就可以重新加入 ISR 了。
b、leader故障:leader 發生故障之后,會從 ISR 中選出一個新的 leader,之后,為保證多個副本之間的數據一致性,其余的 follower 會先將各自的 log 文件高於 HW 的部分截掉,然后從新的 leader同步數據。
注意:這只能保證副本之間的數據一致性,並不能保證數據不丟失或者不重復。
4、如何解決HW進行數據恢復時可能存在的數據丟失和重復的問題:引入Lead Epoch,(epoch,offset)。epoch表示leader的版本號,從0開始,當leader變更過1次時epoch就會+1,而offset則對應於該epoch版本的leader寫入第一條消息的位移。因此假設有兩對值:(0, 0),(1, 120),表示第一個leader從位移0開始寫入消息;共寫了120條[0, 119];而第二個leader版本號是1,從位移120處開始寫入消息。leader broker中會保存這樣的一個緩存,並定期地寫入到一個checkpoint文件中。當leader寫底層log時它會嘗試更新整個緩存——如果這個leader首次寫消息,則會在緩存中增加一個記錄;否則就不做更新。而每次副本重新成為leader時會查詢這部分緩存,獲取出對應leader版本的位移,這就不會發生數據不一致和丟失的情況。
(6)kafka的Exactly Once語義
1、At Least Once 語義:將服務器的 ACK 級別設置為-1,可以保證 Producer 到 Server 之間不會丟失數據,但是不能保證數據不重復
2、At Most Once 語義:將服務器 ACK 級別設置為 0,可以保證生產者每條消息只會被發送一次;可以保證數據不重復,但是不能保證數據不丟失
3、Exactly Once語義:對於一些非常重要的信息,比如說交易數據,下游數據消費者要求數據既不重復也不丟失,即 Exactly Once 語義。0.11 版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發送多少次重復數據,Server 端都只會持久化一條。冪等性結合 At Least Once 語義,就構成了 Kafka 的 Exactly Once 語義
4、Exactly Once實現原理:首先啟用冪等性,將 Producer 的參數中 enable.idompotence 設置為 true 。Kafka的冪等性實現其實就是將原來下游需要做的去重放在了數據上游。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID,發往同一 Partition 的消息會附帶 Sequence Number。而Broker 端會對<PID, Partition, SeqNumber>做緩存,當具有相同主鍵的消息提交時,Broker 只會持久化一條。 但是 PID 重啟就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區跨會話的 Exactly Once。
二、kafka的消費者
(1)消費方式
1、kafka采用的消費模式pull:consumer采用pull(拉)的方式從broker中讀取數據,若采用push(推)模式很難適應消費速率不同的消費者,因為消息發送是由broker決定的,盡管push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及消費消息,從而導致消息堆積。而pull模式可以根據consumer的消費能力以適當的速率消費消息。
2、pull模式的不足:若kafka沒有數據,消費者可能陷入循環之中,一直返回空數據,此時kafka消費者在消費數據時會傳入一個時長參數 timeout,如果當前沒有數據可供消費,consumer會等待一段時間之后再返回,這段時長即為 timeout。
(2)kafka的消費分區策略即一個消費者如何消費分區中的數據
1、前提:一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。
2、分區策略1:Range
1)分區方式:按照消費者總數和分區總數進行整除運算來獲得一個跨度,然后將分區按照跨度進行平均分配,以保證分區盡可能均勻地分配給所有的消費者。對於每一個topic,Range策略會將消費組內所有訂閱這個topic的消費者按照名稱的字典序排序,然后為每個消費者划分固定的分區范圍,如果不夠平均分配,那么字典序靠前的消費者會被多分配一個分區。即n=分區數/消費者數量,m=分區數%消費者數量,那么前m個消費者每個分配n+1個分區,后面的(消費者數量-m)個消費者每個分配n個分區。
2)分區過程如下:
a、假設目前只有一個消費者C0,訂閱了一個主題,這個主題包含7個分區。如下圖1。
b、此時消費組內又加入了一個新的消費者C1,按照既定的邏輯需要將原來7個分區重新分配給C0和C1,如下圖2。
c、接着消費組內又加入了一個新的消費者C2,如此消費者C0、C1和C2按照下圖3中的方式各自負責消費所分配到的分區。
3)缺點:針對多個主題的不同分區,可能會造成分區不均勻。
假設消費組中有2個消費者C0和C1,都訂閱了主題t0和t1,並且每個主題都有3個分區,那么所訂閱的所有分區可以標識為:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最終的分配結果為:消費者C0:t0p0、t0p1、t1p0、t1p1,消費者C1:t0p2、t1p2。明顯消費者消費不均勻。
3、分區策略2:RoundRobin
1)分區方式:原理是將消費組內所有消費者以及消費者所訂閱的所有topic的partition按照字典序排序,然后通過輪詢方式逐個將分區以此分配給每個消費者。
假設消費組中有2個消費者C0和C1,都訂閱了主題t0和t1,並且每個主題都有3個分區,那么所訂閱的所有分區可以標識為:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最終的分配結果為:
消費者C0:t0p0、t0p2、t1p1;
消費者C1:t0p1、t1p0、t1p2。
2)缺點:如果同一個消費組內的消費者所訂閱的信息是不相同的,那么在執行分區分配的時候就不是完全的輪詢分配,有可能會導致分區分配的不均勻。如果某個消費者沒有訂閱消費組內的某個topic,那么在分配分區的時候此消費者將分配不到這個topic的任何分區。
假設消費組內有3個消費者C0、C1和C2,消費者C0訂閱的是主題t0,消費者C1訂閱的是主題t0和t1,消費者C2訂閱的是主題t0、t1和t2,t0、t1、t2,這3個主題分別有1、2、3個分區,最終的分配結果為:
消費者C0:t0p0;
消費者C1:t1p0;
消費者C2:t1p1、t2p0、t2p1、t2p2;
3、分區策略3:Sticky
1)分區方式:分區的分配要盡可能的均勻;分區的分配盡可能的與上次分配的保持相同。當兩者發生沖突時,第一個目標優先於第二個目標
舉例,同樣消費組內有3個消費者:C0、C1和C2,集群中有3個主題:t0、t1和t2,這3個主題分別有1、2、3個分區,也就是說集群中有t0p0、t1p0、t1p1、t2p0、t2p1、t2p2這6個分區。消費者C0訂閱了主題t0,消費者C1訂閱了主題t0和t1,消費者C2訂閱了主題t0、t1和t2。
采用的是StickyAssignor策略,那么最終的分配結果為:
消費者C0:t0p0;
消費者C1:t1p0、t1p1;
消費者C2:t2p0、t2p1、t2p2;
假如此時消費者C0脫離了消費組,則StickyAssignor分配結果為:
消費者C1:t1p0、t1p1、t0p0;
消費者C2:t2p0、t2p1、t2p2;
可以看到StickyAssignor策略保留了消費者C1和C2中原有的5個分區的分配:t1p0、t1p1、t2p0、t2p1、t2p2。
(3)consumer消費時offset的情況:Kafka0.9版本之前,consumer默認將 offset保存在 Zookeeper中,從0.9版本開始,consumer默認將offset保存在Kafka一個內置的topic 中,該topic為__consumer_offsets。 offset的存儲格式:(groupID,topic,partitionID)
對於消費者,kafka中有兩個設置的地方:對於老的消費者,由--zookeeper參數設置;對於新的消費者,由--bootstrap-server參數設置
如果使用了--zookeeper參數,那么consumer的消費信息如offset將會存放在zk之中
如果使用了--bootstrap-server參數,那么consumer的消費信息如offset將會存放在kafka之中
(4)kafka如何高效讀寫數據
1、順序寫磁盤:Kafka的producer生產數據,要寫入到log文件中,寫的過程是一直追加到文件末端,為順序寫。隨機寫會導致磁頭不停地換道,造成效率的極大降低;順序寫磁頭幾乎不用換道,或者換道的時間很短,省去了大量磁頭尋址的時間。
2、零復制技術:零拷貝避免讓CPU做大量的數據拷貝任務,減少不必要的拷貝,讓CPU解脫出來專注於別的任務。
1)傳統讀取文件並發送到網絡的步驟
a、操作系統將數據從磁盤文件中讀取到內核空間的頁面緩存;
b、應用程序將數據從內核空間讀入用戶空間緩沖區;
c、應用程序將讀到數據寫回內核空間並放入socket緩沖區;
e、數據從socket緩沖區通過網絡發送。
“零拷貝技術”只用將磁盤文件的數據復制到頁面緩存中一次,然后將數據拷貝到socket
緩沖區中,最后直接發送到網絡中(發送給不同的訂閱者時,都可以使用同一個頁面緩存),避免了重復復制操作。
(5)zookeeper在kafka中的作用?
kafka使用zookeeper來保存broker,topic和partition的元數據信息,在kafka0.9版本之前消費者使用kafka保存消費分區的偏移量。
1、broker注冊:Broker是分布式部署並且相互之間相互獨立,但是需要有一個注冊系統能夠將整個集群中的Broker管理起來,此時就使用到了Zookeeper。在Zookeeper上會有一個專門用來進行Broker服務器列表記錄的節點:/brokers/ids;每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬於自己的節點,如/brokers/ids/[0...N]。Kafka使用了全局唯一的數字來指代每個Broker服務器,不同的Broker必須使用不同的Broker ID進行注冊,創建完節點后,每個Broker就會將自己的IP地址和端口信息記錄到該節點中去。其中,Broker創建的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。
2、topic和partition注冊:在Kafka中,同一個Topic的消息會被分成多個分區並將其分布在多個Broker上,這些分區信息及與Broker的對應關系也都是由Zookeeper在維護,由專門的節點來記錄,如:/borkers/topics;如/brokers/topics/login/3->2,這個節點表示Broker ID為3的一個Broker服務器,對於"login"這個Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。
3、保存消費者分區的offset。
(6)kafka的事務特性
1、Produce事務:期望一個 Producer 在 Fail 恢復后能主動 abort 上次未完成的事務(接上之前未完成的事務),然后重新開始一個事務,這種情況應該怎么辦?之前冪等性引入的 PID 是無法解決這個問題的,因為每次 Producer 在重啟時,PID 都會更新為一個新值。為了實現跨分區跨會話的事務,Kafka 在 Producer 端引入了一個 TransactionalId 來解決這個問題。TransactionalId 的引入還有一個好處,就是跟 consumer group 類似,它可以用來標識一個事務操作,便於這個事務的所有操作都能在一個地方進行處理;
為了管理 Transaction,Kafka 引入了一個新的組件 Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應的任務狀態。Transaction Coordinator 還負責將事務所有寫入 Kafka 的一個內部 Topic,這樣即使整個服務重啟,由於事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行。
2、consumer的事務特性:對於 Consumer 而言,事務的保證就會相對較弱,尤其時無法保證 Commit 的信息被精確消費。這是由於 Consumer 可以通過 offset 訪問任意信息,而且不同的 Segment File 生命周期不同,同一事務的消息可能會出現重啟后被刪除的情況。