整體架構
生產過程由兩個線程協調運行,分別為主線程和sender線程(發送線程)。
主線程中,由KafkaProducer創建消息,然后通過可能的攔截器、序列化器和分區器的作用,緩存消息到消息加載器(RecordAccumulator,也稱為消息收集器)中,Sender線程負責從消息加載器(RecordAccumulator)中獲取消息並將其發送到Kafka中。
消息加載器
消息加載器(RecordAccumulator)主要用來緩存消息以便Sender線程可以批量發送,進而減少網絡傳輸的資源消耗以提升性能。
消息加載器(RecordAccumulator)緩存的大小可以通過生產者參數buffer.memory配置,默認值為33444432b,即32mb。
如果生產者發送消息的速度大於發送到服務器的速度,也就是RecordAccumulator緩存不夠,此時kafkaproducer的send方法調用要被被阻塞,要么拋出異常,這個取決於參數max.block.ms參數,此參數的默認值為60000ms,60s。
主線程發送過來的消息會被追加到RecordAccumulator的某個雙端隊列中,在RecordAccumulator內部為每個分區都維護了一個雙端隊列,隊列中的內容就是ProducerBatch,即Deque
注意!ProducerBath不是ProducerRecord,ProducerBatch中可以包含一個或多個ProducerRecord。通俗的說ProducerRecord是生產者中創建的消息,而ProducerBatch是指一個消息批次,ProducerRecord會被包含在ProdicerBatch中。較小的ProducerRecord拼湊成一個較大的PeoducerBatch,可以減少網絡請求的次數提升整體的吞吐量。
消息在網絡上以字節的形式傳輸,在發送之前需要創建一塊內存區域來保存對應消息,在kafka生產端配置中,使用java.io.ByteBuffer來實現消息內存的創建和釋放。頻繁的創建和釋放是消耗資源的,在RecordAccumulator內部還有一個BufferPool,主要用來實現ByteBuffer的復用,實現緩存的高效利用。而BufferPool只對特定大小的ByteBuffer進行管理,其他大小的ByteBuffer不會緩存進BufferPool中,我們可以通過調整batch.size參數,以便多緩存消息。
ProducerBatch大小和batch.size參數也有密切聯系。當一條消息(ProducerRecord)流入RecordAccumulator時,會先尋找與消息分區相對應的雙端隊列(沒有則新建),查看Producer中是否還可以寫入這個ProducerRrcord,如果可以則寫入,如果不可以則需要創建一個新的ProducerBatch。在新建ProducerBatch時評估這條消息的大小是否超過batch.size的大小,如果不超過,就以batch.size的大小創建ProducerBatch,這樣在使用完這段內存區域之后,可以通過BufferPool的管理來進行復用;如果超過,就以評估大小來創建ProducerBatch,這段內存區域不會被復用,
Sender從RecordAccumulator中獲取緩存的消息之后,會進一步將原本<分區,Deque
在轉換成<Node,List
請求在從sender發往Kafka之前還會保存到InFlightRequest中,InFlightRequest保存對象的具體形式為Map<Nodeid,deque
相關參數:
batch.size:只有數據積累到 batch.size 之后,sender 才會發送數據。
linger.ms:如果數據遲遲未達到 batch.size,sender 等待 linger.time 之后就會發送數據
生產者客戶端可靠性保證
- 1、(可選,在效率和可靠性之間進行取舍,配置參數acks)為保證 producer 發送的數據,能可靠的發送到指定的 topic,topic 的每個 partition 收到producer 發送的數據后,都需要向 producer 發送 ack(acknowledgement 確認收到),如果producer 收到 ack,就會進行下一輪的發送,否則重新發送數據。
- 2、設想以下情景:leader 收到數據,所有 follower 都開始同步數據,但有一個 follower,因為某種故障,遲遲不能與 leader 進行同步,那 leader 就要一直等下去,直到它完成同步,才能發送 ack。這個問題怎么解決呢?
Leader 維護了一個動態的 in-sync replica set (ISR),意為和 leader 保持同步的 follower 集合。當 ISR 中的 follower 完成數據的同步之后,leader 就會給 follower 發送 ack。如果 follower長時間未 向 leader 同 步 數 據 , 則 該 follower 將 被 踢 出 ISR , 該 時 間 閾 值 由replica.lag.time.max.ms 參數設定。Leader 發生故障之后,就會從 ISR 中選舉新的 leader。 - 3、ack 應答機制
對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等 ISR 中的 follower 全部接收成功。 Kafka 為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡。
acks 參數配置:
0:producer 不等待 broker 的 ack,這一操作提供了一個最低的延遲,broker 一接收到還沒有寫入磁盤就已經返回,當 broker 故障時有可能丟失數據;
1:producer 等待 broker 的 ack,partition 的 leader 落盤成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么將會丟失數據;
-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功后才返回 ack。但是如果在 follower 同步完成后,broker 發送 ack 之前,leader 發生故障,那么會造成數據重復。 - 4、 Exactly Once 語義
將服務器的 ACK 級別設置為-1,可以保證 Producer 到 Server 之間不會丟失數據,即 At Least Once 語義。相對的,將服務器 ACK 級別設置為 0,可以保證生產者每條消息只會被發送一次,即 At Most Once 語義。 At Least Once 可以保證數據不丟失,但是不能保證數據不重復;相對的,At Most Once
可以保證數據不重復,但是不能保證數據不丟失。但是,對於一些非常重要的信息,比如說交易數據,下游數據消費者要求數據既不重復也不丟失,即 Exactly Once 語義。
在 0.11 版本以前的 Kafka,對此是無能為力的,只能保證數據不丟失,再在下游消費者對數據做全局去重。對於多個下游應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大影響。
0.11 版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發送多少次重復數據,Server 端都只會持久化一條。冪等性結合 At Least Once 語義,就構成了 Kafka 的 Exactly Once 語義。即:
At Least Once + 冪等性 = Exactly Once
要啟用冪等性,只需要將 Producer 的參數中 enable.idompotence 設置為 true 即可。Kafka的冪等性實現其實就是將原來下游需要做的去重放在了數據上游。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID,發往同一 Partition 的消息會附帶 Sequence Number。而Broker 端會對<PID, Partition, SeqNumber>做緩存,當具有相同主鍵的消息提交時,Broker 只會持久化一條。
但是 PID 重啟就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區跨會話的 Exactly Once。 - 5、 Kafka 事務
Kafka 從 0.11 版本開始引入了事務支持。事務可以保證 Kafka 在 Exactly Once 語義的基礎上,生產和消費可以跨分區和會話,要么全部成功,要么全部失敗。
為了實現跨分區跨會話的事務,需要引入一個全局唯一的 Transaction ID,並將 Producer獲得的PID和Transaction ID綁定。這樣當Producer重啟后就可以通過正在進行的Transaction ID 獲得原來的 PID。
> 為了管理 Transaction,Kafka 引入了一個新的組件 Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應的任務狀態。Transaction Coordinator 還負責將事務所有寫入 Kafka 的一個內部 Topic,這樣即使整個服務重啟,由於事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行。