消費者客戶端
消費步驟:
1、配置消費者客戶端參數並創建相應的消費者實例。
2、訂閱主題。
3、拉取消息並消費
4、提交消費位移
5、關閉消費者實例
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.10:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
KafkaConsumer consumer = new KafkaConsumer(prop);
try{
consumer.subscribe(Arrays.asList("topic1"));
ConsumerRecords<String,String> records = consumer.poll(500);
for(ConsumerRecord record:records){
System.out.println(record.topic());
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.offset());
}
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
consumer.close();
}
訂閱主題
通過使用subscribe()方法訂閱主題,既可以以集合的形式訂閱多個主題,也可以以正則表達式的形式訂閱特定模式的主題。
consumer.subscribe(Arrays.asList("topic1"));
consumer.subscribe(Pattern.compile("topic-.*"));
對於消費者使用集合的方式來訂閱主題而言,訂閱了什么主題就消費什么主題中的消息。
但是,對於消費者采用正則表達式的方式訂閱,在之后的過程中,如果創建了新的主題,並且主題名月正則表達式相匹配,那么這個消費者就可以消費到新添加的主題中的消息。如果應用程序需要消費多個主題,並且可以處理不同的類型,采取正則表達式的方式就會方便很多。
在subscriber的重載方法中,有一個參數類型ConsumerRebalanceListener
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
this.acquire();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
}
if (topics.isEmpty()) {
this.unsubscribe();
} else {
Iterator i$ = topics.iterator();
String topic;
do {
if (!i$.hasNext()) {
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet(topics), listener);
this.metadata.setTopics(this.subscriptions.groupSubscription());
return;
}
topic = (String)i$.next();
} while(topic != null && !topic.trim().isEmpty());
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
} finally {
this.release();
}
}
這個是用來設置再均衡監聽器的。
訂閱分區
消費者可以通過assign()方法來訂閱分區
public void assign(Collection<TopicPartition> partitions)
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
TopicPartition 類 只有兩個屬性:topic和partition,可以通過這兩個屬性進行分區映射。
consumer.assign(Arrays.asList(new TopicPartition("topic",0)));
如果清楚主題中的分區數,可以通過
public List<PartitionInfo> partitionsFor(String topic) {
獲取 PartitionInfo ,PartitionInfo 類型為主題的分區元數據信息,結構如下
public class PartitionInfo {
private final String topic; //主題名稱
private final int partition; /分區編號
private final Node leader; //leader副本
private final Node[] replicas; //AR集合
private final Node[] inSyncReplicas; //ISR集合
取消訂閱
unsubscriber()方法用來取消訂閱,既可以取消對於主題的訂閱,也可以取消對於分區的訂閱。
同樣的,如果把subscribe或assign的參數設置為空集合,也就等同於unsubscriber()方法。
如果沒有訂閱任何主題或者分區,再繼續執行消費程序就會報出異常。
訂閱總結
集合訂閱方式 subscriber(Collection)、正則表達式subscriber(Pattern)和指定分區的訂閱assign(Collertion)分別代表了三種不同的訂閱狀態:AUTO_TOPICS、AUTO_PATTERN、USER_ASSUGNED(如果沒有訂閱,訂閱狀態為NONE)。這三種狀態是互斥的,在一個消費者中只能使用其中一種,否則會報錯。
通過subscribe()方法訂閱主題具有消費者自動再均衡的功能那個,在多個消費者的情況下可以根據分區分配策略自動分配各個消費者與分區的關系;當消費者組內的消費者增加或減少時,分區分配關系自動調整,以實現消費負載均衡和故障自動轉移。
通過assign()方法訂閱分區,不具備消費者自動均衡的功能。
消息消費
Kafka中的消費者是基於拉模式的,也就是消費者主動向服務端發起請求來拉取消息,區別與其他類型的消息隊列的推模式,也就是服務端主動將消息推送給消費者。
poll
kafka消息消費是一個不斷輪詢的過程,消費者重復的調用poll()方法,返回的是所訂閱主題(分區)上的一組消息。如果分區中沒有可供消費的消息,那額拉取結果為空。poll()方法還有一個參數:超時時間參數timeout,用來控制poll()方法的阻塞時間,在消費者的緩沖區里沒有可用數據時會發生阻塞。timeout的設置取決於應用程序對響應速度的要求,比如需要在多長時間內將控制權移交給執行輪詢的應用線程。如果將timeout設置為0.poll方法會立即返回,而不管是否已經拉取了消息。如果應用線程唯一的工作就是從kafka中拉取並消費消息,則可以將這個參數設置為最大值Long.MAX_VALUE。
ConsumerRecord
拉取到的消息ConsumerRecord封裝到ConsumerRecords集合中
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
- topic 主題
- partition 分區
- offect 表示消息在所屬分區的偏移量。
- timestamp 消失時間戳,tiestampType表示時間戳的類型,createTime和LogAppendTime,分別代表消息創建的時間戳和消息追加到日志的時間戳。
- headers 表示消息的頭部內容
poll()方法返回值類型是ConsuerRecords,用來表示一次拉取操作所獲得的的消息集合,包含多個ConsumerRecord,它提供了一個iterator()方法來循環遍歷消息內部的消息。使用該方法來獲取消息集中的每一個ConsumerRecord。
ConsumerRecords類提供了一個records(TopicPartition)方法獲取消息集中指定分區的消息。
public List<ConsumerRecord<K, V>> records(TopicPartition partition)
用過此方法,可以獲取消息集中指定分區的消息。
count()方法可以用來計算出消息集中消息的個數。
isEmpty()方法判斷消息集是否為空,返回值boolean
empty() 用來獲取一個空的消息集。
位移提交
自動提交
在Kafka中的分區中,每條消息都有唯一的offect,用來表示消息在分區中對用的位置。
在消費中也有一個offect的概念,用來表示消費到分區中某條消息的位置。在每次調用poll()方法的時候,他返回的是還沒有被消費過的消息集,要做到這一點,就需要記錄上一次消費時的消費位移。消費位移需要持久化保存,因為當有新的消費者接替上一個消費者進行消費的時候,能夠正確的根據上一個消費者消費的位置進行繼續消費。新版本的kafka試講消費位移存儲在kafka內部的主題_consumer_offects中,消費者在消費完消息之后需要執行消費位移的提交。

X表示某一次拉取操作中分區消息的最大偏移量,也就是此次操作消費到的位移,但是!本次消費結束后,將要提交位移的時候,提交的不是X,而是x+1,也就是下一條需要拉取的消息的位置。
KafkaConsumer類提供了position(TopicPartition)和conmmitted(TopicPartition)兩個方法來分別獲取partition(下次消費位移)和committed offect(本次提交位移)的值。
對於位移提交的具體時機很有講究,可能會造成重復消費和消費都是的現象。
當前一次poll拉取的消息集為[x+2,x+7],當前在拉取到消息之后,就立即提交位移,也就是位移提交是x+8,當消息處理到x+5的后,出現異常,故障恢復后,重新拉取,將會從x+8進行拉取,從而造成消息丟失。
當位移提交在消息處理之后,當x+5時發生異常,故障恢復后,又重新從x+2進行拉取消息,造成了消息重復。
在kafka中,默認的消息位移提交方式為自動提交,由消費者客戶端參數enable.auto.commit 配置,默認值為true。這個默認的自動提交不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數auto.commit.interval.ms 配置,默認值為5秒,此參數生效的前提是 enable.auto.commit 參數為 true。在默認的方式下,消費者每隔5秒會將拉取到的每個分區中最大的消息位移進行提交。自動位移提交的動作是在poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移。
默認提交雖然非常簡便,但也會出現重復消費和消息丟失的問題。
當剛提交一次消費位移,然后拉取一批消息進行消費,在在下一次自動提交之前,消費者出現異常,消費者恢復的時候又從上次一的分區位移進行消費,從而造成了消費重復的現象。可以通過減小自動提交的時間間隔來減小可能重復的消息大小,但不能完全避免重復消費,而且是位移提交變得頻繁。

當拉取線程A不斷的拉取消息到本地緩存,比如BlockingQueue,B線程從緩存中讀取消息並進行相應的邏輯處理。假設進行到了第y+1次拉取,以及第m次位移提交的時候,也就是x+6位移已經確認提交,此時B線程卻還在消費x+3的消息。此時B線程發生了異常,待回復后會從m位移,也就是x+6的位置開始拉取消息,那么 x+3 至 x+6 之間的消息就沒有得到相應的處理,這樣便發生消息丟失的現象。
自動位移提交的方式在正常情況下不會發生消息丟失或重復消費的現象,但是在編程的世界里異常無可避免,與此同時,自動位移提交也無法做到精確的位移管理。
手動提交
Kafka 中還提供了手動位移提交的方式,這樣可以使得開發人員對消費位移的管理控制更加靈活。很多時候並不是說拉取到消息就算消費完成,而是需要將消息寫入數據庫、寫入本地緩存,或者是更加復雜的業務處理。在這些場景下,所有的業務處理完成才能認為消息被成功消費,手動的提交方式可以讓開發人員根據程序的邏輯在合適的地方進行位移提交。開啟手動提交功能的前提是消費者客戶端參數 enable.auto.commit 配置為 false
手動提交可以細分為同步提交和異步提交,對應於 KafkaConsumer 中的 commitSync() 和 commitAsync() 兩種類型的方法。
同步提交
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitSync();
批量處理+批量提交
final int minBatchSize = 200;
List<ConsumerRecord> buffer = new ArrayList<>();
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//do some logical processing with buffer.
consumer.commitSync();
buffer.clear();
}
}
將拉取到的消息批量存入緩存buffer,到積累到足夠多的時候,也就是示例中大於等於200個的時候,再做相應的批量處理,之后再做批量提交。
這兩種實例都存在消費重復的問題,在同步位移提交前,程序出現了崩潰,那么待恢復之后又只能從上一次位移提交的地方拉取消息,就會發生消費重復。
同步提交會阻塞消費者線程直至位移提交完成。
,如果想尋求更細粒度的、更精准的提交,那么就需要使用 commitSync() 的另一個含參方法
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
該方法提供了一個 offsets 參數,用來提交指定分區的位移。無參的 commitSync() 方法只能提交當前批次對應的position值。如果需要提交一個中間值,比如業務每消費一條消息就提交一次位移,那么就可以使用這種方式
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
long offset = record.offset();
TopicPartition partition =
new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections
.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
}
}
commitSync() 方法本身是同步執行的,會耗費一定的性能,而示例中的這種提交方式會將性能拉到一個相當低的點。更多時候是按照分區的粒度划分提交位移的界限
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
//do some logical processing.
}
long lastConsumedOffset = partitionRecords
.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(lastConsumedOffset + 1)));
}
}
} finally {
consumer.close();
}
手動提交
與 commitSync() 方法相反,異步提交的方式(commitAsync())在執行的時候消費者線程不會被阻塞,可能在提交消費位移的結果還未返回之前就開始了新一次的拉取操作。異步提交可以使消費者的性能得到一定的增強。
commitAsync 方法有三個不同的重載方法
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback)
第二個方法和第三個方法中的callback參數,它提供了一個異步提交的回調方法,當位移提交完成后會回調 OffsetCommitCallback 中的 onComplete() 方法。
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception == null) {
System.out.println(offsets);
}else {
log.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
如果位移提交失敗,比如,第n次消費提交位移x+1,提交失敗,進行重試,然而由於異步提交,也就是x+2已經提交,此時x+1重試提交,kafka的消費位移重新變為x+1,就會造成消費重復。
為此我們可以設置一個遞增的序號來維護異步提交的順序,每次位移提交之后就增加序號相對應的值。在遇到位移提交失敗需要重試的時候,可以檢查所提交的位移和序號的值的大小,如果前者小於后者,則說明有更大的位移已經提交了,不需要再進行本次重試;如果兩者相同,則說明可以進行重試提交。除非程序編碼錯誤,否則不會出現前者大於后者的情況。
如果位移提交失敗的情況經常發生,那么說明系統肯定出現了故障,在一般情況下,位移提交失敗的情況很少發生,不重試也沒有關系,后面的提交也會有成功的。重試會增加代碼邏輯的復雜度,不重試會增加重復消費的概率。如果消費者異常退出,那么這個重復消費的問題就很難避免,因為這種情況下無法及時提交消費位移;如果消費者正常退出或發生再均衡的情況,那么可以在退出或再均衡執行之前使用同步提交的方式做最后的把關。
try {
while (isRunning.get()) {
//poll records and do some logical processing.
consumer.commitAsync();
}
} finally {
try {
consumer.commitSync();
}finally {
consumer.close();
}
}
控制和關閉消費
KafkaConsumer 提供了對消費速度進行控制的方法,在有些應用場景下我們可能需要暫停某些分區的消費而先消費其他分區,當達到一定條件時再恢復這些分區的消費。KafkaConsumer 中使用 pause() 和 resume() 方法來分別實現暫停某些分區在拉取操作時返回數據給客戶端和恢復某些分區向客戶端返回數據的操作。
public void pause(Collection<TopicPartition> partitions)
public void resume(Collection<TopicPartition> partitions)
KafkaConsumer 還提供了一個無參的 paused() 方法來返回被暫停的分區集合
public Set<TopicPartition> paused()
KafkaConsumer 提供了 close() 方法來實現關閉,close() 方法有三種重載方法
public void close()
public void close(Duration timeout)
@Deprecated
public void close(long timeout, TimeUnit timeUnit)
第二種方法是通過 timeout 參數來設定關閉方法的最長執行時間,有些內部的關閉邏輯會耗費一定的時間,比如設置了自動提交消費位移,這里還會做一次位移提交的動作;而第一種方法沒有 timeout 參數,這並不意味着會無限制地等待,它內部設定了最長等待時間(30秒);第三種方法已被標記為 @Deprecated,可以不考慮。
指定位移消費
在 Kafka 中每當消費者查找不到所記錄的消費位移時,就會根據消費者客戶端參數 auto.offset.reset 的配置來決定從何處開始進行消費,這個參數的默認值為“latest”,表示從分區末尾開始消費消息。

按照默認的配置,消費者會從9開始進行消費(9是下一條要寫入消息的位置),更加確切地說是從9開始拉取消息。如果將 auto.offset.reset 參數配置為“earliest”,那么消費者會從起始處,也就是0開始消費。
除了查找不到消費位移,位移越界也會觸發 auto.offset.reset 參數的執行
auto.offset.reset 參數還有一個可配置的值—“none”,配置為此值就意味着出現查到不到消費位移的時候,既不從最新的消息位置處開始消費,也不從最早的消息位置處開始消費,此時會報出 NoOffsetForPartitionException 異常。
如果能夠找到消費位移,那么配置為“none”不會出現任何異常。如果配置的不是“latest”、“earliest”和“none”,則會報出 ConfigException 異常。
poll() 方法中的邏輯對於普通的開發人員而言是一個黑盒,無法精確地掌控其消費的起始位置。提供的 auto.offset.reset 參數也只能在找不到消費位移或位移越界的情況下粗粒度地從開頭或末尾開始消費。我們需要一種更細粒度的掌控,可以讓我們從特定的位移處開始拉取消息,
KafkaConsumer 中的 seek() 方法提供了這個功能。
public void seek(TopicPartition partition, long offset)
參數 partition 表示分區,而 offset 參數用來指定從分區的哪個位置開始消費。seek() 方法只能重置消費者分配到的分區的消費位置,而分區的分配是在 poll() 方法的調用過程中實現的。也就是說,在執行 seek() 方法之前需要先執行一次 poll() 方法,等到分配到分區之后才可以重置消費位置。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
consumer.poll(Duration.ofMillis(10000)); ①
Set<TopicPartition> assignment = consumer.assignment(); ②
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10); ③
}
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
//consume the record.
}
assignment() 方法是用來獲取消費者所分配到的分區信息
如果將oll() 方法的參數設置為0,在此之后,會發現 seek() 方法並未有任何作用。因為當 poll() 方法中的參數為0時,此方法立刻返回,那么 poll() 方法內部進行分區分配的邏輯就會來不及實施。也就是說,消費者此時並未分配到任何分區,如此第②行中的 assignment 便是一個空列表,第③行代碼也不會執行。timeout 參數設置為多少合適呢?太短會使分配分區的動作失敗,太長又有可能造成一些不必要的等待。我們可以通過 KafkaConsumer 的 assignment() 方法來判定是否分配到了相應的分區
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {//如果不為0,則說明已經成功分配到了分區
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10);
}
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
//consume the record.
}
如果對未分配到的分區執行 seek() 方法,那么會報出 IllegalStateException 的異常。類似在調用 subscribe() 方法之后直接調用 seek() 方法.
如果消費組內的消費者在啟動的時候能夠找到消費位移,除非發生位移越界,否則 auto.offset.reset 參數並不會奏效,此時如果想指定從開頭或末尾開始消費,就需要 seek() 方法
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment); ①
for (TopicPartition tp : assignment) {
consumer.seek(tp, offsets.get(tp)); ②
}
endOffsets() 方法用來獲取指定分區的末尾的消息位置,是將要寫入最新消息的位置。
public Map<TopicPartition, Long> endOffsets(
Collection<TopicPartition> partitions)
public Map<TopicPartition, Long> endOffsets(
Collection<TopicPartition> partitions,
Duration timeout)
其中 partitions 參數表示分區集合,而 timeout 參數用來設置等待獲取的超時時間。如果沒有指定 timeout 參數的值,那么 endOffsets() 方法的等待時間由客戶端參數 request.timeout.ms 來設置,默認值為30000。與 endOffsets 對應的是 beginningOffsets() 方法,一個分區的起始位置起初是0,但並不代表每時每刻都為0,因為日志清理的動作會清理舊的數據,所以分區的起始位置會自然而然地增加。
KafkaConsumer 中直接提供了 seekToBeginning() 方法和 seekToEnd() 方法來實現這兩個功能
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)
有時候我們並不知道特定的消費位置,卻知道一個相關的時間點,比如我們想要消費昨天8點之后的消息,這個需求更符合正常的思維邏輯。此時我們無法直接使用 seek() 方法來追溯到相應的位置。KafkaConsumer 同樣考慮到了這種情況,它提供了一個 offsetsForTimes() 方法,通過 timestamp 來查詢與此對應的分區位置。
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch,
Duration timeout)
參數 timestampsToSearch 是一個 Map 類型,key 為待查詢的分區,而 value 為待查詢的時間戳,該方法會返回時間戳大於等於待查詢時間的第一條消息對應的位置和時間戳,對應於 OffsetAndTimestamp 中的 offset 和 timestamp 字段。
首先通過 offsetForTimes() 方法獲取一天之前的消息位置,然后使用 seek() 方法追溯到相應位置開始消費
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
timestampToSearch.put(tp, System.currentTimeMillis()-1*24*3600*1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsets =
consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
位移越界也會觸發 auto.offset.reset 參數的執行,位移越界是指知道消費位置卻無法在實際的分區中查找到
Kafka 中的消費位移是存儲在一個內部主題中的,seek() 方法可以突破這一限制:消費位移可以保存在任意的存儲介質中,例如數據庫、文件系統等。以數據庫為例,我們將消費位移保存在其中的一個表中,在下次消費的時候可以讀取存儲在數據表中的消費位移並通過 seek() 方法指向這個具體的位置
消費位移保存在DB中
consumer.subscribe(Arrays.asList(topic));
//省略poll()方法及assignment的邏輯
for(TopicPartition tp: assignment){
long offset = getOffsetFromDB(tp);//從DB中讀取消費位移
consumer.seek(tp, offset);
}
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
//process the record.
}
long lastConsumedOffset = partitionRecords
.get(partitionRecords.size() - 1).offset();
//將消費位移存儲在DB中
storeOffsetToDB(partition, lastConsumedOffset+1);
}
}
seek() 方法為我們提供了從特定位置讀取消息的能力,我們可以通過這個方法來向前跳過若干消息,也可以通過這個方法來向后回溯若干消息,這樣為消息的消費提供了很大的靈活性。seek() 方法也為我們提供了將消費位移保存在外部存儲介質中的能力,還可以配合再均衡監聽器來提供更加精准的消費能力。
再均衡
再均衡是指分區的所屬權從一個消費者轉移到另一消費者的行為,它為消費組具備高可用性和伸縮性提供保障,使我們可以既方便又安全地刪除消費組內的消費者或往消費組內添加消費者。不過在再均衡發生期間,消費組內的消費者是無法讀取消息的。也就是說,在再均衡發生期間的這一小段時間內,消費組會變得不可用。
當一個分區被重新分配給另一個消費者時,消費者當前的狀態也會丟失。比如消費者消費完某個分區中的一部分消息時還沒有來得及提交消費位移就發生了再均衡操作,之后這個分區又被分配給了消費組內的另一個消費者,原來被消費完的那部分消息又被重新消費一遍,也就是發生了重復消費。一般情況下,應盡量避免不必要的再均衡的發生。
subscribe() 方法 再均衡監聽器 ConsumerRebalanceListener
在 subscribe(Collection
ConsumerRebalanceListener 是一個接口,包含2個方法,具體的釋義如下:
void onPartitionsRevoked(Collection partitions) 這個方法會在再均衡開始之前和消費者停止讀取消息之后被調用。可以通過這個回調方法來處理消費位移的提交,以此來避免一些不必要的重復消費現象的發生。參數 partitions 表示再均衡前所分配到的分區。
void onPartitionsAssigned(Collection partitions) 這個方法會在重新分配分區之后和消費者開始讀取消費之前被調用。參數 partitions 表示再均衡后所分配到的分區。
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do nothing.
}
});
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//process the record.
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync(currentOffsets, null);
}
} finally {
consumer.close();
}
將消費位移暫存到一個局部變量 currentOffsets 中,這樣在正常消費的時候可以通過 commitAsync() 方法來異步提交消費位移,在發生再均衡動作之前可以通過再均衡監聽器的 onPartitionsRevoked() 回調執行 commitSync() 方法同步提交消費位移,以盡量避免一些不必要的重復消費。
再均衡監聽器還可以配合外部存儲使用。我們將消費位移保存在數據庫中,這里可以通過再均衡監聽器查找分配到的分區的消費位移,並且配合 seek() 方法來進一步優化代碼邏輯
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//store offset in DB (storeOffsetToDB)
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition tp: partitions){
consumer.seek(tp, getOffsetFromDB(tp));//從DB中讀取消費位移
}
}
});
消費者攔截器
消費者攔截器主要在消費到消息或在提交消費位移時進行一些定制化的操作。
費者攔截器需要自定義實現 org.apache.kafka.clients.consumer. ConsumerInterceptor 接口。ConsumerInterceptor 接口包含3個方法:
- public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
- public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
- public void close()。
KafkaConsumer 會在 poll() 方法返回之前調用攔截器的 onConsume() 方法來對消息進行相應的定制化操作,比如修改返回的消息內容、按照某種規則過濾消息(可能會減少 poll() 方法返回的消息的個數)。如果 onConsume() 方法中拋出異常,那么會被捕獲並記錄到日志中,但是異常不會再向上傳遞。
KafkaConsumer 會在提交完消費位移之后調用攔截器的 onCommit() 方法,可以使用這個方法來記錄跟蹤所提交的位移信息,比如當消費者使用 commitSync 的無參方法時,我們不知道提交的消費位移的具體細節,而使用攔截器的 onCommit() 方法卻可以做到這一點。
在某些業務場景中會對消息設置一個有效期的屬性,如果某條消息在既定的時間窗口內無法到達,那么就會被視為無效,它也就不需要再被繼續處理了。下面使用消費者攔截器來實現一個簡單的消息 TTL(Time to Live,即過期時間)的功能。
自定義的消費者攔截器 ConsumerInterceptorTTL 使用消息的 timestamp 字段來判定是否過期,如果消息的時間戳與當前的時間戳相差超過10秒則判定為過期,那么這條消息也就被過濾而不投遞給具體的消費者。
public class ConsumerInterceptorTTL implements
ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;
@Override
public ConsumerRecords<String, String> onConsume(
ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
= new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords =
records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) ->
System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
實現自定義的 ConsumerInterceptorTTL 之后,需要在 KafkaConsumer 中配置指定這個攔截器,這個指定的配置和 KafkaProducer 中的一樣,也是通過 interceptor.classes 參數實現的,此參數的默認值為“”。
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptorTTL.class.getName());
