kafka消費者如何才能從頭開始消費某個topic的全量數據


消費者要從頭開始消費某個topic的全量數據,需要滿足2個條件(spring-kafka):

(1)使用一個全新的"group.id"(就是之前沒有被任何消費者使用過);

(2)指定"auto.offset.reset"參數的值為earliest;

對應的spring-kafka消費者客戶端配置參數為:

<!-- 指定消費組名 -->
<entry key="group.id" value="fg11"/>
<!-- 從何處開始消費,latest 表示消費最新消息,earliest 表示從頭開始消費,none表示拋出異常,默認latest -->
<entry key="auto.offset.reset" value="earliest"/>

 

注意:從kafka-0.9版本及以后,kafka的消費者組和offset信息就不存zookeeper了,而是存到broker服務器上,所以,如果你為某個消費者指定了一個消費者組名稱(group.id),那么,一旦這個消費者啟動,這個消費者組名和它要消費的那個topic的offset信息就會被記錄在broker服務器上。

比如我們為消費者A指定了消費者組(group.id)為fg11,那么可以使用如下命令查看消費者組的消費情況:

bin/kafka-consumer-groups.sh --bootstrap-server 172.17.6.10:9092 --describe --group fg11

顯示結果如下:

TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                      HOST              CLIENT-ID 
friend  0          6               6               0    consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4  /192.168.207.127  consumer-1
friend  1          2               2               0    consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4  /192.168.207.127  consumer-1
friend  2          4               4               0    consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4  /192.168.207.127  consumer-1

 

其實friend這個topic共有3個分區,消息總數為12條,其實在消費者A啟動之前,這12條消息已經被其他某個組的消費者消費過了。而我們雖然為消費者A指定了一個全新的group.id為fg11,但是如果我們在啟動消費者A之前,指定的"auto.offset.reset"參數的值是latest而不是earliest的話(就算你停止消費者,然后改為earliest也是沒有用的),啟動之后它將不會消費以前的消息,除非friend這個topic的分區中有了新的消息它才會消費。

 

所以一定要在消費者啟動之前就保證group.id是全新的,而且要指定earliest而不是latest。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM