springboot+Kafka(生產者和消費者)


1、配置信息

spring.application.name=kafka-producer
server.port=8091

spring.kafka.producer.bootstrapServers=192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094

#安全認證
#spring.kafka.producer.ssl=

# procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
# acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
# acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
# acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
spring.kafka.producer.acks=all

# 每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求,
# 這有助於提升客戶端和服務端之間的性能,此配置控制默認批量大小(以字節為單位),默認值為16384
spring.kafka.producer.batchSize=

#緩存數據內存大小
spring.kafka.producer.bufferMemory=

#server記錄日志
spring.kafka.producer.clientId=kafka-producer

#producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好
spring.kafka.producer.compressionType=gzip

#key反序列化類,實現org.apache.kafka.common.serialization.Deserializer接口
spring.kafka.producer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
#value反序列化類,實現org.apache.kafka.common.serialization.Deserializer接口
spring.kafka.producer.valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer

#客戶端發送重試
spring.kafka.producer.retries=0

#生產者事務
spring.kafka.producer.transactionIdPrefix=

#其他配置
#spring.kafka.producer.properties=
spring.application.name=kafka-consumer1
server.port=8092

spring.kafka.consumer.bootstrapServers=192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094


#spring.kafka.consumer.clientId="kafka-consumer"
spring.kafka.consumer.group-id=myGroup1

#自動提交
spring.kafka.consumer.enableAutoCommit=true

#自動向zookeeper提交offset的頻率,默認:5000
spring.kafka.consumer.autoCommitInterval=10

#0:READ_UNCOMMITTED, 1:READ_COMMITTED;
spring.kafka.consumer.isolationLevel=READ_COMMITTED

# 沒有初始化的offset時,可以設置以下三種情況:(默認:latest)
# earliest
# 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest
# 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# none
# topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
spring.kafka.consumer.autoOffsetReset=latest

#安全認證
#spring.kafka.consumer.ssl=

#Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。
#這個配置就是來配置consumer最多等待response多久。
spring.kafka.consumer.fetchMaxWait=100
#每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。默認:1
spring.kafka.consumer.fetchMinSize=10

#消費超時時間,大小不能超過session.timeout.ms,默認:3000
spring.kafka.consumer.heartbeatInterval=100

#key反序列化類,實現org.apache.kafka.common.serialization.Deserializer接口
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
#value反序列化類,實現org.apache.kafka.common.serialization.Deserializer接口
spring.kafka.consumer.valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer

#max.poll.records條數據需要在session.timeout.ms這個時間內處理完,默認:500
spring.kafka.consumer.maxPollRecords=500

 

spring.application.name=kafka-consumer2
server.port=8093

spring.kafka.consumer.bootstrapServers=192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094


#spring.kafka.consumer.clientId="kafka-consumer"
spring.kafka.consumer.group-id=myGroup2

#自動提交
spring.kafka.consumer.enableAutoCommit=true

#自動向zookeeper提交offset的頻率,默認:5000
spring.kafka.consumer.autoCommitInterval=10

#0:READ_UNCOMMITTED, 1:READ_COMMITTED;
spring.kafka.consumer.isolationLevel=READ_COMMITTED

# 沒有初始化的offset時,可以設置以下三種情況:(默認:latest)
# earliest
# 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest
# 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# none
# topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
spring.kafka.consumer.autoOffsetReset=latest

#安全認證
#spring.kafka.consumer.ssl=

#Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。
#這個配置就是來配置consumer最多等待response多久。
spring.kafka.consumer.fetchMaxWait=100
#每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。默認:1
spring.kafka.consumer.fetchMinSize=10

#消費超時時間,大小不能超過session.timeout.ms,默認:3000
spring.kafka.consumer.heartbeatInterval=100

#key反序列化類,實現org.apache.kafka.common.serialization.Deserializer接口
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
#value反序列化類,實現org.apache.kafka.common.serialization.Deserializer接口
spring.kafka.consumer.valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer

#max.poll.records條數據需要在session.timeout.ms這個時間內處理完,默認:500
spring.kafka.consumer.maxPollRecords=500

 

Git地址:https://github.com/wangymd/myKafka.git

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM