Kafka生產者producer簡要總結


Kafka producer在設計上要比consumer簡單,不涉及復雜的組管理操作,每個producer都是獨立進行工作的,與其他producer實例之間沒有關聯。Producer的主要功能就是向某個topic的某個分區發送消息,所以首先要確認向topic的哪個分區寫入消息——即分區器(partitioner)的功能。Kafka producer提供了一個默認的分區器。對於每條待發送的消息而言,如果該消息指定了key,則partitioner會根據key的哈希值來選擇目標分區,將具有相同key的所有消息都路由到相同的分區中;若該消息未指定key,則partitioner使用輪詢的方式確認目標分區。另外producer的API賦予了用戶自行指定目標分區的權力,即用戶可以在消息發送時跳過partitioner直接指定到要發送到的分區。

另外producer也允許用戶實現自定義的分區策略而非使用默認的partitioner。

確認分區后,producer要做的第二件事是尋找該分區對應的leader,也就是該分區leader副本所在的Kafka broker。每個topic分區都由若干個副本組成,其中一個副本充當leader角色,只有leader能響應clients發送的請求,剩下的副本中有一部分副本會與leader副本保持同步,即所謂的ISR(In-Sync Replicas, 副本同步隊列)。因此,在發送消息時,producer可以有多種選擇來實現消息發送,例如不等待任何副本的響應便返回成功,或者只是等待leader副本響應寫入操作后再返回成功。

補充副本概念:

副本(Replication)

同一個partition可能會有多個replication(對應server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication作為follower從leader 中復制數據。

Producer寫入流程

1)producer先從zookeeper的 “/brokers/…/state”節點找到該partition的leader

2)producer將消息發送給該leader

3)leader將消息寫入本地log

4)followers從leader pull消息,寫入本地log后向leader發送ACK

5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)並向producer發送ACK

Producer發送消息

在Java版代碼中,producer首先使用一個線程(用戶主線程,也就是用戶啟動producer的線程)將待發送的消息封裝進一個ProducerRecord類實例,然后將其序列化之后發送給partitioner,再由后者確定了目標分區后一同發送到位於producer程序中的一塊內存緩沖池中。而producer的另一個工作線程(I/O發送線程,也稱Sender線程)則負責實時地從該緩沖區中提取准備就緒的消息封裝進一個批次(batch),統一發送給對應的broker。

Kafka producer發送消息的主方法是send方法,producer在底層完全實現了異步化發送,並且通過Java提供的Future同時實現了同步發送和異步發送+回調(Callback)(默認異步)兩種發送方式。最后producer程序結束時需要關閉producer。

acks參數(用來平衡吞吐量與可靠性):

producers可以一步的並行向kafka發送消息,但是通常producer在發送完消息之后會得到一個響應,返回的是offset值(metadata)或者發送過程中遇到的錯誤(exception)。這其中有個非常重要的參數“request.required.acks”,這個參數決定了producer要求leader partition收到確認的副本個數:

如果acks設置為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發生成功,這樣有可能導致數據丟失,但同時,acks值為0會得到最大的系統吞吐量。一般這種情況下允許消息丟失,如統計服務器日志等;

若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待知道broker確認收到消息。此時,當發送消息時,leader broker僅將該消息寫入本地日志,然后便發送響應結果給producer,而無需等待ISR中其他副本寫入該消息;

若設置為-1或者all,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。此時,當發送消息時,leader broker不僅會將消息寫入本地日志,同時還會等待ISR中所有其他副本都成功寫入它們各自的本地日志后,才發送響應結果給producer。

buffer.memory參數:

指定producer端用於緩存消息的緩沖區的大小,單位是字節,默認32M,采用異步發送消息的架構,Java版Producer啟動時會首先創建一塊內存緩沖區用於保存待發送消息,然后由另一個專屬線程負責從緩沖區中讀取消息執行真正的發送,這部分內存空間的大小就是由buffer.memory參數指定。該參數指定的內存大小幾乎可以認為是producer程序使用的內存大小,若producer程序要給很多分區發送消息,那么就需要仔細設置該參數防止過小的內存緩沖區降低了producer程序整體的吞吐量。

batch.size參數:

batch.size是producer調優吞吐量與延時性能最重要的參數之一,producer會將發往同一分區的多條消息封裝進一個batch中,當batch滿了或者batch沒滿(linger.ms參數=0,默認消息立即發送)producer會發送該batch。

Batch小,吞吐量低,延時低,Batch大,吞吐量高,延時高

Batch.size默認為16KB。

request.timeout.ms參數:

當producer發送請求給broker后,broker需要在規定時間范圍內將處理結果返回給producer。超時時間默認30s。

自定義分區機制

  1. 在producer程序中創建一個類,實現org.apache.kafka.clients.producer.Partitioner接口,主要分區邏輯在Partitioner.partition中實現。入參(topic, key, keyBytes, value, valueBytes, Cluster);
  2. 在用於構造KafkaProducer的Properties對象中設置partitioner.class參數。

序列化

網絡上發送數據都是以字節的形式,kafka也不例外,Apache Kafka支持用戶給broker發送各種類型消息,如字符串、整數、數組或任意對象類型,序列化器serializer負責在producer發送前將消息轉換成字節數組;解序列化器deserializer則用於將consumer接收到的字節數組轉換成相應的對象。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM