消費者客戶端多線程實現
KafkaProducer 是線程安全的,然而 KafkaConsumer 卻是非線程安全的。KafkaConsumer 中定義了一個 acquire() 方法,用來檢測當前是否只有一個線程在操作,若有其他線程正在操作則會拋出 ConcurrentModifcationException 異常:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access.
KafkaConsumer 中的每個公用方法在執行所要執行的動作之前都會調用這個 acquire() 方法,只有 wakeup() 方法是個例外.
acquire () 方法的具體定義如下:
private final AtomicLong currentThread
= new AtomicLong(NO_CURRENT_THREAD); //KafkaConsumer中的成員變量
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() &&
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException
("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
acquire() 方法通過線程操作計數標記的方式來檢測線程是否發生了並發操作,以此保證只有一個線程在操作。acquire() 方法和 release() 方法成對出現,表示相應的加鎖和解鎖操作。
acquire() 方法和 release() 方法都是私有方法,因此在實際應用中不需要我們顯式地調用,但了解其內部的機理之后可以促使我們正確、有效地編寫相應的程序邏輯。
多線程的方式來實現消息消費,多線程的目的就是為了提高整體的消費能力。多線程的實現方式有多種,第一種也是最常見的方式:線程封閉,即為每個線程實例化一個 KafkaConsumer 對象
一個線程對應一個 KafkaConsumer 實例,我們可以稱之為消費線程。一個消費線程可以消費一個或多個分區中的消息,所有的消費線程都隸屬於同一個消費組。這種實現方式的並發度受限於分區的實際個數,當消費線程的個數大於分區數時,就有部分消費線程一直處於空閑的狀態。
public class FirstMultiConsumerThreadDemo {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static Properties initConfig(){
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4;
for(int i=0;i<consumerThreadNum;i++) {
new KafkaConsumerThread(props,topic).start();
}
}
public static class KafkaConsumerThread extends Thread{
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run(){
try {
while (true) {
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//處理消息模塊 ①
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
}
內部類 KafkaConsumerThread 代表消費線程,其內部包裹着一個獨立的 KafkaConsumer 實例。通過外部類的 main() 方法來啟動多個消費線程,消費線程的數量由 consumerThreadNum 變量指定。一般一個主題的分區數事先可以知曉,可以將 consumerThreadNum 設置成不大於分區數的值,如果不知道主題的分區數,那么也可以通過 KafkaConsumer 類的 partitionsFor() 方法來間接獲取,進而再設置合理的 consumerThreadNum 值。
上面這種多線程的實現方式和開啟多個消費進程的方式沒有本質上的區別,它的優點是每個線程可以按順序消費各個分區中的消息。缺點也很明顯,每個消費線程都要維護一個獨立的TCP連接,如果分區數和 consumerThreadNum 的值都很大,那么會造成不小的系統開銷。
如果對消息的處理非常迅速,那么 poll() 拉取的頻次也會更高,進而整體消費的性能也會提升;相反,如果在這里對消息的處理緩慢,比如進行一個事務性操作,或者等待一個RPC的同步響應,那么 poll() 拉取的頻次也會隨之下降,進而造成整體消費性能的下降。一般而言,poll() 拉取消息的速度是相當快的,而整體消費的瓶頸也正是在處理消息這一塊,如果我們通過一定的方式來改進這一部分,那么我們就能帶動整體消費性能的提升。
與此對應的第二種方式是多個消費線程同時消費同一個分區,這個通過 assign()、seek() 等方法實現,這樣可以打破原有的消費線程的個數不能超過分區數的限制,進一步提高了消費的能力。不過這種實現方式對於位移提交和順序控制的處理就會變得非常復雜,實際應用中使用得極少.
第三種實現方式,將處理消息模塊改成多線程的實現方式
public class ThirdMultiConsumerThreadDemo {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
//省略initConfig()方法,具體請參考代碼清單14-1
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumerThread consumerThread =
new KafkaConsumerThread(props, topic,
Runtime.getRuntime().availableProcessors());
consumerThread.start();
}
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
private int threadNumber;
public KafkaConsumerThread(Properties props,
String topic, int threadNumber) {
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;
executorService = new ThreadPoolExecutor(threadNumber, threadNumber,
0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records));
} ①
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
public static class RecordsHandler extends Thread{
public final ConsumerRecords<String, String> records;
public RecordsHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run(){
//處理records.
}
}
}
RecordHandler 類是用來處理消息的,而 KafkaConsumerThread 類對應的是一個消費線程,里面通過線程池的方式來調用 RecordHandler 處理一批批的消息。注意 KafkaConsumerThread 類中 ThreadPoolExecutor 里的最后一個參數設置的是 CallerRunsPolicy(),這樣可以防止線程池的總體消費能力跟不上 poll() 拉取的能力,從而導致異常現象的發生。第三種實現方式還可以橫向擴展,通過開啟多個 KafkaConsumerThread 實例來進一步提升整體的消費能力。
第三種實現方式相比第一種實現方式而言,除了橫向擴展的能力,還可以減少TCP連接對系統資源的消耗,不過缺點就是對於消息的順序處理就比較困難了。
對於第一種實現方式而言,如果要做具體的位移提交,直接在 KafkaConsumerThread 中的 run() 方法里實現即可。而對於第三種實現方式,這里引入一個共享變量 offsets 來參與提交
每一個處理消息的 RecordHandler 類在處理完消息之后都將對應的消費位移保存到共享變量 offsets 中,KafkaConsumerThread 在每一次 poll() 方法之后都讀取 offsets 中的內容並對其進行位移提交。注意在實現的過程中對 offsets 讀寫需要加鎖處理,防止出現並發問題。並且在寫入 offsets 的時候需要注意位移覆蓋的問題,針對這個問題,可以將 RecordHandler 類中的 run() 方法實現改為如下內容
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
//處理tpRecords.
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
synchronized (offsets) {
if (!offsets.containsKey(tp)) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
}else {
long position = offsets.get(tp).offset();
if (position < lastConsumedOffset + 1) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
}
}
}
}
對應的位移提交實現添加在 KafkaConsumerThread 類第①行代碼
synchronized (offsets) {
if (!offsets.isEmpty()) {
kafkaConsumer.commitSync(offsets);
offsets.clear();
}
}
這種位移提交的方式會有數據丟失的風險。對於同一個分區中的消息,假設一個處理線程 RecordHandler1 正在處理 offset 為0~99的消息,而另一個處理線程 RecordHandler2 已經處理完了 offset 為100~199的消息並進行了位移提交,此時如果 RecordHandler1 發生異常,則之后的消費只能從200開始而無法再次消費0~99的消息,從而造成了消息丟失的現象。這里雖然針對位移覆蓋做了一定的處理,但還沒有解決異常情況下的位移覆蓋問題。
對此就要引入更加復雜的處理機制,這里再提供一種解決思路,參考下圖,總體結構上是基於滑動窗口實現的。對於第三種實現方式而言,它所呈現的結構是通過消費者拉取分批次的消息,然后提交給多線程進行處理,而這里的滑動窗口式的實現方式是將拉取到的消息暫存起來,多個消費線程可以拉取暫存的消息,這個用於暫存消息的緩存大小即為滑動窗口的大小,總體上而言沒有太多的變化,不同的是對於消費位移的把控。
如上圖所示,每一個方格代表一個批次的消息,一個滑動窗口包含若干方格,startOffset 標注的是當前滑動窗口的起始位置,endOffset 標注的是末尾位置。每當 startOffset 指向的方格中的消息被消費完成,就可以提交這部分的位移,與此同時,窗口向前滑動一格,刪除原來 startOffset 所指方格中對應的消息,並且拉取新的消息進入窗口。滑動窗口的大小固定,所對應的用來暫存消息的緩存大小也就固定了,這部分內存開銷可控。
方格大小和滑動窗口的大小同時決定了消費線程的並發數:一個方格對應一個消費線程,對於窗口大小固定的情況,方格越小並行度越高;對於方格大小固定的情況,窗口越大並行度越高。不過,若窗口設置得過大,不僅會增大內存的開銷,而且在發生異常(比如 Crash)的情況下也會引起大量的重復消費,同時還考慮線程切換的開銷,建議根據實際情況設置一個合理的值,不管是對於方格還是窗口而言,過大或過小都不合適。
如果一個方格內的消息無法被標記為消費完成,那么就會造成 startOffset 的懸停。為了使窗口能夠繼續向前滑動,那么就需要設定一個閾值,當 startOffset 懸停一定的時間后就對這部分消息進行本地重試消費,如果重試失敗就轉入重試隊列,如果還不奏效就轉入死信隊列。真實應用中無法消費的情況極少,一般是由業務代碼的處理邏輯引起的,比如消息中的內容格式與業務處理的內容格式不符,無法對這條消息進行決斷,這種情況可以通過優化代碼邏輯或采取丟棄策略來避免。如果需要消息高度可靠,也可以將無法進行業務邏輯的消息(這類消息可以稱為死信)存入磁盤、數據庫或 Kafka,然后繼續消費下一條消息以保證整體消費進度合理推進,之后可以通過一個額外的處理任務來分析死信進而找出異常的原因。