kafka如何保證數據不被重復消費並且不丟失數據


Kafka如何保證數據不重復消費,不丟失數據

不重復消費:

1.冪等操作,重復消費不會產生問題

2. dstream.foreachRDD {(rdd, time) =

  rdd.foreachPartition { partitionIterator =>

    val partitionId = TaskContext.get.partitionId()

    val uniqueId = generateUniqueId(time.milliseconds,partitionId)將uniqueID存入數據庫中

    //use this uniqueId to transationally commit the data in partitionIterator

 }

}

對每個partitionID,產生一個uniqueID,.只有這個partition的數據被完全消費,才算成功,否則失敗回滾。下次若重復執行,就skip

 

不丟失數據:丟失情況: 
1.生產者數據不丟失
      同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待)。leader partition掛了,數據就會丟失。
                  解決:設置為-1保證produce寫入所有副本算成功
          producer.type=sync
          request.required.acks=-1
      異步模式,當緩沖區滿了,如果配置為0(沒有收到確認,一滿就丟棄),數據立刻丟棄
                解決:不限制阻塞超時時間。就是一滿生產者就阻塞
          producer.type=async 
          request.required.acks=1 
          queue.buffering.max.ms=5000 
          queue.buffering.max.messages=10000 
          queue.enqueue.timeout.ms = -1 
          batch.num.messages=200
2.消費者數據不丟失 :流計算,基本數據源不適用。高級數據源以kafka為例,由2種方式:receiver(開啟WAL,失敗可恢復)和director(checkpoint保證)
3.   若是storm在消費,開啟storm的ackfail機制;若不是storm,數據處理完更新offset,低級API手動控制offset
4.   Kafka發送數據過快,導致服務器網卡流量暴增。或磁盤過忙,出現丟包。
      1》  首先,對kafka進行限速,
      2》  其次啟用重試機制,使重試間隔變長。
      3》  Kafka設置ack=all,即需要處於ISR(副本列表)的分區都確認,才算發送成功。    rops.put("compression.type", "gzip");
               props.put("linger.ms", "50");
               props.put("acks", "all")表示至少成功發送一次;
               props.put("retries ", 30);
               props.put("reconnect.backoff.ms ", 20000);

               props.put("retry.backoff.ms", 20000)

 5.消費者速度很慢,導致一個session周期(0.1版本是默認30s)內未完成消費。導致心跳機制檢測報告出問題。

   導致消費了的數據未及時提交offset.配置由可能是自動提交

            問題場景:1.offset為自動提交,正在消費數據,kill消費者線程,下次重復消費

         2.設置自動提交,關閉kafka,close之前,調用consumer.unsubscribed()則由可能部分offset沒有提交。

              3.消費程序和業務邏輯在一個線程,導致offset提交超時,

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM