Kafka consumer消息的拉取及偏移的管理


消費者拉取消息並處理主要有4個步驟:

  • 獲取消費者所拉取分區的偏移位置OffsetFetchRequest(新的消息是從偏移位置開始的)
  • 創建FetchReqeust,生成Map<Node, FetchRequest>,以消費者所拉取消息的節點為key來分組,所消費的TopicPartition的數據為value,並放入到unsent隊列
  • 調用poll方法實際發送請求給相應的node,如果返回成功,在onSecuss方法中,消息被保存在completedFetches中
  • 從completedFetches中取出數據,轉換成consumerRecord,清空緩沖區及更新消費偏移位置

偏移管理:更新拉取偏移,updateFetchPositions,發送OffsetFetchRequest請求

消費者在啟動后,需要獲取其所消費的分區的最后提交的偏移位置。消費者在消費完消息后需要提交消費偏移(committed offset),當發生再平衡(reblance)后,分區(partition)有可能被不同的消費者去拉取消息,那新的消費者需要知道上次是消費到哪個偏移位置的,那么新的消費者就需要發出請求給coordinator,以取得提交偏移(committed offset,前一個消費者最后的提交偏移)並更新本地的拉取偏移(fetch position)。消費者在提交偏移的時候,有2種策略可以選擇,自動提交(auto commit)和手動提交(manually commit)

自動提交:

通常情況下,為了提高性能,會使用自動提交方式,自動提交的間隔(auto.commit.interval.ms)默認為5000毫秒,是通過延時隊列的任務來實現的,在consumer每次拉取消息消費后,如果延時隊列的auto commit task到了提交間隔時間,則提交任務更新committed offset,如果沒有到延遲任務的timeout時間,則不執行延遲任務,繼續拉取消息,但在實際消費處理消息后,提交偏移前,消費者有可能崩潰,這就導致存在重復消費

 

手動提交:

在某些場景下,為了能更准確的控制消費偏移,以保證消息不會重復消費或者不會丟失,由消費者客戶端手動控制是否提交偏移

 

偏移與消費語義

 參考:Kafka 消費語義

 

消息的拉取及消費

消費者如果在上一次的消息拉取過程中有消息存在,則直接返回,否則從前面更新的拉取偏移位置處重新發送拉取消息的請求。

拉取消息的請求以消費者所消費的TopicPartition所在的節點分組Map<Node, FetchRequest>,然后再通過poll到相應的節點來獲取分區消息,一旦成功獲取掉消息,將被保存在completedFetches,在返回時轉換為按TopicPartition分組的record

Map<TopicPartition, List<ConsumerRecord<K, V>>> drained

另外,會在本地記錄所消費的最后一條消息的偏移+1,在下次消費時,進行偏移檢查,判斷第一條記錄的offset必須與這個值相等,否則則忽略

 private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
                       PartitionRecords<K, V> partitionRecords,
                       int maxRecords) {
    ......
    List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
    long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; //下一條消費位置
    ......

    subscriptions.position(partitionRecords.partition, nextOffset); //記錄消費的此TopicPartition的位置為nextOffset
    return partRecords.size();
    .......
  }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM