spring-kafka_配置參數


參考:

配置application.yml

(一般項目自動生成的是applicaiton.properties,但為了書寫簡便,改成yml)

spring:
   kafka:
     bootstrap-servers: 10.3.13.213:9092,10.3.13.197:9092,10.3.13.194:9092
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.apache.kafka.common.serialization.StringSerializer
     consumer:
       group-id: test
       enable-auto-commit: true
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
View Code

生產者配置屬性

重要配置
bootstrap.servers  
用於建立與kafka集群建立連接的地址集合,書寫格式為:ip1:port,ip2:port2,ip3:port3     類型是list
spring.kafka.producer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092

 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
Key的序列化方式,用於實現Serializer接口    類型是class

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
值的Serializer類,實現了org.apache.kafka.common.serialization.Serializer接口

spring.kafka.producer.retries=0
設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。
允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,
則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。

spring.kafka.producer.batch-size=16384
每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求,
這有助於提升客戶端和服務端之間的性能,此配置控制默認批量大小(以字節為單位),默認值為16384

spring.kafka.producer.buffer-memory=33554432
producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。
這項設置將和producer能夠使用的總內存相關,但並不是一個硬性的限制,因為不是producer使用的所有內存都是用於緩存。
一些額外的內存會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。

spring.kafka.producer.acks=-1
procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。
在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),
為每條記錄返回的偏移量始終設置為-1。

acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,
在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
 
acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,
記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。

可以設置的值為:all, -1, 0, 1

 

消費者配置屬性

重要配置屬性
spring.kafka.consumer.bootstrap-servers
以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接
spring.kafka.consumer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092

spring.kafka.consumer.group-id=TyyLoveZyy
用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬於同一個consumer group,默認:""

spring.kafka.consumer.max-poll-records
max.poll.records條數據需要在session.timeout.ms這個時間內處理完,默認:500
spring.kafka.consumer.max-poll-records=500

spring.kafka.consumer.heartbeat-interval=3000
消費超時時間,大小不能超過session.timeout.ms,默認:3000

spring.kafka.consumer.enable-auto-commit=true
如果為真,consumer所fetch的消息的offset將會自動的同步到zookeeper。
這項提交的offset將在進程掛掉時,由新的consumer使用,默認:true

spring.kafka.consumer.auto-commit-interval=5000
consumer自動向zookeeper提交offset的頻率,默認:5000

spring.kafka.consumer.auto-offset-reset=earliest
沒有初始化的offset時,可以設置以下三種情況:(默認:latest)
earliest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none
topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常

spring.kafka.consumer.fetch-min-size=1
每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,
請求會等待,直到足夠的數據才會返回。默認:1

spring.kafka.consumer.fetch-max-wait=500
Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),
此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久

 spring.kafka.consumer.client-id=1
消費者進程的標識。如果設置一個人為可讀的值,跟蹤問題會比較方便。。默認:""

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
key的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
值的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM