1. 消費者位置(consumer position)
因為kafka服務端不保存消息的狀態,所以消費端需要自己去做很多事情。我們每次調用poll()方法他總是返回已經保存在生產者隊列中還未被消費者消費的消息。消息在每一個分區中都是順序的,那么必然可以通過一個偏移量去確定每一條消息的位置。
偏移量在消費消息的過程中處於重要的作用。如果是自動提交消息,那么poll()方法會去在每次獲取消息的時候自動提交獲取最后一條消息的偏移量,告訴服務器我們已經消費到這個位置,下次從下一個位置開始消費。
我們把更新分區當前位置的操作叫做提交。消費者是如何提交偏移量的呢?kafka最新的api是這樣做的:創建一個叫做_consumer_offset的特殊主題用來保存消息的偏移量。消費者每次消費都會往這個主題發送消息,消息包含每個分區的偏移量。
如果消費者一直處於運行的狀態那么這個偏移量沒有什么用。不過如果這個消費者崩潰或者有新的消費者加入群組觸發再均衡策略,那么再均衡之后該分區的消費者如果不是之前的那一位,那么新的小伙伴怎么知道之前的伙計消費到哪里呢。所以提交他自己的offset就發揮作用了。
Consumer讀取partition中的數據是通過調用發起一個fetch請求來執行的。而從KafkaConsumer來看,它有一個poll方法。但是這個poll方法只是可能會發起fetch請求。原因是:Consumer每次發起fetch請求時,讀取到的數據是有限制的,通過配置項max.partition.fetch.bytes來限制的。而在執行poll方法時,會根據配置項個max.poll.records來限制一次最多pool多少個record。
那么就可能出現這樣的情況: 在滿足max.partition.fetch.bytes限制的情況下,假如fetch到了100個record,放到本地緩存后,由於max.poll.records限制每次只能poll出15個record。那么KafkaConsumer就需要執行7次才能將這一次通過網絡發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次pool中15個record,最后一次是poll出10個record。
在consumer中,還有另外一個配置項:max.poll.interval.ms ,它表示最大的poll數據間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group。所以為了不使Consumer 自己被退出,Consumer 應該不停的發起poll(timeout)操作。而這個動作 KafkaConsumer Client是不會幫我們做的,這就需要自己在程序中不停的調用poll方法了。
當一個consumer因某種原因退出Group時,進行重新分配partition后,同一group中的另一個consumer在讀取該partition時,怎么能夠知道上一個consumer該從哪個offset的message讀取呢?也是是如何保證同一個group內的consumer不重復消費消息呢?上面說了一次走網絡的fetch請求會拉取到一定量的數據,但是這些數據還沒有被消息完畢,Consumer就掛掉了,下一次進行數據fetch時,是否會從上次讀到的數據開始讀取,而導致Consumer消費的數據丟失嗎?
為了做到這一點,當使用完poll從本地緩存拉取到數據之后,需要client調用commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪一個offset的message。
而這個commit方法會通過走網絡的commit請求將offset在coordinator中保留,這樣就能夠保證下一次讀取(不論是進行rebalance)時,既不會重復消費消息,也不會遺漏消息。
對於offset的commit,Kafka Consumer Java Client支持兩種模式:由KafkaConsumer自動提交,或者是用戶通過調用commitSync、commitAsync方法的方式完成offset的提交。
2. 位移管理(offset management)
2.1 自動提交
Kafka默認是定期幫你自動提交位移的(enable.auto.commit = true),使用這種簡單的方式之前你需要知道可能會帶來什么后果。
假設我們仍然使用默認的5s提交時間間隔,在最近一次提交之后的3s發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后
了3s,所以在這3s內到達的消息會被重復處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復悄息的時間窗,不過這種情況是無也完全避免的。
在使用自動提交時,每次調用輪詢方告都會把上一次調用返回的偏移量提交上去,它並不知道具體哪些消息已經被處理了,所以在再次調用之前最好確保所有當前調用返回的消息都已經處理完畢(在調用close()方法之前也會進行自動提交)。一般情況下不會有什么問題,不過在處理異常或提前退出輪詢時要格外小心。
2.2 手動提交
在多partition多consumer的場景下自動提交總會發生一些不可控的情況。所以消費者API也為我們提供了另外一種提交偏移量的方式。開發者可以在程序中自己決定何時提交,而不是基於時間間隔。
在使用手動提交之前我們需要先將:
properties.put("enable.auto.commit", "false");
然后使用:
consumer.commitSync();
來提交。
commitSync()方法會提交由poll()方法返回的最新偏移量,提交成功后馬上返回,否則跑出異常。
我們處理消息的邏輯可以變成這樣:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
try {
consumer.commitSync();
} catch (Exception e) {
System.out.println("commit failed");
}
}
}
每處理一次消息我們提交一次offset。
異步手動提交
上面我們使用commitSync()的方式提交數據,每次提交都需要等待broker返回確認結果。這樣沒提交一次等待一次會限制我們的吞吐量。
如果采用降低提交頻率來保證吞吐量,那么則有增加消息重復消費的風險。所以kafka消費者提供了異步提交的API。我們只管發送提交請求無需等待broker返回。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
commitAsync()方法提交最后一個偏移量。在成功提交或碰到無怯恢復的錯誤之前,commitAsync()會一直重試,但是commitAsync()不會,這也是commitAsync()不好的一個地方。它之所以不進行重試,是因為在它收到服務器響應的時候, 可能有一個更大的偏移量已經提交成功。假設我們發出一個請求用於提交偏移量2000,這個時候發生了短暫的通信問題,服務器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,並成功提交了偏移量3000。如果commitAsync()重新嘗試提交偏移量2000 ,它有可能在偏移量3000之后提交成功。這個時候如果發生再均衡,就會出現重復消息。
當然使用手動提交最大的好處就是如果發生了錯誤我們可以記錄下來。commitAsync()也支持回調方法,提交offset發生錯誤我們可以記下當前的偏移量。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null){
System.out.println("commit failed"+map);
}
}
});
}
同步和異步組合提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。因此,在消費者關閉前一般會組合使用commitAsync()和commitSync()。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
System.out.println("commit failed");
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
如果一切正常我們使用commitAsync()來提交。如果直接關閉消費者,就沒有所謂的下一次提交了。使用commitSync()會一直重試,直到提交成功。
2.3 提交特定偏移量
上面我們手動提交使用的commitAsync()和commitSync()都是提交每一次消費最后一條消息的偏移量,那么如果我們一次拉取了很多消息但是沒有消費完,想提交我們消費完成的位置該怎么處理呢?kafka也有相應的對策。
Map<TopicPartition,OffsetAndMetadata> currentOffset = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset(),"metadata"));
try {
System.out.println("模擬消息處理失敗的情況");
} catch (Exception e) {
consumer.commitAsync(currentOffset,null);
}
}
}
這里調用的是commitAsync(),調用commitSync()也是可以的。代碼中模擬我們在處理消息的過程中可能會出錯的情況,每次讀消息都把當前的offset存入map中,如果出錯就提交當前已經消費到的偏移量。
2.4 再均衡監聽器
前面我們說過當發生consumer退出或者新增,partition新增的時候會觸發再均衡。那么發生再均衡的時候如果某個consumer正在消費的任務沒有消費完該如何提交當前消費到的offset呢?kafka提供了再均衡監聽器,在發生再均衡之前監聽到,當前consumer可以在失去分區所有權之前處理offset關閉句柄等。
消費者API中有一個()方法:
subscribe(Collection<TopicPartition> var1, ConsumerRebalanceListener var2);
ConsumerRebalanceListener對象就是監聽器的接口對象,我們需要實現自己的監聽器繼承該接口。接口里面有兩個方法需要實現:
void onPartitionsRevoked(Collection<TopicPartition> var1);
void onPartitionsAssigned(Collection<TopicPartition> var1);
第一個方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取了。
第二個會在重新分配分區之后和消費者開始讀取消息之前被調用。、
我們來模擬一下再均衡的場景:
final Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("page_visits"));
final Map<TopicPartition,OffsetAndMetadata> currentOffset = new HashMap<>();
class HandleRebance implements ConsumerRebalanceListener{
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
System.out.println("partition is rebanlance");
consumer.commitAsync(currentOffset,null);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
}
}
consumer.subscribe(topic,new HandleRebance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset(),"metadata"));
try {
System.out.println("模擬消息處理失敗的情況");
} catch (Exception e) {
consumer.commitAsync(currentOffset,null);
}
}
}
首先實現了ConsumerRebalanceListener接口,實現方法里面如果監聽到發生再均衡我們提交當前處理過的偏移量。
2.5 從特定偏移量處開始處理
前面都是consumer.poll()之后讀取該批次的消息,kafka還提供了從分區的開始或者末尾讀消息的功能:
seekToEnd(Collection<TopicPartition> partitions)
seekToBeginning(Collection<TopicPartition> partitions)
另外kafka還提供了從指定偏移量處讀取消息,可以通過seek()方法來處理:
seek(TopicPartition partition, long offset)
提交當前分區和當前消費位置信息。
2.6 獨立消費者–不屬於群組的消費者
到目前為止我們討論的都是消費者群組,分區被自動分配給群組的消費者,群組的消費者有變動會觸發再均衡。那么是不是可以回歸到別的消息隊列的方式:不需要群組消費者也可以自己訂閱主題?
kafka也提供了這樣的案例,因為kafka的主題有分區的概念,那么如果沒有群組就意味着你的自己訂閱到特定的一個分區才能消費內容。如果是這樣的話,就不需要訂閱主題,而是為自己分配分區。一個消費者可以訂閱主題(井加入消費者群組),或者為自己分配分區,但不能同時做這兩件事情。
下面的例子演示如何為自己分配分區並讀取消息的:
final Consumer<String, String> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> partitionInfoList = consumer.partitionsFor("page_visits");
List<TopicPartition> topicPartitionList = new ArrayList<>();
if(partitionInfoList != null){
for(PartitionInfo partitionInfo : partitionInfoList){
topicPartitionList.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
consumer.assign(topicPartitionList);
}
}
final Map<TopicPartition,OffsetAndMetadata> currentOffset = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset(),"metadata"));
try {
System.out.println("模擬消息處理失敗的情況");
} catch (Exception e) {
consumer.commitAsync(currentOffset,null);
}
}
}
- consumer.partitionsFor(“主題”)方法允許我們獲取某個主題的分區信息。
- 知道想消費的分區后使用assign()手動為該消費者分配分區。
除了不會發生再均衡,也不需要手動查找分區,其他的看起來一切正常。不過要記住,如果主題增加了新的分區,消費者並不會收到通知。所以,要么周期性地調用consumer.partitionsFor()方法來檢查是否有新分區加入,要么在添加新分區后重啟應用程序。