kafka producer 生產者客戶端參數配置


 

  在生產者向broker發送消息時,需要配置不同的參數來確保發送成功。

acks = all         #指定分區中有多少副本必須收到這條消息,生產者才認為這條消息發送成功
    
        acks = 0      #生產者發送消息之后不需要等待任何服務端的響應
        acks = 1        #只要分區的leader副本成功寫入消息,那么它就會收到服務端的成功響應
        acks = -1 或all   #生產者在發送消息之后,需要等待ISR中的所有副本都成功寫入消息之后,才能夠收到來自服務端的成功響應。

batch.size = 16384        #ProducerBatch 最大緩存空間,默認16KB
    
bootstrap.servers =[192.1.1.2:9092]    #kafka集群
    
buffer.memory = 33554432            #RecordAccumulator消息累加器最大存儲空間,默認32MB 
    
client.id =                         #客戶端id
    
compression.type = none             #設置消息的壓縮格式("gzip,snappy,lz4")\
      對消息壓縮可以極大的減少網絡傳輸、降低網絡IO,從而提高整體性能。這是一種時間換空間的優化方式,如果對延時性要求高,則不推薦對消息進行壓縮 connections.max.idle.ms
= 540000 #設置多久之后關閉閑置連接,默認9分鍾 enable.idempotence = false interceptor.classes = [] #攔截器配置 key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer #key的序列化器 linger.ms = 0 #生產者在客戶端發送ProducerBatch被填滿或等待時間超過linger.ms值是發出去 #指定生產者發送producerBatch 之前等待更多消息加入producerRecord加入ProducerBatch的時間 max.block.ms = 60000 #用來控制KafkaProducer中send()方法和partitionsFor()方法的阻塞時間,當生產者的發送緩沖區 #已滿,或者沒有可用的元數據時,這些方法就會阻塞。 max.in.flight.requests.per.connection = 5 #客戶端與broker端連接最多緩存5個未響應的請求(即發送到broker端,沒來得及收到響應), #如果超過了就不能再發送請求,可以通過這個參數大小來判斷是否有消息堆積 max.request.size = 1048576 #生產者客戶端能發送的消息的最大值,默認1M(不建議改,會引起聯動) metadata.max.age.ms = 300000 #元數據更新時間,5分鍾 partitioner.class = class com.zpb.partitioner.CustomPartition   #自定義key分區器,可以根據指定的key來作特殊的的相關業務

retries
= 0 #生產者重試次數,默認0,在發生異常時不進行任何的重試。在發送數據時會遇到2種異常,一種是可恢復的,一種是不可 #恢復的,如:leader的選舉,網絡抖動等這些異常是可以恢復的,這個時候設置retries大於0就可以進行重試,在網絡穩定 #或者leader選舉完后,這種異常就會消失,數據在重發時就會正常,在不可恢復異常時,如超過了max.request.size最大值 #時,這種錯誤是不可恢復的 retry.backoff.ms = 100 #重試之間的時間間隔,最好預估一下異常恢復的時間間隔,讓重試時間大於異常恢復時間, value.serializer = class org.apache.kafka.common.serialization.StringSerializer #value序列化器

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM