Kafka中的事務是怎么實現的?
Kafka中的事務可以使應用程序將消費消息、生產消息、提交消費位移當作原子操作來處理,同時成功或失敗,即使該生產或消費會跨多個分區。
生產者必須提供唯一的transactionalId,啟動后請求事務協調器獲取一個PID,transactionalId與PID一一對應。
每次發送數據給<Topic, Partition>前,需要先向事務協調器發送AddPartitionsToTxnRequest,事務協調器會將該<Transaction, Topic, Partition>存於__transaction_state內,並將其狀態置為BEGIN。
在處理完 AddOffsetsToTxnRequest 之后,生產者還會發送 TxnOffsetCommitRequest 請求給 GroupCoordinator,從而將本次事務中包含的消費位移信息 offsets 存儲到主題 __consumer_offsets 中
一旦上述數據寫入操作完成,應用程序必須調用KafkaProducer的commitTransaction方法或者abortTransaction方法以結束當前事務。無論調用 commitTransaction() 方法還是 abortTransaction() 方法,生產者都會向 TransactionCoordinator 發送 EndTxnRequest 請求。
TransactionCoordinator 在收到 EndTxnRequest 請求后會執行如下操作:
- 將 PREPARE_COMMIT 或 PREPARE_ABORT 消息寫入主題 __transaction_state
- 通過 WriteTxnMarkersRequest 請求將 COMMIT 或 ABORT 信息寫入用戶所使用的普通主題和 __consumer_offsets
- 將 COMPLETE_COMMIT 或 COMPLETE_ABORT 信息寫入內部主題 __transaction_state標明該事務結束
在消費端有一個參數isolation.level,設置為“read_committed”,表示消費端應用不可以看到尚未提交的事務內的消息。如果生產者開啟事務並向某個分區值發送3條消息 msg1、msg2 和 msg3,在執行 commitTransaction() 或 abortTransaction() 方法前,設置為“read_committed”的消費端應用是消費不到這些消息的,不過在 KafkaConsumer 內部會緩存這些消息,直到生產者執行 commitTransaction() 方法之后它才能將這些消息推送給消費端應用。反之,如果生產者執行了 abortTransaction() 方法,那么 KafkaConsumer 會將這些緩存的消息丟棄而不推送給消費端應用。
失效副本是指什么?有那些應對措施?
正常情況下,分區的所有副本都處於 ISR 集合中,但是難免會有異常情況發生,從而某些副本被剝離出 ISR 集合中。在 ISR 集合之外,也就是處於同步失效或功能失效(比如副本處於非存活狀態)的副本統稱為失效副本,失效副本對應的分區也就稱為同步失效分區,即 under-replicated 分區。
Kafka 從 0.9.x 版本開始就通過唯一的 broker 端參數 replica.lag.time.max.ms 來抉擇,當 ISR 集合中的一個 follower 副本滯后 leader 副本的時間超過此參數指定的值時則判定為同步失敗,需要將此 follower 副本剔除出 ISR 集合。replica.lag.time.max.ms 參數的默認值為10000。
在 0.9.x 版本之前,Kafka 中還有另一個參數 replica.lag.max.messages(默認值為4000),它也是用來判定失效副本的,當一個 follower 副本滯后 leader 副本的消息數超過 replica.lag.max.messages 的大小時,則判定它處於同步失效的狀態。它與 replica.lag.time.max.ms 參數判定出的失效副本取並集組成一個失效副本的集合,從而進一步剝離出分區的 ISR 集合。
Kafka 源碼注釋中說明了一般有這幾種情況會導致副本失效:
- follower 副本進程卡住,在一段時間內根本沒有向 leader 副本發起同步請求,比如頻繁的 Full GC。
- follower 副本進程同步過慢,在一段時間內都無法追趕上 leader 副本,比如 I/O 開銷過大。
- 如果通過工具增加了副本因子,那么新增加的副本在趕上 leader 副本之前也都是處於失效狀態的。
- 如果一個 follower 副本由於某些原因(比如宕機)而下線,之后又上線,在追趕上 leader 副本之前也處於失效狀態。
應對措施
我們用UnderReplicatedPartitions代表leader副本在當前Broker上且具有失效副本的分區的個數。
如果集群中有多個Broker的UnderReplicatedPartitions保持一個大於0的穩定值時,一般暗示着集群中有Broker已經處於下線狀態。這種情況下,這個Broker中的分區個數與集群中的所有UnderReplicatedPartitions(處於下線的Broker是不會上報任何指標值的)之和是相等的。通常這類問題是由於機器硬件原因引起的,但也有可能是由於操作系統或者JVM引起的 。
如果集群中存在Broker的UnderReplicatedPartitions頻繁變動,或者處於一個穩定的大於0的值(這里特指沒有Broker下線的情況)時,一般暗示着集群出現了性能問題,通常這類問題很難診斷,不過我們可以一步一步的將問題的范圍縮小,比如先嘗試確定這個性能問題是否只存在於集群的某個Broker中,還是整個集群之上。如果確定集群中所有的under-replicated分區都是在單個Broker上,那么可以看出這個Broker出現了問題,進而可以針對這單一的Broker做專項調查,比如:操作系統、GC、網絡狀態或者磁盤狀態(比如:iowait、ioutil等指標)。
多副本下,各個副本中的HW和LEO的演變過程
某個分區有3個副本分別位於 broker0、broker1 和 broker2 節點中,假設 broker0 上的副本1為當前分區的 leader 副本,那么副本2和副本3就是 follower 副本,整個消息追加的過程可以概括如下:
- 生產者客戶端發送消息至 leader 副本(副本1)中。
- 消息被追加到 leader 副本的本地日志,並且會更新日志的偏移量。
- follower 副本(副本2和副本3)向 leader 副本請求同步數據。
- leader 副本所在的服務器讀取本地日志,並更新對應拉取的 follower 副本的信息。
- leader 副本所在的服務器將拉取結果返回給 follower 副本。
- follower 副本收到 leader 副本返回的拉取結果,將消息追加到本地日志中,並更新日志的偏移量信息。
某一時刻,leader 副本的 LEO 增加至5,並且所有副本的 HW 還都為0。
之后 follower 副本(不帶陰影的方框)向 leader 副本拉取消息,在拉取的請求中會帶有自身的 LEO 信息,這個 LEO 信息對應的是 FetchRequest 請求中的 fetch_offset。leader 副本返回給 follower 副本相應的消息,並且還帶有自身的 HW 信息,如上圖(右)所示,這個 HW 信息對應的是 FetchResponse 中的 high_watermark。
此時兩個 follower 副本各自拉取到了消息,並更新各自的 LEO 為3和4。與此同時,follower 副本還會更新自己的 HW,更新 HW 的算法是比較當前 LEO 和 leader 副本中傳送過來的HW的值,取較小值作為自己的 HW 值。當前兩個 follower 副本的 HW 都等於0(min(0,0) = 0)。
接下來 follower 副本再次請求拉取 leader 副本中的消息,如下圖(左)所示。
此時 leader 副本收到來自 follower 副本的 FetchRequest 請求,其中帶有 LEO 的相關信息,選取其中的最小值作為新的 HW,即 min(15,3,4)=3。然后連同消息和 HW 一起返回 FetchResponse 給 follower 副本,如上圖(右)所示。注意 leader 副本的 HW 是一個很重要的東西,因為它直接影響了分區數據對消費者的可見性。
兩個 follower 副本在收到新的消息之后更新 LEO 並且更新自己的 HW 為3(min(LEO,3)=3)。
Kafka在可靠性方面做了哪些改進?(HW, LeaderEpoch)
HW
HW 是 High Watermark 的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 之前的消息。
分區 ISR 集合中的每個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即為分區的 HW,對消費者而言只能消費 HW 之前的消息。
leader epoch
leader epoch 代表 leader 的紀元信息(epoch),初始值為0。每當 leader 變更一次,leader epoch 的值就會加1,相當於為 leader 增設了一個版本號。
每個副本中還會增設一個矢量 <LeaderEpoch => StartOffset>,其中 StartOffset 表示當前 LeaderEpoch 下寫入的第一條消息的偏移量。
假設有兩個節點A和B,B是leader節點,里面的數據如圖:
A發生重啟,之后A不是先忙着截斷日志而是先發送OffsetsForLeaderEpochRequest請求給B,B作為目前的leader在收到請求之后會返回當前的LEO(LogEndOffset,注意圖中LE0和LEO的不同),與請求對應的響應為OffsetsForLeaderEpochResponse。如果 A 中的 LeaderEpoch(假設為 LE_A)和 B 中的不相同,那么 B 此時會查找 LeaderEpoch 為 LE_A+1 對應的 StartOffset 並返回給 A
如上圖所示,A 在收到2之后發現和目前的 LEO 相同,也就不需要截斷日志了,以此來保護數據的完整性。
再如,之后 B 發生了宕機,A 成為新的 leader,那么對應的 LE=0 也變成了 LE=1,對應的消息 m2 此時就得到了保留。后續的消息都可以以 LE1 為 LeaderEpoch 陸續追加到 A 中。這個時候A就會有兩個LE,第二LE所記錄的Offset從2開始。如果B恢復了,那么就會從A中獲取到LE+1的Offset為2的值返回給B。
再來看看LE如何解決數據不一致的問題:
當前 A 為 leader,B 為 follower,A 中有2條消息 m1 和 m2,而 B 中有1條消息 m1。假設 A 和 B 同時“掛掉”,然后 B 第一個恢復過來並成為新的 leader。
之后 B 寫入消息 m3,並將 LEO 和 HW 更新至2,如下圖所示。注意此時的 LeaderEpoch 已經從 LE0 增至 LE1 了。
緊接着 A 也恢復過來成為 follower 並向 B 發送 OffsetsForLeaderEpochRequest 請求,此時 A 的 LeaderEpoch 為 LE0。B 根據 LE0 查詢到對應的 offset 為1並返回給 A,A 就截斷日志並刪除了消息 m2,如下圖所示。之后 A 發送 FetchRequest 至 B 請求來同步數據,最終A和B中都有兩條消息 m1 和 m3,HW 和 LEO都為2,並且 LeaderEpoch 都為 LE1,如此便解決了數據不一致的問題。
為什么Kafka不支持讀寫分離?
因為這樣有兩個明顯的缺點:
- 數據一致性問題。數據從主節點轉到從節點必然會有一個延時的時間窗口,這個時間窗口會導致主從節點之間的數據不一致。
- 延時問題。數據從寫入主節點到同步至從節點中的過程需要經歷網絡→主節點內存→主節點磁盤→網絡→從節點內存→從節點磁盤這幾個階段。對延時敏感的應用而言,主寫從讀的功能並不太適用。
對於Kafka來說,必要性不是很高,因為在Kafka集群中,如果存在多個副本,經過合理的配置,可以讓leader副本均勻的分布在各個broker上面,使每個 broker 上的讀寫負載都是一樣的。
Kafka中的延遲隊列怎么實現
在發送延時消息的時候並不是先投遞到要發送的真實主題(real_topic)中,而是先投遞到一些 Kafka 內部的主題(delay_topic)中,這些內部主題對用戶不可見,然后通過一個自定義的服務拉取這些內部主題中的消息,並將滿足條件的消息再投遞到要發送的真實的主題中,消費者所訂閱的還是真實的主題。
如果采用這種方案,那么一般是按照不同的延時等級來划分的,比如設定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour這些按延時時間遞增的延時等級,延時的消息按照延時時間投遞到不同等級的主題中,投遞到同一主題中的消息的延時時間會被強轉為與此主題延時等級一致的延時時間,這樣延時誤差控制在兩個延時等級的時間差范圍之內(比如延時時間為17s的消息投遞到30s的延時主題中,之后按照延時時間為30s進行計算,延時誤差為13s)。雖然有一定的延時誤差,但是誤差可控,並且這樣只需增加少許的主題就能實現延時隊列的功能。
發送到內部主題(delay_topic_*)中的消息會被一個獨立的 DelayService 進程消費,這個 DelayService 進程和 Kafka broker 進程以一對一的配比進行同機部署(參考下圖),以保證服務的可用性。
針對不同延時級別的主題,在 DelayService 的內部都會有單獨的線程來進行消息的拉取,以及單獨的 DelayQueue(這里用的是 JUC 中 DelayQueue)進行消息的暫存。與此同時,在 DelayService 內部還會有專門的消息發送線程來獲取 DelayQueue 的消息並轉發到真實的主題中。從消費、暫存再到轉發,線程之間都是一一對應的關系。如下圖所示,DelayService 的設計應當盡量保持簡單,避免鎖機制產生的隱患。
為了保障內部 DelayQueue 不會因為未處理的消息過多而導致內存的占用過大,DelayService 會對主題中的每個分區進行計數,當達到一定的閾值之后,就會暫停拉取該分區中的消息。
因為一個主題中一般不止一個分區,分區之間的消息並不會按照投遞時間進行排序,DelayQueue的作用是將消息按照再次投遞時間進行有序排序,這樣下游的消息發送線程就能夠按照先后順序獲取最先滿足投遞條件的消息。
Kafka中怎么實現死信隊列和重試隊列?
死信可以看作消費者不能處理收到的消息,也可以看作消費者不想處理收到的消息,還可以看作不符合處理要求的消息。比如消息內包含的消息內容無法被消費者解析,為了確保消息的可靠性而不被隨意丟棄,故將其投遞到死信隊列中,這里的死信就可以看作消費者不能處理的消息。再比如超過既定的重試次數之后將消息投入死信隊列,這里就可以將死信看作不符合處理要求的消息。
重試隊列其實可以看作一種回退隊列,具體指消費端消費消息失敗時,為了防止消息無故丟失而重新將消息回滾到 broker 中。與回退隊列不同的是,重試隊列一般分成多個重試等級,每個重試等級一般也會設置重新投遞延時,重試次數越多投遞延時就越大。
理解了他們的概念之后我們就可以為每個主題設置重試隊列,消息第一次消費失敗入重試隊列 Q1,Q1 的重新投遞延時為5s,5s過后重新投遞該消息;如果消息再次消費失敗則入重試隊列 Q2,Q2 的重新投遞延時為10s,10s過后再次投遞該消息。
然后再設置一個主題作為死信隊列,重試越多次重新投遞的時間就越久,並且需要設置一個上限,超過投遞次數就進入死信隊列。重試隊列與延時隊列有相同的地方,都需要設置延時級別。
Kafka中怎么做消息審計?
消息審計是指在消息生產、存儲和消費的整個過程之間對消息個數及延遲的審計,以此來檢測是否有數據丟失、是否有數據重復、端到端的延遲又是多少等內容。
目前與消息審計有關的產品也有多個,比如 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它們主要通過在消息體(value 字段)或在消息頭(headers 字段)中內嵌消息對應的時間戳 timestamp 或全局的唯一標識 ID(或者是兩者兼備)來實現消息的審計功能。
內嵌 timestamp 的方式主要是設置一個審計的時間間隔 time_bucket_interval(可以自定義設置幾秒或幾分鍾),根據這個 time_bucket_interval 和消息所屬的 timestamp 來計算相應的時間桶(time_bucket)。
內嵌 ID 的方式就更加容易理解了,對於每一條消息都會被分配一個全局唯一標識 ID。如果主題和相應的分區固定,則可以為每個分區設置一個全局的 ID。當有消息發送時,首先獲取對應的 ID,然后內嵌到消息中,最后才將它發送到 broker 中。消費者進行消費審計時,可以判斷出哪條消息丟失、哪條消息重復。
Kafka中怎么做消息軌跡?
消息軌跡指的是一條消息從生產者發出,經由 broker 存儲,再到消費者消費的整個過程中,各個相關節點的狀態、時間、地點等數據匯聚而成的完整鏈路信息。生產者、broker、消費者這3個角色在處理消息的過程中都會在鏈路中增加相應的信息,將這些信息匯聚、處理之后就可以查詢任意消息的狀態,進而為生產環境中的故障排除提供強有力的數據支持。
對消息軌跡而言,最常見的實現方式是封裝客戶端,在保證正常生產消費的同時添加相應的軌跡信息埋點邏輯。無論生產,還是消費,在執行之后都會有相應的軌跡信息,我們需要將這些信息保存起來。
我們同樣可以將軌跡信息保存到 Kafka 的某個主題中,比如下圖中的主題 trace_topic。
生產者在將消息正常發送到用戶主題 real_topic 之后(或者消費者在拉取到消息消費之后)會將軌跡信息發送到主題 trace_topic 中。
怎么計算Lag?(注意read_uncommitted和read_committed狀態下的不同)
如果消費者客戶端的 isolation.level 參數配置為“read_uncommitted”(默認),它對應的 Lag 等於HW – ConsumerOffset 的值,其中 ConsumerOffset 表示當前的消費位移。
如果這個參數配置為“read_committed”,那么就要引入 LSO 來進行計算了。LSO 是 LastStableOffset 的縮寫,它對應的 Lag 等於 LSO – ConsumerOffset 的值。
- 首先通過 DescribeGroupsRequest 請求獲取當前消費組的元數據信息,當然在這之前還會通過 FindCoordinatorRequest 請求查找消費組對應的 GroupCoordinator。
- 接着通過 OffsetFetchRequest 請求獲取消費位移 ConsumerOffset。
- 然后通過 KafkaConsumer 的 endOffsets(Collection partitions)方法(對應於 ListOffsetRequest 請求)獲取 HW(LSO)的值。
- 最后通過 HW 與 ConsumerOffset 相減得到分區的 Lag,要獲得主題的總體 Lag 只需對旗下的各個分區累加即可。
Kafka有哪些指標需要着重關注?
比較重要的 Broker 端 JMX 指標:
- BytesIn/BytesOut:即 Broker 端每秒入站和出站字節數。你要確保這組值不要接近你的網絡帶寬,否則這通常都表示網卡已被“打滿”,很容易出現網絡丟包的情形。
- NetworkProcessorAvgIdlePercent:即網絡線程池線程平均的空閑比例。通常來說,你應該確保這個 JMX 值長期大於 30%。如果小於這個值,就表明你的網絡線程池非常繁忙,你需要通過增加網絡線程數或將負載轉移給其他服務器的方式,來給該 Broker 減負。
- RequestHandlerAvgIdlePercent:即 I/O 線程池線程平均的空閑比例。同樣地,如果該值長期小於 30%,你需要調整 I/O 線程池的數量,或者減少 Broker 端的負載。
- UnderReplicatedPartitions:即未充分備份的分區數。所謂未充分備份,是指並非所有的 Follower 副本都和 Leader 副本保持同步。一旦出現了這種情況,通常都表明該分區有可能會出現數據丟失。因此,這是一個非常重要的 JMX 指標。
- ISRShrink/ISRExpand:即 ISR 收縮和擴容的頻次指標。如果你的環境中出現 ISR 中副本頻繁進出的情形,那么這組值一定是很高的。這時,你要診斷下副本頻繁進出 ISR 的原因,並采取適當的措施。
- ActiveControllerCount:即當前處於激活狀態的控制器的數量。正常情況下,Controller 所在 Broker 上的這個 JMX 指標值應該是 1,其他 Broker 上的這個值是 0。如果你發現存在多台 Broker 上該值都是 1 的情況,一定要趕快處理,處理方式主要是查看網絡連通性。這種情況通常表明集群出現了腦裂。腦裂問題是非常嚴重的分布式故障,Kafka 目前依托 ZooKeeper 來防止腦裂。但一旦出現腦裂,Kafka 是無法保證正常工作的。
Kafka的那些設計讓它有如此高的性能?
- 分區
kafka是個分布式集群的系統,整個系統可以包含多個broker,也就是多個服務器實例。每個主題topic會有多個分區,kafka將分區均勻地分配到整個集群中,當生產者向對應主題傳遞消息,消息通過負載均衡機制傳遞到不同的分區以減輕單個服務器實例的壓力。
一個Consumer Group中可以有多個consumer,多個consumer可以同時消費不同分區的消息,大大的提高了消費者的並行消費能力。但是一個分區中的消息只能被一個Consumer Group中的一個consumer消費。
-
網絡傳輸上減少開銷
批量發送:
在發送消息的時候,kafka不會直接將少量數據發送出去,否則每次發送少量的數據會增加網絡傳輸頻率,降低網絡傳輸效率。kafka會先將消息緩存在內存中,當超過一個的大小或者超過一定的時間,那么會將這些消息進行批量發送。
端到端壓縮:
當然網絡傳輸時數據量小也可以減小網絡負載,kafaka會將這些批量的數據進行壓縮,將一批消息打包后進行壓縮,發送broker服務器后,最終這些數據還是提供給消費者用,所以數據在服務器上還是保持壓縮狀態,不會進行解壓,而且頻繁的壓縮和解壓也會降低性能,最終還是以壓縮的方式傳遞到消費者的手上。 -
順序讀寫
kafka將消息追加到日志文件中,利用了磁盤的順序讀寫,來提高讀寫效率。 -
零拷貝技術
零拷貝將文件內容從磁盤通過DMA引擎復制到內核緩沖區,而且沒有把數據復制到socket緩沖區,只是將數據位置和長度信息的描述符復制到了socket緩存區,然后直接將數據傳輸到網絡接口,最后發送。這樣大大減小了拷貝的次數,提高了效率。kafka正是調用linux系統給出的sendfile系統調用來使用零拷貝。Java中的系統調用給出的是FileChannel.transferTo接口。
5. 優秀的文件存儲機制
如果分區規則設置得合理,那么所有的消息可以均勻地分布到不同的分區中,這樣就可以實現水平擴展。不考慮多副本的情況,一個分區對應一個日志(Log)。為了防止 Log 過大,Kafka 又引入了日志分段(LogSegment)的概念,將 Log 切分為多個 LogSegment,相當於一個巨型文件被平均分配為多個相對較小的文件,這樣也便於消息的維護和清理。
Kafka 中的索引文件以稀疏索引(sparse index)的方式構造消息的索引,它並不保證每個消息在索引文件中都有對應的索引項。每當寫入一定量(由 broker 端參數 log.index.interval.bytes 指定,默認值為4096,即 4KB)的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項,增大或減小 log.index.interval.bytes 的值,對應地可以增加或縮小索引項的密度。