Kafka生產者——發送原理分析


整體架構

生產過程由兩個線程協調運行,分別為主線程和sender線程(發送線程)。

主線程中,由KafkaProducer創建消息,然后通過可能的攔截器、序列化器和分區器的作用,緩存消息到消息加載器(RecordAccumulator,也稱為消息收集器)中,Sender線程負責從消息加載器(RecordAccumulator)中獲取消息並將其發送到Kafka中。

消息加載器

消息加載器(RecordAccumulator)主要用來緩存消息以便Sender線程可以批量發送,進而減少網絡傳輸的資源消耗以提升性能。
消息加載器(RecordAccumulator)緩存的大小可以通過生產者參數buffer.memory配置,默認值為33444432b,即32mb。
如果生產者發送消息的速度大於發送到服務器的速度,也就是RecordAccumulator緩存不夠,此時kafkaproducer的send方法調用要被被阻塞,要么拋出異常,這個取決於參數max.block.ms參數,此參數的默認值為60000ms,60s。

主線程發送過來的消息會被追加到RecordAccumulator的某個雙端隊列中,在RecordAccumulator內部為每個分區都維護了一個雙端隊列,隊列中的內容就是ProducerBatch,即Deque 。消息寫入緩存時,追加到雙端隊列的尾部,sender讀取消息的時候,從雙端隊列的頭部讀取。
注意!ProducerBath不是ProducerRecord,ProducerBatch中可以包含一個或多個ProducerRecord。通俗的說ProducerRecord是生產者中創建的消息,而ProducerBatch是指一個消息批次,ProducerRecord會被包含在ProdicerBatch中。較小的ProducerRecord拼湊成一個較大的PeoducerBatch,可以減少網絡請求的次數提升整體的吞吐量。

消息在網絡上以字節的形式傳輸,在發送之前需要創建一塊內存區域來保存對應消息,在kafka生產端配置中,使用java.io.ByteBuffer來實現消息內存的創建和釋放。頻繁的創建和釋放是消耗資源的,在RecordAccumulator內部還有一個BufferPool,主要用來實現ByteBuffer的復用,實現緩存的高效利用。而BufferPool只對特定大小的ByteBuffer進行管理,其他大小的ByteBuffer不會緩存進BufferPool中,我們可以通過調整batch.size參數,以便多緩存消息。

ProducerBatch大小和batch.size參數也有密切聯系。當一條消息(ProducerRecord)流入RecordAccumulator時,會先尋找與消息分區相對應的雙端隊列(沒有則新建),查看Producer中是否還可以寫入這個ProducerRrcord,如果可以則寫入,如果不可以則需要創建一個新的ProducerBatch。在新建ProducerBatch時評估這條消息的大小是否超過batch.size的大小,如果不超過,就以batch.size的大小創建ProducerBatch,這樣在使用完這段內存區域之后,可以通過BufferPool的管理來進行復用;如果超過,就以評估大小來創建ProducerBatch,這段內存區域不會被復用,

Sender從RecordAccumulator中獲取緩存的消息之后,會進一步將原本<分區,Deque >的保存形式轉變成<Node,List >的形式,其中Node表示broker結點,對於網絡連接來說,生產者客戶端是與具體的broker結點建立的鏈接,也就是向具體的broker結點發送消息,並不關心消息屬於哪個分區;而對於KafkaProducer的應用邏輯來說,我們只關注向哪個分區發送那些消息,所以這里是一個應用邏輯層的網絡IO層面的轉換。

在轉換成<Node,List >的形式之后,sender還會進一步封裝成<Node,Request>的形式,這樣就可以將Requst的請求發各個Node了,這里的Request實施Kafka的各種協議請求,對於消息發送而言是指具體的ProducerRequest。

請求在從sender發往Kafka之前還會保存到InFlightRequest中,InFlightRequest保存對象的具體形式為Map<Nodeid,deque >,它的主要作用是緩存了已經發出去但還沒有收到響應的請求。InFlightRequest還提供了很多管理類的方法,通過參數配置可以限制每個鏈接(也就是客戶端和Node之間 的鏈接)最多緩存的請求數,參數為:max.in.flight.request.per.com=nnection,默認值為5,即每個連接最多只能緩存5個未響應的請求,超過這個數值之后就不能再向這個連接發送更多的請求了,除非有緩存的請求收到了響應。
相關參數:
batch.size:只有數據積累到 batch.size 之后,sender 才會發送數據。
linger.ms:如果數據遲遲未達到 batch.size,sender 等待 linger.time 之后就會發送數據

