本文簡單介紹下如何使用多線程消費kafka
注: 以下示例采用Kafka版本2.2
消費者配置
消費者從Kafka讀取消息,需要考慮以下消費者配置。
參數 | 說明 |
---|---|
max.poll.records(default=500) | 消費者單次獲取的最大消息條數 |
fetch.max.bytes(default=52428800) | 服務器應為獲取請求返回的最大數據量。記錄由消費者分批獲取,如果獲取的第一個非空分區中的第一個記錄批次大於該值,則仍會返回該記錄批次,以確保消費者可以取得進展。因此,這不是絕對最大值。代理接受的最大記錄批量大小是通過message.max.bytes (broker config) 或max.message.bytes (topic config) 定義的。請注意,消費者並行執行多次提取。 |
session.timeout.ms(default=10000) | 消費者定期向broker發送心跳,如果在此會話超時到期之前broker沒有收到心跳,則broker將從組中刪除消費者,並啟動重新平衡 |
max.poll.interval.ms(default=300000) | 消費者兩次調用poll()之間的最大延遲,如果超過這個時間,則broker將從組中刪除消費者並啟動重新平衡 |
heartbeat.interval.ms(default=3000) | 定義消費者發送心跳的頻率 |
Rebalance
什么情況會觸發重新平衡?
Kafka在管理消費者組時,只要消費者組成員發生變化或消費者組訂閱發生變化,就會觸發分區重新平衡。比如:
-
topic 添加了新的分區
-
一個消費者加入一個組:部署了新的程序,使用了一樣的groupId
-
一個消費者離開一個組:
max.poll.interval.ms
超時,未及時處理輪詢記錄session.timeout.ms
超時,由於應用程序崩潰或者網絡錯誤,沒有發送心跳- 消費者關閉,服務停掉
重新平衡該怎么做?
如果是啟用自動偏移提交,您不必擔心組重新平衡,一切都由 poll 方法自動完成。但是,如果您禁用自動偏移提交並手動提交,你需要在發送組請求之前提交偏移量。您可以通過兩種方式執行此操作:
-
在處理完一批消息后執行
commitSync()
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 在入庫之后,提交offset之前失敗的話,也會導致重復消費 consumer.commitSync(); buffer.clear(); } }
-
實現
ConsumerRebalanceListener
,在分區即將被撤銷時得到通知,並在此時提交相應的offset。public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener { private Consumer<?,?> consumer; public SaveOffsetsOnRebalance(Consumer<?,?> consumer) { this.consumer = consumer; } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions){ // 提交offset for(TopicPartition partition: partitions) saveOffsetInExternalStore(consumer.position(partition)); } }
第一種方法更簡單,但處理速度非常快的情況可能會導致偏移提交過於頻繁。第二種方法更有效,並且對於完全分離的消費和處理是必要的。
示例代碼
思路:主線程拉取一批Kafka消息,以分區為最小粒度創建任務,交給線程池處理,每個任務處理一個分區的數據,主線程輪詢任務消費情況,提交offset。
創建任務
package com.mirai.boot.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author mirai
* @version 2021/9/23
* @since 1.8
*/
@Slf4j
public class MyConsumerTask implements Runnable {
private final List<ConsumerRecord<String, String>> records;
private volatile boolean stopped = false;
private volatile boolean started = false;
private volatile boolean finished = false;
private final CompletableFuture<Long> completion = new CompletableFuture<>();
private final ReentrantLock startStopLock = new ReentrantLock();
private final AtomicLong currentOffset = new AtomicLong();
public MyConsumerTask(List<ConsumerRecord<String, String>> records) {
this.records = records;
}
@Override
public void run() {
startStopLock.lock();
try {
if (stopped) {
return;
}
started = true;
} finally {
startStopLock.unlock();
}
for (ConsumerRecord<String, String> record : records) {
if (stopped) {
break;
}
// process record here and make sure you catch all exceptions;
currentOffset.set(record.offset() + 1);
}
finished = true;
completion.complete(currentOffset.get());
}
public long getCurrentOffset() {
return currentOffset.get();
}
public void stop() {
startStopLock.lock();
try {
this.stopped = true;
if (!started) {
finished = true;
completion.complete(currentOffset.get());
}
} finally {
startStopLock.unlock();
}
}
public long waitForCompletion() {
try {
return completion.get();
} catch (InterruptedException | ExecutionException e) {
return -1;
}
}
public boolean isFinished() {
return finished;
}
}
實現 ConsumerRebalanceListener
package com.mirai.boot.kafka.demo;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* @author mirai
*/
@Slf4j
@AllArgsConstructor
public class MultiThreadedRebalancedListener implements ConsumerRebalanceListener {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks;
private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 1. Stop all tasks handling records from revoked partitions
Map<TopicPartition, MyConsumerTask<String, String>> stoppedTask = new HashMap<>();
for (TopicPartition partition : partitions) {
MyConsumerTask<String, String> task = activeTasks.remove(partition);
if (task != null) {
task.stop();
stoppedTask.put(partition, task);
}
}
// 2. Wait for stopped tasks to complete processing of current record
stoppedTask.forEach((partition, task) -> {
long offset = task.waitForCompletion();
if (offset > 0) {
offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
}
});
// 3. collect offsets for revoked partitions
Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();
partitions.forEach(partition -> {
OffsetAndMetadata offset = offsetsToCommit.remove(partition);
if (offset != null) {
revokedPartitionOffsets.put(partition, offset);
}
});
// 4. commit offsets for revoked partitions
try {
consumer.commitSync(revokedPartitionOffsets);
} catch (Exception e) {
log.warn("Failed to commit offsets for revoked partitions!");
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.resume(partitions);
}
}
多線程消費
package com.mirai.boot.kafka.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author mirai
* @version 2021/9/23
* @since 1.8
*/
@Slf4j
public class MyMultiTreadConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
private final AtomicBoolean stopped = new AtomicBoolean(false);
private long lastCommitTime = System.currentTimeMillis();
private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L
, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.DiscardPolicy()
);
public MyMultiTreadConsumer(Properties properties, String topic) {
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic)
, new MultiThreadedRebalancedListener(consumer, activeTasks, offsetsToCommit));
}
@Override
public void run() {
try {
while (!stopped.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
handleFetchedRecords(records);
checkActiveTasks();
commitOffsets();
}
} catch (WakeupException we) {
if (!stopped.get()) {
throw we;
}
} finally {
consumer.close();
}
}
private void handleFetchedRecords(ConsumerRecords<String, String> records) {
if (records.count() > 0) {
List<TopicPartition> partitionsToPause = new ArrayList<>();
records.partitions().forEach(partition -> {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
MyConsumerTask<String, String> task = new MyConsumerTask<>(partitionRecords);
partitionsToPause.add(partition);
EXECUTOR.submit(task);
activeTasks.put(partition, task);
});
consumer.pause(partitionsToPause);
}
}
private void checkActiveTasks() {
List<TopicPartition> finishedTasksPartitions = new ArrayList<>();
activeTasks.forEach((partition, task) -> {
if (task.isFinished()) {
finishedTasksPartitions.add(partition);
}
long offset = task.getCurrentOffset();
if (offset > 0) {
offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
}
});
finishedTasksPartitions.forEach(activeTasks::remove);
consumer.resume(finishedTasksPartitions);
}
private void commitOffsets() {
try {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - lastCommitTime > 5000) {
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
offsetsToCommit.clear();
}
lastCommitTime = currentTimeMillis;
}
} catch (Exception e) {
log.error("Failed to commit offsets!", e);
}
}
}