kafka生產消息生產者配置參數詳解


必選屬性有3個:

bootstrap.servers:該屬性指定broker的地址清單,地址的格式為host:port。清單里不需要包含所有的broker地址,生產者會從給定的broker里查詢其他broker的信息。不過最少提供2個broker的信息,一旦其中一個宕機,生產者仍能連接到集群上。
key.serializer:生產者接口允許使用參數化類型,可以把Java對象作為鍵和值傳broker,但是broker希望收到的消息的鍵和值都是字節數組,所以,必須提供將對象序列化成字節數組的序列化器。key.serializer必須設置為實現org.apache.kafka.common.serialization.Serializer的接口類,默認為

org.apache.kafka.common.serialization.StringSerializer,也可以實現自定義的序列化器。
value.serializer:同上。

可選參數:

acks:指定了必須要有多少個分區副本收到消息,生產者才會認為寫入消息是成功的,這個參數對消息丟失的可能性有重大影響。

acks=0:生產者在寫入消息之前不會等待任何來自服務器的響應,容易丟消息,但是吞吐量高。

acks=1:只要集群的首領節點收到消息,生產者會收到來自服務器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新首領沒有選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。默認使用這個配置。

acks=all:只有當所有參與復制的節點都收到消息,生產者才會收到一個來自服務器的成功響應。延遲高。

buffer.memory:設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。

max.block.ms:指定了在調用send()方法或者使用partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩沖區已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到max.block.ms時,生產者會拋出超時異常。

batch.size:當多個消息被發送同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次內存被填滿后,批次里的所有消息會被發送出去。

retries:指定生產者可以重發消息的次數。

receive.buffer.bytes和send.buffer.bytes:指定TCP socket接受和發送數據包的緩存區大小。如果它們被設置為-1,則使用操作系統的默認值。如果生產者或消費者處在不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

linger.ms:指定了生產者在發送批次前等待更多消息加入批次的時間。

 再來個詳細的:

 private String clientId;   //客戶端的一個標識
    private final Partitioner partitioner;   //分區選擇器,根據傳入的參數,決定該條消息被放到哪個分區
    private final int maxRequestSize;  //客戶端最大的消息大小
    private final long totalMemorySize; //單個消息的緩存區大小
    private final Metadata metadata;  //kafka 元數據維護
    private final RecordAccumulator accumulator; //消息暫存區
    private final Sender sender; //發送消息的sender任務
    private final Metrics metrics; //一些統計信息
    private final Thread ioThread; //執行Sender任務發送消息的線程
    private final CompressionType compressionType; //消息的壓縮策略
    private final Sensor errors; //
    private final Time time; 
    private final Serializer<K> keySerializer; //key序列化
    private final Serializer<V> valueSerializer; //value序列化
    private final ProducerConfig producerConfig; //生產者相關配置信息
    private final long maxBlockTimeMs; //在等待metadata更新的最大等待時間
    private final int requestTimeoutMs; //消息的超時時間
    private final ProducerInterceptors<K, V> interceptors; //消息攔截器

  詳細的見:https://blog.csdn.net/charry_a/article/details/79816575

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM