消費者要從頭開始消費某個topic的全量數據,需要滿足2個條件(spring-kafka):
(1)使用一個全新的"group.id"(就是之前沒有被任何消費者使用過); (2)指定"auto.offset.reset"參數的值為earliest;
對應的spring-kafka消費者客戶端配置參數為:
<!-- 指定消費組名 --> <entry key="group.id" value="fg11"/> <!-- 從何處開始消費,latest 表示消費最新消息,earliest 表示從頭開始消費,none表示拋出異常,默認latest --> <entry key="auto.offset.reset" value="earliest"/>
注意:從kafka-0.9版本及以后,kafka的消費者組和offset信息就不存zookeeper了,而是存到broker服務器上,所以,如果你為某個消費者指定了一個消費者組名稱(group.id),那么,一旦這個消費者啟動,這個消費者組名和它要消費的那個topic的offset信息就會被記錄在broker服務器上。
比如我們為消費者A指定了消費者組(group.id)為fg11,那么可以使用如下命令查看消費者組的消費情況:
bin/kafka-consumer-groups.sh --bootstrap-server 172.17.6.10:9092 --describe --group fg11
顯示結果如下:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID friend 0 6 6 0 consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4 /192.168.207.127 consumer-1 friend 1 2 2 0 consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4 /192.168.207.127 consumer-1 friend 2 4 4 0 consumer-1-08c856a3-ae39-4f73-a2da-4de1795c6ad4 /192.168.207.127 consumer-1
其實friend這個topic共有3個分區,消息總數為12條,其實在消費者A啟動之前,這12條消息已經被其他某個組的消費者消費過了。而我們雖然為消費者A指定了一個全新的group.id為fg11,但是如果我們在啟動消費者A之前,指定的"auto.offset.reset"參數的值是latest而不是earliest的話(就算你停止消費者,然后改為earliest也是沒有用的),啟動之后它將不會消費以前的消息,除非friend這個topic的分區中有了新的消息它才會消費。
所以一定要在消費者啟動之前就保證group.id是全新的,而且要指定earliest而不是latest。