一個正常的消費邏輯需要具備以下幾個步驟:

1. 消息訂閱
1.1 subscribe訂閱主題
subscribe有如下重載方法:
1.1 前面兩種是通過集合的方式訂閱一到多個topic
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
1.2 后兩種主要是采用正則的方式訂閱一到多個topic
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
如果消費則采用正則表達式的方式訂閱,如果新創建的新的主題並且符合正則表達式,那么該消費者就可以消費到新添加主題中的消息。如果應用程序需要消費多個主題,並且可以處理不同類型的消息,正則表達式匹配的方式就比較適用了。使用方式如consumer.subscribe(Pattern.compile("topic_k_*"));
1.2 assign訂閱主題
assign訂閱主題,相比較subscribe的訂閱在訂閱主題的同時還可以指定消費的分區,具體用法如下:
subscribe和assign的區別:
通過subscribe的方式訂閱主題具有消費者自動再均衡功能;
在多個消費者的情況下可以根據分區分配策略來自動分配各消費者與分區的關系。當消費組的消費者增加或者減少時,分區分配關系會自動調整,以實現消費負載均衡及故障自動轉移。
assign訂閱分區時是不具備消費者自動均衡功能的;
其實在subscribe和assign兩者方法如參就可以比較出來,subscribe的重載的方法中有ConsumerRebalanceListener,而assign中沒有該參數
1.3 取消訂閱
consumer取消訂閱很簡單,調用方法cosumer.unsubscribe()。當然也可以通過將consumer.subscribe(Collection) 或 consumer.assign(Collection)集合參數設置為空集合的方式實現同樣的效果。示例如下: consumer.subscribe(new ArrayList<String>()); consumer.assgin(new ArrayList<TopicPartation>());
2. 消息的消費模式
Kafka的消費模式是基於拉取模式的。消息的消費一般有兩種模式:推送模式和拉取模式。推送模式是服務器端主動向消費者端推送消息,而拉取模式是消費者主動請求服務器端拉取消息。
2.1 poll()拉取消息
Kafka中消息的消費是一個不斷循環的過程,消費者所要做的就是不斷的重復調用poll()方法,poll()方法返回的是所訂閱的主題(分區)上的一組消息。對於poll()方法而言,如果某些分區中沒有可供消費的消息,
那么對應該分區的消息拉取結果就為空。如果訂閱的所有分區中都沒有消費的消息,那么poll()方法返回為空的消息集。方法示例如下: public ConsumerRecords<K, V> poll(final Duration timeout)
超時時間參數timeout,用來控制poll()方法的阻塞時間,在消費者緩沖區沒有可用的消息時會發生阻塞。如果消費者只是單純的拉取消息並消費數據,則為了提高吞吐率,可以把timeout設置為Long.MAX_VALUE;
2.2 seek()從指定的偏移量拉取消息
有時候我們需要對消息消費做到更細粒度的控制,可以讓我們從指定的位移拉取消息,而KafkaConsumer中的seek()方法,可以讓我們追前消息和回溯消息。示例如下:
public void seek(TopicPartition partition, long offset)
2.3 ConsumerRebalanceListener再均衡監聽器
一般同一個消費組中,一旦有觸發消費者的增減變化,都會觸發消費組的rebalance再均衡,如果消費者a消費一批消息后還沒來得及提交偏移量offset,而它所負責的分區在rebalance中轉移給了消費者b,則有可能發生消息的重復消費,那么此時可以通過再均衡器做一定程度的補救。
示例代碼如下:
consumer.subscribe(Arrays.asList("brian_t"), new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("<><> Before start consume the message <><>");
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("<><> After stop consume the message <><>");
}
});
2.4 offset位移提交
2.4.1 offset自動位移提交
Kafka的默認位移提交是自動提交的方式,對應的消費者客戶端配置為enable.auto.commit配置,默認值為true。但是這個默認提交不是消費一條記錄就提交一次,而是定期提交,對應的配置為auto.commit.interval.ms,默認值為5秒,次參數生效的前提是enabe.auto.commit為true。
在默認的方式下, 消費者客戶端每隔5秒會拉取到每個分區中最大消費位移進行提交。自動位移提交是在poll()方法中完成的,在每次真正向服務器端發起拉取消息請求會檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪訓的位移。
Kafka消費者客戶端編程邏輯中位移提交是一個大難點,自動位移提交免去了復雜的位移提交邏輯,讓編碼更簡潔,但同時也帶來了重復消費和消息丟失的問題。
重復消費
假設剛剛提交完一次消費位移,然后拉取一批消息進行消費,在下一次進行自動位移提交之前,消費者崩潰了或者發生再均衡,那么又得從上一次的位移處重新開始消費。
我們可以通過減少自動位移提交的時間間隔來減少重復消息的窗口大小,但這樣不能從根本上解決重復消費的問題,而且會使位移提交更加頻繁。
消息丟失
比如在如下圖場景:拉取線程不斷拉取消息並存入本地緩存,比如存入到BlockingQueue中,另外一個線程負責從緩存中讀取消息並進行相應的邏輯處理。假設目前已經進行第y+1次拉取和第Z次位移提交,也就是第X+9以前的位移一經提交,但是處理消息的線程還在處理第X+4條消息,此時如果處理消息的線程發生異常,然后恢復正常后,則再次拉取消息會從第Z次提交的位置X+9處開始拉取消息然后處理,此時從X+4到X+9處的消息就被丟失了。
2.4.2 offset手動位移提交
自動位移提交的方式在正常情況下不會發生消息的重復消費和消息的丟失,但是在編程的世界異常是無法避免的。同時,kafka的自動位移提交是無法做到精確的位移管理。很多時候並不是說拉取到消息就算消息消費完成,而是需要將消息寫入數據庫,寫入緩存 或者進行復雜的業務邏輯處理才算完成。手動提交的方式可以讓開發人員根據業務邏輯在合適的位置進行位移提交。開啟手動提交前,要將消費者客戶端參數enable.auto.commit設置為false。
手動提交位移分為同步提交( consumer.commitSync() )和異步提交( consumer.commitAsync() ).
consumer.commitSync()
示例代碼如下:
public void getMessageManualCommit() { consumer.subscribe(Arrays.asList("brian_t")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); if (records.isEmpty()) { log.info("===== topic: brian_t not have data! ====="); continue; } records.forEach(r -> { log.info("{ key:{}, value:{}, offset:{} }", r.key(), r.value(), r.offset()); }); // manual commit consumer.commitSync(); log.info("===== manual commit the offset ====="); } }
對於采用consumer.commitSync()無參方式提交,它提交消費位移的頻次,拉取消息的頻次和處理批次消息的頻次是一樣的。如果需要精心更加細粒度的提交,就要采用由參的提交方式consumer.commitSync(offsets).示例代碼如下:
public void getMessageManualCommitWithOffser() { consumer.subscribe(Arrays.asList("brian_t")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); if (records.isEmpty()) { log.info("===== topic: brian_t not have data! ====="); continue; } records.forEach(r -> { log.info("{ key:{}, value:{}, offset:{} }", r.key(), r.value(), r.offset()); TopicPartition tp = new TopicPartition(r.topic(), r.partition()); // commit with offset consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(r.offset()+1))); log.info("===== manual commit the offset ====="); }); } }
consumer.commitAsync()
代碼示例如下:
public void getMessageManualAsyncCommit() { consumer.subscribe(Arrays.asList("brian_t")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); if (records.isEmpty()) { log.info("===== topic: brian_t not have data! ====="); continue; } records.forEach(r -> { log.info("{ key:{}, value:{}, offset:{} }", r.key(), r.value(), r.offset()); TopicPartition tp = new TopicPartition(r.topic(), r.partition()); // commit with offset consumer.commitAsync(Collections.singletonMap(tp, new OffsetAndMetadata(r.offset()+1)), new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception == null){ log.info("===== manual commit the offset success =====: {}", offsets.size()); } else { log.info("===== manual commit error =====: {}", exception.getMessage()); } } }); }); } }
3.消費者客戶端其他重要參數
fetch.min.bytes=1B #一次拉取最小字節數
fetch.max.bytes=50M #一次拉取最大字節數
fetch.max.wat.ms=500ms #拉取時最大等待時長
max.partation.fetch.bytes=1MB #每個分區一次拉取最大字節數
max.poll.records=500 #一次拉取的最大條數
connection.max.idle.ms=540000ms #網絡連接的最大閑置時長
request.timeout.ms=30000ms #一次請求等待響應的最大超時時間
metadata.max.age.ms=300000 #元數據在限定的時間內沒有更新,則會被強制更新
reconnect.backoff.ms=50ms #嘗試重試連接指定主機之前的間隔時間
retry.backoff.ms=100ms #嘗試重新拉取數據之前的時間間隔
isolation.level=read_uncommitted #隔離級別 決定消費者能讀到什么樣的數據
read_uncommitted: 可以消費到LSO(LastStableOffset)位置
read_committed: 可以消費到 HW(High Watermark)位置
max.poll.interval.ms=60000ms #超時限沒有發起poll,消費額組認為該消費者離開消費組
enable.auto.commit=true #開啟自動位移提交
auto.commit.interval.ms=5000 #自動提交位移的時間間隔