生產者客戶端可靠性保證

  • 1、(可選,在效率和可靠性之間進行取舍,配置參數acks)為保證 producer 發送的數據,能可靠的發送到指定的 topic,topic 的每個 partition 收到producer 發送的數據后,都需要向 producer 發送 ack(acknowledgement 確認收到),如果producer 收到 ack,就會進行下一輪的發送,否則重新發送數據。
  • 2、設想以下情景:leader 收到數據,所有 follower 都開始同步數據,但有一個 follower,因為某種故障,遲遲不能與 leader 進行同步,那 leader 就要一直等下去,直到它完成同步,才能發送 ack。這個問題怎么解決呢?
    Leader 維護了一個動態的 in-sync replica set (ISR),意為和 leader 保持同步的 follower 集合。當 ISR 中的 follower 完成數據的同步之后,leader 就會給 follower 發送 ack。如果 follower長時間未 向 leader 同 步 數 據 , 則 該 follower 將 被 踢 出 ISR , 該 時 間 閾 值 由replica.lag.time.max.ms 參數設定。Leader 發生故障之后,就會從 ISR 中選舉新的 leader。
  • 3、ack 應答機制
    對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等 ISR 中的 follower 全部接收成功。 Kafka 為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡。
    acks 參數配置:
    0:producer 不等待 broker 的 ack,這一操作提供了一個最低的延遲,broker 一接收到還沒有寫入磁盤就已經返回,當 broker 故障時有可能丟失數據;
    1:producer 等待 broker 的 ack,partition 的 leader 落盤成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么將會丟失數據;
    -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功后才返回 ack。但是如果在 follower 同步完成后,broker 發送 ack 之前,leader 發生故障,那么會造成數據重復。
  • 4、 Exactly Once 語義
    將服務器的 ACK 級別設置為-1,可以保證 Producer 到 Server 之間不會丟失數據,即 At Least Once 語義。相對的,將服務器 ACK 級別設置為 0,可以保證生產者每條消息只會被發送一次,即 At Most Once 語義。 At Least Once 可以保證數據不丟失,但是不能保證數據不重復;相對的,At Most Once
    可以保證數據不重復,但是不能保證數據不丟失。但是,對於一些非常重要的信息,比如說交易數據,下游數據消費者要求數據既不重復也不丟失,即 Exactly Once 語義。
    在 0.11 版本以前的 Kafka,對此是無能為力的,只能保證數據不丟失,再在下游消費者對數據做全局去重。對於多個下游應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大影響。
    0.11 版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發送多少次重復數據,Server 端都只會持久化一條。冪等性結合 At Least Once 語義,就構成了 Kafka 的 Exactly Once 語義。即:
    At Least Once + 冪等性 = Exactly Once
    要啟用冪等性,只需要將 Producer 的參數中 enable.idompotence 設置為 true 即可。Kafka的冪等性實現其實就是將原來下游需要做的去重放在了數據上游。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID,發往同一 Partition 的消息會附帶 Sequence Number。而Broker 端會對<PID, Partition, SeqNumber>做緩存,當具有相同主鍵的消息提交時,Broker 只會持久化一條。
    但是 PID 重啟就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區跨會話的 Exactly Once。
  • 5、 Kafka 事務
    Kafka 從 0.11 版本開始引入了事務支持。事務可以保證 Kafka 在 Exactly Once 語義的基礎上,生產和消費可以跨分區和會話,要么全部成功,要么全部失敗。
    為了實現跨分區跨會話的事務,需要引入一個全局唯一的 Transaction ID,並將 Producer獲得的PID和Transaction ID綁定。這樣當Producer重啟后就可以通過正在進行的Transaction ID 獲得原來的 PID。
> 為了管理 Transaction,Kafka 引入了一個新的組件 Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應的任務狀態。Transaction Coordinator 還負責將事務所有寫入 Kafka 的一個內部 Topic,這樣即使整個服務重啟,由於事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM