系列目錄
kafka原理和實踐(三)spring-kafka生產者源碼
kafka原理和實踐(四)spring-kafka消費者源碼
一、官方配置
官方配置文檔飛機票建議看Importance=medium以上的,即重要性為中級以上的,其他的用到了再說。
二、實踐中的配置
properties配置如下:
bootstrap.servers=192.168.49.206:9092,192.168.49.205:9092,192.168.49.204:9092 brokers集群
acks=all 即所有副本都同步到數據時send方法才返回, 以此來完全判斷數據是否發送成功, 理論上來講數據不會丟失.
retries=10 發送失敗重試次數
batch.size=1638 批處理條數:當多個記錄被發送到同一個分區時,生產者會嘗試將記錄合並到更少的請求中。這有助於客戶端和服務器的性能。
linger.ms=1 批處理延遲時間上限:即1ms過后,不管是否達到批處理數,都直接發送一次請求
buffer.memory=33554432 即32MB的批處理緩沖區
group.id=order-beta 消費者群組ID,發布-訂閱模式,即如果一個生產者,多個消費者都要消費,那么需要定義自己的群組,同一群組內的消費者只有一個能消費到消息
enable.auto.commit=true 如果為true,消費者的偏移量將在后台定期提交。
auto.commit.interval.ms=1000 如何設置為自動提交(enable.auto.commit=true),這里設置自動提交周期
session.timeout.ms=15000 在使用Kafka的組管理時,用於檢測消費者故障的超時
concurrency = 3 消費監聽器容器並發數
1、生產者配置
具體對應第二章中xml配置:
1 <bean id="producerProperties" class="java.util.HashMap"> 2 <constructor-arg> 3 <map> 4 <entry key="bootstrap.servers" value="${bootstrap.servers}" /> 6 <entry key="retries" value="${retries}" /> 7 <entry key="batch.size" value="${batch.size}" /> 8 <entry key="linger.ms" value="${linger.ms}" /> 9 <entry key="buffer.memory" value="${buffer.memory}" /> 11 <entry key="acks" value="${acks}" /> 13 <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />源碼預制的UTF8字符串反序列化實現類 byte[]-》String 15 <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> 17 </map> 18 </constructor-arg> 19 </bean>
2、消費者配置
具體對應第二章中xml配置:
1 <!-- 定義consumer的參數 --> 2 <bean id="consumerProperties" class="java.util.HashMap"> 3 <constructor-arg> 4 <map> 5 <entry key="bootstrap.servers" value="${bootstrap.servers}" /> 6 <entry key="group.id" value="${group.id}" /> 7 <entry key="enable.auto.commit" value="${enable.auto.commit}" /> 8 <entry key="session.timeout.ms" value="${session.timeout.ms}" /> 9 <entry key="key.deserializer" 10 value="org.apache.kafka.common.serialization.StringDeserializer" /> 11 <entry key="value.deserializer" 12 value="org.apache.kafka.common.serialization.StringDeserializer" /> 13 </map> 14 </constructor-arg> 15 </bean>
1 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" > 2 <constructor-arg ref="consumerFactory" /> 4 <property name="concurrency" value="${concurrency}" />消費監聽器容器並發數 5 </bean>
3. 使用規范
這里發布一個真實的公司要求的使用規范,當然比較簡單哈,但貴在真實:
a: Producer 部分參數設定:
1: acks 設置為 "all" 即所有副本都同步到數據時send方法才返回, 以此來完全判斷數據是否發送成功, 理論上來講數據不會丟失.
2: retries = MAX 無限重試,直到你意識到出現了問題.
3: 使用 callback 來處理消息失敗發送邏輯.
4: min.insync.replicas > 1 消息至少要被寫入到這么多副本才算成功,也是提升數據持久性的一個參數。與acks配合使用.
5: 其他一些超時參數: reconnect.backoff.ms, retry.backoff.ms , linger.ms 結合 batch.size 等.
b: Consumer 部分參數設定:
1: auto.offset.reset 設置為 "earliest" 避免 offset 丟失時跳過未消費的消息. 目前消息存儲不統一, 部分使用 zookeeper, 部分使用 kafka topic.
2: enable.auto.commit=false 關閉自動提交位移, 在消息被完整處理之后再手動提交位移.
3: consumer 的並發受 partition 的限制. 如果消息處理量比較大的情況請提前與運維聯系, 增加 partition 數量應對消費端並發. 默認topic partition 為6-8個.
partition 也不是越多越好. 首先會增加 file 和 memory, 其次會延長選舉時間, 並且會延長 offset 的查詢時間. partition可以擴容但無法縮減.
極限情況的數據丟失現象.
a: 即使將 ack 設置為 "all" 也會在一定情況下丟失消息. 因為 kafka 的高性能特性, 消息在寫入 kafka 時並沒有落盤 而是寫入了 OS buffer 中. 使用 OS 的臟頁刷新策略周期性落盤, 就算落盤 仍然會有 raid buffer. 前者機器宕機數據丟失, 后者機器跳電數據丟失.
b: 對數據可靠性較高的場景建議 offset 手動提交. 自動提交當遇到業務系統上線被關閉時, 消息讀取並且 offset 已經提交, 但是數據沒有存儲或者仍沒來得及消費時, 消息狀態在內存中無法保留, 重啟應用會跳過消息 致使消息丟失.