Spring Kafka中關於Kafka的配置參數


來源於    https://www.cnblogs.com/miracleYu/p/10213807.html

 

復制代碼
 1 #################consumer的配置參數(開始)#################
 2 #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。
 3 spring.kafka.consumer.auto-commit-interval;
 4  
 5 #當Kafka中沒有初始偏移量或者服務器上不再存在當前偏移量時該怎么辦,默認值為latest,表示自動將偏移重置為最新的偏移量
 6 #可選的值為latest, earliest, none
 7 spring.kafka.consumer.auto-offset-reset=latest;
 8  
 9 #以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接。
10 spring.kafka.consumer.bootstrap-servers;
11  
12 #ID在發出請求時傳遞給服務器;用於服務器端日志記錄。
13 spring.kafka.consumer.client-id;
14  
15 #如果為true,則消費者的偏移量將在后台定期提交,默認值為true
16 spring.kafka.consumer.enable-auto-commit=true;
17  
18 #如果沒有足夠的數據立即滿足“fetch.min.bytes”給出的要求,服務器在回答獲取請求之前將阻塞的最長時間(以毫秒為單位)
19 #默認值為500
20 spring.kafka.consumer.fetch-max-wait;
21  
22 #服務器應以字節為單位返回獲取請求的最小數據量,默認值為1,對應的kafka的參數為fetch.min.bytes。
23 spring.kafka.consumer.fetch-min-size;
24  
25 #用於標識此使用者所屬的使用者組的唯一字符串。
26 spring.kafka.consumer.group-id;
27  
28 #心跳與消費者協調員之間的預期時間(以毫秒為單位),默認值為3000
29 spring.kafka.consumer.heartbeat-interval;
30  
31 #密鑰的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer
32 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
33  
34 #值的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer
35 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
36  
37 #一次調用poll()操作時返回的最大記錄數,默認值為500
38 spring.kafka.consumer.max-poll-records;
39 #################consumer的配置參數(結束)#################
40 #################producer的配置參數(開始)#################
41 #procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
42 #acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
43 #acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
44 #acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
45 #可以設置的值為:all, -1, 0, 1
46 spring.kafka.producer.acks=1
47  
48 #每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求, 
49 #這有助於提升客戶端和服務器上的性能,此配置控制默認批量大小(以字節為單位),默認值為16384
50 spring.kafka.producer.batch-size=16384
51  
52 #以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接
53 spring.kafka.producer.bootstrap-servers
54  
55 #生產者可用於緩沖等待發送到服務器的記錄的內存總字節數,默認值為33554432
56 spring.kafka.producer.buffer-memory=33554432
57  
58 #ID在發出請求時傳遞給服務器,用於服務器端日志記錄
59 spring.kafka.producer.client-id
60  
61 #生產者生成的所有數據的壓縮類型,此配置接受標准壓縮編解碼器('gzip','snappy','lz4'),
62 #它還接受'uncompressed'以及'producer',分別表示沒有壓縮以及保留生產者設置的原始壓縮編解碼器,
63 #默認值為producer
64 spring.kafka.producer.compression-type=producer
65  
66 #key的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer
67 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
68  
69 #值的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer
70 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
71  
72 #如果該值大於零時,表示啟用重試失敗的發送次數
73 spring.kafka.producer.retries
74 #################producer的配置參數(結束)#################
75 #################listener的配置參數(結束)#################
76 #偵聽器的AckMode,參見https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets
77 #當enable.auto.commit的值設置為false時,該值會生效;為true時不會生效
78 spring.kafka.listener.ack-mode;
79  
80 #在偵聽器容器中運行的線程數
81 spring.kafka.listener.concurrency;
82  
83 #輪詢消費者時使用的超時(以毫秒為單位)
84 spring.kafka.listener.poll-timeout;
85  
86 #當ackMode為“COUNT”或“COUNT_TIME”時,偏移提交之間的記錄數
87 spring.kafka.listener.ack-count;
88  
89 #當ackMode為“TIME”或“COUNT_TIME”時,偏移提交之間的時間(以毫秒為單位)
90 spring.kafka.listener.ack-time;
91 #################listener的配置參數(結束)#################
復制代碼
復制代碼
一、線上問題

    出現Kafka手動提交失敗,堆棧信息如下:

通過堆棧信息可以看出,有兩個重要參數: session.timeout  和 max.poll.records

session.timeout.ms : 在使用Kafka的團隊管理設施時,用於檢測消費者失敗的超時時間。消費者定期發送心跳來向經紀人表明其活躍度。如果代理在該會話超時到期之前沒有收到心跳,那么代理將從該組中刪除該消費者並啟動重新平衡。

max.poll.records : 在一次調用poll()中返回的最大記錄數。

根據堆棧的提示,他讓增加 session.timeout.ms 時間 或者 減少 max.poll.records。

總結:

1、 使用Kafka時,消費者每次poll的數據業務處理時間不能超過kafka的max.poll.interval.ms,該參數在kafka0.10.2.1中的默認值是300s,所以要綜合業務處理時間和每次poll的數據數量。

2、Java線程池大小的選擇,

對於CPU密集型應用,也就是計算密集型,線程池大小應該設置為CPU核數+1;

對於IO密集型應用 ,線程池大小設置為    2*CPU核數+1.   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM