【原創】Kafka Consumer多線程消費


上一篇《Kafka Consumer多線程實例續篇》修正了多線程提交位移的問題,但依然可能出現數據丟失的情況,原因在於多個線程可能拿到相同分區的數據,而消費的順序會破壞消息本身在分區中的順序,因而擾亂位移的提交。這次我使用KafkaConsumer的pause和resume方法來防止這種情形的發生。另外,本次我會編寫一個測試類用於驗證消費相同數量消息時,單線程消費速度要遠遜於多線程消費。

概述 

這一次,我編寫了5個java文件,它們分別是:

  • OrdinaryConsumer.java:普通的單線程Consumer,用於后面進行性能測試對比用。
  • ConsumerWorker.java:多線程消息處理類,本質上就是一個Runnable。會被提交給線程池用於實際消息處理。
  • MultiThreadedConsumer.java:多線程Consumer主控類,用於將消息分配給不同的ConsumerWorker,並且管理位移的提交。
  • MultiThreadedRebalanceListener.java:為多線程Consumer服務的Rebalance監聽器。
  • Test.java:用於測試單線程和多線程性能。

OrdinaryConsumer類

單線程的Consumer最簡單,我首先給出它的代碼:

package huxihx.mtc;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 單線程Consumer
 */
public class OrdinaryConsumer {

    private final Consumer<String, String> consumer;
    private final int expectedCount; // 用於測試的消息數量

    public OrdinaryConsumer(String brokerId, String topic, String groupID, int expectedCount) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic));
        this.expectedCount = expectedCount;
    }

    public void run() {
        try {
            int alreadyConsumed = 0;
            while (alreadyConsumed < expectedCount) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                alreadyConsumed += records.count();
                records.forEach(this::handleRecord);
            }
        } finally {
            consumer.close();
        }
    }

    private void handleRecord(ConsumerRecord<String, String> record) {
        try {
            // 模擬每條消息10毫秒處理
            Thread.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
        System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset());
    }
} 

 代碼很簡單,沒什么可說的。唯一要說的是Consumer會模擬10毫秒處理一條事件。后面多線程Consumer我們也會使用相同的標准。

ConsumerWorker.java

接下來是消息處理的Runnable類:ConsumerWorker。和上一篇相比,這次最大的不同在於每個Worker只處理相同分區下的消息,而不是向之前那樣處理多個分區中的消息。這樣做的好處在於一旦某個分區的消息分配給了這個Worker,我可以暫停這個分區的可消費狀態,直到這個Worker全部處理完成。如果是混着多個分區的消息一起處理,實現這個就比較困難。ConsumerWorker代碼如下:

package huxihx.mtc;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

public class ConsumerWorker<K, V> {

    private final List<ConsumerRecord<K, V>> recordsOfSamePartition;
    private volatile boolean started = false;
    private volatile boolean stopped = false;
    private final ReentrantLock lock = new ReentrantLock();

    private final long INVALID_COMMITTED_OFFSET = -1L;
    private final AtomicLong latestProcessedOffset = new AtomicLong(INVALID_COMMITTED_OFFSET);
    private final CompletableFuture<Long> future = new CompletableFuture<>();

    public ConsumerWorker(List<ConsumerRecord<K, V>> recordsOfSamePartition) {
        this.recordsOfSamePartition = recordsOfSamePartition;
    }

    public boolean run() {
        lock.lock();
        if (stopped)
            return false;
        started = true;
        lock.unlock();
        for (ConsumerRecord<K, V> record : recordsOfSamePartition) {
            if (stopped)
                break;
            handleRecord(record);
            if (latestProcessedOffset.get() < record.offset() + 1)
                latestProcessedOffset.set(record.offset() + 1);
        }
        return future.complete(latestProcessedOffset.get());
    }

    public long getLatestProcessedOffset() {
        return latestProcessedOffset.get();
    }

    private void handleRecord(ConsumerRecord<K, V> record) {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
        System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset());
    }

    public void close() {
        lock.lock();
        this.stopped = true;
        if (!started) {
            future.complete(latestProcessedOffset.get());
        }
        lock.unlock();
    }

    public boolean isFinished() {
        return future.isDone();
    }

    public long waitForCompletion(long timeout, TimeUnit timeUnit) {
        try {
            return future.get(timeout, timeUnit);
        } catch (Exception e) {
            if (e instanceof InterruptedException)
                Thread.currentThread().interrupt();
            return INVALID_COMMITTED_OFFSET;
        }
    }
}

需要說明的地方有以下幾點:

  • latestProcessedOffset:使用這個變量保存該Worker當前已消費的最新位移。
  • future:使用CompletableFuture來保存Worker要提交的位移。
  • Worker成功操作與否的標志就是看這個future是否將latestProcessedOffset值封裝到結果中。
  • handleRecord和單線程Consumer中的一致,模擬10ms處理消息。

MultiThreadedConsumer.java

構建好了ConsumerWorker類之后,下面是編寫多線程Consumer的主控類,該類循環執行:1、創建Consumer;2、讀取訂閱分區的消息;3、將消息按照不同分區進行歸組分發給不同的線程;4、暫停這些分區的后續消費,同時等待Worker線程完成消息處理;5、提交這些分區的位移;6、恢復這些分區的消費。

以下代碼是MultiThreadedConsumer類的完整代碼:

package huxihx.mtc;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MultiThreadedConsumer {

    private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers = new HashMap<>();
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    private long lastCommitTime = System.currentTimeMillis();
    private final Consumer<String, String> consumer;
    private final int DEFAULT_COMMIT_INTERVAL = 3000;
    private final Map<TopicPartition, Long> currentConsumedOffsets = new HashMap<>();
    private final long expectedCount;

    private final static Executor executor = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 10, r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

    public MultiThreadedConsumer(String brokerId, String topic, String groupID, long expectedCount) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic), new MultiThreadedRebalanceListener(consumer, outstandingWorkers, offsetsToCommit));
        this.expectedCount = expectedCount;
    }

    public void run() {
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                distributeRecords(records);
                checkOutstandingWorkers();
                commitOffsets();
                if (currentConsumedOffsets.values().stream().mapToLong(Long::longValue).sum() >= expectedCount) {
                    break;
                }
            }
        } finally {
            consumer.close();
        }
    }

    /**
     * 對已完成消息處理並提交位移的分區執行resume操作
     */
    private void checkOutstandingWorkers() {
        Set<TopicPartition> completedPartitions = new HashSet<>();
        outstandingWorkers.forEach((tp, worker) -> {
            if (worker.isFinished()) {
                completedPartitions.add(tp);
            }
            long offset = worker.getLatestProcessedOffset();
            currentConsumedOffsets.put(tp, offset);
            if (offset > 0L) {
                offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
            }
        });
        completedPartitions.forEach(outstandingWorkers::remove);
        consumer.resume(completedPartitions);
    }

    /**
     * 提交位移
     */
    private void commitOffsets() {
        try {
            long currentTime = System.currentTimeMillis();
            if (currentTime - lastCommitTime > DEFAULT_COMMIT_INTERVAL && !offsetsToCommit.isEmpty()) {
                consumer.commitSync(offsetsToCommit);
                offsetsToCommit.clear();
            }
            lastCommitTime = currentTime;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 將不同分區的消息交由不同的線程,同時暫停該分區消息消費
     * @param records
     */
    private void distributeRecords(ConsumerRecords<String, String> records) {
        if (records.isEmpty())
            return;
        Set<TopicPartition> pausedPartitions = new HashSet<>();
        records.partitions().forEach(tp -> {
            List<ConsumerRecord<String, String>> partitionedRecords = records.records(tp);
            pausedPartitions.add(tp);
            final ConsumerWorker<String, String> worker = new ConsumerWorker<>(partitionedRecords);
            CompletableFuture.supplyAsync(worker::run, executor);
            outstandingWorkers.put(tp, worker);
        });
        consumer.pause(pausedPartitions);
    }
}  

 該類代碼需要說明的地方包括:

  • executor:我創建了一個包含10倍CPU核數的線程數。具體線程數根據你自己的業務需求而定。如果你的事件處理邏輯是I/O密集型操作(比如寫入外部系統),那么設置一個大一點的線程數通常都是有意義的。當然,我個人覺得最好不要超過Consumer分配到的總分區數。
  • 一定要將自動提交位移的參數設置為false。多線程Consumer的一個關鍵設計就是要手動提交位移。
  • Rebalance監聽器設置為MultiThreadedRebalanceListener。這個類如何響應分區的回收與分配我們稍后討論。
  • run方法的邏輯基本上遵循了上面提到的流程:消息獲取 -> 分發 -> 檢查消費進度 -> 提交位移
  • expectedCount:這是為了后面進行性能測試比對用到的總消息消費數。

MultiThreadedRebalanceListener.java

多線程Consumer在Rebalance操作開啟后要小心處理。首先,主線程的poll方法與Worker線程處理消息是並行執行的。此時如果發生Rebalance,那么有些分區就會被分配給其他Consumer,但Worker線程依然可能正在處理這些分區。因此,就可能出現這樣的場景:兩個Consumer都會處理這些分區中的消息。這就破壞了消費者組的設計理念。針對這種情況,我們必須要確保要被回收的那些分區的處理必須首先完成,之后才能被重新分配。

總體而言,在要回收分區前,多線程Consumer必須完成:

  1. 停止對應的Worker線程
  2. 提交位移

當然,一旦分區被重新分配后,事情就變得簡單了,我們調用resume恢復這些分區的可消費狀態即可。如果這些分區之前就是可以消費的,那么調用resume方法就沒有任何效果,總之是一個“無害”操作。MultiThreadedRebalanceListener類完整代碼如下:

package huxihx.mtc;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class MultiThreadedRebalanceListener implements ConsumerRebalanceListener {

    private final Consumer<String, String> consumer;
    private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers;
    private final Map<TopicPartition, OffsetAndMetadata> offsets;

    public MultiThreadedRebalanceListener(Consumer<String, String> consumer,
                                          Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers,
                                          Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.consumer = consumer;
        this.outstandingWorkers = outstandingWorkers;
        this.offsets = offsets;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Map<TopicPartition, ConsumerWorker<String, String>> stoppedWorkers = new HashMap<>();
        for (TopicPartition tp : partitions) {
            ConsumerWorker<String, String> worker = outstandingWorkers.remove(tp);
            if (worker != null) {
                worker.close();
                stoppedWorkers.put(tp, worker);
            }
        }

        stoppedWorkers.forEach((tp, worker) -> {
            long offset = worker.waitForCompletion(1, TimeUnit.SECONDS);
            if (offset > 0L) {
                offsets.put(tp, new OffsetAndMetadata(offset));
            }
        });

        Map<TopicPartition, OffsetAndMetadata> revokedOffsets = new HashMap<>();
        partitions.forEach(tp -> {
            OffsetAndMetadata offset = offsets.remove(tp);
            if (offset != null) {
                revokedOffsets.put(tp, offset);
            }
        });

        try {
            consumer.commitSync(revokedOffsets);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.resume(partitions);
    }
}

該類代碼需要說明的地方包括:

  • 任何Rebalance監聽器都要實現ConsumerRebalanceListener接口。
  • 該類定義了3個字段,分別保存Consumer實例、要停掉的Worker線程實例以及要提交的位移數據。
  • 主要的邏輯在onPartitionsRevoked方法中實現。第一步是停掉Worker線程;第二步是手動提交位移。

Test.java

說完了以上4個Java類之后,現在我們編寫一個測試類來比較單線程Consumer和多線程Consumer的性能對比。首先我們創建一個topic,50個分區,單副本,並使用kafka-producer-perf-test工具創建5萬條消息,每個分區1000條。之后編寫如下代碼分別測試兩個Consumer的消費耗時:

package huxihx.mtc;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        int expectedCount = 50 * 900;
        String brokerId = "localhost:9092";
        String groupId = "test-group";
        String topic = "test";

        OrdinaryConsumer consumer = new OrdinaryConsumer(brokerId, topic, groupId + "-single", expectedCount);
        long start = System.currentTimeMillis();
        consumer.run();
        System.out.println("Single-threaded consumer costs " + (System.currentTimeMillis() - start));

        Thread.sleep(1L);

        MultiThreadedConsumer multiThreadedConsumer =
                new MultiThreadedConsumer(brokerId, topic, groupId + "-multi", expectedCount);
        start = System.currentTimeMillis();
        multiThreadedConsumer.run();
        System.out.println("Multi-threaded consumer costs " + (System.currentTimeMillis() - start));
    }
}

最后結果顯示。單線程Consumer消費45000條消息共耗時232秒,而多線程Consumer耗時6.2秒,如下:

Single-threaded consumer costs 232336

Multi-threaded consumer costs 6246

顯然,采用多線程Consumer的消費性能大約是單線程Consumer的37倍。當然實際的提升效果依具體環境而定。不過結論是肯定的,多線程Consumer在CPU核數很多且消息處理邏輯為I/O密集型操作的情形下會比單線程Consumer表現更好。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM