spring-kafka生產者消費者配置詳解


一、生產者
1、重要配置

    # 高優先級配置
    # 以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接
    spring.kafka.producer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092
     
    # 設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。
    spring.kafka.producer.retries=0
     
    # 每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求,
    # 這有助於提升客戶端和服務端之間的性能,此配置控制默認批量大小(以字節為單位),默認值為16384
    spring.kafka.producer.batch-size=16384
     
    # producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。這項設置將和producer能夠使用的總內存相關,但並不是一個硬性的限制,因為不是producer使用的所有內存都是用於緩存。一些額外的內存會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。
    spring.kafka.producer.buffer-memory=33554432
     
    # key的Serializer類,實現了org.apache.kafka.common.serialization.Serializer接口
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
     
    # 值的Serializer類,實現了org.apache.kafka.common.serialization.Serializer接口
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
     
    # procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
    # acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
    # acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
    # acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
    # 可以設置的值為:all, -1, 0, 1
    spring.kafka.producer.acks=-1
     
    # 當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤
    spring.kafka.producer.client-id=1
     
    # producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好
    spring.kafka.producer.compression-type=none

2、其他配置

    # 中優先級配置
    # 以毫秒為單位的時間,是在我們強制更新metadata的時間間隔。即使我們沒有看到任何partition leadership改變。默認值:5 * 60 * 1000 = 300000
    spring.kafka.producer.properties.metadata.max.age.ms=300000
     
    # producer組將會匯總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。通常來說,這只有在記錄產生速度大於發送速度的時候才能發生。然而,在某些條件下,客戶端將希望降低請求的數量,甚至降低到中等負載一下。這項設置將通過增加小的延遲來完成–即,不是立即發送一條記錄,producer將會等待給定的延遲時間以允許其他消息記錄發送,這些消息記錄可以批量處理。這可以認為是TCP種Nagle的算法類似。這項設置設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比這項設置要小的多,我們需要“linger”特定的時間以獲取更多的消息。 這個設置默認為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。
    spring.kafka.producer.properties.linger.ms=0
     
    # 發送數據時的緩存空間大小,默認:128 * 1024 = 131072
    spring.kafka.producer.properties.send.buffer.bytes=131072
     
    # socket的接收緩存空間大小,當閱讀數據時使用,默認:32 * 1024 = 32768
    spring.kafka.producer.properties.receive.buffer.bytes=32768
     
    # 請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。默認:1 * 1024 * 1024 = 1048576
    spring.kafka.producer.properties.max.request.size=1048576
     
    # 連接失敗時,當我們重新連接時的等待時間。這避免了客戶端反復重連,默認值:50
    spring.kafka.producer.properties.reconnect.backoff.ms=50
     
    # producer客戶端連接一個kafka服務(broker)失敗重連的總時間,每次連接失敗,重連時間都會指數級增加,每次增加的時間會存在20%的隨機抖動,以避免連接風暴。默認:1000
    # spring.kafka.producer.properties.reconnect.backoff.max.ms=1000
     
    # 控制block的時長,當buffer空間不夠或者metadata丟失時產生block,默認:60 * 1000 = 60000
    spring.kafka.producer.properties.max.block.ms=60000
     
    # 在試圖重試失敗的produce請求之前的等待時間。避免陷入發送-失敗的死循環中,默認:100
    spring.kafka.producer.properties.retry.backoff.ms=100
     
    # metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了窗口大小,例如。我們可能在30s的期間維護兩個樣本。當一個窗口退出后,我們會擦除並重寫最老的窗口,默認:30000
    spring.kafka.producer.properties.metrics.sample.window.ms=30000
     
    # 用於維護metrics的樣本數,默認:2
    spring.kafka.producer.properties.metrics.num.samples=2
     
    # 用於metrics的最高紀錄等級。
    # spring.kafka.producer.properties.metrics.recording.level=Sensor.RecordingLevel.INFO.toString()
     
    # 類的列表,用於衡量指標。實現MetricReporter接口,將允許增加一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於注冊JMX統計
    #spring.kafka.producer.properties.metric.reporters=Collections.emptyList()
     
    # kafka可以在一個connection中發送多個請求,叫作一個flight,這樣可以減少開銷,但是如果產生錯誤,可能會造成數據的發送順序改變,默認是5 (修改)
    spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
     
    # 關閉連接空閑時間,默認:9 * 60 * 1000 = 540000
    spring.kafka.producer.properties.connections.max.idle.ms=540000
     
    # 分區類,默認:org.apache.kafka.clients.producer.internals.DefaultPartitioner
    spring.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
     
    # 客戶端將等待請求的響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求;超過重試次數將拋異常,默認:30 * 1000 = 30000
    spring.kafka.producer.properties.request.timeout.ms=30000
     
    # 用戶自定義interceptor。
    #spring.kafka.producer.properties.interceptor.classes=none
     
    # 是否使用冪等性。如果設置為true,表示producer將確保每一條消息都恰好有一份備份;如果設置為false,則表示producer因發送數據到broker失敗重試使,可能往數據流中寫入多分重試的消息。
    #spring.kafka.producer.properties.enable.idempotence=false
     
    # 在主動中止正在進行的事務之前,事務協調器將等待生產者的事務狀態更新的最長時間(以ms為單位)。
    #spring.kafka.producer.properties.transaction.timeout.ms=60000
     
    # 用於事務傳遞的TransactionalId。 這使得可以跨越多個生產者會話的可靠性語義,因為它允許客戶端保證在開始任何新事務之前使用相同的TransactionalId的事務已經完成。 如果沒有提供TransactionalId,則生產者被限制為冪等傳遞。請注意,如果配置了TransactionalId,則必須啟用enable.idempotence。默認值為空,這意味着無法使用事務。
    #spring.kafka.producer.properties.transactional.id=null

連接風暴

應用啟動的時候,經常可能發生各應用服務器的連接數異常飆升的情況。假設連接數的設置為:min值3,max值10,正常的業務使用連接數在5個左右,當重啟應用時,各應用連接數可能會飆升到10個,瞬間甚至還有可能部分應用會報取不到連接。啟動完成后接下來的時間內,連接開始慢慢返回到業務的正常值。這就是所謂的連接風暴。
二、消費者
1、重要配置

    # 以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接
    spring.kafka.consumer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092
     
    # 用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬於同一個consumer group,默認:""
    spring.kafka.consumer.group-id=TyyLoveZyy
     
    # max.poll.records條數據需要在session.timeout.ms這個時間內處理完,默認:500
    spring.kafka.consumer.max-poll-records=500
     
    # 消費超時時間,大小不能超過session.timeout.ms,默認:3000
    spring.kafka.consumer.heartbeat-interval=3000
     
    # 如果為真,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用,默認:true
    spring.kafka.consumer.enable-auto-commit=true
     
    # consumer自動向zookeeper提交offset的頻率,默認:5000
    spring.kafka.consumer.auto-commit-interval=5000
     
    # 沒有初始化的offset時,可以設置以下三種情況:(默認:latest)
    # earliest
    # 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
    # latest
    # 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
    # none
    # topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
    spring.kafka.consumer.auto-offset-reset=earliest
     
    # 每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。默認:1
    spring.kafka.consumer.fetch-min-size=1
     
    # Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久。
    spring.kafka.consumer.fetch-max-wait=500
     
    # 消費者進程的標識。如果設置一個人為可讀的值,跟蹤問題會比較方便。。默認:""
    spring.kafka.consumer.client-id=1
     
    # key的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
     
    # 值的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

2、其他配置

    # consumer是通過拉取的方式向服務端拉取數據,當超過指定時間間隔max.poll.interval.ms沒有向服務端發送poll()請求,而心跳heartbeat線程仍然在繼續,會認為該consumer鎖死,就會將該consumer退出group,並進行再分配。默認:300000
    spring.kafka.consumer.properties.max.poll.interval.ms=300000
     
    # 會話的超時限制。如果consumer在這段時間內沒有發送心跳信息,則它會被認為掛掉了,並且reblance將會產生,必須在[group.min.session.timeout.ms, group.max.session.timeout.ms]范圍內。默認:10000
    spring.kafka.consumer.properties.session.timeout.ms=10000
     
    # 在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 數據流的策略; 循環的partition分配器分配所有可用的partitions以及所有可用consumer 線程。它會將partition循環的分配到consumer線程上。如果所有consumer實例的訂閱都是確定的,則partitions的划分是確定的分布。循環分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實例上都有同樣數量的數據流。(2)訂閱的topic的集合對於consumer group中每個consumer實例來說都是確定的。
    spring.kafka.consumer.properties.partition.assignment.strategy=range
     
    # 一次fetch請求,從一個broker中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。默認:50 * 1024 * 1024 = 52428800
    spring.kafka.consumer.properties.fetch.max.bytes=52428800
     
    # Metadata數據的刷新間隔。即便沒有任何的partition訂閱關系變更也能執行。默認:5 * 60 * 1000 = 300000
    spring.kafka.consumer.properties.metadata.max.age.ms=300000
     
    # 一次fetch請求,從一個partition中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。broker、topic都會對producer發給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。默認:1 * 1024 * 1024 = 1048576
    spring.kafka.consumer.properties.max.partition.fetch.bytes=1048576
     
    # 最大發送的TCP大小。默認:128 * 1024 = 131072,如果設置為 -1 則為操作系統默認大小
    spring.kafka.consumer.properties.send.buffer.bytes=131072
     
    # 消費者接受緩沖區的大小。這個值在創建Socket連接時會用到。取值范圍是:[-1, Integer.MAX]。默認值是:65536 (64 KB),如果值設置為-1,則會使用操作系統默認的值。默認:64 * 1024 = 65536
    spring.kafka.consumer.properties.receive.buffer.bytes=65536
     
    # 連接失敗時,當我們重新連接時的等待時間。這避免了客戶端反復重連,默認:50
    spring.kafka.consumer.properties.reconnect.backoff.ms=50
     
    # producer客戶端連接一個kafka服務(broker)失敗重連的總時間,每次連接失敗,重連時間都會指數級增加,每次增加的時間會存在20%的隨機抖動,以避免連接風暴。默認:1000
    spring.kafka.consumer.properties.reconnect.backoff.max.ms=1000
     
    # 在試圖重試失敗的produce請求之前的等待時間。避免陷入發送-失敗的死循環中,默認:100
    spring.kafka.consumer.properties.retry.backoff.ms=100
     
    # metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了窗口大小,例如。我們可能在30s的期間維護兩個樣本。當一個窗口退出后,我們會擦除並重寫最老的窗口,默認:30000
    spring.kafka.consumer.properties.metrics.sample.window.ms=30000
     
    # 用於維護metrics的樣本數,默認:2
    spring.kafka.consumer.properties.metrics.num.samples=2
     
    # 用於metrics的最高紀錄等級。默認:Sensor.RecordingLevel.INFO.toString()
    #spring.kafka.consumer.properties.metrics.recording.level=Sensor.RecordingLevel.INFO.toString()
     
    # 類的列表,用於衡量指標。實現MetricReporter接口,將允許增加一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於注冊JMX統計。默認:Collections.emptyList()
    #spring.kafka.consumer.properties.metric.reporters=Collections.emptyList()
     
    # 自動檢查所消耗記錄的CRC32。這可以確保沒有線上或磁盤損壞的消息發生。此檢查會增加一些開銷,因此在尋求極高性能的情況下可能會被禁用。默認:true
    spring.kafka.consumer.properties.check.crcs=true
     
    # 連接空閑超時時間。因為consumer只與broker有連接(coordinator也是一個broker),所以這個配置的是consumer到broker之間的。默認:9 * 60 * 1000 = 540000
    spring.kafka.consumer.properties.connections.max.idle.ms=540000
     
    # 客戶端將等待請求的響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求;超過重試次數將拋異常,默認:30000
    spring.kafka.consumer.properties.request.timeout.ms=30000
     
    # 用於阻止的KafkaConsumer API的默認超時時間。KIP還為這樣的阻塞API添加了重載,以支持指定每個阻塞API使用的特定超時,而不是使用default.api.timeout.ms設置的默認超時。特別是,添加了一個新的輪詢(持續時間)API,它不會阻止動態分區分配。舊的poll(long)API已被棄用,將在以后的版本中刪除。還為其他KafkaConsumer方法添加了重載,例如partitionsFor,listTopics,offsetsForTimes,beginningOffsets,endOffsets和close,它們接收持續時間。默認:60 * 1000 = 60000
    spring.kafka.consumer.properties.default.api.timeout.ms=60000
     
    # 用戶自定義interceptor。默認:Collections.emptyList()
    #spring.kafka.consumer.properties.interceptor.classes=Collections.emptyList()
     
    # 是否將內部topics的消息暴露給consumer。默認:true
    spring.kafka.consumer.properties.exclude.internal.topics=true
     
    # 默認:true
    spring.kafka.consumer.properties.internal.leave.group.on.close=true
     
    # 默認:IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)
    #spring.kafka.consumer.properties.isolation.level=IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)

 
作者:天少弋  
來源:CSDN  
原文:https://blog.csdn.net/u014774648/article/details/90110508  
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM