來源於 https://www.cnblogs.com/miracleYu/p/10213807.html
1 #################consumer的配置參數(開始)################# 2 #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。 3 spring.kafka.consumer.auto-commit-interval; 4 5 #當Kafka中沒有初始偏移量或者服務器上不再存在當前偏移量時該怎么辦,默認值為latest,表示自動將偏移重置為最新的偏移量 6 #可選的值為latest, earliest, none 7 spring.kafka.consumer.auto-offset-reset=latest; 8 9 #以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接。 10 spring.kafka.consumer.bootstrap-servers; 11 12 #ID在發出請求時傳遞給服務器;用於服務器端日志記錄。 13 spring.kafka.consumer.client-id; 14 15 #如果為true,則消費者的偏移量將在后台定期提交,默認值為true 16 spring.kafka.consumer.enable-auto-commit=true; 17 18 #如果沒有足夠的數據立即滿足“fetch.min.bytes”給出的要求,服務器在回答獲取請求之前將阻塞的最長時間(以毫秒為單位) 19 #默認值為500 20 spring.kafka.consumer.fetch-max-wait; 21 22 #服務器應以字節為單位返回獲取請求的最小數據量,默認值為1,對應的kafka的參數為fetch.min.bytes。 23 spring.kafka.consumer.fetch-min-size; 24 25 #用於標識此使用者所屬的使用者組的唯一字符串。 26 spring.kafka.consumer.group-id; 27 28 #心跳與消費者協調員之間的預期時間(以毫秒為單位),默認值為3000 29 spring.kafka.consumer.heartbeat-interval; 30 31 #密鑰的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer 32 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 33 34 #值的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer 35 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer 36 37 #一次調用poll()操作時返回的最大記錄數,默認值為500 38 spring.kafka.consumer.max-poll-records; 39 #################consumer的配置參數(結束)################# 40 #################producer的配置參數(開始)################# 41 #procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下: 42 #acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。 43 #acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。 44 #acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。 45 #可以設置的值為:all, -1, 0, 1 46 spring.kafka.producer.acks=1 47 48 #每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求, 49 #這有助於提升客戶端和服務器上的性能,此配置控制默認批量大小(以字節為單位),默認值為16384 50 spring.kafka.producer.batch-size=16384 51 52 #以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接 53 spring.kafka.producer.bootstrap-servers 54 55 #生產者可用於緩沖等待發送到服務器的記錄的內存總字節數,默認值為33554432 56 spring.kafka.producer.buffer-memory=33554432 57 58 #ID在發出請求時傳遞給服務器,用於服務器端日志記錄 59 spring.kafka.producer.client-id 60 61 #生產者生成的所有數據的壓縮類型,此配置接受標准壓縮編解碼器('gzip','snappy','lz4'), 62 #它還接受'uncompressed'以及'producer',分別表示沒有壓縮以及保留生產者設置的原始壓縮編解碼器, 63 #默認值為producer 64 spring.kafka.producer.compression-type=producer 65 66 #key的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer 67 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 68 69 #值的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer 70 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 71 72 #如果該值大於零時,表示啟用重試失敗的發送次數 73 spring.kafka.producer.retries 74 #################producer的配置參數(結束)################# 75 #################listener的配置參數(結束)################# 76 #偵聽器的AckMode,參見https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets 77 #當enable.auto.commit的值設置為false時,該值會生效;為true時不會生效 78 spring.kafka.listener.ack-mode; 79 80 #在偵聽器容器中運行的線程數 81 spring.kafka.listener.concurrency; 82 83 #輪詢消費者時使用的超時(以毫秒為單位) 84 spring.kafka.listener.poll-timeout; 85 86 #當ackMode為“COUNT”或“COUNT_TIME”時,偏移提交之間的記錄數 87 spring.kafka.listener.ack-count; 88 89 #當ackMode為“TIME”或“COUNT_TIME”時,偏移提交之間的時間(以毫秒為單位) 90 spring.kafka.listener.ack-time; 91 #################listener的配置參數(結束)#################
一、線上問題 出現Kafka手動提交失敗,堆棧信息如下: 通過堆棧信息可以看出,有兩個重要參數: session.timeout 和 max.poll.records session.timeout.ms : 在使用Kafka的團隊管理設施時,用於檢測消費者失敗的超時時間。消費者定期發送心跳來向經紀人表明其活躍度。如果代理在該會話超時到期之前沒有收到心跳,那么代理將從該組中刪除該消費者並啟動重新平衡。 max.poll.records : 在一次調用poll()中返回的最大記錄數。 根據堆棧的提示,他讓增加 session.timeout.ms 時間 或者 減少 max.poll.records。 總結: 1、 使用Kafka時,消費者每次poll的數據業務處理時間不能超過kafka的max.poll.interval.ms,該參數在kafka0.10.2.1中的默認值是300s,所以要綜合業務處理時間和每次poll的數據數量。 2、Java線程池大小的選擇, 對於CPU密集型應用,也就是計算密集型,線程池大小應該設置為CPU核數+1; 對於IO密集型應用 ,線程池大小設置為 2*CPU核數+1.