kafka原理和實踐(五)spring-kafka配置詳解


系列目錄

kafka原理和實踐(一)原理:10分鍾入門

kafka原理和實踐(二)spring-kafka簡單實踐

kafka原理和實踐(三)spring-kafka生產者源碼

kafka原理和實踐(四)spring-kafka消費者源碼

kafka原理和實踐(五)spring-kafka配置詳解

kafka原理和實踐(六)總結升華

一、官方配置

官方配置文檔飛機票建議看Importance=medium以上的,即重要性為中級以上的,其他的用到了再說。

二、實踐中的配置

properties配置如下:

bootstrap.servers=192.168.49.206:9092,192.168.49.205:9092,192.168.49.204:9092 brokers集群
acks=all     即所有副本都同步到數據時send方法才返回, 以此來完全判斷數據是否發送成功, 理論上來講數據不會丟失.        
retries=10 發送失敗重試次數
batch.size=1638 批處理條數:當多個記錄被發送到同一個分區時,生產者會嘗試將記錄合並到更少的請求中。這有助於客戶端和服務器的性能。
linger.ms=1 批處理延遲時間上限:即1ms過后,不管是否達到批處理數,都直接發送一次請求
buffer.memory=33554432 即32MB的批處理緩沖區

group.id=order-beta  消費者群組ID,發布-訂閱模式,即如果一個生產者,多個消費者都要消費,那么需要定義自己的群組,同一群組內的消費者只有一個能消費到消息
enable.auto.commit=true 如果為true,消費者的偏移量將在后台定期提交。
auto.commit.interval.ms=1000 如何設置為自動提交(enable.auto.commit=true),這里設置自動提交周期
session.timeout.ms=15000 在使用Kafka的組管理時,用於檢測消費者故障的超時
concurrency = 3 消費監聽器容器並發數

1、生產者配置

具體對應第二章中xml配置:

 1 <bean id="producerProperties" class="java.util.HashMap">
 2         <constructor-arg>
 3             <map>
 4                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
 6                 <entry key="retries" value="${retries}" />
 7                 <entry key="batch.size" value="${batch.size}" />
 8                 <entry key="linger.ms" value="${linger.ms}" />
 9                 <entry key="buffer.memory" value="${buffer.memory}" />
11                 <entry key="acks" value="${acks}" />   
13                 <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />源碼預制的UTF8字符串反序列化實現類  byte[]-》String
15                 <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
17             </map>
18         </constructor-arg>
19     </bean>

2、消費者配置

具體對應第二章中xml配置:

 1 <!-- 定義consumer的參數 -->
 2     <bean id="consumerProperties" class="java.util.HashMap">
 3         <constructor-arg>
 4             <map>
 5                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
 6                 <entry key="group.id" value="${group.id}" />
 7                 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
 8                 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
 9                 <entry key="key.deserializer"
10                     value="org.apache.kafka.common.serialization.StringDeserializer" />
11                 <entry key="value.deserializer"
12                     value="org.apache.kafka.common.serialization.StringDeserializer" />
13             </map>
14         </constructor-arg>
15     </bean>
1 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
2         <constructor-arg ref="consumerFactory" />
4         <property name="concurrency" value="${concurrency}" />消費監聽器容器並發數
5 </bean>

 

3. 使用規范

這里發布一個真實的公司要求的使用規范,當然比較簡單哈,但貴在真實:

        a: Producer 部分參數設定:

         1: acks 設置為 "all" 即所有副本都同步到數據時send方法才返回, 以此來完全判斷數據是否發送成功, 理論上來講數據不會丟失.           

    2: retries = MAX 無限重試,直到你意識到出現了問題.

    3: 使用 callback 來處理消息失敗發送邏輯.

    4: min.insync.replicas > 1 消息至少要被寫入到這么多副本才算成功,也是提升數據持久性的一個參數。與acks配合使用.

    5: 其他一些超時參數: reconnect.backoff.ms, retry.backoff.ms , linger.ms 結合 batch.size 等.    

       

       

        b: Consumer 部分參數設定:

            1: auto.offset.reset 設置為 "earliest" 避免 offset 丟失時跳過未消費的消息. 目前消息存儲不統一, 部分使用 zookeeper, 部分使用 kafka topic.          

            2: enable.auto.commit=false  關閉自動提交位移, 在消息被完整處理之后再手動提交位移.

            3: consumer 的並發受 partition 的限制. 如果消息處理量比較大的情況請提前與運維聯系, 增加 partition 數量應對消費端並發. 默認topic partition 為6-8個.

               partition 也不是越多越好. 首先會增加 file 和 memory, 其次會延長選舉時間, 並且會延長 offset 的查詢時間.  partition可以擴容但無法縮減.

       

       

    極限情況的數據丟失現象.

        a: 即使將 ack 設置為 "all" 也會在一定情況下丟失消息. 因為 kafka 的高性能特性, 消息在寫入 kafka 時並沒有落盤 而是寫入了 OS buffer 中. 使用 OS 的臟頁刷新策略周期性落盤, 就算落盤 仍然會有 raid buffer. 前者機器宕機數據丟失, 后者機器跳電數據丟失.

        b: 對數據可靠性較高的場景建議 offset 手動提交. 自動提交當遇到業務系統上線被關閉時, 消息讀取並且 offset 已經提交, 但是數據沒有存儲或者仍沒來得及消費時, 消息狀態在內存中無法保留, 重啟應用會跳過消息 致使消息丟失.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM