Kafka如何保證數據不重復消費,不丟失數據
不重復消費:
1.冪等操作,重復消費不會產生問題
2. dstream.foreachRDD {(rdd, time) =
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds,partitionId)將uniqueID存入數據庫中
//use this uniqueId to transationally commit the data in partitionIterator
}
}
對每個partitionID,產生一個uniqueID,.只有這個partition的數據被完全消費,才算成功,否則失敗回滾。下次若重復執行,就skip
不丟失數據:丟失情況:
1.生產者數據不丟失
同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待)。leader partition掛了,數據就會丟失。
解決:設置為-1保證produce寫入所有副本算成功
producer.type=sync
request.required.acks=-1
異步模式,當緩沖區滿了,如果配置為0(沒有收到確認,一滿就丟棄),數據立刻丟棄
解決:不限制阻塞超時時間。就是一滿生產者就阻塞
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
2.消費者數據不丟失 :流計算,基本數據源不適用。高級數據源以kafka為例,由2種方式:receiver(開啟WAL,失敗可恢復)和director(checkpoint保證)
3. 若是storm在消費,開啟storm的ackfail機制;若不是storm,數據處理完更新offset,低級API手動控制offset
4. Kafka發送數據過快,導致服務器網卡流量暴增。或磁盤過忙,出現丟包。
1》 首先,對kafka進行限速,
2》 其次啟用重試機制,使重試間隔變長。
3》 Kafka設置ack=all,即需要處於ISR(副本列表)的分區都確認,才算發送成功。 rops.put("compression.type", "gzip");
props.put("linger.ms", "50");
props.put("acks", "all")表示至少成功發送一次;
props.put("retries ", 30);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000)
5.消費者速度很慢,導致一個session周期(0.1版本是默認30s)內未完成消費。導致心跳機制檢測報告出問題。
導致消費了的數據未及時提交offset.配置由可能是自動提交
問題場景:1.offset為自動提交,正在消費數據,kill消費者線程,下次重復消費
2.設置自動提交,關閉kafka,close之前,調用consumer.unsubscribed()則由可能部分offset沒有提交。
3.消費程序和業務邏輯在一個線程,導致offset提交超時,