上一篇《Kafka Consumer多線程實例續篇》修正了多線程提交位移的問題,但依然可能出現數據丟失的情況,原因在於多個線程可能拿到相同分區的數據,而消費的順序會破壞消息本身在分區中的順序,因而擾亂位移的提交。這次我使用KafkaConsumer的pause和resume方法來防止這種情形的發生。另外,本次我會編寫一個測試類用於驗證消費相同數量消息時,單線程消費速度要遠遜於多線程消費。
概述
這一次,我編寫了5個java文件,它們分別是:
- OrdinaryConsumer.java:普通的單線程Consumer,用於后面進行性能測試對比用。
- ConsumerWorker.java:多線程消息處理類,本質上就是一個Runnable。會被提交給線程池用於實際消息處理。
- MultiThreadedConsumer.java:多線程Consumer主控類,用於將消息分配給不同的ConsumerWorker,並且管理位移的提交。
- MultiThreadedRebalanceListener.java:為多線程Consumer服務的Rebalance監聽器。
- Test.java:用於測試單線程和多線程性能。
OrdinaryConsumer類
單線程的Consumer最簡單,我首先給出它的代碼:
package huxihx.mtc; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ThreadLocalRandom; /** * 單線程Consumer */ public class OrdinaryConsumer { private final Consumer<String, String> consumer; private final int expectedCount; // 用於測試的消息數量 public OrdinaryConsumer(String brokerId, String topic, String groupID, int expectedCount) { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topic)); this.expectedCount = expectedCount; } public void run() { try { int alreadyConsumed = 0; while (alreadyConsumed < expectedCount) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); alreadyConsumed += records.count(); records.forEach(this::handleRecord); } } finally { consumer.close(); } } private void handleRecord(ConsumerRecord<String, String> record) { try { // 模擬每條消息10毫秒處理 Thread.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset()); } }
代碼很簡單,沒什么可說的。唯一要說的是Consumer會模擬10毫秒處理一條事件。后面多線程Consumer我們也會使用相同的標准。
ConsumerWorker.java
接下來是消息處理的Runnable類:ConsumerWorker。和上一篇相比,這次最大的不同在於每個Worker只處理相同分區下的消息,而不是向之前那樣處理多個分區中的消息。這樣做的好處在於一旦某個分區的消息分配給了這個Worker,我可以暫停這個分區的可消費狀態,直到這個Worker全部處理完成。如果是混着多個分區的消息一起處理,實現這個就比較困難。ConsumerWorker代碼如下:
package huxihx.mtc; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; public class ConsumerWorker<K, V> { private final List<ConsumerRecord<K, V>> recordsOfSamePartition; private volatile boolean started = false; private volatile boolean stopped = false; private final ReentrantLock lock = new ReentrantLock(); private final long INVALID_COMMITTED_OFFSET = -1L; private final AtomicLong latestProcessedOffset = new AtomicLong(INVALID_COMMITTED_OFFSET); private final CompletableFuture<Long> future = new CompletableFuture<>(); public ConsumerWorker(List<ConsumerRecord<K, V>> recordsOfSamePartition) { this.recordsOfSamePartition = recordsOfSamePartition; } public boolean run() { lock.lock(); if (stopped) return false; started = true; lock.unlock(); for (ConsumerRecord<K, V> record : recordsOfSamePartition) { if (stopped) break; handleRecord(record); if (latestProcessedOffset.get() < record.offset() + 1) latestProcessedOffset.set(record.offset() + 1); } return future.complete(latestProcessedOffset.get()); } public long getLatestProcessedOffset() { return latestProcessedOffset.get(); } private void handleRecord(ConsumerRecord<K, V> record) { try { Thread.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset()); } public void close() { lock.lock(); this.stopped = true; if (!started) { future.complete(latestProcessedOffset.get()); } lock.unlock(); } public boolean isFinished() { return future.isDone(); } public long waitForCompletion(long timeout, TimeUnit timeUnit) { try { return future.get(timeout, timeUnit); } catch (Exception e) { if (e instanceof InterruptedException) Thread.currentThread().interrupt(); return INVALID_COMMITTED_OFFSET; } } }
需要說明的地方有以下幾點:
- latestProcessedOffset:使用這個變量保存該Worker當前已消費的最新位移。
- future:使用CompletableFuture來保存Worker要提交的位移。
- Worker成功操作與否的標志就是看這個future是否將latestProcessedOffset值封裝到結果中。
- handleRecord和單線程Consumer中的一致,模擬10ms處理消息。
MultiThreadedConsumer.java
構建好了ConsumerWorker類之后,下面是編寫多線程Consumer的主控類,該類循環執行:1、創建Consumer;2、讀取訂閱分區的消息;3、將消息按照不同分區進行歸組分發給不同的線程;4、暫停這些分區的后續消費,同時等待Worker線程完成消息處理;5、提交這些分區的位移;6、恢復這些分區的消費。
以下代碼是MultiThreadedConsumer類的完整代碼:
package huxihx.mtc; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class MultiThreadedConsumer { private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers = new HashMap<>(); private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); private long lastCommitTime = System.currentTimeMillis(); private final Consumer<String, String> consumer; private final int DEFAULT_COMMIT_INTERVAL = 3000; private final Map<TopicPartition, Long> currentConsumedOffsets = new HashMap<>(); private final long expectedCount; private final static Executor executor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * 10, r -> { Thread t = new Thread(r); t.setDaemon(true); return t; }); public MultiThreadedConsumer(String brokerId, String topic, String groupID, long expectedCount) { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topic), new MultiThreadedRebalanceListener(consumer, outstandingWorkers, offsetsToCommit)); this.expectedCount = expectedCount; } public void run() { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); distributeRecords(records); checkOutstandingWorkers(); commitOffsets(); if (currentConsumedOffsets.values().stream().mapToLong(Long::longValue).sum() >= expectedCount) { break; } } } finally { consumer.close(); } } /** * 對已完成消息處理並提交位移的分區執行resume操作 */ private void checkOutstandingWorkers() { Set<TopicPartition> completedPartitions = new HashSet<>(); outstandingWorkers.forEach((tp, worker) -> { if (worker.isFinished()) { completedPartitions.add(tp); } long offset = worker.getLatestProcessedOffset(); currentConsumedOffsets.put(tp, offset); if (offset > 0L) { offsetsToCommit.put(tp, new OffsetAndMetadata(offset)); } }); completedPartitions.forEach(outstandingWorkers::remove); consumer.resume(completedPartitions); } /** * 提交位移 */ private void commitOffsets() { try { long currentTime = System.currentTimeMillis(); if (currentTime - lastCommitTime > DEFAULT_COMMIT_INTERVAL && !offsetsToCommit.isEmpty()) { consumer.commitSync(offsetsToCommit); offsetsToCommit.clear(); } lastCommitTime = currentTime; } catch (Exception e) { e.printStackTrace(); } } /** * 將不同分區的消息交由不同的線程,同時暫停該分區消息消費 * @param records */ private void distributeRecords(ConsumerRecords<String, String> records) { if (records.isEmpty()) return; Set<TopicPartition> pausedPartitions = new HashSet<>(); records.partitions().forEach(tp -> { List<ConsumerRecord<String, String>> partitionedRecords = records.records(tp); pausedPartitions.add(tp); final ConsumerWorker<String, String> worker = new ConsumerWorker<>(partitionedRecords); CompletableFuture.supplyAsync(worker::run, executor); outstandingWorkers.put(tp, worker); }); consumer.pause(pausedPartitions); } }
該類代碼需要說明的地方包括:
- executor:我創建了一個包含10倍CPU核數的線程數。具體線程數根據你自己的業務需求而定。如果你的事件處理邏輯是I/O密集型操作(比如寫入外部系統),那么設置一個大一點的線程數通常都是有意義的。當然,我個人覺得最好不要超過Consumer分配到的總分區數。
- 一定要將自動提交位移的參數設置為false。多線程Consumer的一個關鍵設計就是要手動提交位移。
- Rebalance監聽器設置為MultiThreadedRebalanceListener。這個類如何響應分區的回收與分配我們稍后討論。
- run方法的邏輯基本上遵循了上面提到的流程:消息獲取 -> 分發 -> 檢查消費進度 -> 提交位移
- expectedCount:這是為了后面進行性能測試比對用到的總消息消費數。
MultiThreadedRebalanceListener.java
多線程Consumer在Rebalance操作開啟后要小心處理。首先,主線程的poll方法與Worker線程處理消息是並行執行的。此時如果發生Rebalance,那么有些分區就會被分配給其他Consumer,但Worker線程依然可能正在處理這些分區。因此,就可能出現這樣的場景:兩個Consumer都會處理這些分區中的消息。這就破壞了消費者組的設計理念。針對這種情況,我們必須要確保要被回收的那些分區的處理必須首先完成,之后才能被重新分配。
總體而言,在要回收分區前,多線程Consumer必須完成:
- 停止對應的Worker線程
- 提交位移
當然,一旦分區被重新分配后,事情就變得簡單了,我們調用resume恢復這些分區的可消費狀態即可。如果這些分區之前就是可以消費的,那么調用resume方法就沒有任何效果,總之是一個“無害”操作。MultiThreadedRebalanceListener類完整代碼如下:
package huxihx.mtc; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; public class MultiThreadedRebalanceListener implements ConsumerRebalanceListener { private final Consumer<String, String> consumer; private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers; private final Map<TopicPartition, OffsetAndMetadata> offsets; public MultiThreadedRebalanceListener(Consumer<String, String> consumer, Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers, Map<TopicPartition, OffsetAndMetadata> offsets) { this.consumer = consumer; this.outstandingWorkers = outstandingWorkers; this.offsets = offsets; } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { Map<TopicPartition, ConsumerWorker<String, String>> stoppedWorkers = new HashMap<>(); for (TopicPartition tp : partitions) { ConsumerWorker<String, String> worker = outstandingWorkers.remove(tp); if (worker != null) { worker.close(); stoppedWorkers.put(tp, worker); } } stoppedWorkers.forEach((tp, worker) -> { long offset = worker.waitForCompletion(1, TimeUnit.SECONDS); if (offset > 0L) { offsets.put(tp, new OffsetAndMetadata(offset)); } }); Map<TopicPartition, OffsetAndMetadata> revokedOffsets = new HashMap<>(); partitions.forEach(tp -> { OffsetAndMetadata offset = offsets.remove(tp); if (offset != null) { revokedOffsets.put(tp, offset); } }); try { consumer.commitSync(revokedOffsets); } catch (Exception e) { e.printStackTrace(); } } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { consumer.resume(partitions); } }
該類代碼需要說明的地方包括:
- 任何Rebalance監聽器都要實現ConsumerRebalanceListener接口。
- 該類定義了3個字段,分別保存Consumer實例、要停掉的Worker線程實例以及要提交的位移數據。
- 主要的邏輯在onPartitionsRevoked方法中實現。第一步是停掉Worker線程;第二步是手動提交位移。
Test.java
說完了以上4個Java類之后,現在我們編寫一個測試類來比較單線程Consumer和多線程Consumer的性能對比。首先我們創建一個topic,50個分區,單副本,並使用kafka-producer-perf-test工具創建5萬條消息,每個分區1000條。之后編寫如下代碼分別測試兩個Consumer的消費耗時:
package huxihx.mtc; public class Test { public static void main(String[] args) throws InterruptedException { int expectedCount = 50 * 900; String brokerId = "localhost:9092"; String groupId = "test-group"; String topic = "test"; OrdinaryConsumer consumer = new OrdinaryConsumer(brokerId, topic, groupId + "-single", expectedCount); long start = System.currentTimeMillis(); consumer.run(); System.out.println("Single-threaded consumer costs " + (System.currentTimeMillis() - start)); Thread.sleep(1L); MultiThreadedConsumer multiThreadedConsumer = new MultiThreadedConsumer(brokerId, topic, groupId + "-multi", expectedCount); start = System.currentTimeMillis(); multiThreadedConsumer.run(); System.out.println("Multi-threaded consumer costs " + (System.currentTimeMillis() - start)); } }
最后結果顯示。單線程Consumer消費45000條消息共耗時232秒,而多線程Consumer耗時6.2秒,如下:
Single-threaded consumer costs 232336
Multi-threaded consumer costs 6246
顯然,采用多線程Consumer的消費性能大約是單線程Consumer的37倍。當然實際的提升效果依具體環境而定。不過結論是肯定的,多線程Consumer在CPU核數很多且消息處理邏輯為I/O密集型操作的情形下會比單線程Consumer表現更好。